Handling 5xx Webhook Failures with a Custom Dead Letter Queue in Python

Handling 5xx Webhook Failures with a Custom Dead Letter Queue in Python

What You Will Build

  • You will build a Python FastAPI service that acts as a Genesys Cloud CX webhook receiver.
  • You will implement a retry logic with exponential backoff for transient errors.
  • You will route permanently failed payloads to a simulated Dead Letter Queue (DLQ) for manual inspection or reprocessing.

Prerequisites

  • OAuth Client Type: Machine-to-Machine (Client Credentials) is not required for receiving webhooks, but is required if you need to call Genesys APIs to update entity states based on the webhook data. For this tutorial, we assume you are receiving the webhook payload and processing it locally.
  • SDK/API Version: Genesys Cloud CX REST API v2.
  • Language/Runtime: Python 3.9+.
  • External Dependencies:
    • fastapi: For the HTTP server.
    • uvicorn: ASGI server to run FastAPI.
    • pydantic: For data validation.
    • asyncio: For async retry logic.
    • redis (optional): For a production-grade DLQ, though this tutorial uses an in-memory list for simplicity.
pip install fastapi uvicorn pydantic httpx

Authentication Setup

Webhooks are pushed from Genesys Cloud to your endpoint. Genesys does not send an OAuth token in the webhook header by default. However, to verify the source of the webhook, you should validate the X-Genesys-Webhook-Id header or implement a shared secret challenge if configured.

For this tutorial, we assume the webhook is configured in Genesys Cloud with a target URL pointing to your service. We will not use OAuth for the incoming request, but we will use httpx to demonstrate how you might call a Genesys API (like updating a user profile) using a cached token if needed later.

Validating the Webhook Source

Genesys Cloud sends specific headers with every webhook. The most critical for security is ensuring the request comes from Genesys. While there is no cryptographic signature in the standard webhook payload, you can check the User-Agent or IP allow-listing at the network level. In code, we log the X-Genesys-Webhook-Id for tracing.

Implementation

Step 1: Define the Webhook Payload and DLQ Structure

First, define the data structures. Genesys webhooks vary by entity (Conversation, User, Queue, etc.). We will create a generic structure that captures the essential metadata and the payload body.

We also define the DeadLetterQueueItem which stores the failed payload and the error context.

from pydantic import BaseModel, Field
from datetime import datetime
from typing import Any, Optional, List
import uuid

class GenesysWebhookPayload(BaseModel):
    """
    Represents the incoming payload from Genesys Cloud Webhooks.
    The 'payload' field contains the actual event data (e.g., conversation details).
    """
    webhookId: str
    webhookName: str
    webhookDescription: Optional[str] = None
    payload: Any  # The actual event data structure varies by webhook type
    timestamp: str

class DeadLetterQueueItem(BaseModel):
    """
    Structure for storing failed webhook attempts.
    """
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    original_payload: GenesysWebhookPayload
    error_message: str
    failed_at: datetime = Field(default_factory=datetime.utcnow)
    retry_count: int = 0

# In-memory DLQ. In production, use Redis, Kafka, or a database.
dlq: List[DeadLetterQueueItem] = []

Step 2: Implement the Retry Logic with Exponential Backoff

When a downstream service (or your own processing logic) fails with a 5xx error, you do not want to immediately discard the message. You also do not want to retry instantly, which could cause a cascade failure.

We implement an asynchronous retry function with exponential backoff. This function attempts to process the webhook data. If it fails, it retries up to a maximum number of times. If all retries fail, it pushes the item to the DLQ.

import asyncio
import logging
import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MAX_RETRIES = 3
BASE_DELAY = 2  # seconds

async def process_webhook_logic(payload: GenesysWebhookPayload) -> bool:
    """
    Simulates business logic that might fail.
    Returns True if successful, raises Exception if it fails.
    """
    logger.info(f"Processing webhook: {payload.webhookId}")
    
    # SIMULATION: Randomly fail to demonstrate 5xx handling
    # In real code, this is where you call your database or other services.
    if random.random() < 0.7:  # 70% chance of failure for demo purposes
        raise Exception("Simulated downstream service error (500 Internal Server Error)")
    
    logger.info(f"Successfully processed webhook: {payload.webhookId}")
    return True

async def retry_with_backoff(payload: GenesysWebhookPayload):
    """
    Retries the processing logic with exponential backoff.
    If all retries fail, adds the payload to the Dead Letter Queue.
    """
    retry_count = 0
    last_exception = None

    while retry_count <= MAX_RETRIES:
        try:
            await process_webhook_logic(payload)
            logger.info(f"Webhook {payload.webhookId} processed successfully on attempt {retry_count + 1}")
            return  # Exit if successful
        except Exception as e:
            last_exception = e
            retry_count += 1
            
            if retry_count > MAX_RETRIES:
                break
            
            # Calculate delay: Base * 2^(attempt-1) + jitter
            delay = BASE_DELAY * (2 ** (retry_count - 1))
            jitter = random.uniform(0, 1)  # Add jitter to prevent thundering herd
            total_delay = delay + jitter
            
            logger.warning(f"Attempt {retry_count} failed for {payload.webhookId}. Retrying in {total_delay:.2f}s. Error: {str(e)}")
            await asyncio.sleep(total_delay)

    # If we reach here, all retries failed
    logger.error(f"All retries failed for {payload.webhookId}. Moving to DLQ.")
    
    # Push to DLQ
    dlq_item = DeadLetterQueueItem(
        original_payload=payload,
        error_message=str(last_exception),
        retry_count=retry_count
    )
    dlq.append(dlq_item)
    logger.info(f"Added to DLQ: {dlq_item.id}")

Step 3: Create the FastAPI Webhook Endpoint

This endpoint receives the POST request from Genesys Cloud. It validates the payload, then offloads the processing to the retry logic.

Crucially, you must return a 200 OK response to Genesys Cloud as quickly as possible. If you do not return 200, Genesys will retry sending the webhook to you, which can lead to duplicate processing if your local retry logic is also running.

Important: Genesys Cloud will retry delivery if it receives a non-2xx status code or a timeout. By returning 200 immediately and handling retries internally, you take control of the retry strategy.

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI(title="Genesys Webhook Receiver with DLQ")

@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
    """
    Endpoint to receive webhooks from Genesys Cloud.
    Returns 200 immediately to acknowledge receipt.
    Processing happens asynchronously in the background.
    """
    try:
        body = await request.json()
        payload = GenesysWebhookPayload(**body)
        
        # Log the incoming webhook for audit trails
        logger.info(f"Received webhook: {payload.webhookName} (ID: {payload.webhookId})")
        
        # Offload to background task to avoid blocking the HTTP response
        # This ensures Genesys gets a 200 OK quickly
        asyncio.create_task(retry_with_backoff(payload))
        
        return JSONResponse(content={"status": "accepted"}, status_code=200)
        
    except Exception as e:
        # If the payload itself is invalid (malformed JSON), return 400
        # Genesys will not retry 4xx errors, so this is safe for malformed data.
        logger.error(f"Invalid webhook payload: {str(e)}")
        raise HTTPException(status_code=400, detail=f"Invalid payload: {str(e)}")

@app.get("/dlq")
async def get_dlq_items():
    """
    Endpoint to retrieve items from the Dead Letter Queue.
    Useful for debugging and manual reprocessing.
    """
    return {"dlq_items": dlq, "count": len(dlq)}

@app.post("/dlq/{item_id}/retry")
async def retry_dlq_item(item_id: str):
    """
    Manually retry a specific item from the DLQ.
    """
    for i, item in enumerate(dlq):
        if item.id == item_id:
            # Remove from DLQ
            removed_item = dlq.pop(i)
            
            # Re-process with retry logic
            asyncio.create_task(retry_with_backoff(removed_item.original_payload))
            
            return JSONResponse(content={"status": "retried", "item_id": item_id}, status_code=200)
            
    raise HTTPException(status_code=404, detail="DLQ item not found")

Step 4: Running the Server

To run this application, use Uvicorn.

uvicorn main:app --reload --port 8000

Complete Working Example

Here is the complete, copy-pasteable Python script. Save this as main.py.

import asyncio
import logging
import random
import uuid
from datetime import datetime
from typing import Any, Optional, List

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Configuration ---
MAX_RETRIES = 3
BASE_DELAY = 2  # seconds

# --- Data Models ---

class GenesysWebhookPayload(BaseModel):
    """
    Represents the incoming payload from Genesys Cloud Webhooks.
    """
    webhookId: str
    webhookName: str
    webhookDescription: Optional[str] = None
    payload: Any  # The actual event data structure varies by webhook type
    timestamp: str

class DeadLetterQueueItem(BaseModel):
    """
    Structure for storing failed webhook attempts.
    """
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    original_payload: GenesysWebhookPayload
    error_message: str
    failed_at: datetime = Field(default_factory=datetime.utcnow)
    retry_count: int = 0

# In-memory DLQ. In production, use Redis, Kafka, or a database.
dlq: List[DeadLetterQueueItem] = []

# --- Business Logic & Retry Strategy ---

async def process_webhook_logic(payload: GenesysWebhookPayload) -> bool:
    """
    Simulates business logic that might fail.
    Returns True if successful, raises Exception if it fails.
    """
    logger.info(f"Processing webhook: {payload.webhookId}")
    
    # SIMULATION: Randomly fail to demonstrate 5xx handling
    # In real code, this is where you call your database or other services.
    if random.random() < 0.7:  # 70% chance of failure for demo purposes
        raise Exception("Simulated downstream service error (500 Internal Server Error)")
    
    logger.info(f"Successfully processed webhook: {payload.webhookId}")
    return True

async def retry_with_backoff(payload: GenesysWebhookPayload):
    """
    Retries the processing logic with exponential backoff.
    If all retries fail, adds the payload to the Dead Letter Queue.
    """
    retry_count = 0
    last_exception = None

    while retry_count <= MAX_RETRIES:
        try:
            await process_webhook_logic(payload)
            logger.info(f"Webhook {payload.webhookId} processed successfully on attempt {retry_count + 1}")
            return  # Exit if successful
        except Exception as e:
            last_exception = e
            retry_count += 1
            
            if retry_count > MAX_RETRIES:
                break
            
            # Calculate delay: Base * 2^(attempt-1) + jitter
            delay = BASE_DELAY * (2 ** (retry_count - 1))
            jitter = random.uniform(0, 1)  # Add jitter to prevent thundering herd
            total_delay = delay + jitter
            
            logger.warning(f"Attempt {retry_count} failed for {payload.webhookId}. Retrying in {total_delay:.2f}s. Error: {str(e)}")
            await asyncio.sleep(total_delay)

    # If we reach here, all retries failed
    logger.error(f"All retries failed for {payload.webhookId}. Moving to DLQ.")
    
    # Push to DLQ
    dlq_item = DeadLetterQueueItem(
        original_payload=payload,
        error_message=str(last_exception),
        retry_count=retry_count
    )
    dlq.append(dlq_item)
    logger.info(f"Added to DLQ: {dlq_item.id}")

# --- FastAPI Application ---

app = FastAPI(title="Genesys Webhook Receiver with DLQ")

@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
    """
    Endpoint to receive webhooks from Genesys Cloud.
    Returns 200 immediately to acknowledge receipt.
    Processing happens asynchronously in the background.
    """
    try:
        body = await request.json()
        payload = GenesysWebhookPayload(**body)
        
        # Log the incoming webhook for audit trails
        logger.info(f"Received webhook: {payload.webhookName} (ID: {payload.webhookId})")
        
        # Offload to background task to avoid blocking the HTTP response
        # This ensures Genesys gets a 200 OK quickly
        asyncio.create_task(retry_with_backoff(payload))
        
        return JSONResponse(content={"status": "accepted"}, status_code=200)
        
    except Exception as e:
        # If the payload itself is invalid (malformed JSON), return 400
        # Genesys will not retry 4xx errors, so this is safe for malformed data.
        logger.error(f"Invalid webhook payload: {str(e)}")
        raise HTTPException(status_code=400, detail=f"Invalid payload: {str(e)}")

@app.get("/dlq")
async def get_dlq_items():
    """
    Endpoint to retrieve items from the Dead Letter Queue.
    Useful for debugging and manual reprocessing.
    """
    return {"dlq_items": dlq, "count": len(dlq)}

@app.post("/dlq/{item_id}/retry")
async def retry_dlq_item(item_id: str):
    """
    Manually retry a specific item from the DLQ.
    """
    for i, item in enumerate(dlq):
        if item.id == item_id:
            # Remove from DLQ
            removed_item = dlq.pop(i)
            
            # Re-process with retry logic
            asyncio.create_task(retry_with_backoff(removed_item.original_payload))
            
            return JSONResponse(content={"status": "retried", "item_id": item_id}, status_code=200)
            
    raise HTTPException(status_code=404, detail="DLQ item not found")

Common Errors & Debugging

Error: Genesys Cloud Retries the Webhook Multiple Times

Cause: Your endpoint returned a 4xx or 5xx status code, or timed out. Genesys Cloud has a built-in retry mechanism for webhooks. If you do not return 200 OK, Genesys will resend the payload, potentially causing duplicate processing if your internal logic does not handle idempotency.

Fix: Ensure your /webhook/genesys endpoint returns 200 OK immediately upon receiving valid JSON. Do not perform heavy processing in the request handler. Use asyncio.create_task or a message queue (like RabbitMQ or Kafka) to offload processing.

Code Adjustment:

# BAD: Blocking processing
@app.post("/webhook/genesys")
async def bad_receive(request: Request):
    body = await request.json()
    process_logic(body) # Blocks until done
    return {"status": "ok"}

# GOOD: Async offload
@app.post("/webhook/genesys")
async def good_receive(request: Request):
    body = await request.json()
    asyncio.create_task(process_logic(body)) # Returns immediately
    return {"status": "ok"}

Error: DLQ is Filling Up with Duplicate Events

Cause: Genesys Cloud retries the webhook because your initial response was not 200. Your internal retry logic also retried. This results in multiple copies of the same event.

Fix: Implement idempotency checks. Use the webhookId or a unique identifier from the payload (e.g., conversationId) to check if the event has already been processed.

Code Adjustment:

processed_ids: set = set()

async def process_webhook_logic(payload: GenesysWebhookPayload) -> bool:
    # Check for idempotency
    unique_key = f"{payload.webhookId}-{payload.timestamp}"
    if unique_key in processed_ids:
        logger.info(f"Skipping duplicate event: {unique_key}")
        return True
    
    processed_ids.add(unique_key)
    # ... rest of logic

Error: Memory Leak in DLQ

Cause: The in-memory list dlq grows indefinitely.

Fix: In production, persist the DLQ to a database (PostgreSQL, MongoDB) or a cache (Redis). Set up a cleanup job to archive old DLQ items.

Example with Redis:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def add_to_dlq_redis(item: DeadLetterQueueItem):
    r.lpush('genesys:dlq', item.json())

def get_dlq_from_redis():
    return r.lrange('genesys:dlq', 0, -1)

Official References