Processing Genesys Cloud EventBridge Dead-Letter Queue Messages with Python
What You Will Build
A Python service that polls Genesys Cloud EventBridge failed events, deserializes payloads to extract error codes and stack traces, categorizes failures by root cause, retries transient errors with exponential backoff, archives persistent failures to S3, tracks processing status, generates health reports, and exposes a replay mechanism.
This tutorial uses the Genesys Cloud analytics:events and eventbridge API surfaces alongside the official genesyscloud-python SDK.
The implementation is written in Python 3.10+ using boto3 for archival storage and httpx for direct HTTP fallback operations.
Prerequisites
- OAuth 2.0 Client Credentials grant type
- Required scopes:
analytics:events:read,eventbridge:subscriptions:read,eventbridge:failed-events:read,eventbridge:replay:write - Genesys Cloud Python SDK version 135.0.0 or higher
- Python 3.10 runtime
- External dependencies:
genesyscloud,boto3,httpx,pydantic,tenacity - AWS credentials with
s3:PutObjectpermissions for archival storage
Authentication Setup
The Genesys Cloud Python SDK manages OAuth token lifecycles automatically. You initialize the ApiClient with your client credentials, and the SDK handles access token requests, caching, and silent refreshes before they expire. You never need to manually parse token responses.
import os
import logging
from genesyscloud import platformclientv2
from platformclientv2.api_client import ApiClient
from platformclientv2.rest import ApiException
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def initialize_genesys_client(client_id: str, client_secret: str) -> platformclientv2.AnalyticsEventsApi:
"""
Creates an authenticated AnalyticsEventsApi client.
The SDK caches the access token and refreshes it automatically.
"""
api_client = ApiClient()
api_client.configuration.host = "https://api.mypurecloud.com"
# Client credentials flow. The SDK stores the token internally.
api_client.configuration.access_token = api_client.client_credentials_access_token(
client_id=client_id,
client_secret=client_secret
)
return platformclientv2.AnalyticsEventsApi(api_client)
Implementation
Step 1: Retrieving Failed Events with Pagination
The /api/v2/analytics/event-bridge/failed-events/{subscriptionId} endpoint returns a paginated collection. You must handle the next_page cursor to process all queued failures. The SDK exposes get_analytics_eventbridge_failedevents which accepts limit and offset parameters.
from typing import List, Generator
from platformclientv2.model.eventbridge_failed_event import EventbridgeFailedEvent
def fetch_failed_events(
api: platformclientv2.AnalyticsEventsApi,
subscription_id: str,
limit: int = 100
) -> Generator[List[EventbridgeFailedEvent], None, None]:
"""
Paginates through all failed events for a subscription.
Yields batches of events to prevent memory exhaustion.
"""
offset = 0
while True:
try:
response = api.get_analytics_eventbridge_failedevents(
subscription_id=subscription_id,
limit=limit,
offset=offset
)
except ApiException as e:
if e.status == 429:
logging.warning("Rate limited on failed events query. Backing off.")
raise
raise
if not response.entities or len(response.entities) == 0:
break
yield response.entities
offset += limit
Step 2: Deserializing Payloads and Categorizing Root Causes
EventBridge failed events contain a failure_reason string, a failure_code identifier, and an optional stack_trace. You parse these fields to determine whether the failure is transient (network timeout, provider rate limit) or permanent (malformed payload, missing permissions). The SDK returns raw model objects that you convert to typed dictionaries for reliable analysis.
from enum import Enum
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field
class FailureCategory(Enum):
TRANSIENT = "transient"
PERMANENT = "permanent"
UNKNOWN = "unknown"
class ParsedFailure(BaseModel):
event_id: str
event_type: str
failure_code: Optional[str]
failure_reason: str
stack_trace: Optional[str]
category: FailureCategory
original_payload: Dict[str, Any]
@staticmethod
def categorize(failure_code: Optional[str], failure_reason: str) -> FailureCategory:
reason_lower = failure_reason.lower()
code_lower = (failure_code or "").lower()
transient_indicators = ["timeout", "rate limit", "retry", "503", "502", "network", "throttle"]
permanent_indicators = ["invalid", "malformed", "400", "401", "403", "permission", "schema", "missing"]
if any(ind in reason_lower or ind in code_lower for ind in transient_indicators):
return FailureCategory.TRANSIENT
elif any(ind in reason_lower or ind in code_lower for ind in permanent_indicators):
return FailureCategory.PERMANENT
return FailureCategory.UNKNOWN
@classmethod
def from_sdk_model(cls, event: EventbridgeFailedEvent) -> "ParsedFailure":
return cls(
event_id=event.id or "unknown",
event_type=event.event_type or "unknown",
failure_code=event.failure_code,
failure_reason=event.failure_reason or "No reason provided",
stack_trace=event.stack_trace,
category=cls.categorize(event.failure_code, event.failure_reason or ""),
original_payload=event.payload or {}
)
Step 3: Retry Logic with Exponential Backoff
Transient failures require automated retry attempts. You implement exponential backoff with jitter to prevent thundering herd problems. The tenacity library handles the retry loop cleanly, but you must catch 429 responses explicitly and apply the Retry-After header when available.
import time
import random
import httpx
from typing import Callable, Any
def retry_with_backoff(
func: Callable[..., Any],
max_attempts: int = 5,
base_delay: float = 2.0,
max_delay: float = 60.0,
jitter: bool = True
) -> Any:
"""
Executes a function with exponential backoff and optional jitter.
Handles 429 responses by parsing the Retry-After header.
"""
for attempt in range(max_attempts):
try:
return func()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = float(e.response.headers.get("Retry-After", base_delay))
delay = min(retry_after, max_delay)
logging.warning(f"Attempt {attempt + 1} hit 429. Waiting {delay}s")
time.sleep(delay)
continue
elif e.response.status_code >= 500:
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay += random.uniform(0, delay * 0.1)
logging.warning(f"Attempt {attempt + 1} hit {e.response.status_code}. Waiting {delay:.2f}s")
time.sleep(delay)
continue
else:
raise
except Exception as e:
delay = min(base_delay * (2 ** attempt), max_delay)
logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Waiting {delay:.2f}s")
time.sleep(delay)
raise RuntimeError("Max retry attempts exceeded")
Step 4: Routing Persistent Failures to Archival Storage
Permanent failures bypass retry logic. You serialize the full event context, error metadata, and categorization result into a JSON payload, then upload it to an S3 bucket for manual review. The archival path includes the event type and failure code for efficient downstream querying.
import json
import boto3
from datetime import datetime, timezone
from typing import Dict, Any
def archive_permanent_failure(s3_client: boto3.client, bucket_name: str, failure: ParsedFailure, metadata: Dict[str, Any]) -> str:
"""
Uploads a permanent failure to S3 with structured metadata.
Returns the S3 object key for audit tracking.
"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f")
safe_event_type = failure.event_type.replace("/", "_").replace(":", "_")
safe_code = (failure.failure_code or "unknown").replace("/", "_")
object_key = f"dlq/archives/{safe_event_type}/{safe_code}/{failure.event_id}-{timestamp}.json"
archival_payload = {
"event_id": failure.event_id,
"event_type": failure.event_type,
"failure_code": failure.failure_code,
"failure_reason": failure.failure_reason,
"stack_trace": failure.stack_trace,
"category": failure.category.value,
"original_payload": failure.original_payload,
"processing_metadata": metadata,
"archived_at": datetime.now(timezone.utc).isoformat()
}
s3_client.put_object(
Bucket=bucket_name,
Key=object_key,
Body=json.dumps(archival_payload, indent=2),
ContentType="application/json",
ServerSideEncryption="AES256"
)
return object_key
Step 5: Status Tracking, Health Reports, and Replay Mechanism
You maintain a local processing registry to track event states (pending, retried, archived, replayed). The health report aggregates success/failure ratios by category and event type. The replay mechanism calls /api/v2/analytics/event-bridge/replay with a batch of event IDs, which instructs Genesys Cloud to re-inject the payloads into the original subscription pipeline.
from typing import List, Dict, Any, Set
import json
import os
class DLQProcessor:
def __init__(
self,
api: platformclientv2.AnalyticsEventsApi,
s3_client: boto3.client,
archive_bucket: str,
state_file: str = "dlq_state.json"
):
self.api = api
self.s3_client = s3_client
self.archive_bucket = archive_bucket
self.state_file = state_file
self._load_state()
def _load_state(self) -> None:
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
self.state: Dict[str, Dict[str, Any]] = json.load(f)
else:
self.state = {}
def _save_state(self) -> None:
with open(self.state_file, "w") as f:
json.dump(self.state, f, indent=2)
def process_subscription(self, subscription_id: str) -> Dict[str, int]:
metrics = {"processed": 0, "archived": 0, "retried": 0, "failed": 0}
for event_batch in fetch_failed_events(self.api, subscription_id):
for event in event_batch:
parsed = ParsedFailure.from_sdk_model(event)
event_id = parsed.event_id
if event_id in self.state and self.state[event_id]["status"] == "archived":
continue
if parsed.category == FailureCategory.PERMANENT:
s3_key = archive_permanent_failure(
self.s3_client,
self.archive_bucket,
parsed,
{"processed_by": "dlq_worker", "attempt": 0}
)
self.state[event_id] = {"status": "archived", "s3_key": s3_key, "category": parsed.category.value}
metrics["archived"] += 1
elif parsed.category == FailureCategory.TRANSIENT:
try:
def retry_action():
return self._simulate_reprocess(parsed)
retry_with_backoff(retry_action, max_attempts=3)
self.state[event_id] = {"status": "processed", "category": parsed.category.value}
metrics["retried"] += 1
metrics["processed"] += 1
except Exception as e:
self.state[event_id] = {"status": "failed_retry", "error": str(e), "category": parsed.category.value}
metrics["failed"] += 1
else:
self.state[event_id] = {"status": "unknown_category", "category": parsed.category.value}
metrics["failed"] += 1
self._save_state()
return metrics
def _simulate_reprocess(self, failure: ParsedFailure) -> bool:
"""
Placeholder for actual business logic re-execution.
In production, this calls your internal service that originally failed.
"""
if "timeout" in failure.failure_reason.lower():
return True
raise RuntimeError("Reprocessing failed")
def generate_health_report(self) -> Dict[str, Any]:
total = len(self.state)
statuses = {}
categories = {}
for evt_id, data in self.state.items():
status = data.get("status", "unknown")
cat = data.get("category", "unknown")
statuses[status] = statuses.get(status, 0) + 1
categories[cat] = categories.get(cat, 0) + 1
return {
"total_tracked": total,
"status_distribution": statuses,
"category_distribution": categories,
"success_rate": (statuses.get("processed", 0) / max(total, 1)) * 100,
"generated_at": datetime.now(timezone.utc).isoformat()
}
def replay_events(self, subscription_id: str, event_ids: List[str]) -> bool:
"""
Calls the Genesys Cloud replay endpoint to reprocess specific events.
"""
replay_request = {
"subscriptionId": subscription_id,
"eventIds": event_ids
}
try:
self.api.post_analytics_eventbridge_replay(body=replay_request)
for eid in event_ids:
if eid in self.state:
self.state[eid]["status"] = "replayed"
self._save_state()
return True
except ApiException as e:
logging.error(f"Replay failed: {e.reason}")
return False
Complete Working Example
import os
import logging
from genesyscloud import platformclientv2
from platformclientv2.api_client import ApiClient
from platformclientv2.rest import ApiException
import boto3
from typing import Generator, List
from platformclientv2.model.eventbridge_failed_event import EventbridgeFailedEvent
# Import classes from Step 2-5
from dlq_processor_module import (
ParsedFailure,
DLQProcessor,
fetch_failed_events,
retry_with_backoff,
archive_permanent_failure
)
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
subscription_id = os.getenv("GENESYS_SUBSCRIPTION_ID")
aws_region = os.getenv("AWS_REGION", "us-east-1")
archive_bucket = os.getenv("S3_ARCHIVE_BUCKET", "genesys-dlq-archives")
if not all([client_id, client_secret, subscription_id]):
raise ValueError("Missing required environment variables")
api_client = ApiClient()
api_client.configuration.host = "https://api.mypurecloud.com"
api_client.configuration.access_token = api_client.client_credentials_access_token(client_id, client_secret)
events_api = platformclientv2.AnalyticsEventsApi(api_client)
s3_client = boto3.client("s3", region_name=aws_region)
processor = DLQProcessor(
api=events_api,
s3_client=s3_client,
archive_bucket=archive_bucket,
state_file="dlq_state.json"
)
logging.info("Starting DLQ processing pipeline")
metrics = processor.process_subscription(subscription_id)
logging.info(f"Processing complete. Metrics: {metrics}")
report = processor.generate_health_report()
logging.info(f"Health Report: {report}")
# Example replay of failed retries
failed_ids = [eid for eid, data in processor.state.items() if data.get("status") == "failed_retry"]
if failed_ids:
logging.info(f"Replaying {len(failed_ids)} failed events")
processor.replay_events(subscription_id, failed_ids)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
- Cause: The OAuth client credentials lack the required
eventbridge:failed-events:readoreventbridge:replay:writescopes. The SDK returns a 401 when the token expires and fails to refresh due to invalid credentials. - Fix: Verify the client credentials in the Genesys Cloud admin console under Organization Settings > OAuth. Ensure the exact scopes are attached to the client. Restart the process to force a fresh token request.
- Code Check:
try:
api.get_analytics_eventbridge_failedevents(subscription_id=subscription_id, limit=1)
except ApiException as e:
if e.status in (401, 403):
logging.error("Credential or scope mismatch. Verify OAuth configuration.")
raise
Error: 429 Too Many Requests
- Cause: Genesys Cloud enforces strict rate limits on analytics endpoints. Polling failed events too frequently or replaying large batches triggers throttling.
- Fix: Implement the exponential backoff shown in Step 3. Respect the
Retry-Afterheader. Space replay calls to maximum 100 event IDs per request. - Code Check:
import httpx
# The retry_with_backoff function already parses Retry-After headers.
# Ensure you never call replay_events in a tight loop without delays.
Error: 400 Bad Request on Replay
- Cause: The
eventIdsarray contains duplicates, references events outside the specifiedsubscriptionId, or exceeds the 100-item limit. - Fix: Deduplicate IDs before sending. Validate subscription ownership. Chunk large lists.
- Code Check:
unique_ids = list(set(event_ids))
if len(unique_ids) > 100:
logging.warning("Chunking replay request to 100 IDs per batch")
for i in range(0, len(unique_ids), 100):
processor.replay_events(subscription_id, unique_ids[i:i+100])
Error: Malformed JSON Payload in Archival
- Cause: The original EventBridge payload contains binary data or unescaped control characters that break JSON serialization during S3 upload.
- Fix: Preprocess payloads with
json.dumps(..., ensure_ascii=True)or base64 encode binary fields before archival. - Code Check:
import base64
if isinstance(payload, bytes):
archival_payload["original_payload"] = base64.b64encode(payload).decode("utf-8")