Handling Webhook 5xx Failures with a Dead Letter Queue Retry Pattern

Handling Webhook 5xx Failures with a Dead Letter Queue Retry Pattern

What You Will Build

  • A Python service that intercepts failed Genesys Cloud CX webhooks (5xx errors) and stores them in a durable queue for later retry.
  • A secondary worker that pulls items from the queue, retries the delivery to the target endpoint, and updates the Genesys Cloud webhook configuration upon success.
  • Python 3.10+ with httpx for async HTTP requests and boto3 for AWS SQS integration.

Prerequisites

  • Genesys Cloud CX: A valid OAuth 2.0 Client ID and Secret with webhooks:read and webhooks:write scopes.
  • AWS Account: Access keys for an SQS queue (Standard or FIFO).
  • Python Runtime: Python 3.10 or later.
  • Dependencies:
    • httpx>=0.24.0: For async HTTP requests and retries.
    • boto3>=1.28.0: For AWS SQS interaction.
    • pydantic>=2.0.0: For data validation.
    • python-dotenv>=1.0.0: For environment variable management.

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials Grant for server-to-server communication. You must obtain an access token before calling the Webhooks API. The token expires in 14 minutes (840 seconds), so your service must handle refresh logic or request a new token when the current one expires.

import httpx
import os
from datetime import datetime, timedelta
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip('/')
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[datetime] = None

    async def get_token(self) -> str:
        """
        Retrieves an OAuth2 access token.
        Returns a cached token if valid, otherwise fetches a new one.
        """
        if self.access_token and self.token_expiry and datetime.utcnow() < self.token_expiry:
            return self.access_token

        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    self.token_url,
                    headers={"Content-Type": "application/x-www-form-urlencoded"},
                    data={
                        "grant_type": "client_credentials",
                        "client_id": self.client_id,
                        "client_secret": self.client_secret
                    },
                    timeout=10.0
                )
                response.raise_for_status()
                
                data = response.json()
                self.access_token = data["access_token"]
                # Genesys tokens typically last 840 seconds. 
                # We subtract 30 seconds for buffer.
                self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 30)
                
                return self.access_token

            except httpx.HTTPStatusError as e:
                raise RuntimeError(f"Failed to obtain OAuth token: {e.response.status_code} - {e.response.text}") from e
            except httpx.RequestError as e:
                raise RuntimeError(f"Network error during OAuth request: {e}") from e

Implementation

Step 1: The Failure Interceptor (Webhook Receiver)

Genesys Cloud sends webhooks to your specified URL. If your endpoint returns a 5xx error, Genesys will retry up to 5 times with exponential backoff. If all retries fail, the event is lost unless you have a dead letter mechanism.

In this pattern, your receiving endpoint is designed to always return 200 OK to Genesys immediately, acknowledging receipt, even if the actual business logic fails. The actual processing happens asynchronously. If the business logic fails (e.g., downstream service is down), you push the payload to a Dead Letter Queue (DLQ) instead of dropping it.

Required Scope: webhooks:read (to verify webhook signature if enabled, though not strictly required for receiving).

import asyncio
import json
import logging
import uuid
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import boto3
from botocore.exceptions import ClientError

# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Genesys Webhook DLQ Handler")

# SQS Client Setup
sqs_client = boto3.client('sqs', region_name=os.getenv("AWS_REGION"))
DLQ_URL = os.getenv("SQS_DLQ_URL")

@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
    """
    Receives webhook from Genesys Cloud.
    Always returns 200 to Genesys to stop retries.
    Processes payload asynchronously.
    """
    try:
        body = await request.json()
    except json.JSONDecodeError:
        # Invalid JSON, return 400 to let Genesys retry? 
        # No, for DLQ pattern, we want to capture everything.
        # But if we cannot parse it, we cannot retry later easily.
        # Best practice: Return 400 for malformed requests so Genesys stops.
        raise HTTPException(status_code=400, detail="Invalid JSON")

    # Extract key metadata
    webhook_id = body.get("id")
    event_type = body.get("eventType")
    timestamp = body.get("timestamp")

    logger.info(f"Received webhook: ID={webhook_id}, Type={event_type}")

    # Asynchronously process the webhook.
    # We do not await here. This allows the HTTP response to return 200 immediately.
    # This ensures Genesys considers the delivery successful.
    asyncio.create_task(process_webhook_async(body, webhook_id, event_type))

    return JSONResponse(content={"status": "received"}, status_code=200)

async def process_webhook_async(payload: dict, webhook_id: str, event_type: str):
    """
    Performs the actual business logic.
    If it fails, pushes to DLQ.
    """
    try:
        # Simulate business logic (e.g., sending to CRM, updating DB)
        await simulate_business_logic(payload)
        logger.info(f"Successfully processed webhook {webhook_id}")
        
    except Exception as e:
        logger.error(f"Business logic failed for webhook {webhook_id}: {str(e)}")
        await push_to_dlq(payload, webhook_id, event_type, str(e))

async def simulate_business_logic(payload: dict):
    """
    Placeholder for your actual integration logic.
    Raise an exception to trigger DLQ push.
    """
    # Example: Check if a downstream service is reachable
    # if not await check_downstream_service():
    #     raise ConnectionError("Downstream service unavailable")
    
    # For this tutorial, we simulate a 50% failure rate
    import random
    if random.random() < 0.5:
        raise ConnectionError("Simulated downstream 5xx error")

async def push_to_dlq(payload: dict, webhook_id: str, event_type: str, error_reason: str):
    """
    Pushes the failed webhook payload to the SQS Dead Letter Queue.
    """
    dlq_message = {
        "original_payload": payload,
        "webhook_id": webhook_id,
        "event_type": event_type,
        "error_reason": error_reason,
        "failed_at": datetime.utcnow().isoformat(),
        "retry_count": 0
    }

    try:
        sqs_client.send_message(
            QueueUrl=DLQ_URL,
            MessageBody=json.dumps(dlq_message)
        )
        logger.info(f"Pushed webhook {webhook_id} to DLQ")
    except ClientError as e:
        logger.critical(f"Failed to push to DLQ for webhook {webhook_id}: {e}")
        # In a production system, you might want to log to a separate error sink here

Step 2: The Retry Worker

This service runs continuously, polling the SQS DLQ. It attempts to redeliver the webhook payload to the original target URL specified in the Genesys webhook configuration, or to a fallback URL.

Required Scope: webhooks:write (to update the webhook configuration if needed, or to verify the webhook exists). webhooks:read is required to fetch the current configuration.

First, we need a helper to fetch the webhook configuration from Genesys to know where to retry.

import httpx
import asyncio

class GenesysWebhookService:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = auth.base_url

    async def get_webhook_config(self, webhook_id: str) -> dict:
        """
        Fetches the webhook configuration from Genesys Cloud.
        Scope: webhooks:read
        """
        token = await self.auth.get_token()
        url = f"{self.base_url}/api/v2/webhooks/{webhook_id}"
        
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(
                    url,
                    headers={"Authorization": f"Bearer {token}"},
                    timeout=10.0
                )
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as e:
                if e.response.status_code == 404:
                    raise ValueError(f"Webhook {webhook_id} not found in Genesys")
                raise RuntimeError(f"Failed to fetch webhook config: {e.response.text}") from e

    async def update_webhook_status(self, webhook_id: str, active: bool) -> None:
        """
        Updates the active status of a webhook.
        Scope: webhooks:write
        """
        token = await self.auth.get_token()
        url = f"{self.base_url}/api/v2/webhooks/{webhook_id}"
        
        # Fetch current config first to preserve other settings
        current_config = await self.get_webhook_config(webhook_id)
        current_config["active"] = active

        async with httpx.AsyncClient() as client:
            try:
                response = await client.put(
                    url,
                    headers={
                        "Authorization": f"Bearer {token}",
                        "Content-Type": "application/json"
                    },
                    json=current_config,
                    timeout=10.0
                )
                response.raise_for_status()
            except httpx.HTTPStatusError as e:
                raise RuntimeError(f"Failed to update webhook status: {e.response.text}") from e

Now, the worker logic. It retrieves messages from SQS, attempts to resend the payload to the original target URL, and handles retries.

import time
import json

MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 10

async def retry_worker(auth: GenesysAuth, sqs_client: boto3.client, dlq_url: str):
    """
    Polls the DLQ and retries failed webhooks.
    """
    webhook_service = GenesysWebhookService(auth)
    
    logger.info("Starting retry worker...")
    
    while True:
        try:
            # Poll SQS
            response = sqs_client.receive_message(
                QueueUrl=dlq_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5
            )
            
            messages = response.get("Messages", [])
            if not messages:
                await asyncio.sleep(1)
                continue

            for msg in messages:
                message_body = json.loads(msg["Body"])
                receipt_handle = msg["ReceiptHandle"]
                webhook_id = message_body["webhook_id"]
                original_payload = message_body["original_payload"]
                retry_count = message_body.get("retry_count", 0)

                logger.info(f"Processing DLQ message for webhook {webhook_id}, retry count: {retry_count}")

                if retry_count >= MAX_RETRIES:
                    logger.warning(f"Max retries reached for webhook {webhook_id}. Deleting from DLQ.")
                    sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
                    continue

                # Get the target URL from Genesys Config
                try:
                    config = await webhook_service.get_webhook_config(webhook_id)
                    target_url = config.get("targetUrl")
                    if not target_url:
                        raise ValueError("No targetUrl found in webhook config")
                except Exception as e:
                    logger.error(f"Could not fetch config for {webhook_id}: {e}")
                    sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
                    continue

                # Attempt to resend
                success = await attempt_redelivery(target_url, original_payload)

                if success:
                    logger.info(f"Successfully redelivered webhook {webhook_id}")
                    # Delete from DLQ
                    sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)
                else:
                    logger.error(f"Redelivery failed for webhook {webhook_id}")
                    # Update retry count and put back in DLQ
                    message_body["retry_count"] = retry_count + 1
                    sqs_client.send_message(
                        QueueUrl=dlq_url,
                        MessageBody=json.dumps(message_body),
                        DelaySeconds=RETRY_DELAY_SECONDS
                    )

        except Exception as e:
            logger.error(f"Worker loop error: {e}")
            await asyncio.sleep(5)

async def attempt_redelivery(target_url: str, payload: dict) -> bool:
    """
    Attempts to send the webhook payload to the target URL.
    """
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await client.post(
                target_url,
                json=payload,
                headers={"Content-Type": "application/json"}
            )
            # Consider 2xx and 3xx as success
            if 200 <= response.status_code < 400:
                return True
            else:
                logger.warning(f"Redelivery returned status {response.status_code}")
                return False
        except httpx.RequestError as e:
            logger.error(f"Network error during redelivery: {e}")
            return False

Step 3: Orchestrating the Service

Combine the receiver and the worker into a single application entry point. In production, these would likely be separate microservices (one for receiving, one for processing), but for this tutorial, we run them concurrently using asyncio.

import asyncio
import os

async def main():
    # Initialize Auth
    auth = GenesysAuth(
        client_id=os.getenv("GENESYS_CLIENT_ID"),
        client_secret=os.getenv("GENESYS_CLIENT_SECRET")
    )

    # Start the Retry Worker in the background
    worker_task = asyncio.create_task(
        retry_worker(auth, sqs_client, DLQ_URL)
    )

    # Start the FastAPI Server
    import uvicorn
    config = uvicorn.Config(app, host="0.0.0.0", port=8000)
    server = uvicorn.Server(config)

    # Run both concurrently
    await server.serve()

if __name__ == "__main__":
    asyncio.run(main())

Complete Working Example

Below is the consolidated code structure. Save this as main.py.

import asyncio
import json
import logging
import os
import uuid
from datetime import datetime, timedelta
from typing import Optional

import boto3
import httpx
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from botocore.exceptions import ClientError

# --- Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_BASE_URL = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
SQS_DLQ_URL = os.getenv("SQS_DLQ_URL")

if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, SQS_DLQ_URL]):
    raise ValueError("Missing required environment variables: GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, SQS_DLQ_URL")

# --- Clients ---
sqs_client = boto3.client('sqs', region_name=AWS_REGION)

# --- Authentication ---
class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip('/')
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[datetime] = None

    async def get_token(self) -> str:
        if self.access_token and self.token_expiry and datetime.utcnow() < self.token_expiry:
            return self.access_token

        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    self.token_url,
                    headers={"Content-Type": "application/x-www-form-urlencoded"},
                    data={
                        "grant_type": "client_credentials",
                        "client_id": self.client_id,
                        "client_secret": self.client_secret
                    },
                    timeout=10.0
                )
                response.raise_for_status()
                data = response.json()
                self.access_token = data["access_token"]
                self.token_expiry = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 30)
                return self.access_token
            except httpx.HTTPStatusError as e:
                raise RuntimeError(f"OAuth Error: {e.response.text}") from e

# --- Webhook Service ---
class GenesysWebhookService:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth

    async def get_webhook_config(self, webhook_id: str) -> dict:
        token = await self.auth.get_token()
        url = f"{self.auth.base_url}/api/v2/webhooks/{webhook_id}"
        async with httpx.AsyncClient() as client:
            response = await client.get(url, headers={"Authorization": f"Bearer {token}"})
            response.raise_for_status()
            return response.json()

# --- FastAPI App ---
app = FastAPI(title="Genesys Webhook DLQ Handler")

@app.post("/webhook/genesys")
async def receive_webhook(request: Request):
    try:
        body = await request.json()
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON")

    webhook_id = body.get("id")
    event_type = body.get("eventType")
    
    # Fire and forget processing
    asyncio.create_task(process_webhook_async(body, webhook_id, event_type))
    
    return JSONResponse(content={"status": "received"}, status_code=200)

async def process_webhook_async(payload: dict, webhook_id: str, event_type: str):
    try:
        await simulate_business_logic(payload)
        logger.info(f"Processed {webhook_id}")
    except Exception as e:
        logger.error(f"Logic failed for {webhook_id}: {e}")
        await push_to_dlq(payload, webhook_id, event_type, str(e))

async def simulate_business_logic(payload: dict):
    import random
    if random.random() < 0.5:
        raise ConnectionError("Simulated downstream 5xx")

async def push_to_dlq(payload: dict, webhook_id: str, event_type: str, error_reason: str):
    dlq_message = {
        "original_payload": payload,
        "webhook_id": webhook_id,
        "event_type": event_type,
        "error_reason": error_reason,
        "failed_at": datetime.utcnow().isoformat(),
        "retry_count": 0
    }
    try:
        sqs_client.send_message(QueueUrl=SQS_DLQ_URL, MessageBody=json.dumps(dlq_message))
    except ClientError as e:
        logger.critical(f"DLQ Push Failed: {e}")

# --- Retry Worker ---
MAX_RETRIES = 3
RETRY_DELAY = 10

async def retry_worker(auth: GenesysAuth):
    webhook_service = GenesysWebhookService(auth)
    logger.info("Retry Worker Started")
    
    while True:
        try:
            response = sqs_client.receive_message(
                QueueUrl=SQS_DLQ_URL,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5
            )
            
            messages = response.get("Messages", [])
            if not messages:
                await asyncio.sleep(1)
                continue

            for msg in messages:
                body = json.loads(msg["Body"])
                receipt_handle = msg["ReceiptHandle"]
                webhook_id = body["webhook_id"]
                payload = body["original_payload"]
                retries = body.get("retry_count", 0)

                if retries >= MAX_RETRIES:
                    logger.warning(f"Max retries for {webhook_id}")
                    sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
                    continue

                try:
                    config = await webhook_service.get_webhook_config(webhook_id)
                    target_url = config.get("targetUrl")
                except Exception as e:
                    logger.error(f"Config fetch failed for {webhook_id}: {e}")
                    sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
                    continue

                success = await attempt_redelivery(target_url, payload)

                if success:
                    sqs_client.delete_message(QueueUrl=SQS_DLQ_URL, ReceiptHandle=receipt_handle)
                else:
                    body["retry_count"] = retries + 1
                    sqs_client.send_message(
                        QueueUrl=SQS_DLQ_URL,
                        MessageBody=json.dumps(body),
                        DelaySeconds=RETRY_DELAY
                    )

        except Exception as e:
            logger.error(f"Worker Error: {e}")
            await asyncio.sleep(5)

async def attempt_redelivery(url: str, payload: dict) -> bool:
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            resp = await client.post(url, json=payload)
            return 200 <= resp.status_code < 400
        except:
            return False

# --- Entry Point ---
async def main():
    auth = GenesysAuth(GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_BASE_URL)
    asyncio.create_task(retry_worker(auth))
    config = uvicorn.Config(app, host="0.0.0.0", port=8000)
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 403 Forbidden on Webhook API Calls

  • Cause: The OAuth token lacks the required scope.
  • Fix: Ensure your OAuth client in Genesys Cloud has webhooks:read and webhooks:write scopes assigned.
  • Code Check: Verify the grant_type is client_credentials and the client secret is correct.

Error: 404 Not Found on Webhook Config

  • Cause: The webhook_id in the DLQ message does not exist or was deleted.
  • Fix: The worker logs this and deletes the DLQ message. If this happens frequently, check if webhooks are being deleted manually after failure.

Error: SQS Queue Not Found

  • Cause: SQS_DLQ_URL environment variable points to a non-existent queue.
  • Fix: Create the SQS queue in AWS and ensure the IAM user/role running the Python script has sqs:SendMessage and sqs:ReceiveMessage permissions.

Error: 502 Bad Gateway from Genesys during initial receipt

  • Cause: Your FastAPI server is crashing or timing out before returning 200.
  • Fix: Ensure the receive_webhook endpoint returns 200 immediately. Do not perform heavy synchronous work in the endpoint. Use asyncio.create_task to offload work.

Official References