Handling Webhook 5xx Failures with a Dead Letter Queue and Retry Logic

Handling Webhook 5xx Failures with a Dead Letter Queue and Retry Logic

What You Will Build

  • A robust event listener that intercepts Genesys Cloud CX webhook delivery failures.
  • A Python-based dead letter queue (DLQ) service using Amazon SQS to capture failed payloads for manual inspection or automated retry.
  • A retry mechanism that attempts to redeliver failed payloads to the original endpoint with exponential backoff.

Prerequisites

  • Genesys Cloud CX Account: A developer or admin account with access to the Admin Console.
  • AWS Account: Active account with permissions to create SQS queues and IAM roles.
  • Python Environment: Python 3.9+ installed.
  • Dependencies:
    • boto3 (AWS SDK for Python)
    • requests (HTTP library)
    • purecloudplatformclientv2 (Genesys Cloud CX Python SDK)
    • pydantic (Data validation)
  • IAM Role: An IAM role or user with sqs:SendMessage, sqs:ReceiveMessage, and sqs:DeleteMessage permissions.

Authentication Setup

To interact with the Genesys Cloud CX API, you must authenticate using OAuth 2.0 Client Credentials flow. This tutorial assumes you have already created a Public App in the Genesys Admin Console with the following scopes:

  • webhook:read (to inspect webhook configurations)
  • webhook:write (if you need to update webhook endpoints programmatically)
  • event:subscribe (if using EventStreams, though this tutorial focuses on standard Webhooks)

Generating the Access Token

Create a helper function to handle token acquisition. In production, cache this token and refresh it before expiration (typically 1 hour).

import requests
from typing import Optional

class GenesysAuth:
    def __init__(self, org_id: str, client_id: str, client_secret: str):
        self.org_id = org_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{org_id}.mypurecloud.com"
        self.token: Optional[str] = None

    def get_access_token(self) -> str:
        """
        Retrieves an OAuth2 access token using Client Credentials flow.
        Returns the token string.
        """
        if self.token:
            return self.token

        url = f"{self.base_url}/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "webhook:read webhook:write"
        }

        try:
            response = requests.post(url, headers=headers, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.token = token_data["access_token"]
            return self.token
        except requests.exceptions.HTTPError as e:
            print(f"Authentication failed: {e.response.text}")
            raise

        return self.token

Implementation

Step 1: Configure the Dead Letter Queue (SQS)

First, create the SQS queue in AWS. This queue acts as the buffer for failed webhook deliveries.

import boto3
from botocore.exceptions import ClientError

def create_dlq_queue(queue_name: str) -> str:
    """
    Creates an SQS queue if it does not exist.
    Returns the queue URL.
    """
    sqs = boto3.resource('sqs')
    try:
        queue = sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'VisibilityTimeout': '300',  # 5 minutes
                'MessageRetentionPeriod': '1209600',  # 14 days
                'ReceiveMessageWaitTimeSeconds': '20'  # Long polling
            }
        )
        print(f"Queue created: {queue.url}")
        return queue.url
    except ClientError as e:
        if e.response['Error']['Code'] == 'QueueAlreadyExists':
            queue = sqs.get_queue_by_name(QueueName=queue_name)
            print(f"Queue already exists: {queue.url}")
            return queue.url
        raise

Step 2: Capture and Store Failed Webhook Payloads

In this architecture, we assume a middleware service (e.g., an AWS Lambda or a lightweight Flask/FastAPI app) receives the initial webhook from Genesys Cloud. If the downstream target returns a 5xx error, the middleware pushes the payload to the DLQ.

For this tutorial, we will simulate the middleware logic that pushes to the DLQ.

import json
import uuid
from datetime import datetime
import boto3

sqs_client = boto3.client('sqs')

def push_to_dlq(queue_url: str, webhook_payload: dict, original_url: str, error_code: int) -> str:
    """
    Pushes a failed webhook payload to the SQS Dead Letter Queue.
    
    Args:
        queue_url: The URL of the SQS queue.
        webhook_payload: The original JSON payload from Genesys Cloud.
        original_url: The endpoint that failed to respond correctly.
        error_code: The HTTP status code returned by the original endpoint.
    
    Returns:
        The MessageId of the sent message.
    """
    message_body = {
        "id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "original_endpoint": original_url,
        "error_code": error_code,
        "payload": webhook_payload,
        "retry_count": 0
    }
    
    try:
        response = sqs_client.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message_body)
        )
        print(f"Message pushed to DLQ: {response['MessageId']}")
        return response['MessageId']
    except ClientError as e:
        print(f"Failed to push to DLQ: {e}")
        raise

Step 3: Implement the Retry Worker with Exponential Backoff

The core of this solution is a worker process that polls the DLQ, attempts to redeliver the message, and handles retries.

import time
import requests
import json
from datetime import datetime

class WebhookRetryWorker:
    def __init__(self, queue_url: str, max_retries: int = 5):
        self.queue_url = queue_url
        self.max_retries = max_retries
        self.sqs = boto3.client('sqs')

    def poll_queue(self) -> list:
        """
        Polls the SQS queue for messages.
        Uses long polling to reduce empty receives.
        """
        try:
            messages = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20
            )
            return messages.get('Messages', [])
        except ClientError as e:
            print(f"Error polling queue: {e}")
            return []

    def retry_delivery(self, message: dict) -> bool:
        """
        Attempts to redeliver the webhook payload to the original endpoint.
        
        Args:
            message: The message structure from SQS.
            
        Returns:
            True if delivery was successful, False otherwise.
        """
        body = json.loads(message['Body'])
        original_url = body['original_endpoint']
        payload = body['payload']
        retry_count = body.get('retry_count', 0)

        if retry_count >= self.max_retries:
            print(f"Max retries reached for message ID: {body['id']}")
            # Optionally move to a permanent archive or alert
            return False

        # Exponential Backoff: 2^retry_count seconds
        backoff_time = 2 ** retry_count
        print(f"Waiting {backoff_time} seconds before retry {retry_count + 1}...")
        time.sleep(backoff_time)

        try:
            # Send the payload back to the original endpoint
            response = requests.post(
                original_url,
                json=payload,
                headers={
                    'Content-Type': 'application/json',
                    'X-Webhook-Source': 'GenesysCloud-Retry'
                },
                timeout=10
            )
            
            # Check if the response is successful (2xx)
            if 200 <= response.status_code < 300:
                print(f"Retry successful for message ID: {body['id']}")
                return True
            else:
                print(f"Retry failed with status {response.status_code} for message ID: {body['id']}")
                return False

        except requests.exceptions.RequestException as e:
            print(f"Network error during retry: {e}")
            return False

    def process_message(self, message: dict):
        """
        Processes a single message: retries delivery and deletes from queue if successful.
        """
        success = self.retry_delivery(message)
        
        if success:
            # Delete the message from the queue
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
            print(f"Message deleted from DLQ: {message['MessageId']}")
        else:
            # If retry failed, we do NOT delete it. 
            # It will become visible again after the VisibilityTimeout.
            # To implement proper retry counting, we should update the message body 
            # with the new retry count, but SQS doesn't allow easy in-place updates.
            # A common pattern is to send a new message with incremented retry_count 
            # and delete the old one, or use a separate database for state.
            # For this simple example, we let it re-queue naturally.
            print(f"Message re-queued for next visibility window: {message['MessageId']}")

    def run(self):
        """
        Main loop to process messages continuously.
        """
        print("Starting Webhook Retry Worker...")
        while True:
            messages = self.poll_queue()
            for message in messages:
                try:
                    self.process_message(message)
                except Exception as e:
                    print(f"Unexpected error processing message: {e}")
            time.sleep(5)  # Small delay between poll cycles

Step 4: Validating Webhook Configurations in Genesys Cloud

Before deploying the retry worker, ensure your webhooks are correctly configured in Genesys Cloud. You can use the SDK to list and validate webhook endpoints.

from purecloudplatformclientv2 import WebhooksApi, Configuration, ApiClient
from purecloudplatformclientv2.rest import ApiException

def validate_webhooks(org_id: str, client_id: str, client_secret: str):
    """
    Lists all webhooks and prints their status and endpoint.
    """
    auth = GenesysAuth(org_id, client_id, client_secret)
    token = auth.get_access_token()

    configuration = Configuration(
        host=f"https://{org_id}.mypurecloud.com",
        access_token=token
    )

    with ApiClient(configuration) as api_client:
        webhooks_api = WebhooksApi(api_client)
        
        try:
            # Get all webhooks
            result = webhooks_api.get_platform_webhooks()
            
            print("Current Webhook Configurations:")
            print("-" * 80)
            
            for webhook in result.entities:
                print(f"ID: {webhook.id}")
                print(f"Name: {webhook.name}")
                print(f"Endpoint: {webhook.endpoint}")
                print(f"Events: {', '.join(webhook.events)}")
                print(f"Status: {webhook.status}")
                print("-" * 80)
                
        except ApiException as e:
            print(f"Exception when calling WebhooksApi->get_platform_webhooks: {e}")
            if e.status == 401:
                print("Authentication failed. Check your credentials.")
            elif e.status == 403:
                print("Forbidden. Check your OAuth scopes.")

Complete Working Example

Below is the complete, integrated script. It combines authentication, DLQ setup, and the retry worker. Save this as main.py.

import os
import sys
import json
import time
import requests
import boto3
from botocore.exceptions import ClientError
from typing import Optional

# --- Configuration ---
ORG_ID = os.getenv("GENESYS_ORG_ID", "your_org_id")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
DLQ_QUEUE_NAME = "gen-webhook-dlq"
MAX_RETRIES = 5

# --- Authentication Module ---

class GenesysAuth:
    def __init__(self, org_id: str, client_id: str, client_secret: str):
        self.org_id = org_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{org_id}.mypurecloud.com"
        self.token: Optional[str] = None

    def get_access_token(self) -> str:
        if self.token:
            return self.token
        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "webhook:read webhook:write"
        }
        try:
            response = requests.post(url, headers=headers, data=data)
            response.raise_for_status()
            self.token = response.json()["access_token"]
            return self.token
        except requests.exceptions.HTTPError as e:
            print(f"Auth Error: {e.response.text}")
            raise

# --- SQS DLQ Module ---

def setup_dlq(queue_name: str) -> str:
    sqs = boto3.resource('sqs')
    try:
        queue = sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'VisibilityTimeout': '300',
                'MessageRetentionPeriod': '1209600',
                'ReceiveMessageWaitTimeSeconds': '20'
            }
        )
        print(f"DLQ Created: {queue.url}")
        return queue.url
    except ClientError as e:
        if e.response['Error']['Code'] == 'QueueAlreadyExists':
            queue = sqs.get_queue_by_name(QueueName=queue_name)
            print(f"DLQ Exists: {queue.url}")
            return queue.url
        raise

def push_to_dlq(queue_url: str, payload: dict, endpoint: str, error_code: int):
    sqs_client = boto3.client('sqs')
    message_body = {
        "id": str(uuid.uuid4()),
        "timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
        "original_endpoint": endpoint,
        "error_code": error_code,
        "payload": payload,
        "retry_count": 0
    }
    try:
        sqs_client.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message_body)
        )
        print("Payload pushed to DLQ.")
    except ClientError as e:
        print(f"Failed to push to DLQ: {e}")

# --- Retry Worker Module ---

class RetryWorker:
    def __init__(self, queue_url: str, max_retries: int):
        self.queue_url = queue_url
        self.max_retries = max_retries
        self.sqs = boto3.client('sqs')

    def run(self):
        print("Starting Retry Worker...")
        while True:
            messages = self._poll()
            for msg in messages:
                self._process(msg)
            time.sleep(5)

    def _poll(self) -> list:
        try:
            resp = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20
            )
            return resp.get('Messages', [])
        except ClientError as e:
            print(f"Poll Error: {e}")
            return []

    def _process(self, message: dict):
        try:
            body = json.loads(message['Body'])
            if self._retry_delivery(body):
                self.sqs.delete_message(
                    QueueUrl=self.queue_url,
                    ReceiptHandle=message['ReceiptHandle']
                )
                print(f"Deleted: {message['MessageId']}")
            else:
                print(f"Re-queueing: {message['MessageId']}")
        except Exception as e:
            print(f"Process Error: {e}")

    def _retry_delivery(self, body: dict) -> bool:
        retry_count = body.get('retry_count', 0)
        if retry_count >= self.max_retries:
            print("Max retries exceeded.")
            return False
        
        backoff = 2 ** retry_count
        print(f"Waiting {backoff}s...")
        time.sleep(backoff)
        
        try:
            resp = requests.post(
                body['original_endpoint'],
                json=body['payload'],
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            return 200 <= resp.status_code < 300
        except Exception as e:
            print(f"Request Error: {e}")
            return False

if __name__ == "__main__":
    import uuid
    
    # 1. Setup DLQ
    queue_url = setup_dlq(DLQ_QUEUE_NAME)
    
    # 2. Validate Webhooks (Optional Check)
    try:
        auth = GenesysAuth(ORG_ID, CLIENT_ID, CLIENT_SECRET)
        token = auth.get_access_token()
        print("Authentication successful.")
    except Exception as e:
        print("Authentication failed. Check credentials.")
        sys.exit(1)

    # 3. Start Worker
    worker = RetryWorker(queue_url, MAX_RETRIES)
    worker.run()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or the client credentials are invalid.
  • Fix: Ensure the GenesysAuth class refreshes the token. In the RetryWorker, if you are using the Genesys SDK for other operations, ensure the ApiClient is initialized with a fresh token. For the retry worker itself, 401 errors usually come from the target endpoint, not Genesys. If the target returns 401, the retry logic will keep failing. You may need to implement logic to detect 401s and skip retries or alert admins.

Error: 429 Too Many Requests

  • Cause: The target endpoint is rate-limiting your retry attempts.
  • Fix: Increase the exponential backoff base. Instead of 2 ** retry_count, use 5 ** retry_count or add a jitter. Also, check if the target API has a specific rate limit header (e.g., X-RateLimit-Reset) and respect it.

Error: SQS Visibility Timeout Expired

  • Cause: The retry logic took longer than the queue’s VisibilityTimeout (300 seconds in the example).
  • Fix: If processing is slow, extend the VisibilityTimeout in the SQS queue settings. Alternatively, use change_message_visibility to extend the timeout during long-running retries.

Error: Payload Too Large

  • Cause: Genesys Cloud webhooks can contain large payloads (e.g., detailed conversation analytics). SQS has a 256KB message size limit.
  • Fix: If payloads exceed 256KB, do not store the full payload in SQS. Instead, store the payload in Amazon S3 or a database, and push only the reference (URL/ID) and metadata to SQS. The retry worker then fetches the payload from S3 before redelivery.

Official References