Handling 5xx Webhook Failures with a Dead Letter Queue and Retry Logic

Handling 5xx Webhook Failures with a Dead Letter Queue and Retry Logic

What You Will Build

  • You will build a Python application that intercepts Genesys Cloud CX webhooks, detects 5xx server errors from your downstream payload, and stores failed events in a Dead Letter Queue (DLQ) for later retry.
  • You will use the Genesys Cloud CX messages API to acknowledge receipt and the webhooks API to verify configuration.
  • You will use Python with FastAPI for the webhook receiver, SQLAlchemy for the DLQ storage, and asyncio for the retry logic.

Prerequisites

  • OAuth Client Type: Client Credentials Grant.
  • Required Scopes: webhooks:read, webhooks:write, messages:read.
  • SDK Version: genesys-cloud-py-client v2.0+.
  • Language/Runtime: Python 3.9+.
  • External Dependencies: fastapi, uvicorn, sqlalchemy, aiosqlite (or psycopg2 for PostgreSQL), httpx, pydantic.

Authentication Setup

Genesys Cloud CX uses OAuth 2.0. For server-to-server integrations, you must use the Client Credentials flow. The token expires after 3600 seconds (1 hour). Your application must cache the token and refresh it before expiration to avoid 401 errors during high-volume webhook processing.

import httpx
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    async def get_token(self) -> str:
        """Fetches a new OAuth token using Client Credentials."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            
            if response.status_code != 200:
                raise Exception(f"Failed to obtain token: {response.text}")
            
            data = response.json()
            self.access_token = data["access_token"]
            # Subtract 60 seconds to refresh before actual expiry
            self.token_expiry = time.time() + (data["expires_in"] - 60)
            return self.access_token

    async def get_valid_token(self) -> str:
        """Returns cached token if valid, otherwise fetches a new one."""
        if not self.access_token or time.time() >= self.token_expiry:
            return await self.get_token()
        return self.access_token

Implementation

Step 1: Define the Dead Letter Queue Schema

A Dead Letter Queue is not a feature of Genesys Cloud; it is an architectural pattern you implement in your data layer. When a webhook payload fails to process (specifically when your downstream service returns 5xx), you must store the raw payload, the error context, and a retry count.

We will use SQLAlchemy with an asynchronous driver for high-concurrency storage.

from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
import datetime

Base = declarative_base()

class FailedWebhook(Base):
    __tablename__ = "failed_webhooks"

    id = Column(Integer, primary_key=True, autoincrement=True)
    webhook_id = Column(String, nullable=False, index=True)
    event_type = Column(String, nullable=False)
    payload = Column(Text, nullable=False) # Store raw JSON
    error_message = Column(Text, nullable=True)
    status_code = Column(Integer, nullable=True)
    retry_count = Column(Integer, default=0)
    max_retries = Column(Integer, default=5)
    next_retry_at = Column(DateTime, nullable=False)
    is_processed = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

# Database setup
DATABASE_URL = "sqlite+aiosqlite:///./dlq.db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

Why this schema?

  • payload: You must store the raw JSON. Genesys may not resend the event if you acknowledge it. If you drop the event, you lose data.
  • next_retry_at: Allows for exponential backoff. You do not want to retry every second.
  • is_processed: Marks the record as successfully re-delivered to your downstream system.

Step 2: Implement the Webhook Receiver

Genesys Cloud sends webhooks as HTTP POST requests to your endpoint. You must respond with a 2xx status code within the timeout window (typically 10-15 seconds) to acknowledge receipt. If you take longer, Genesys will retry the delivery, potentially causing duplicates.

The critical logic here is: Acknowledge Genesys immediately, then process asynchronously. If your downstream service fails with 5xx, you save the event to the DLQ.

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import json
import asyncio
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
    """
    Receives webhook from Genesys Cloud.
    Must return 200 OK quickly to prevent Genesys retries.
    """
    try:
        body = await request.json()
    except Exception:
        return JSONResponse(status_code=400, content={"error": "Invalid JSON"})

    # Extract key identifiers
    webhook_id = body.get("webhookId")
    event_type = body.get("eventType")
    
    if not webhook_id or not event_type:
        return JSONResponse(status_code=400, content={"error": "Missing webhookId or eventType"})

    # Offload processing to background task
    # This allows us to return 200 OK immediately
    app.background_tasks.add_task(process_webhook_async, body, webhook_id, event_type)
    
    return JSONResponse(status_code=200, content={"status": "accepted"})

async def process_webhook_async(body: dict, webhook_id: str, event_type: str):
    """
    Processes the webhook payload.
    If downstream fails with 5xx, saves to DLQ.
    """
    try:
        # Simulate calling your downstream business logic
        success, status_code, error_msg = await call_downstream_service(body)
        
        if not success:
            # Only save to DLQ if it is a server error (5xx)
            # 4xx errors usually indicate bad data, which won't be fixed by retrying
            if 500 <= status_code < 600:
                await save_to_dlq(webhook_id, event_type, body, status_code, error_msg)
                logger.warning(f"Saved to DLQ: Webhook {webhook_id} failed with {status_code}")
            else:
                logger.error(f"Client error {status_code} for Webhook {webhook_id}: {error_msg}")
                
    except Exception as e:
        logger.exception(f"Unexpected error processing webhook {webhook_id}")
        # In case of unexpected crash, still save to DLQ
        await save_to_dlq(webhook_id, event_type, body, 500, str(e))

Step 3: Implement the Downstream Call and DLQ Storage

This section defines how you call your internal service and how you persist failures.

import httpx

async def call_downstream_service(payload: dict) -> tuple[bool, int, str]:
    """
    Simulates calling an internal microservice.
    Returns (success, status_code, error_message)
    """
    internal_url = "https://internal-service.example.com/process-event"
    
    async with httpx.AsyncClient(timeout=5.0) as client:
        try:
            response = await client.post(
                internal_url,
                json=payload,
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                return True, 200, ""
            elif response.status_code >= 500:
                return False, response.status_code, response.text
            else:
                # 4xx errors are not retried in this example
                return False, response.status_code, response.text
                
        except httpx.TimeoutException:
            return False, 504, "Timeout"
        except Exception as e:
            return False, 500, str(e)

async def save_to_dlq(webhook_id: str, event_type: str, payload: dict, status_code: int, error_message: str):
    """
    Saves failed webhook to the Dead Letter Queue database.
    Implements exponential backoff for next_retry_at.
    """
    async with async_session() as session:
        # Check if already in DLQ (idempotency)
        stmt = select(FailedWebhook).where(
            FailedWebhook.webhook_id == webhook_id,
            FailedWebhook.is_processed == False
        )
        result = await session.execute(stmt)
        existing = result.scalar_one_or_none()
        
        if existing:
            # Update retry count and next retry time
            existing.retry_count += 1
            existing.error_message = error_message
            existing.status_code = status_code
            # Exponential backoff: 1 min, 2 min, 4 min, 8 min, 16 min
            backoff_minutes = 2 ** (existing.retry_count - 1)
            existing.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
        else:
            # New failure
            next_retry = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
            new_record = FailedWebhook(
                webhook_id=webhook_id,
                event_type=event_type,
                payload=json.dumps(payload),
                error_message=error_message,
                status_code=status_code,
                next_retry_at=next_retry
            )
            session.add(new_record)
        
        await session.commit()

Step 4: Implement the Retry Worker

The retry worker is a background process (or cron job) that scans the DLQ for records where next_retry_at is in the past and retry_count < max_retries.

from sqlalchemy import select
import datetime

async def retry_worker():
    """
    Runs periodically to retry failed webhooks.
    """
    async with async_session() as session:
        now = datetime.datetime.utcnow()
        
        # Find records ready for retry
        stmt = select(FailedWebhook).where(
            FailedWebhook.is_processed == False,
            FailedWebhook.retry_count < FailedWebhook.max_retries,
            FailedWebhook.next_retry_at <= now
        )
        
        result = await session.execute(stmt)
        pending_retries = result.scalars().all()
        
        for record in pending_retries:
            try:
                payload = json.loads(record.payload)
                
                # Attempt to resend to downstream
                success, status_code, error_msg = await call_downstream_service(payload)
                
                if success:
                    # Mark as processed
                    record.is_processed = True
                    record.error_message = None
                    record.status_code = 200
                    await session.commit()
                    logger.info(f"Successfully retried Webhook {record.webhook_id}")
                else:
                    # Still failing
                    if 500 <= status_code < 600:
                        record.retry_count += 1
                        record.error_message = error_msg
                        record.status_code = status_code
                        # Update next retry time
                        backoff_minutes = 2 ** (record.retry_count - 1)
                        record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
                        await session.commit()
                        logger.warning(f"Retry {record.retry_count} failed for Webhook {record.webhook_id}")
                    else:
                        # 4xx error on retry: give up
                        record.is_processed = True # Mark as done, but note it failed permanently
                        record.error_message = f"Permanent failure: {error_msg}"
                        await session.commit()
                        logger.error(f"Permanent failure for Webhook {record.webhook_id}")
                        
            except Exception as e:
                logger.exception(f"Error during retry of Webhook {record.webhook_id}")
                record.retry_count += 1
                record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
                await session.commit()

Complete Working Example

Below is the full main.py file. It combines the FastAPI app, the database setup, and a simple scheduler for the retry worker.

import asyncio
import datetime
import json
import logging
import time
from typing import Optional

import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

# --- Configuration ---
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
ORG_ID = "your_org_id"
DATABASE_URL = "sqlite+aiosqlite:///./dlq.db"

# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Database Models ---
Base = declarative_base()

class FailedWebhook(Base):
    __tablename__ = "failed_webhooks"

    id = Column(Integer, primary_key=True, autoincrement=True)
    webhook_id = Column(String, nullable=False, index=True)
    event_type = Column(String, nullable=False)
    payload = Column(Text, nullable=False)
    error_message = Column(Text, nullable=True)
    status_code = Column(Integer, nullable=True)
    retry_count = Column(Integer, default=0)
    max_retries = Column(Integer, default=5)
    next_retry_at = Column(DateTime, nullable=False)
    is_processed = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# --- Auth ---
class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    async def get_token(self) -> str:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            if response.status_code != 200:
                raise Exception(f"Auth failed: {response.text}")
            data = response.json()
            self.access_token = data["access_token"]
            self.token_expiry = time.time() + (data["expires_in"] - 60)
            return self.access_token

    async def get_valid_token(self) -> str:
        if not self.access_token or time.time() >= self.token_expiry:
            return await self.get_token()
        return self.access_token

auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_ID)

# --- Downstream Service Simulation ---
async def call_downstream_service(payload: dict) -> tuple[bool, int, str]:
    # Replace this with your actual internal API call
    # For demo, we simulate a 500 error 50% of the time
    import random
    if random.random() < 0.5:
        return False, 500, "Internal Server Error"
    return True, 200, "OK"

# --- DLQ Logic ---
async def save_to_dlq(webhook_id: str, event_type: str, payload: dict, status_code: int, error_message: str):
    async with async_session() as session:
        stmt = select(FailedWebhook).where(
            FailedWebhook.webhook_id == webhook_id,
            FailedWebhook.is_processed == False
        )
        result = await session.execute(stmt)
        existing = result.scalar_one_or_none()
        
        if existing:
            existing.retry_count += 1
            existing.error_message = error_message
            existing.status_code = status_code
            backoff_minutes = 2 ** (existing.retry_count - 1)
            existing.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=backoff_minutes)
        else:
            next_retry = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
            new_record = FailedWebhook(
                webhook_id=webhook_id,
                event_type=event_type,
                payload=json.dumps(payload),
                error_message=error_message,
                status_code=status_code,
                next_retry_at=next_retry
            )
            session.add(new_record)
        await session.commit()

async def process_webhook_async(body: dict, webhook_id: str, event_type: str):
    try:
        success, status_code, error_msg = await call_downstream_service(body)
        
        if not success and 500 <= status_code < 600:
            await save_to_dlq(webhook_id, event_type, body, status_code, error_msg)
    except Exception as e:
        await save_to_dlq(webhook_id, event_type, body, 500, str(e))

# --- FastAPI App ---
app = FastAPI()

@app.post("/webhooks/genesys")
async def receive_webhook(request: Request):
    try:
        body = await request.json()
    except Exception:
        return JSONResponse(status_code=400, content={"error": "Invalid JSON"})

    webhook_id = body.get("webhookId")
    event_type = body.get("eventType")
    
    if not webhook_id or not event_type:
        return JSONResponse(status_code=400, content={"error": "Missing fields"})

    app.background_tasks.add_task(process_webhook_async, body, webhook_id, event_type)
    return JSONResponse(status_code=200, content={"status": "accepted"})

# --- Retry Worker ---
async def retry_worker():
    async with async_session() as session:
        now = datetime.datetime.utcnow()
        stmt = select(FailedWebhook).where(
            FailedWebhook.is_processed == False,
            FailedWebhook.retry_count < FailedWebhook.max_retries,
            FailedWebhook.next_retry_at <= now
        )
        result = await session.execute(stmt)
        pending = result.scalars().all()
        
        for record in pending:
            try:
                payload = json.loads(record.payload)
                success, status_code, error_msg = await call_downstream_service(payload)
                
                if success:
                    record.is_processed = True
                    record.error_message = None
                    record.status_code = 200
                else:
                    if 500 <= status_code < 600:
                        record.retry_count += 1
                        record.error_message = error_msg
                        record.status_code = status_code
                        record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=2 ** (record.retry_count - 1))
                    else:
                        record.is_processed = True
                        record.error_message = f"Permanent: {error_msg}"
                await session.commit()
            except Exception as e:
                record.retry_count += 1
                record.next_retry_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
                await session.commit()

@app.on_event("startup")
async def startup_event():
    # Initialize DB
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    # Start retry worker loop
    asyncio.create_task(periodic_retry())

async def periodic_retry():
    while True:
        await retry_worker()
        await asyncio.sleep(60) # Check every minute

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Common Errors & Debugging

Error: 401 Unauthorized on Token Refresh

  • Cause: The client_id or client_secret is incorrect, or the OAuth client has been disabled in Genesys Cloud.
  • Fix: Verify credentials in the Genesys Cloud Admin Console under Administration > Users > OAuth Clients. Ensure the client status is Active.

Error: 429 Too Many Requests on Downstream

  • Cause: Your retry worker is firing too aggressively, overwhelming your downstream service.
  • Fix: Increase the backoff_minutes calculation in save_to_dlq. Use a jitter factor: random.uniform(0.5 * base, 1.5 * base).

Error: Webhook Payload Missing webhookId

  • Cause: The incoming HTTP request is not a valid Genesys Cloud webhook.
  • Fix: Validate the X-Genesys-Webhook-Id header or the webhookId field in the body. Reject requests lacking these fields with a 400 Bad Request immediately.

Error: Database Lock (SQLite)

  • Cause: SQLite does not handle high-concurrency writes well. If you receive >100 webhooks/second, the DLQ write may fail.
  • Fix: Switch to PostgreSQL or MySQL with psycopg2 or aiomysql. Use connection pooling.

Official References