Handling 5xx Webhook Failures with a Dead Letter Queue and Retry Logic
What You Will Build
- You will build a Python application that intercepts Genesys Cloud CX webhooks, detects 5xx server errors from your downstream payload, and stores failed events in a Dead Letter Queue (DLQ) for later retry.
- You will use the Genesys Cloud CX
messagesAPI to acknowledge receipt and thewebhooksAPI to verify configuration. - You will use Python with
FastAPIfor the webhook receiver,SQLAlchemyfor the DLQ storage, andasynciofor the retry logic.
Prerequisites
- OAuth Client Type: Client Credentials Grant.
- Required Scopes:
webhooks:read,webhooks:write,messages:read. - SDK Version:
genesys-cloud-py-clientv2.0+. - Language/Runtime: Python 3.9+.
- External Dependencies:
fastapi,uvicorn,sqlalchemy,aiosqlite(orpsycopg2for PostgreSQL),httpx,pydantic.
Authentication Setup
Genesys Cloud CX uses OAuth 2.0. For server-to-server integrations, you must use the Client Credentials flow. The token expires after 3600 seconds (1 hour). Your application must cache the token and refresh it before expiration to avoid 401 errors during high-volume webhook processing.
import httpx
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
async def get_token(self) -> str:
"""Fetches a new OAuth token using Client Credentials."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code != 200:
raise Exception(f"Failed to obtain token: {response.text}")
data = response.json()
self.access_token = data["access_token"]
# Subtract 60 seconds to refresh before actual expiry
self.token_expiry = time.time() + (data["expires_in"] - 60)
return self.access_token
async def get_valid_token(self) -> str:
"""Returns cached token if valid, otherwise fetches a new one."""
if not self.access_token or time.time() >= self.token_expiry:
return await self.get_token()
return self.access_token
Implementation
Step 1: Define the Dead Letter Queue Schema
A Dead Letter Queue is not a feature of Genesys Cloud; it is an architectural pattern you implement in your data layer. When a webhook payload fails to process (specifically when your downstream service returns 5xx), you must store the raw payload, the error context, and a retry count.
We will use SQLAlchemy with an asynchronous driver for high-concurrency storage.
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
import datetime
Base = declarative_base()
class FailedWebhook(Base):
__tablename__ = "failed_webhooks"
id = Column(Integer, primary_key=True, autoincrement=True)
webhook_id = Column(String, nullable=False, index=True)
event_type = Column(String, nullable=False)
payload = Column(Text, nullable=False) # Store raw JSON
error_message = Column(Text, nullable=True)
status_code = Column(Integer, nullable=True)
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=5)
next_retry_at = Column(DateTime, nullable=False)
is_processed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# Database setup
DATABASE_URL = "sqlite+aiosqlite:///./dlq.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Why this schema?
payload: You must store the raw JSON. Genesys may not resend the event if you acknowledge it. If you drop the event, you lose data.next_retry_at: Allows for exponential backoff. You do not want to retry every second.is_processed: Marks the record as successfully re-delivered to your downstream system.
Step 2: Implement the Webhook Receiver
Genesys Cloud sends webhooks as HTTP POST requests to your endpoint. You must respond with a 2xx status code within the timeout window (typically 10-15 seconds) to acknowledge receipt. If you take longer, Genesys will retry the delivery, potentially causing duplicates.
The critical logic here is: Acknowledge Genesys immediately, then process asynchronously. If your downstream service fails with 5xx, you save the event to the DLQ.
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import json
import asyncio
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
"""
Receives webhook from Genesys Cloud.
Must return 200 OK quickly to prevent Genesys retries.
"""
try:
body = await request.json()
except Exception:
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
# Extract key identifiers
webhook_id = body.get("webhookId")
event_type = body.get("eventType")
if not webhook_id or not event_type:
return JSONResponse(status_code=400, content={"error": "Missing webhookId or eventType"})
# Offload processing to background task
# This allows us to return 200 OK immediately
app.background_tasks.add_task(process_webhook_async, body, webhook_id, event_type)
return JSONResponse(status_code=200, content={"status": "accepted"})
async def process_webhook_async(body: dict, webhook_id: str, event_type: str):
"""
Processes the webhook payload.
If downstream fails with 5xx, saves to DLQ.
"""
try:
# Simulate calling your downstream business logic
success, status_code, error_msg = await call_downstream_service(body)
if not success:
# Only save to DLQ if it is a server error (5xx)
# 4xx errors usually indicate bad data, which won't be fixed by retrying
if 500 <= status_code < 600:
await save_to_dlq(webhook_id, event_type, body, status_code, error_msg)
logger.warning(f"Saved to DLQ: Webhook {webhook_id} failed with {status_code}")
else:
logger.error(f"Client error {status_code} for Webhook {webhook_id}: {error_msg}")
except Exception as e:
logger.exception(f"Unexpected error processing webhook {webhook_id}")
# In case of unexpected crash, still save to DLQ
await save_to_dlq(webhook_id, event_type, body, 500, str(e))
Step 3: Implement the Downstream Call and DLQ Storage
This section defines how you call your internal service and how you persist failures.
import httpx
async def call_downstream_service(payload: dict) -> tuple[bool, int, str]:
"""
Simulates calling an internal microservice.
Returns (success, status_code, error_message)
"""
internal_url = "https://internal-service.example.com/process-event"
async with httpx.AsyncClient(timeout=5.0) as client:
try:
response = await client.post(
internal_url,
json=payload,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
return True, 200, ""
elif response.status_code >= 500:
return False, response.status_code, response.text
else:
# 4xx errors are not retried in this example
return False, response.status_code, response.text
except httpx.TimeoutException:
return False, 504, "Timeout"
except Exception as e:
return False, 500, str(e)
async def save_to_dlq(webhook_id: str, event_type: str, payload: dict, status_code: int, error_message: str):
"""
Saves failed webhook to the Dead Letter Queue database.
Implements exponential backoff for next_retry_at.
"""
async with async_session() as session:
# Check if already in DLQ (idempotency)
stmt = select(FailedWebhook).where(
FailedWebhook.webhook_id == webhook_id,
FailedWebhook.is_processed == False
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update retry count and next retry time
existing.retry_count += 1
existing.error_message = error_message
existing.status_code = status_code
# Exponential backoff: 1 min, 2 min, 4 min, 8 min, 16 min
backoff_minutes = 2 ** (existing.retry_count - 1)
existing.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
else:
# New failure
next_retry = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
new_record = FailedWebhook(
webhook_id=webhook_id,
event_type=event_type,
payload=json.dumps(payload),
error_message=error_message,
status_code=status_code,
next_retry_at=next_retry
)
session.add(new_record)
await session.commit()
Step 4: Implement the Retry Worker
The retry worker is a background process (or cron job) that scans the DLQ for records where next_retry_at is in the past and retry_count < max_retries.
from sqlalchemy import select
import datetime
async def retry_worker():
"""
Runs periodically to retry failed webhooks.
"""
async with async_session() as session:
now = datetime.datetime.utcnow()
# Find records ready for retry
stmt = select(FailedWebhook).where(
FailedWebhook.is_processed == False,
FailedWebhook.retry_count < FailedWebhook.max_retries,
FailedWebhook.next_retry_at <= now
)
result = await session.execute(stmt)
pending_retries = result.scalars().all()
for record in pending_retries:
try:
payload = json.loads(record.payload)
# Attempt to resend to downstream
success, status_code, error_msg = await call_downstream_service(payload)
if success:
# Mark as processed
record.is_processed = True
record.error_message = None
record.status_code = 200
await session.commit()
logger.info(f"Successfully retried Webhook {record.webhook_id}")
else:
# Still failing
if 500 <= status_code < 600:
record.retry_count += 1
record.error_message = error_msg
record.status_code = status_code
# Update next retry time
backoff_minutes = 2 ** (record.retry_count - 1)
record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
await session.commit()
logger.warning(f"Retry {record.retry_count} failed for Webhook {record.webhook_id}")
else:
# 4xx error on retry: give up
record.is_processed = True # Mark as done, but note it failed permanently
record.error_message = f"Permanent failure: {error_msg}"
await session.commit()
logger.error(f"Permanent failure for Webhook {record.webhook_id}")
except Exception as e:
logger.exception(f"Error during retry of Webhook {record.webhook_id}")
record.retry_count += 1
record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
await session.commit()
Complete Working Example
Below is the full main.py file. It combines the FastAPI app, the database setup, and a simple scheduler for the retry worker.
import asyncio
import datetime
import json
import logging
import time
from typing import Optional
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
# --- Configuration ---
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
ORG_ID = "your_org_id"
DATABASE_URL = "sqlite+aiosqlite:///./dlq.db"
# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Database Models ---
Base = declarative_base()
class FailedWebhook(Base):
__tablename__ = "failed_webhooks"
id = Column(Integer, primary_key=True, autoincrement=True)
webhook_id = Column(String, nullable=False, index=True)
event_type = Column(String, nullable=False)
payload = Column(Text, nullable=False)
error_message = Column(Text, nullable=True)
status_code = Column(Integer, nullable=True)
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=5)
next_retry_at = Column(DateTime, nullable=False)
is_processed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# --- Auth ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
async def get_token(self) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code != 200:
raise Exception(f"Auth failed: {response.text}")
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + (data["expires_in"] - 60)
return self.access_token
async def get_valid_token(self) -> str:
if not self.access_token or time.time() >= self.token_expiry:
return await self.get_token()
return self.access_token
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_ID)
# --- Downstream Service Simulation ---
async def call_downstream_service(payload: dict) -> tuple[bool, int, str]:
# Replace this with your actual internal API call
# For demo, we simulate a 500 error 50% of the time
import random
if random.random() < 0.5:
return False, 500, "Internal Server Error"
return True, 200, "OK"
# --- DLQ Logic ---
async def save_to_dlq(webhook_id: str, event_type: str, payload: dict, status_code: int, error_message: str):
async with async_session() as session:
stmt = select(FailedWebhook).where(
FailedWebhook.webhook_id == webhook_id,
FailedWebhook.is_processed == False
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
existing.retry_count += 1
existing.error_message = error_message
existing.status_code = status_code
backoff_minutes = 2 ** (existing.retry_count - 1)
existing.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
else:
next_retry = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
new_record = FailedWebhook(
webhook_id=webhook_id,
event_type=event_type,
payload=json.dumps(payload),
error_message=error_message,
status_code=status_code,
next_retry_at=next_retry
)
session.add(new_record)
await session.commit()
async def process_webhook_async(body: dict, webhook_id: str, event_type: str):
try:
success, status_code, error_msg = await call_downstream_service(body)
if not success and 500 <= status_code < 600:
await save_to_dlq(webhook_id, event_type, body, status_code, error_msg)
except Exception as e:
await save_to_dlq(webhook_id, event_type, body, 500, str(e))
# --- FastAPI App ---
app = FastAPI()
@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
try:
body = await request.json()
except Exception:
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
webhook_id = body.get("webhookId")
event_type = body.get("eventType")
if not webhook_id or not event_type:
return JSONResponse(status_code=400, content={"error": "Missing fields"})
app.background_tasks.add_task(process_webhook_async, body, webhook_id, event_type)
return JSONResponse(status_code=200, content={"status": "accepted"})
# --- Retry Worker ---
async def retry_worker():
async with async_session() as session:
now = datetime.datetime.utcnow()
stmt = select(FailedWebhook).where(
FailedWebhook.is_processed == False,
FailedWebhook.retry_count < FailedWebhook.max_retries,
FailedWebhook.next_retry_at <= now
)
result = await session.execute(stmt)
pending = result.scalars().all()
for record in pending:
try:
payload = json.loads(record.payload)
success, status_code, error_msg = await call_downstream_service(payload)
if success:
record.is_processed = True
record.error_message = None
record.status_code = 200
else:
if 500 <= status_code < 600:
record.retry_count += 1
record.error_message = error_msg
record.status_code = status_code
record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=2 ** (record.retry_count - 1))
else:
record.is_processed = True
record.error_message = f"Permanent: {error_msg}"
await session.commit()
except Exception as e:
record.retry_count += 1
record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
await session.commit()
@app.on_event("startup")
async def startup_event():
# Initialize DB
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Start retry worker loop
asyncio.create_task(periodic_retry())
async def periodic_retry():
while True:
await retry_worker()
await asyncio.sleep(60) # Check every minute
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 401 Unauthorized on Token Refresh
- Cause: The
client_idorclient_secretis incorrect, or the OAuth client has been disabled in Genesys Cloud. - Fix: Verify credentials in the Genesys Cloud Admin Console under Administration > Users > OAuth Clients. Ensure the client status is Active.
Error: 429 Too Many Requests on Downstream
- Cause: Your retry worker is firing too aggressively, overwhelming your downstream service.
- Fix: Increase the
backoff_minutescalculation insave_to_dlq. Use a jitter factor:random.uniform(0.5 * base, 1.5 * base).
Error: Webhook Payload Missing webhookId
- Cause: The incoming HTTP request is not a valid Genesys Cloud webhook.
- Fix: Validate the
X-Genesys-Webhook-Idheader or thewebhookIdfield in the body. Reject requests lacking these fields with a 400 Bad Request immediately.
Error: Database Lock (SQLite)
- Cause: SQLite does not handle high-concurrency writes well. If you receive >100 webhooks/second, the DLQ write may fail.
- Fix: Switch to PostgreSQL or MySQL with
psycopg2oraiomysql. Use connection pooling.