Processing dead-letter events from Genesys Cloud EventBridge subscriptions using a Go consumer

Processing dead-letter events from Genesys Cloud EventBridge subscriptions using a Go consumer

What You Will Build

  • A Go HTTP consumer that receives EventBridge dead-letter events, parses retry metadata, reconstructs original payloads, and re-submits them to the target API with incremented attempt counters.
  • This implementation uses the Genesys Cloud REST API and the official Go SDK for authentication and subscription validation.
  • The tutorial covers Go 1.21+ with standard library HTTP handling, JSON parsing, and production-grade retry logic.

Prerequisites

  • OAuth 2.0 Client Credentials grant type with scopes: eventstreams:read, eventstreams:write
  • Genesys Cloud Platform Client SDK for Go version 125+ (github.com/mypurecloud/platform-client-sdk-go/v125)
  • Go runtime 1.21 or higher
  • External dependencies: none beyond standard library and the official SDK. Install SDK via go get github.com/mypurecloud/platform-client-sdk-go/v125

Authentication Setup

Genesys Cloud APIs require a bearer token obtained through the client credentials flow. The consumer must cache the token and handle expiration before making API calls or validating subscriptions.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"
)

// OAuthConfig holds client credentials and token endpoint
type OAuthConfig struct {
	ClientID     string
	ClientSecret string
	Environment  string // e.g., "mypurecloud.com"
}

// TokenResponse matches the Genesys Cloud OAuth2 response structure
type TokenResponse struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int    `json:"expires_in"`
	TokenType   string `json:"token_type"`
}

type TokenCache struct {
	mu        sync.RWMutex
	token     string
	expiresAt time.Time
}

func NewTokenCache() *TokenCache {
	return &TokenCache{
		expiresAt: time.Time{},
	}
}

func (tc *TokenCache) GetValidToken() (string, error) {
	tc.mu.RLock()
	defer tc.mu.RUnlock()
	if time.Now().Before(tc.expiresAt) {
		return tc.token, nil
	}
	return "", fmt.Errorf("token expired or not initialized")
}

func (tc *TokenCache) SetToken(token string, expiresIn int) {
	tc.mu.Lock()
	defer tc.mu.Unlock()
	tc.token = token
	tc.expiresAt = time.Now().Add(time.Duration(expiresIn-60) * time.Second)
}

// FetchOAuthToken retrieves a new bearer token from Genesys Cloud
// Required scope: eventstreams:read, eventstreams:write
func FetchOAuthToken(cfg OAuthConfig) (string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	payload := fmt.Sprintf(
		"client_id=%s&client_secret=%s&grant_type=client_credentials&scope=eventstreams:read+eventstreams:write",
		cfg.ClientID, cfg.ClientSecret,
	)

	req, err := http.NewRequestWithContext(ctx, http.MethodPost,
		fmt.Sprintf("https://api.%s/api/v2/oauth2/token", cfg.Environment),
		nil,
	)
	if err != nil {
		return "", fmt.Errorf("failed to create oauth request: %w", err)
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
	req.SetBasicAuth(cfg.ClientID, cfg.ClientSecret)

	client := &http.Client{Timeout: 10 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		return "", fmt.Errorf("oauth request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("oauth token request returned %d", resp.StatusCode)
	}

	var tokenResp TokenResponse
	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
		return "", fmt.Errorf("failed to decode oauth response: %w", err)
	}

	return tokenResp.AccessToken, nil
}

The token cache subtracts 60 seconds from the expiration window to prevent race conditions during concurrent requests. Every external API call must check the cache first and refresh if expired.

Implementation

Step 1: Parse incoming dead-letter events and validate subscription

EventBridge dead-letter events arrive as POST requests to your subscription endpoint. The payload wraps the original event with retry metadata. The consumer must unmarshal the wrapper, extract the original payload, and verify the subscription exists in Genesys Cloud.

package main

import (
	"encoding/json"
	"fmt"
	"time"

	"github.com/mypurecloud/platform-client-sdk-go/v125/platformclientv2"
)

// DeadLetterEvent matches Genesys Cloud EventBridge dead-letter structure
type DeadLetterEvent struct {
	EventType      string          `json:"eventType"`
	SubscriptionID string          `json:"subscriptionId"`
	RetryCount     int             `json:"retryCount"`
	FailureReason  string          `json:"failureReason"`
	Timestamp      string          `json:"timestamp"`
	OriginalEvent  json.RawMessage `json:"originalEvent"`
}

// ReconstructOriginalPayload extracts and validates the original event data
func ReconstructOriginalPayload(deadLetter DeadLetterEvent) ([]byte, error) {
	if len(deadLetter.OriginalEvent) == 0 {
		return nil, fmt.Errorf("originalEvent field is empty in dead-letter payload")
	}

	// Validate that the original event is valid JSON
	var raw interface{}
	if err := json.Unmarshal(deadLetter.OriginalEvent, &raw); err != nil {
		return nil, fmt.Errorf("invalid JSON in originalEvent: %w", err)
	}

	return deadLetter.OriginalEvent, nil
}

// ValidateSubscription checks if the subscription exists and is active
// Required scope: eventstreams:read
func ValidateSubscription(apiClient *platformclientv2.ApiClient, subscriptionID string) error {
	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer cancel()

	config, err := apiClient.GetConfiguration()
	if err != nil {
		return fmt.Errorf("failed to get SDK configuration: %w", err)
	}

	api := platformclientv2.NewEventstreamsApi(apiClient)
	resp, httpResp, err := api.GetEventstreamsSubscription(ctx, subscriptionID)
	if httpResp != nil {
		defer httpResp.Body.Close()
	}

	if err != nil {
		return fmt.Errorf("subscription validation failed: %w", err)
	}

	if resp.StatusCode() != nil && *resp.StatusCode() != "ACTIVE" {
		return fmt.Errorf("subscription %s is not active: %s", subscriptionID, *resp.StatusCode())
	}

	return nil
}

The SDK call api.GetEventstreamsSubscription maps directly to GET /api/v2/eventstreams/subscriptions/{id}. The response includes subscription status, target URL, and filtering rules. Validation prevents processing dead-letters from deleted or disabled subscriptions.

Step 2: Re-submit reconstructed payloads with incremented attempt counters

After extraction, the consumer POSTs the original payload to the target endpoint. The attempt counter increments from the dead-letter retry count. The HTTP client implements exponential backoff for 429 responses and retries up to three times.

package main

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"net/http"
	"time"
)

// RetryConfig defines backoff parameters for 429 handling
type RetryConfig struct {
	MaxRetries    int
	BaseDelay     time.Duration
	MaxDelay      time.Duration
}

// ResubmitPayload posts the reconstructed event to the target API
// Returns the HTTP response status and body for logging
func ResubmitPayload(client *http.Client, targetURL string, payload []byte, attemptCount int, retryCfg RetryConfig) (int, []byte, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	currentAttempt := attemptCount
	delay := retryCfg.BaseDelay

	for i := 0; i <= retryCfg.MaxRetries; i++ {
		req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(payload))
		if err != nil {
			return 0, nil, fmt.Errorf("failed to create resubmit request: %w", err)
		}

		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("X-Attempt-Count", fmt.Sprintf("%d", currentAttempt))
		req.Header.Set("X-Resubmit-Source", "dead-letter-consumer")

		resp, err := client.Do(req)
		if err != nil {
			return 0, nil, fmt.Errorf("resubmit request failed: %w", err)
		}

		respBody, _ := io.ReadAll(resp.Body)
		resp.Body.Close()

		if resp.StatusCode == http.StatusTooManyRequests {
			if i == retryCfg.MaxRetries {
				return resp.StatusCode, respBody, fmt.Errorf("max retries exceeded after 429 responses")
			}
			currentAttempt++
			time.Sleep(delay)
			delay = min(delay * 2, retryCfg.MaxDelay)
			continue
		}

		if resp.StatusCode >= 500 {
			if i == retryCfg.MaxRetries {
				return resp.StatusCode, respBody, fmt.Errorf("server error after retries: %d", resp.StatusCode)
			}
			currentAttempt++
			time.Sleep(delay)
			delay = min(delay * 2, retryCfg.MaxDelay)
			continue
		}

		return resp.StatusCode, respBody, nil
	}

	return 0, nil, fmt.Errorf("unexpected retry loop termination")
}

func min(a, b time.Duration) time.Duration {
	if a < b {
		return a
	}
	return b
}

The X-Attempt-Count header allows downstream systems to track processing history. The retry loop handles 429 and 5xx responses with exponential backoff capped at MaxDelay. Successful responses (2xx) exit immediately. The function returns the final status code and response body for audit logging.

Step 3: Wire the HTTP handler and process events end-to-end

The HTTP handler ties authentication, parsing, validation, and resubmission together. It enforces request size limits, validates JSON structure, and logs outcomes for observability.

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"
)

type ConsumerConfig struct {
	OAuth        OAuthConfig
	TargetURL    string
	RetryConfig  RetryConfig
	MaxBodySize  int64
}

func DeadLetterHandler(cfg ConsumerConfig, tokenCache *TokenCache, sdkClient *platformclientv2.ApiClient) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
			return
		}

		r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodySize)
		body, err := io.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Failed to read request body", http.StatusBadRequest)
			return
		}

		var deadLetter DeadLetterEvent
		if err := json.Unmarshal(body, &deadLetter); err != nil {
			http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
			return
		}

		if deadLetter.EventType != "dead-letter" {
			http.Error(w, "Unsupported event type", http.StatusBadRequest)
			return
		}

		payload, err := ReconstructOriginalPayload(deadLetter)
		if err != nil {
			log.Printf("Payload reconstruction failed: %v", err)
			http.Error(w, "Internal processing error", http.StatusInternalServerError)
			return
		}

		// Refresh token if expired
		token, err := tokenCache.GetValidToken()
		if err != nil {
			newToken, fetchErr := FetchOAuthToken(cfg.OAuth)
			if fetchErr != nil {
				log.Printf("Token refresh failed: %v", fetchErr)
				http.Error(w, "Authentication failed", http.StatusUnauthorized)
				return
			}
			tokenCache.SetToken(newToken, 3600)
			token = newToken
		}

		// Validate subscription exists
		if err := ValidateSubscription(sdkClient, deadLetter.SubscriptionID); err != nil {
			log.Printf("Subscription validation failed: %v", err)
			http.Error(w, "Subscription invalid", http.StatusForbidden)
			return
		}

		httpClient := &http.Client{
			Timeout: 30 * time.Second,
			Transport: &http.Transport{
				MaxIdleConns:       10,
				IdleConnTimeout:    90 * time.Second,
			},
		}

		statusCode, respBody, err := ResubmitPayload(
			httpClient,
			cfg.TargetURL,
			payload,
			deadLetter.RetryCount+1,
			cfg.RetryConfig,
		)

		if err != nil {
			log.Printf("Resubmission failed [%d]: %v | Response: %s", statusCode, err, string(respBody))
			http.Error(w, "Resubmission failed", http.StatusBadGateway)
			return
		}

		log.Printf("Successfully resubmitted event %s | Attempt: %d | Status: %d",
			deadLetter.SubscriptionID, deadLetter.RetryCount+1, statusCode)
		w.WriteHeader(http.StatusOK)
		w.Write([]byte(`{"status":"processed","attempt":` + fmt.Sprintf("%d", deadLetter.RetryCount+1) + `}`))
	}
}

The handler enforces a MaxBodySize limit to prevent memory exhaustion. It refreshes the OAuth token before API calls. Validation ensures the subscription remains active. The resubmission result determines the HTTP response status. Successful processing returns 200 with a JSON acknowledgment.

Complete Working Example

package main

import (
	"log"
	"net/http"
	"os"
	"time"

	"github.com/mypurecloud/platform-client-sdk-go/v125/platformclientv2"
)

func main() {
	clientID := os.Getenv("GENESYS_CLIENT_ID")
	clientSecret := os.Getenv("GENESYS_CLIENT_SECRET")
	environment := os.Getenv("GENESYS_ENVIRONMENT")
	targetURL := os.Getenv("TARGET_API_URL")

	if clientID == "" || clientSecret == "" || targetURL == "" {
		log.Fatal("Required environment variables missing")
	}

	cfg := ConsumerConfig{
		OAuth: OAuthConfig{
			ClientID:     clientID,
			ClientSecret: clientSecret,
			Environment:  environment,
		},
		TargetURL: targetURL,
		RetryConfig: RetryConfig{
			MaxRetries: 3,
			BaseDelay:  1 * time.Second,
			MaxDelay:   30 * time.Second,
		},
		MaxBodySize: 1024 * 1024, // 1MB
	}

	tokenCache := NewTokenCache()

	// Initial token fetch
	initialToken, err := FetchOAuthToken(cfg.OAuth)
	if err != nil {
		log.Fatalf("Failed to fetch initial token: %v", err)
	}
	tokenCache.SetToken(initialToken, 3600)

	// Initialize SDK client
	config, err := platformclientv2.NewConfiguration()
	if err != nil {
		log.Fatalf("SDK configuration error: %v", err)
	}
	config.SetBasePath("https://api." + cfg.OAuth.Environment)
	config.SetAccessToken(initialToken)

	sdkClient, err := platformclientv2.NewApiClient(config)
	if err != nil {
		log.Fatalf("Failed to create SDK client: %v", err)
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/dead-letter", DeadLetterHandler(cfg, tokenCache, sdkClient))

	server := &http.Server{
		Addr:         ":8080",
		Handler:      mux,
		ReadTimeout:  15 * time.Second,
		WriteTimeout: 15 * time.Second,
		IdleTimeout:  60 * time.Second,
	}

	log.Printf("Dead-letter consumer listening on :8080/dead-letter")
	if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		log.Fatalf("Server failed: %v", err)
	}
}

Run with environment variables set. The server binds to port 8080 and accepts POST requests at /dead-letter. Configure your Genesys Cloud EventBridge subscription to point to https://your-host:8080/dead-letter.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: OAuth token expired, invalid client credentials, or missing eventstreams:read scope.
  • Fix: Verify environment variables contain valid credentials. Check the token cache expiration logic. Ensure the OAuth client in Genesys Cloud has the eventstreams:read and eventstreams:write scopes assigned.
  • Code fix: The DeadLetterHandler already refreshes tokens on expiration. Add logging around FetchOAuthToken to capture credential mismatches.

Error: 403 Forbidden

  • Cause: Subscription ID in the dead-letter payload does not exist, is disabled, or the OAuth client lacks permission to read it.
  • Fix: Validate the subscription ID matches an active EventBridge subscription. Confirm the OAuth client has eventstreams:read. Check subscription status via GET /api/v2/eventstreams/subscriptions/{id}.
  • Code fix: The ValidateSubscription function returns a descriptive error. Log the subscription ID and status code from the SDK response to identify disabled subscriptions.

Error: 429 Too Many Requests

  • Cause: Target API enforces rate limits, or Genesys Cloud API throttles subscription validation calls.
  • Fix: The ResubmitPayload function implements exponential backoff up to MaxRetries. Increase BaseDelay or MaxRetries if the target system has stricter limits. Monitor X-RateLimit-Reset headers if provided by the target API.
  • Code fix: Adjust RetryConfig values in main(). Add header inspection in ResubmitPayload to parse Retry-After if present.

Error: JSON Unmarshal Failure

  • Cause: Malformed dead-letter payload, missing originalEvent field, or nested JSON encoding issues.
  • Fix: Verify the EventBridge subscription sends raw JSON. Ensure the consumer reads the full body. Use json.RawMessage to preserve nested structure without double-encoding.
  • Code fix: The ReconstructOriginalPayload function validates JSON structure. Add a fallback logger that writes the raw payload to disk for manual inspection when unmarshaling fails.

Official References