Implementing a Dead Letter Queue for NICE CXone Data Actions with Go and S3

Implementing a Dead Letter Queue for NICE CXone Data Actions with Go and S3

What You Will Build

  • A Go microservice that ingests NICE CXone Data Action webhooks, routes processing failures to an Amazon S3 dead letter queue with enriched debugging metadata, and exposes a paginated REST endpoint to manually retry failed events after schema corrections.
  • This implementation uses the NICE CXone REST API for OAuth authentication and event validation, alongside the AWS SDK for Go v2 for S3 object storage.
  • The tutorial covers Go 1.21+ with standard library HTTP servers, gorilla/mux for routing, and aws-sdk-go-v2 for cloud storage.

Prerequisites

  • NICE CXone OAuth Client Credentials (Client ID and Client Secret) with data-actions:read and data-actions:write scopes
  • AWS IAM user with s3:PutObject, s3:GetObject, s3:DeleteObject, and s3:ListBucket permissions
  • Go 1.21 or higher installed
  • External dependencies: github.com/aws/aws-sdk-go-v2/config, github.com/aws/aws-sdk-go-v2/service/s3, github.com/gorilla/mux, github.com/google/uuid
  • S3 bucket created with versioning enabled (recommended for DLQs)

Authentication Setup

NICE CXone requires OAuth 2.0 Client Credentials flow for programmatic access. The following code demonstrates token acquisition, caching, and automatic refresh logic. The token is cached in memory and refreshed before expiration to prevent 401 errors during high-throughput processing.

package auth

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"time"
)

type CxoneToken struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int64  `json:"expires_in"`
	RefreshedAt time.Time
}

var (
	tokenCache      *CxoneToken
	tokenMutex      sync.Mutex
	cxoneBaseURL    string
	cxoneClientID   string
	cxoneClientSecret string
)

func InitOAuth(baseURL, clientID, clientSecret string) {
	cxoneBaseURL = baseURL
	cxoneClientID = clientID
	cxoneClientSecret = clientSecret
}

func GetValidToken(ctx context.Context) (string, error) {
	tokenMutex.Lock()
	defer tokenMutex.Unlock()

	if tokenCache != nil && time.Since(tokenCache.RefreshedAt) < time.Duration(tokenCache.ExpiresIn-30)*time.Second {
		return tokenCache.AccessToken, nil
	}

	payload := fmt.Sprintf("grant_type=client_credentials&client_id=%s&client_secret=%s&scope=data-actions:read data-actions:write",
		cxoneClientID, cxoneClientSecret)

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, cxoneBaseURL+"/api/v2/oauth/token", nil)
	if err != nil {
		return "", fmt.Errorf("failed to create oauth request: %w", err)
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
	req.Header.Set("Accept", "application/json")

	client := &http.Client{Timeout: 10 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		return "", fmt.Errorf("oauth request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("oauth authentication failed with status %d", resp.StatusCode)
	}

	var tokenResp CxoneToken
	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
		return "", fmt.Errorf("failed to decode oauth response: %w", err)
	}

	tokenCache = &tokenResp{
		AccessToken: tokenResp.AccessToken,
		ExpiresIn:   tokenResp.ExpiresIn,
		RefreshedAt: time.Now(),
	}

	return tokenCache.AccessToken, nil
}

Required OAuth Scope: data-actions:read data-actions:write
Endpoint: POST /api/v2/oauth/token
Error Handling: The function returns wrapped errors for network failures, non-200 status codes, and JSON decoding issues. Token caching reduces authentication overhead and prevents rate limiting on the OAuth endpoint.

Implementation

Step 1: Ingest CXone Data Action Webhooks and Simulate Processing

CXone delivers Data Action events to a configured webhook URL. The consumer must validate the payload, attempt business logic, and route failures to the DLQ. The following handler demonstrates webhook ingestion, schema validation, and processing simulation.

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"time"
)

type CxoneEvent struct {
	EventID   string `json:"event_id"`
	TenantID  string `json:"tenant_id"`
	Timestamp string `json:"timestamp"`
	Payload   map[string]interface{} `json:"payload"`
}

type DLQPayload struct {
	OriginalEvent CxoneEvent      `json:"original_event"`
	Error         string          `json:"error"`
	TraceID       string          `json:"trace_id"`
	FailedAt      time.Time       `json:"failed_at"`
	RetryCount    int             `json:"retry_count"`
	SchemaVersion string          `json:"schema_version"`
}

func WebhookHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var event CxoneEvent
	if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
		log.Printf("Invalid CXone webhook payload: %v", err)
		http.Error(w, "Invalid payload", http.StatusBadRequest)
		return
	}

	// Simulate processing that may fail due to schema changes or missing fields
	if err := processEvent(event); err != nil {
		dlqItem := DLQPayload{
			OriginalEvent: event,
			Error:         err.Error(),
			TraceID:       generateTraceID(),
			FailedAt:      time.Now(),
			RetryCount:    0,
			SchemaVersion: "v2.1",
		}
		
		if pushErr := pushToDLQ(dlqItem); pushErr != nil {
			log.Printf("Critical: Failed to push event %s to DLQ: %v", event.EventID, pushErr)
			http.Error(w, "Internal server error", http.StatusInternalServerError)
			return
		}
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "accepted"})
}

func processEvent(event CxoneEvent) error {
	// Example: Validate required field exists in payload
	if _, exists := event.Payload["interaction_id"]; !exists {
		return fmt.Errorf("missing required field: interaction_id in event %s", event.EventID)
	}
	return nil
}

func generateTraceID() string {
	return fmt.Sprintf("DLQ-%d", time.Now().UnixNano())
}

Expected Response: {"status": "accepted"}
Error Handling: Invalid JSON returns 400. Processing failures trigger DLQ insertion. DLQ push failures return 500 to ensure CXone retries the webhook delivery.
OAuth Scope: None required for webhook ingestion (CXone pushes to your endpoint).

Step 2: Enrich Failures and Push to S3 with Partitioned Keys

The dead letter queue stores enriched payloads in S3 using time-partitioned keys for efficient querying and lifecycle management. The partition structure follows dlq/year=YYYY/month=MM/day=DD/hour=HH/{trace_id}.json.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/s3"
)

var s3Client *s3.Client
var dlqBucket string

func InitS3(ctx context.Context) error {
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("us-east-1"))
	if err != nil {
		return fmt.Errorf("failed to load AWS config: %w", err)
	}
	s3Client = s3.NewFromConfig(cfg)
	dlqBucket = "cxone-dlq-events"
	return nil
}

func pushToDLQ(payload DLQPayload) error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	key := fmt.Sprintf("dlq/year=%d/month=%02d/day=%02d/hour=%02d/%s.json",
		payload.FailedAt.Year(),
		payload.FailedAt.Month(),
		payload.FailedAt.Day(),
		payload.FailedAt.Hour(),
		payload.TraceID)

	jsonData, err := json.MarshalIndent(payload, "", "  ")
	if err != nil {
		return fmt.Errorf("failed to marshal DLQ payload: %w", err)
	}

	_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
		Bucket:        aws.String(dlqBucket),
		Key:           aws.String(key),
		Body:          bytes.NewReader(jsonData),
		ContentType:   aws.String("application/json"),
		Metadata: map[string]string{
			"event_id":    payload.OriginalEvent.EventID,
			"schema_ver":  payload.SchemaVersion,
			"retry_count": fmt.Sprintf("%d", payload.RetryCount),
		},
	})
	if err != nil {
		return fmt.Errorf("failed to upload to S3: %w", err)
	}

	log.Printf("Event %s successfully routed to DLQ at %s", payload.OriginalEvent.EventID, key)
	return nil
}

S3 Key Structure: dlq/year=2024/month=10/day=23/hour=14/DLQ-1729712345678.json
Error Handling: Context timeout prevents hanging AWS calls. Marshal failures and S3 upload errors are wrapped and returned to the caller.
Metadata: S3 object metadata stores event ID and schema version for efficient filtering during retry operations.

Step 3: Expose a Paginated Retry API for Manual Reprocessing

Administrators or automated scripts use the retry endpoint to fetch failed events, validate schema corrections, and reprocess them. The endpoint supports pagination using next_token and implements exponential backoff for 429 rate limits when calling CXone APIs.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"time"

	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go-v2/service/s3/types"
	"github.com/gorilla/mux"
)

type RetryRequest struct {
	NextToken string `json:"next_token"`
	Limit     int    `json:"limit"`
}

type RetryResponse struct {
	Processed []string `json:"processed"`
	Failed    []string `json:"failed"`
	NextToken string   `json:"next_token"`
}

func RetryDLQEvents(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var req RetryRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	if req.Limit <= 0 || req.Limit > 100 {
		req.Limit = 20
	}

	ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
	defer cancel()

	input := &s3.ListObjectsV2Input{
		Bucket: aws.String(dlqBucket),
		Prefix: aws.String("dlq/"),
		MaxKeys: aws.Int32(int32(req.Limit)),
	}
	if req.NextToken != "" {
		input.ContinuationToken = aws.String(req.NextToken)
	}

	output, err := s3Client.ListObjectsV2(ctx, input)
	if err != nil {
		log.Printf("Failed to list DLQ objects: %v", err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}

	var processed, failed []string
	for _, obj := range output.Contents {
		// Fetch and reprocess each DLQ item
		if err := fetchAndRetryEvent(ctx, *obj.Key); err != nil {
			failed = append(failed, *obj.Key)
			log.Printf("Retry failed for %s: %v", *obj.Key, err)
		} else {
			processed = append(processed, *obj.Key)
		}
	}

	resp := RetryResponse{
		Processed: processed,
		Failed:    failed,
		NextToken: aws.ToString(output.NextContinuationToken),
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(resp)
}

func fetchAndRetryEvent(ctx context.Context, s3Key string) error {
	// Retrieve object from S3
	output, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(dlqBucket),
		Key:    aws.String(s3Key),
	})
	if err != nil {
		return fmt.Errorf("failed to fetch DLQ object: %w", err)
	}
	defer output.Body.Close()

	var dlqItem DLQPayload
	if err := json.NewDecoder(output.Body).Decode(&dlqItem); err != nil {
		return fmt.Errorf("failed to decode DLQ payload: %w", err)
	}

	// Increment retry count and attempt processing
	dlqItem.RetryCount++
	dlqItem.FailedAt = time.Now()

	if err := processEvent(dlqItem.OriginalEvent); err != nil {
		// Still failing, update DLQ with new retry count
		dlqItem.Error = fmt.Sprintf("retry %d failed: %v", dlqItem.RetryCount, err)
		if pushErr := pushToDLQ(dlqItem); pushErr != nil {
			return fmt.Errorf("failed to update DLQ after retry: %w", pushErr)
		}
		return fmt.Errorf("processing still failed after retry")
	}

	// Success: delete from S3
	_, err = s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
		Bucket: aws.String(dlqBucket),
		Key:    aws.String(s3Key),
	})
	if err != nil {
		return fmt.Errorf("failed to delete processed DLQ object: %w", err)
	}

	log.Printf("Successfully reprocessed event %s after %d attempts", dlqItem.OriginalEvent.EventID, dlqItem.RetryCount)
	return nil
}

Pagination: Uses ContinuationToken from ListObjectsV2 response to handle large DLQ volumes.
429 Retry Logic: The fetchAndRetryEvent function assumes CXone API calls may occur during processing. In production, wrap CXone HTTP calls with a retry client. The following snippet shows the retry wrapper used inside processEvent or external API calls:

func callCxoneAPIWithRetry(ctx context.Context, url string, token string) (*http.Response, error) {
	client := &http.Client{Timeout: 15 * time.Second}
	var resp *http.Response
	var err error
	maxRetries := 3

	for attempt := 0; attempt <= maxRetries; attempt++ {
		req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
		req.Header.Set("Authorization", "Bearer "+token)
		req.Header.Set("Accept", "application/json")

		resp, err = client.Do(req)
		if err != nil {
			return nil, fmt.Errorf("network error: %w", err)
		}

		if resp.StatusCode != http.StatusTooManyRequests {
			return resp, nil
		}

		// Exponential backoff: 2^attempt seconds
		backoff := time.Duration(1<<attempt) * time.Second
		log.Printf("Received 429 from CXone. Retrying in %v", backoff)
		time.Sleep(backoff)
		resp.Body.Close()
	}

	return nil, fmt.Errorf("max retries exceeded for CXone API")
}

OAuth Scope: data-actions:read (for validation), data-actions:write (for acknowledging retries)
Error Handling: S3 fetch failures, JSON decode errors, and processing failures are tracked. Successful retries delete the S3 object. Failed retries update the DLQ with incremented retry_count.

Complete Working Example

The following module combines authentication, S3 initialization, webhook ingestion, and the retry API into a single runnable service.

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"sync"

	"github.com/gorilla/mux"
)

var tokenMutex sync.Mutex

func main() {
	ctx := context.Background()

	// Initialize OAuth
	auth.InitOAuth(
		os.Getenv("CXONE_BASE_URL"),
		os.Getenv("CXONE_CLIENT_ID"),
		os.Getenv("CXONE_CLIENT_SECRET"),
	)

	// Initialize S3
	if err := InitS3(ctx); err != nil {
		log.Fatalf("Failed to initialize S3: %v", err)
	}

	router := mux.NewRouter()
	router.HandleFunc("/webhook/cxone-data-actions", WebhookHandler).Methods(http.MethodPost)
	router.HandleFunc("/api/v1/dlq/retry", RetryDLQEvents).Methods(http.MethodPost)

	log.Println("DLQ Consumer listening on :8080")
	if err := http.ListenAndServe(":8080", router); err != nil {
		log.Fatalf("Server failed: %v", err)
	}
}

Run Instructions:

  1. Set environment variables: CXONE_BASE_URL, CXONE_CLIENT_ID, CXONE_CLIENT_SECRET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
  2. Execute: go run main.go
  3. Test webhook ingestion: curl -X POST http://localhost:8080/webhook/cxone-data-actions -H "Content-Type: application/json" -d '{"event_id":"evt_123","tenant_id":"t_456","timestamp":"2024-10-23T14:00:00Z","payload":{}}'
  4. Test retry API: curl -X POST http://localhost:8080/api/v1/dlq/retry -H "Content-Type: application/json" -d '{"limit":10}'

Common Errors & Debugging

Error: 401 Unauthorized from CXone OAuth Endpoint

  • Cause: Invalid client credentials, expired tokens, or missing data-actions scopes.
  • Fix: Verify Client ID and Secret match the CXone Developer Portal. Ensure the OAuth client has data-actions:read and data-actions:write scopes assigned. Check the token cache expiration logic to prevent using stale tokens.
  • Code Fix: The GetValidToken function automatically refreshes tokens 30 seconds before expiration. If 401 persists, force cache invalidation by setting tokenCache = nil.

Error: 403 Forbidden on S3 PutObject

  • Cause: IAM policy lacks s3:PutObject permission, or bucket policy blocks the request.
  • Fix: Attach the AWS managed policy AmazonS3FullAccess or a custom policy granting s3:PutObject, s3:GetObject, s3:DeleteObject, and s3:ListBucket on the DLQ bucket. Verify the AWS region matches the bucket location.

Error: 429 Too Many Requests on CXone API Calls

  • Cause: Exceeding CXone rate limits during batch retries or token refreshes.
  • Fix: Implement exponential backoff as shown in callCxoneAPIWithRetry. Distribute retry operations over time using a scheduled worker instead of synchronous API calls. Monitor the Retry-After header if CXone returns it.

Error: JSON Unmarshal Failure on DLQ Payload

  • Cause: Schema evolution between ingestion and retry, or corrupted S3 objects.
  • Fix: Use SchemaVersion in the DLQ payload to route retries to version-specific processors. Add fallback decoding with json.Decoder.DisallowUnknownFields() disabled during migration periods. Validate S3 objects with checksums before processing.

Official References