Handling 5xx Webhook Failures with a Dead Letter Queue in Genesys Cloud
What You Will Build
- A Python microservice that ingests Genesys Cloud webhooks, handles transient 5xx failures from your downstream system, and persists failed payloads to a Dead Letter Queue (DLQ) for later retry or analysis.
- This implementation uses the Genesys Cloud Webhooks API to configure the endpoint and the Python
requestslibrary to handle the ingestion logic. - The tutorial covers Python 3.9+ with
fastapifor the webhook receiver andboto3for Amazon SQS as the DLQ provider.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant) for API configuration; Public or Confidential for testing.
- Required Scopes:
webhooks:webhook:create(to register the webhook)webhooks:webhook:view(to verify configuration)
- SDK/API Version: Genesys Cloud API v2.
- Language/Runtime: Python 3.9+.
- External Dependencies:
fastapi(Web framework)uvicorn(ASGI server)boto3(AWS SDK for SQS DLQ)pydantic(Data validation)requests(HTTP client for internal retries)tenacity(Retry logic library)
Authentication Setup
To interact with the Genesys Cloud API for configuration, you must implement the Client Credentials Grant flow. This flow is suitable for server-to-server communication where no user context is required.
Python OAuth Token Helper
This helper handles token acquisition and caching to avoid unnecessary requests to the authorization server.
import requests
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.token: Optional[str] = None
self.token_expiry: float = 0
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry:
return self.token
auth_url = f"{self.base_url}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
# Subtract 60 seconds to provide a buffer for expiration
self.token_expiry = time.time() + token_data["expires_in"] - 60
return self.token
except requests.exceptions.HTTPError as e:
raise Exception(f"Failed to acquire OAuth token: {e.response.text}") from e
except Exception as e:
raise Exception(f"Unexpected error during OAuth token acquisition: {str(e)}") from e
Implementation
Step 1: Configure the Genesys Cloud Webhook
Before building the receiver, you must register a webhook in Genesys Cloud. This example registers a webhook that triggers on conversation:analytics:summary events.
Required Scope: webhooks:webhook:create
import json
from purecloudplatformclientv2 import WebhooksApi, WebhookCreateRequest, PlatformClientConfiguration
def register_webhook(auth: GenesysAuth):
"""
Registers a new webhook in Genesys Cloud using the official SDK.
"""
# Initialize the SDK client
config = PlatformClientConfiguration()
config.host = "api.mypurecloud.com"
config.access_token = auth.get_token()
webhooks_api = WebhooksApi(config)
# Define the webhook configuration
webhook_body = WebhookCreateRequest(
name="DLQ Webhook Receiver",
description="Webhook for testing 5xx handling and DLQ",
enabled=True,
event="conversation:analytics:summary",
url="https://your-domain.com/webhooks/genesys", # Your FastAPI endpoint
# Optional: Add retry settings if supported by your specific use case,
# but typically you handle retries server-side for granular control.
)
try:
response = webhooks_api.post_webhooks(body=webhook_body)
print(f"Webhook created successfully. ID: {response.id}")
return response.id
except Exception as e:
print(f"Error creating webhook: {e}")
raise e
Step 2: Build the Webhook Receiver with DLQ Logic
The core of this tutorial is the FastAPI application. It must:
- Accept the webhook payload.
- Attempt to process it (simulated here as an HTTP POST to a downstream service).
- If the downstream service returns a 5xx error, retry immediately a few times.
- If all retries fail, push the payload to an SQS Dead Letter Queue.
Note: Genesys Cloud expects a 200 OK response within 15 seconds. If you do not return 200, Genesys will retry the webhook. For this DLQ pattern, we want to acknowledge receipt immediately (200) even if downstream processing fails, shifting the burden of reliability to our internal DLQ system.
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import boto3
import logging
from typing import Any
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests as http_requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Genesys Webhook DLQ Handler")
# Initialize AWS SQS Client (DLQ)
sqs_client = boto3.client('sqs', region_name='us-east-1')
DLQ_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/genesys-webhook-dlq"
# Downstream service URL for simulation
DOWNSTREAM_URL = "https://your-downstream-api.com/process-conversation"
@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
"""
Receives webhook from Genesys Cloud.
Returns 200 immediately to acknowledge receipt, then processes asynchronously.
"""
try:
body = await request.json()
# 1. Acknowledge receipt to Genesys Cloud immediately
# This prevents Genesys from retrying this specific webhook delivery.
# The reliability is now handled by our DLQ system.
# 2. Process asynchronously (in a real app, use asyncio.create_task or a message queue)
# For this tutorial, we simulate synchronous processing for clarity.
process_payload(body)
return JSONResponse(content={"status": "accepted"}, status_code=200)
except Exception as e:
logger.error(f"Failed to parse request: {e}")
return JSONResponse(content={"error": "Internal Server Error"}, status_code=500)
def process_payload(payload: dict):
"""
Attempts to send payload to downstream service.
Implements retry logic and DLQ fallback.
"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
# Define retry strategy:
# - Retry only on 5xx errors or connection errors
# - Exponential backoff (1s, 2s, 4s)
# - Max 3 attempts
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((http_requests.exceptions.ConnectionError, http_requests.exceptions.Timeout))
)
def send_to_downstream():
response = http_requests.post(
DOWNSTREAM_URL,
json=payload,
headers=headers,
timeout=5
)
# Check for 5xx errors manually for retry logic if tenacity doesn't catch them
if response.status_code >= 500:
# Raise an exception to trigger tenacity retry
raise http_requests.exceptions.HTTPError(f"Server Error: {response.status_code}")
# If 4xx error, do not retry (bad request, auth error, etc.)
response.raise_for_status()
return response.json()
try:
result = send_to_downstream()
logger.info(f"Successfully processed webhook for conversation {payload.get('conversationId', 'unknown')}")
except Exception as e:
logger.error(f"All retries failed for webhook. Sending to DLQ. Error: {e}")
send_to_dlq(payload, str(e))
def send_to_dlq(payload: dict, error_message: str):
"""
Sends the failed payload and error context to the SQS Dead Letter Queue.
"""
dlq_message = {
"original_payload": payload,
"error": error_message,
"timestamp": __import__('datetime').datetime.utcnow().isoformat()
}
try:
sqs_client.send_message(
QueueUrl=DLQ_URL,
MessageBody=__import__('json').dumps(dlq_message)
)
logger.info("Failed webhook payload successfully sent to DLQ.")
except Exception as e:
logger.critical(f"CRITICAL: Failed to send message to DLQ. Data loss risk. Error: {e}")
# In production, you might alert here (PagerDuty, Slack, etc.)
Step 3: Processing Results from the DLQ
The DLQ is not a black hole. You need a mechanism to inspect and retry failed messages. This script polls the SQS queue, processes the messages, and deletes them upon success.
import json
import boto3
import requests as http_requests
import time
class DLQProcessor:
def __init__(self, dlq_url: str, downstream_url: str):
self.sqs_client = boto3.client('sqs', region_name='us-east-1')
self.dlq_url = dlq_url
self.downstream_url = downstream_url
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
def process_queue(self, max_iterations: int = 10):
"""
Polls the DLQ and attempts to process messages.
"""
iteration = 0
while iteration < max_iterations:
iteration += 1
# Poll for messages
response = self.sqs_client.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5 # Long polling
)
messages = response.get('Messages', [])
if not messages:
logger.info("No messages in DLQ. Exiting.")
break
for message in messages:
receipt_handle = message['ReceiptHandle']
body = json.loads(message['Body'])
logger.info(f"Processing DLQ message: {body.get('timestamp')}")
try:
# Retry logic for DLQ processing
self._retry_downstream(body['original_payload'])
# Success: Delete from queue
self.sqs_client.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=receipt_handle
)
logger.info("Message processed and deleted from DLQ.")
except Exception as e:
logger.error(f"Failed to process DLQ message: {e}")
# Optional: Re-queue or move to a secondary error queue
# For now, we leave it in the queue for the next poll cycle
logger.info("DLQ Processing cycle complete.")
def _retry_downstream(self, payload: dict):
"""
Attempts to send payload to downstream service with retries.
"""
for attempt in range(3):
try:
response = http_requests.post(
self.downstream_url,
json=payload,
headers=self.headers,
timeout=5
)
response.raise_for_status()
return response.json()
except http_requests.exceptions.HTTPError as e:
if response.status_code >= 500:
logger.warning(f"Attempt {attempt + 1} failed with 5xx. Retrying...")
time.sleep(2 ** attempt) # Exponential backoff
else:
raise e
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
raise Exception("Max retries exceeded for DLQ message.")
# Usage
if __name__ == "__main__":
processor = DLQProcessor(DLQ_URL, DOWNSTREAM_URL)
processor.process_queue()
Complete Working Example
Below is the consolidated main.py file for the FastAPI webhook receiver.
import json
import logging
import time
from typing import Optional
from datetime import datetime
import boto3
import requests
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# --- Configuration ---
GENESYS_CLIENT_ID = "YOUR_CLIENT_ID"
GENESYS_CLIENT_SECRET = "YOUR_CLIENT_SECRET"
DLQ_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/genesys-webhook-dlq"
DOWNSTREAM_URL = "https://your-downstream-api.com/process-conversation"
AWS_REGION = "us-east-1"
# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- AWS SQS Client ---
sqs_client = boto3.client('sqs', region_name=AWS_REGION)
# --- Genesys Auth Helper ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry:
return self.token
auth_url = "https://api.mypurecloud.com/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"] - 60
return self.token
# --- FastAPI App ---
app = FastAPI(title="Genesys Webhook DLQ Handler")
@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
try:
body = await request.json()
# Acknowledge receipt immediately
# Genesys Cloud expects 200 OK within 15s.
# We return 200 to stop Genesys retries, shifting reliability to our DLQ.
# Process in background or synchronously for this example
process_payload(body)
return JSONResponse(content={"status": "accepted"}, status_code=200)
except Exception as e:
logger.error(f"Failed to parse request: {e}")
return JSONResponse(content={"error": "Internal Server Error"}, status_code=500)
def process_payload(payload: dict):
headers = {"Content-Type": "application/json", "Accept": "application/json"}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((requests.exceptions.ConnectionError, requests.exceptions.Timeout))
)
def send_to_downstream():
response = requests.post(
DOWNSTREAM_URL,
json=payload,
headers=headers,
timeout=5
)
if response.status_code >= 500:
raise requests.exceptions.HTTPError(f"Server Error: {response.status_code}")
response.raise_for_status()
return response.json()
try:
send_to_downstream()
logger.info(f"Successfully processed webhook for conversation {payload.get('conversationId', 'unknown')}")
except Exception as e:
logger.error(f"All retries failed. Sending to DLQ. Error: {e}")
send_to_dlq(payload, str(e))
def send_to_dlq(payload: dict, error_message: str):
dlq_message = {
"original_payload": payload,
"error": error_message,
"timestamp": datetime.utcnow().isoformat()
}
try:
sqs_client.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps(dlq_message)
)
logger.info("Failed webhook payload sent to DLQ.")
except Exception as e:
logger.critical(f"CRITICAL: Failed to send message to DLQ. Error: {e}")
# --- Main Entry Point for Testing ---
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 429 Too Many Requests
- Cause: Genesys Cloud rate limits webhook registrations or your downstream service is overwhelmed.
- Fix: Implement exponential backoff in your retry logic. For Genesys API calls, check the
Retry-Afterheader. - Code: The
tenacitylibrary in the example handles backoff. Ensure your downstream service also respects rate limits.
Error: 401 Unauthorized
- Cause: OAuth token expired or invalid client credentials.
- Fix: Ensure the
GenesysAuthclass is used to fetch a fresh token before API calls. Check that the client ID and secret are correct. - Code: Verify the
get_tokenmethod is called beforepost_webhooks.
Error: 500 Internal Server Error (Genesys Webhook)
- Cause: Your endpoint returned a non-2xx status or timed out.
- Fix: Ensure your FastAPI endpoint returns
200 OKquickly. Do not block the request thread on long-running downstream processing. Use background tasks or message queues. - Code: The
receive_webhookfunction returns200immediately after callingprocess_payload. Ifprocess_payloadis slow, move it to a background task usingasyncio.create_task.
Error: SQS Queue Not Found
- Cause: Incorrect
DLQ_URLor AWS credentials not configured. - Fix: Verify the SQS queue exists in the specified region. Ensure the IAM role or user has
sqs:SendMessagepermissions. - Code: Check the
DLQ_URLvariable in the configuration section.