Processing dead-letter events from Genesys Cloud EventBridge subscriptions using a Go consumer
What You Will Build
- A Go HTTP consumer that receives EventBridge dead-letter events, parses retry metadata, reconstructs original payloads, and re-submits them to the target API with incremented attempt counters.
- This implementation uses the Genesys Cloud REST API and the official Go SDK for authentication and subscription validation.
- The tutorial covers Go 1.21+ with standard library HTTP handling, JSON parsing, and production-grade retry logic.
Prerequisites
- OAuth 2.0 Client Credentials grant type with scopes:
eventstreams:read,eventstreams:write - Genesys Cloud Platform Client SDK for Go version 125+ (
github.com/mypurecloud/platform-client-sdk-go/v125) - Go runtime 1.21 or higher
- External dependencies: none beyond standard library and the official SDK. Install SDK via
go get github.com/mypurecloud/platform-client-sdk-go/v125
Authentication Setup
Genesys Cloud APIs require a bearer token obtained through the client credentials flow. The consumer must cache the token and handle expiration before making API calls or validating subscriptions.
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// OAuthConfig holds client credentials and token endpoint
type OAuthConfig struct {
ClientID string
ClientSecret string
Environment string // e.g., "mypurecloud.com"
}
// TokenResponse matches the Genesys Cloud OAuth2 response structure
type TokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
TokenType string `json:"token_type"`
}
type TokenCache struct {
mu sync.RWMutex
token string
expiresAt time.Time
}
func NewTokenCache() *TokenCache {
return &TokenCache{
expiresAt: time.Time{},
}
}
func (tc *TokenCache) GetValidToken() (string, error) {
tc.mu.RLock()
defer tc.mu.RUnlock()
if time.Now().Before(tc.expiresAt) {
return tc.token, nil
}
return "", fmt.Errorf("token expired or not initialized")
}
func (tc *TokenCache) SetToken(token string, expiresIn int) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.token = token
tc.expiresAt = time.Now().Add(time.Duration(expiresIn-60) * time.Second)
}
// FetchOAuthToken retrieves a new bearer token from Genesys Cloud
// Required scope: eventstreams:read, eventstreams:write
func FetchOAuthToken(cfg OAuthConfig) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
payload := fmt.Sprintf(
"client_id=%s&client_secret=%s&grant_type=client_credentials&scope=eventstreams:read+eventstreams:write",
cfg.ClientID, cfg.ClientSecret,
)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("https://api.%s/api/v2/oauth2/token", cfg.Environment),
nil,
)
if err != nil {
return "", fmt.Errorf("failed to create oauth request: %w", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.SetBasicAuth(cfg.ClientID, cfg.ClientSecret)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("oauth request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("oauth token request returned %d", resp.StatusCode)
}
var tokenResp TokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("failed to decode oauth response: %w", err)
}
return tokenResp.AccessToken, nil
}
The token cache subtracts 60 seconds from the expiration window to prevent race conditions during concurrent requests. Every external API call must check the cache first and refresh if expired.
Implementation
Step 1: Parse incoming dead-letter events and validate subscription
EventBridge dead-letter events arrive as POST requests to your subscription endpoint. The payload wraps the original event with retry metadata. The consumer must unmarshal the wrapper, extract the original payload, and verify the subscription exists in Genesys Cloud.
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/mypurecloud/platform-client-sdk-go/v125/platformclientv2"
)
// DeadLetterEvent matches Genesys Cloud EventBridge dead-letter structure
type DeadLetterEvent struct {
EventType string `json:"eventType"`
SubscriptionID string `json:"subscriptionId"`
RetryCount int `json:"retryCount"`
FailureReason string `json:"failureReason"`
Timestamp string `json:"timestamp"`
OriginalEvent json.RawMessage `json:"originalEvent"`
}
// ReconstructOriginalPayload extracts and validates the original event data
func ReconstructOriginalPayload(deadLetter DeadLetterEvent) ([]byte, error) {
if len(deadLetter.OriginalEvent) == 0 {
return nil, fmt.Errorf("originalEvent field is empty in dead-letter payload")
}
// Validate that the original event is valid JSON
var raw interface{}
if err := json.Unmarshal(deadLetter.OriginalEvent, &raw); err != nil {
return nil, fmt.Errorf("invalid JSON in originalEvent: %w", err)
}
return deadLetter.OriginalEvent, nil
}
// ValidateSubscription checks if the subscription exists and is active
// Required scope: eventstreams:read
func ValidateSubscription(apiClient *platformclientv2.ApiClient, subscriptionID string) error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
config, err := apiClient.GetConfiguration()
if err != nil {
return fmt.Errorf("failed to get SDK configuration: %w", err)
}
api := platformclientv2.NewEventstreamsApi(apiClient)
resp, httpResp, err := api.GetEventstreamsSubscription(ctx, subscriptionID)
if httpResp != nil {
defer httpResp.Body.Close()
}
if err != nil {
return fmt.Errorf("subscription validation failed: %w", err)
}
if resp.StatusCode() != nil && *resp.StatusCode() != "ACTIVE" {
return fmt.Errorf("subscription %s is not active: %s", subscriptionID, *resp.StatusCode())
}
return nil
}
The SDK call api.GetEventstreamsSubscription maps directly to GET /api/v2/eventstreams/subscriptions/{id}. The response includes subscription status, target URL, and filtering rules. Validation prevents processing dead-letters from deleted or disabled subscriptions.
Step 2: Re-submit reconstructed payloads with incremented attempt counters
After extraction, the consumer POSTs the original payload to the target endpoint. The attempt counter increments from the dead-letter retry count. The HTTP client implements exponential backoff for 429 responses and retries up to three times.
package main
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
)
// RetryConfig defines backoff parameters for 429 handling
type RetryConfig struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
}
// ResubmitPayload posts the reconstructed event to the target API
// Returns the HTTP response status and body for logging
func ResubmitPayload(client *http.Client, targetURL string, payload []byte, attemptCount int, retryCfg RetryConfig) (int, []byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
currentAttempt := attemptCount
delay := retryCfg.BaseDelay
for i := 0; i <= retryCfg.MaxRetries; i++ {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(payload))
if err != nil {
return 0, nil, fmt.Errorf("failed to create resubmit request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Attempt-Count", fmt.Sprintf("%d", currentAttempt))
req.Header.Set("X-Resubmit-Source", "dead-letter-consumer")
resp, err := client.Do(req)
if err != nil {
return 0, nil, fmt.Errorf("resubmit request failed: %w", err)
}
respBody, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode == http.StatusTooManyRequests {
if i == retryCfg.MaxRetries {
return resp.StatusCode, respBody, fmt.Errorf("max retries exceeded after 429 responses")
}
currentAttempt++
time.Sleep(delay)
delay = min(delay * 2, retryCfg.MaxDelay)
continue
}
if resp.StatusCode >= 500 {
if i == retryCfg.MaxRetries {
return resp.StatusCode, respBody, fmt.Errorf("server error after retries: %d", resp.StatusCode)
}
currentAttempt++
time.Sleep(delay)
delay = min(delay * 2, retryCfg.MaxDelay)
continue
}
return resp.StatusCode, respBody, nil
}
return 0, nil, fmt.Errorf("unexpected retry loop termination")
}
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
The X-Attempt-Count header allows downstream systems to track processing history. The retry loop handles 429 and 5xx responses with exponential backoff capped at MaxDelay. Successful responses (2xx) exit immediately. The function returns the final status code and response body for audit logging.
Step 3: Wire the HTTP handler and process events end-to-end
The HTTP handler ties authentication, parsing, validation, and resubmission together. It enforces request size limits, validates JSON structure, and logs outcomes for observability.
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)
type ConsumerConfig struct {
OAuth OAuthConfig
TargetURL string
RetryConfig RetryConfig
MaxBodySize int64
}
func DeadLetterHandler(cfg ConsumerConfig, tokenCache *TokenCache, sdkClient *platformclientv2.ApiClient) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodySize)
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
var deadLetter DeadLetterEvent
if err := json.Unmarshal(body, &deadLetter); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
if deadLetter.EventType != "dead-letter" {
http.Error(w, "Unsupported event type", http.StatusBadRequest)
return
}
payload, err := ReconstructOriginalPayload(deadLetter)
if err != nil {
log.Printf("Payload reconstruction failed: %v", err)
http.Error(w, "Internal processing error", http.StatusInternalServerError)
return
}
// Refresh token if expired
token, err := tokenCache.GetValidToken()
if err != nil {
newToken, fetchErr := FetchOAuthToken(cfg.OAuth)
if fetchErr != nil {
log.Printf("Token refresh failed: %v", fetchErr)
http.Error(w, "Authentication failed", http.StatusUnauthorized)
return
}
tokenCache.SetToken(newToken, 3600)
token = newToken
}
// Validate subscription exists
if err := ValidateSubscription(sdkClient, deadLetter.SubscriptionID); err != nil {
log.Printf("Subscription validation failed: %v", err)
http.Error(w, "Subscription invalid", http.StatusForbidden)
return
}
httpClient := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
},
}
statusCode, respBody, err := ResubmitPayload(
httpClient,
cfg.TargetURL,
payload,
deadLetter.RetryCount+1,
cfg.RetryConfig,
)
if err != nil {
log.Printf("Resubmission failed [%d]: %v | Response: %s", statusCode, err, string(respBody))
http.Error(w, "Resubmission failed", http.StatusBadGateway)
return
}
log.Printf("Successfully resubmitted event %s | Attempt: %d | Status: %d",
deadLetter.SubscriptionID, deadLetter.RetryCount+1, statusCode)
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"processed","attempt":` + fmt.Sprintf("%d", deadLetter.RetryCount+1) + `}`))
}
}
The handler enforces a MaxBodySize limit to prevent memory exhaustion. It refreshes the OAuth token before API calls. Validation ensures the subscription remains active. The resubmission result determines the HTTP response status. Successful processing returns 200 with a JSON acknowledgment.
Complete Working Example
package main
import (
"log"
"net/http"
"os"
"time"
"github.com/mypurecloud/platform-client-sdk-go/v125/platformclientv2"
)
func main() {
clientID := os.Getenv("GENESYS_CLIENT_ID")
clientSecret := os.Getenv("GENESYS_CLIENT_SECRET")
environment := os.Getenv("GENESYS_ENVIRONMENT")
targetURL := os.Getenv("TARGET_API_URL")
if clientID == "" || clientSecret == "" || targetURL == "" {
log.Fatal("Required environment variables missing")
}
cfg := ConsumerConfig{
OAuth: OAuthConfig{
ClientID: clientID,
ClientSecret: clientSecret,
Environment: environment,
},
TargetURL: targetURL,
RetryConfig: RetryConfig{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
},
MaxBodySize: 1024 * 1024, // 1MB
}
tokenCache := NewTokenCache()
// Initial token fetch
initialToken, err := FetchOAuthToken(cfg.OAuth)
if err != nil {
log.Fatalf("Failed to fetch initial token: %v", err)
}
tokenCache.SetToken(initialToken, 3600)
// Initialize SDK client
config, err := platformclientv2.NewConfiguration()
if err != nil {
log.Fatalf("SDK configuration error: %v", err)
}
config.SetBasePath("https://api." + cfg.OAuth.Environment)
config.SetAccessToken(initialToken)
sdkClient, err := platformclientv2.NewApiClient(config)
if err != nil {
log.Fatalf("Failed to create SDK client: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("/dead-letter", DeadLetterHandler(cfg, tokenCache, sdkClient))
server := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
log.Printf("Dead-letter consumer listening on :8080/dead-letter")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server failed: %v", err)
}
}
Run with environment variables set. The server binds to port 8080 and accepts POST requests at /dead-letter. Configure your Genesys Cloud EventBridge subscription to point to https://your-host:8080/dead-letter.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: OAuth token expired, invalid client credentials, or missing
eventstreams:readscope. - Fix: Verify environment variables contain valid credentials. Check the token cache expiration logic. Ensure the OAuth client in Genesys Cloud has the
eventstreams:readandeventstreams:writescopes assigned. - Code fix: The
DeadLetterHandleralready refreshes tokens on expiration. Add logging aroundFetchOAuthTokento capture credential mismatches.
Error: 403 Forbidden
- Cause: Subscription ID in the dead-letter payload does not exist, is disabled, or the OAuth client lacks permission to read it.
- Fix: Validate the subscription ID matches an active EventBridge subscription. Confirm the OAuth client has
eventstreams:read. Check subscription status viaGET /api/v2/eventstreams/subscriptions/{id}. - Code fix: The
ValidateSubscriptionfunction returns a descriptive error. Log the subscription ID and status code from the SDK response to identify disabled subscriptions.
Error: 429 Too Many Requests
- Cause: Target API enforces rate limits, or Genesys Cloud API throttles subscription validation calls.
- Fix: The
ResubmitPayloadfunction implements exponential backoff up toMaxRetries. IncreaseBaseDelayorMaxRetriesif the target system has stricter limits. MonitorX-RateLimit-Resetheaders if provided by the target API. - Code fix: Adjust
RetryConfigvalues inmain(). Add header inspection inResubmitPayloadto parseRetry-Afterif present.
Error: JSON Unmarshal Failure
- Cause: Malformed dead-letter payload, missing
originalEventfield, or nested JSON encoding issues. - Fix: Verify the EventBridge subscription sends raw JSON. Ensure the consumer reads the full body. Use
json.RawMessageto preserve nested structure without double-encoding. - Code fix: The
ReconstructOriginalPayloadfunction validates JSON structure. Add a fallback logger that writes the raw payload to disk for manual inspection when unmarshaling fails.