Handling Webhook 5xx Failures with a Dead Letter Queue Retry Pattern
What You Will Build
- A Python service that intercepts failed Genesys Cloud CX webhooks (5xx errors) and stores them in a durable queue for later retry.
- A secondary worker that pulls items from the queue, retries the delivery to the target endpoint, and updates the Genesys Cloud webhook configuration upon success.
- Python 3.10+ with
httpxfor async HTTP requests andboto3for AWS SQS integration.
Prerequisites
- Genesys Cloud CX: A valid OAuth 2.0 Client ID and Secret with
webhooks:readandwebhooks:writescopes. - AWS Account: Access keys for an SQS queue (Standard or FIFO).
- Python Runtime: Python 3.10 or later.
- Dependencies:
httpx>=0.24.0: For async HTTP requests and retries.boto3>=1.28.0: For AWS SQS interaction.pydantic>=2.0.0: For data validation.python-dotenv>=1.0.0: For environment variable management.
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials Grant for server-to-server communication. You must obtain an access token before calling the Webhooks API. The token expires in 14 minutes (840 seconds), so your service must handle refresh logic or request a new token when the current one expires.
import httpx
import os
from datetime import datetime, timedelta
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.token_url = f"{self.base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: Optional[datetime] = None
async def get_token(self) -> str:
"""
Retrieves an OAuth2 access token.
Returns a cached token if valid, otherwise fetches a new one.
"""
if self.access_token and self.token_expiry and datetime.utcnow() < self.token_expiry:
return self.access_token
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
timeout=10.0
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
# Genesys tokens typically last 840 seconds.
# We subtract 30 seconds for buffer.
self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 30)
return self.access_token
except httpx.HTTPStatusError as e:
raise RuntimeError(f"Failed to obtain OAuth token: {e.response.status_code} - {e.response.text}") from e
except httpx.RequestError as e:
raise RuntimeError(f"Network error during OAuth request: {e}") from e
Implementation
Step 1: The Failure Interceptor (Webhook Receiver)
Genesys Cloud sends webhooks to your specified URL. If your endpoint returns a 5xx error, Genesys will retry up to 5 times with exponential backoff. If all retries fail, the event is lost unless you have a dead letter mechanism.
In this pattern, your receiving endpoint is designed to always return 200 OK to Genesys immediately, acknowledging receipt, even if the actual business logic fails. The actual processing happens asynchronously. If the business logic fails (e.g., downstream service is down), you push the payload to a Dead Letter Queue (DLQ) instead of dropping it.
Required Scope: webhooks:read (to verify webhook signature if enabled, though not strictly required for receiving).
import asyncio
import json
import logging
import uuid
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import boto3
from botocore.exceptions import ClientError
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Genesys Webhook DLQ Handler")
# SQS Client Setup
sqs_client = boto3.client('sqs', region_name=os.getenv("AWS_REGION"))
DLQ_URL = os.getenv("SQS_DLQ_URL")
@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
"""
Receives webhook from Genesys Cloud.
Always returns 200 to Genesys to stop retries.
Processes payload asynchronously.
"""
try:
body = await request.json()
except json.JSONDecodeError:
# Invalid JSON, return 400 to let Genesys retry?
# No, for DLQ pattern, we want to capture everything.
# But if we cannot parse it, we cannot retry later easily.
# Best practice: Return 400 for malformed requests so Genesys stops.
raise HTTPException(status_code=400, detail="Invalid JSON")
# Extract key metadata
webhook_id = body.get("id")
event_type = body.get("eventType")
timestamp = body.get("timestamp")
logger.info(f"Received webhook: ID={webhook_id}, Type={event_type}")
# Asynchronously process the webhook.
# We do not await here. This allows the HTTP response to return 200 immediately.
# This ensures Genesys considers the delivery successful.
asyncio.create_task(process_webhook_async(body, webhook_id, event_type))
return JSONResponse(content={"status": "received"}, status_code=200)
async def process_webhook_async(payload: dict, webhook_id: str, event_type: str):
"""
Performs the actual business logic.
If it fails, pushes to DLQ.
"""
try:
# Simulate business logic (e.g., sending to CRM, updating DB)
await simulate_business_logic(payload)
logger.info(f"Successfully processed webhook {webhook_id}")
except Exception as e:
logger.error(f"Business logic failed for webhook {webhook_id}: {str(e)}")
await push_to_dlq(payload, webhook_id, event_type, str(e))
async def simulate_business_logic(payload: dict):
"""
Placeholder for your actual integration logic.
Raise an exception to trigger DLQ push.
"""
# Example: Check if a downstream service is reachable
# if not await check_downstream_service():
# raise ConnectionError("Downstream service unavailable")
# For this tutorial, we simulate a 50% failure rate
import random
if random.random() < 0.5:
raise ConnectionError("Simulated downstream 5xx error")
async def push_to_dlq(payload: dict, webhook_id: str, event_type: str, error_reason: str):
"""
Pushes the failed webhook payload to the SQS Dead Letter Queue.
"""
dlq_message = {
"original_payload": payload,
"webhook_id": webhook_id,
"event_type": event_type,
"error_reason": error_reason,
"failed_at": datetime.utcnow().isoformat(),
"retry_count": 0
}
try:
sqs_client.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps(dlq_message)
)
logger.info(f"Pushed webhook {webhook_id} to DLQ")
except ClientError as e:
logger.critical(f"Failed to push to DLQ for webhook {webhook_id}: {e}")
# In a production system, you might want to log to a separate error sink here
Step 2: The Retry Worker
This service runs continuously, polling the SQS DLQ. It attempts to redeliver the webhook payload to the original target URL specified in the Genesys webhook configuration, or to a fallback URL.
Required Scope: webhooks:write (to update the webhook configuration if needed, or to verify the webhook exists). webhooks:read is required to fetch the current configuration.
First, we need a helper to fetch the webhook configuration from Genesys to know where to retry.
import httpx
import asyncio
class GenesysWebhookService:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = auth.base_url
async def get_webhook_config(self, webhook_id: str) -> dict:
"""
Fetches the webhook configuration from Genesys Cloud.
Scope: webhooks:read
"""
token = await self.auth.get_token()
url = f"{self.base_url}/api/v2/webhooks/{webhook_id}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url,
headers={"Authorization": f"Bearer {token}"},
timeout=10.0
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ValueError(f"Webhook {webhook_id} not found in Genesys")
raise RuntimeError(f"Failed to fetch webhook config: {e.response.text}") from e
async def update_webhook_status(self, webhook_id: str, active: bool) -> None:
"""
Updates the active status of a webhook.
Scope: webhooks:write
"""
token = await self.auth.get_token()
url = f"{self.base_url}/api/v2/webhooks/{webhook_id}"
# Fetch current config first to preserve other settings
current_config = await self.get_webhook_config(webhook_id)
current_config["active"] = active
async with httpx.AsyncClient() as client:
try:
response = await client.put(
url,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
json=current_config,
timeout=10.0
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
raise RuntimeError(f"Failed to update webhook status: {e.response.text}") from e
Now, the worker logic. It retrieves messages from SQS, attempts to resend the payload to the original target URL, and handles retries.
import time
import json
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 10
async def retry_worker(auth: GenesysAuth, sqs_client: boto3.client, dlq_url: str):
"""
Polls the DLQ and retries failed webhooks.
"""
webhook_service = GenesysWebhookService(auth)
logger.info("Starting retry worker...")
while True:
try:
# Poll SQS
response = sqs_client.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5
)
messages = response.get("Messages", [])
if not messages:
await asyncio.sleep(1)
continue
for msg in messages:
message_body = json.loads(msg["Body"])
receipt_handle = msg["ReceiptHandle"]
webhook_id = message_body["webhook_id"]
original_payload = message_body["original_payload"]
retry_count = message_body.get("retry_count", 0)
logger.info(f"Processing DLQ message for webhook {webhook_id}, retry count: {retry_count}")
if retry_count >= MAX_RETRIES:
logger.warning(f"Max retries reached for webhook {webhook_id}. Deleting from DLQ.")
sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
continue
# Get the target URL from Genesys Config
try:
config = await webhook_service.get_webhook_config(webhook_id)
target_url = config.get("targetUrl")
if not target_url:
raise ValueError("No targetUrl found in webhook config")
except Exception as e:
logger.error(f"Could not fetch config for {webhook_id}: {e}")
sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
continue
# Attempt to resend
success = await attempt_redelivery(target_url, original_payload)
if success:
logger.info(f"Successfully redelivered webhook {webhook_id}")
# Delete from DLQ
sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
else:
logger.error(f"Redelivery failed for webhook {webhook_id}")
# Update retry count and put back in DLQ
message_body["retry_count"] = retry_count + 1
sqs_client.send_message(
QueueUrl=dlq_url,
MessageBody=json.dumps(message_body),
DelaySeconds=RETRY_DELAY_SECONDS
)
except Exception as e:
logger.error(f"Worker loop error: {e}")
await asyncio.sleep(5)
async def attempt_redelivery(target_url: str, payload: dict) -> bool:
"""
Attempts to send the webhook payload to the target URL.
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.post(
target_url,
json=payload,
headers={"Content-Type": "application/json"}
)
# Consider 2xx and 3xx as success
if 200 <= response.status_code < 400:
return True
else:
logger.warning(f"Redelivery returned status {response.status_code}")
return False
except httpx.RequestError as e:
logger.error(f"Network error during redelivery: {e}")
return False
Step 3: Orchestrating the Service
Combine the receiver and the worker into a single application entry point. In production, these would likely be separate microservices (one for receiving, one for processing), but for this tutorial, we run them concurrently using asyncio.
import asyncio
import os
async def main():
# Initialize Auth
auth = GenesysAuth(
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET")
)
# Start the Retry Worker in the background
worker_task = asyncio.create_task(
retry_worker(auth, sqs_client, DLQ_URL)
)
# Start the FastAPI Server
import uvicorn
config = uvicorn.Config(app, host="0.0.0.0", port=8000)
server = uvicorn.Server(config)
# Run both concurrently
await server.serve()
if __name__ == "__main__":
asyncio.run(main())
Complete Working Example
Below is the consolidated code structure. Save this as main.py.
import asyncio
import json
import logging
import os
import uuid
from datetime import datetime, timedelta
from typing import Optional
import boto3
import httpx
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from botocore.exceptions import ClientError
# --- Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_BASE_URL = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
SQS_DLQ_URL = os.getenv("SQS_DLQ_URL")
if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, SQS_DLQ_URL]):
raise ValueError("Missing required environment variables: GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, SQS_DLQ_URL")
# --- Clients ---
sqs_client = boto3.client('sqs', region_name=AWS_REGION)
# --- Authentication ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.token_url = f"{self.base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: Optional[datetime] = None
async def get_token(self) -> str:
if self.access_token and self.token_expiry and datetime.utcnow() < self.token_expiry:
return self.access_token
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
timeout=10.0
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 30)
return self.access_token
except httpx.HTTPStatusError as e:
raise RuntimeError(f"OAuth Error: {e.response.text}") from e
# --- Webhook Service ---
class GenesysWebhookService:
def __init__(self, auth: GenesysAuth):
self.auth = auth
async def get_webhook_config(self, webhook_id: str) -> dict:
token = await self.auth.get_token()
url = f"{self.auth.base_url}/api/v2/webhooks/{webhook_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers={"Authorization": f"Bearer {token}"})
response.raise_for_status()
return response.json()
# --- FastAPI App ---
app = FastAPI(title="Genesys Webhook DLQ Handler")
@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
try:
body = await request.json()
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
webhook_id = body.get("id")
event_type = body.get("eventType")
# Fire and forget processing
asyncio.create_task(process_webhook_async(body, webhook_id, event_type))
return JSONResponse(content={"status": "received"}, status_code=200)
async def process_webhook_async(payload: dict, webhook_id: str, event_type: str):
try:
await simulate_business_logic(payload)
logger.info(f"Processed {webhook_id}")
except Exception as e:
logger.error(f"Logic failed for {webhook_id}: {e}")
await push_to_dlq(payload, webhook_id, event_type, str(e))
async def simulate_business_logic(payload: dict):
import random
if random.random() < 0.5:
raise ConnectionError("Simulated downstream 5xx")
async def push_to_dlq(payload: dict, webhook_id: str, event_type: str, error_reason: str):
dlq_message = {
"original_payload": payload,
"webhook_id": webhook_id,
"event_type": event_type,
"error_reason": error_reason,
"failed_at": datetime.utcnow().isoformat(),
"retry_count": 0
}
try:
sqs_client.send_message(QueueUrl=SQS_DLQ_URL, MessageBody=json.dumps(dlq_message))
except ClientError as e:
logger.critical(f"DLQ Push Failed: {e}")
# --- Retry Worker ---
MAX_RETRIES = 3
RETRY_DELAY = 10
async def retry_worker(auth: GenesysAuth):
webhook_service = GenesysWebhookService(auth)
logger.info("Retry Worker Started")
while True:
try:
response = sqs_client.receive_message(
QueueUrl=SQS_DLQ_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=5
)
messages = response.get("Messages", [])
if not messages:
await asyncio.sleep(1)
continue
for msg in messages:
body = json.loads(msg["Body"])
receipt_handle = msg["ReceiptHandle"]
webhook_id = body["webhook_id"]
payload = body["original_payload"]
retries = body.get("retry_count", 0)
if retries >= MAX_RETRIES:
logger.warning(f"Max retries for {webhook_id}")
sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
continue
try:
config = await webhook_service.get_webhook_config(webhook_id)
target_url = config.get("targetUrl")
except Exception as e:
logger.error(f"Config fetch failed for {webhook_id}: {e}")
sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
continue
success = await attempt_redelivery(target_url, payload)
if success:
sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
else:
body["retry_count"] = retries + 1
sqs_client.send_message(
QueueUrl=SQS_DLQ_URL,
MessageBody=json.dumps(body),
DelaySeconds=RETRY_DELAY
)
except Exception as e:
logger.error(f"Worker Error: {e}")
await asyncio.sleep(5)
async def attempt_redelivery(url: str, payload: dict) -> bool:
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.post(url, json=payload)
return 200 <= resp.status_code < 400
except:
return False
# --- Entry Point ---
async def main():
auth = GenesysAuth(GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_BASE_URL)
asyncio.create_task(retry_worker(auth))
config = uvicorn.Config(app, host="0.0.0.0", port=8000)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 403 Forbidden on Webhook API Calls
- Cause: The OAuth token lacks the required scope.
- Fix: Ensure your OAuth client in Genesys Cloud has
webhooks:readandwebhooks:writescopes assigned. - Code Check: Verify the
grant_typeisclient_credentialsand the client secret is correct.
Error: 404 Not Found on Webhook Config
- Cause: The
webhook_idin the DLQ message does not exist or was deleted. - Fix: The worker logs this and deletes the DLQ message. If this happens frequently, check if webhooks are being deleted manually after failure.
Error: SQS Queue Not Found
- Cause:
SQS_DLQ_URLenvironment variable points to a non-existent queue. - Fix: Create the SQS queue in AWS and ensure the IAM user/role running the Python script has
sqs:SendMessageandsqs:ReceiveMessagepermissions.
Error: 502 Bad Gateway from Genesys during initial receipt
- Cause: Your FastAPI server is crashing or timing out before returning 200.
- Fix: Ensure the
receive_webhookendpoint returns 200 immediately. Do not perform heavy synchronous work in the endpoint. Useasyncio.create_taskto offload work.