Handling Genesys Cloud Webhook 5xx Failures with a Python Dead Letter Queue

Handling Genesys Cloud Webhook 5xx Failures with a Python Dead Letter Queue

What You Will Build

  • You will build a Python service that intercepts failed Genesys Cloud webhooks, stores the payload in a persistent dead letter queue, and implements a retry mechanism.
  • This solution uses the Genesys Cloud REST API for webhook configuration and standard Python libraries for HTTP handling and persistence.
  • The tutorial covers Python 3.9+ with requests, sqlite3, and threading for concurrent retry processing.

Prerequisites

  • OAuth Client Type: Service Account with webhooks:read and webhooks:write scopes.
  • SDK/API Version: Genesys Cloud REST API v2.
  • Language/Runtime: Python 3.9 or higher.
  • External Dependencies:
    • requests (for HTTP interactions)
    • sqlite3 (built-in, for the dead letter queue)
    • threading (built-in, for background retry workers)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for API authentication. For server-to-server integrations like webhook management, a Service Account is the standard approach. You must obtain a client ID and client secret from the Genesys Cloud Admin Console under Applications > Integrations.

The following code demonstrates how to acquire an access token and refresh it when it expires. This token is required for all subsequent API calls to manage webhooks.

import requests
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    def get_token(self) -> str:
        """
        Retrieves an OAuth access token. Caches the token until it expires.
        """
        current_time = time.time()
        
        # Return cached token if valid
        if self.access_token and current_time < self.token_expiry:
            return self.access_token

        token_url = f"https://api.{self.environment}/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(token_url, data=payload)
            response.raise_for_status()
            data = response.json()
            
            self.access_token = data["access_token"]
            # Tokens typically last 3600 seconds. Subtract 60s as a safety buffer.
            self.token_expiry = current_time + data["expires_in"] - 60
            
            return self.access_token
        except requests.exceptions.RequestException as e:
            raise RuntimeError(f"Failed to acquire OAuth token: {e}")

    def get_headers(self) -> dict:
        """
        Returns standard headers for Genesys Cloud API requests.
        """
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

Implementation

Step 1: Define the Dead Letter Queue Structure

A dead letter queue (DLQ) must persist data across restarts. We will use SQLite for simplicity, as it requires no external database server. The schema needs to store the original webhook payload, the target URL, the number of retry attempts, and the timestamp of the last failure.

import sqlite3
import json
import time
from typing import List, Dict, Any

class DeadLetterQueue:
    def __init__(self, db_path: str = "dlq.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """
        Creates the DLQ table if it does not exist.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS failed_webhooks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                target_url TEXT NOT NULL,
                payload TEXT NOT NULL,
                headers TEXT,
                retry_count INTEGER DEFAULT 0,
                last_error TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
        conn.close()

    def add_failure(self, target_url: str, payload: str, headers: dict, error_msg: str):
        """
        Adds a failed webhook attempt to the queue.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO failed_webhooks (target_url, payload, headers, retry_count, last_error)
            VALUES (?, ?, ?, 0, ?)
        """, (target_url, payload, json.dumps(headers), error_msg))
        conn.commit()
        conn.close()

    def get_pending_retries(self, limit: int = 10) -> List[Dict[str, Any]]:
        """
        Retrieves items from the DLQ that have not exceeded the max retry limit.
        """
        MAX_RETRIES = 5
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        
        # Select items with retry_count < MAX_RETRIES
        cursor.execute("""
            SELECT * FROM failed_webhooks 
            WHERE retry_count < ? 
            ORDER BY created_at ASC 
            LIMIT ?
        """, (MAX_RETRIES, limit))
        
        rows = cursor.fetchall()
        results = [dict(row) for row in rows]
        conn.close()
        return results

    def update_retry_status(self, item_id: int, success: bool, error_msg: str = ""):
        """
        Updates the status of a specific item in the DLQ.
        If success is True, the item is deleted.
        If success is False, the retry_count is incremented.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        if success:
            cursor.execute("DELETE FROM failed_webhooks WHERE id = ?", (item_id,))
        else:
            cursor.execute("""
                UPDATE failed_webhooks 
                SET retry_count = retry_count + 1, 
                    last_error = ?, 
                    updated_at = CURRENT_TIMESTAMP 
                WHERE id = ?
            """, (error_msg, item_id))
        
        conn.commit()
        conn.close()

Step 2: Implement the Webhook Receiver and Failure Handler

This component acts as the initial endpoint that Genesys Cloud calls. When your downstream system returns a 5xx error, this receiver catches the exception and pushes the payload into the DLQ.

from flask import Flask, request, jsonify
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)
dlq = DeadLetterQueue("dlq.db")

@app.route("/webhook/genesys", methods=["POST"])
def receive_webhook():
    """
    Endpoint for Genesys Cloud to call.
    Attempts to forward the payload to the internal processing system.
    On failure, adds to DLQ.
    """
    try:
        # 1. Validate incoming request
        if not request.is_json:
            return jsonify({"error": "Content-Type must be application/json"}), 400

        payload = request.get_json()
        target_url = "https://your-internal-system.com/process-event"
        
        # 2. Attempt to forward to internal system
        # Note: In production, use a dedicated HTTP client with timeouts
        response = requests.post(
            target_url, 
            json=payload, 
            headers={"Content-Type": "application/json"},
            timeout=5
        )
        
        # 3. Check for server errors (5xx)
        if response.status_code >= 500:
            raise Exception(f"Internal system returned {response.status_code}: {response.text}")
        
        # If success (2xx), return 200 to Genesys immediately
        # Genesys will consider this delivered and stop retrying
        return jsonify({"status": "accepted"}), 200

    except Exception as e:
        logger.error(f"Webhook processing failed: {str(e)}")
        
        # 4. Add to Dead Letter Queue
        try:
            dlq.add_failure(
                target_url=target_url,
                payload=json.dumps(payload),
                headers=dict(request.headers),
                error_msg=str(e)
            )
            # Return 200 to Genesys to stop their native retry mechanism
            # We are now responsible for retries via the DLQ
            return jsonify({"status": "queued_for_retry"}), 200
        except Exception as dlq_error:
            logger.critical(f"Failed to add to DLQ: {str(dlq_error)}")
            # If DLQ fails, return 500 so Genesys retries its own mechanism
            return jsonify({"error": "DLQ failure"}), 500

Key Logic Explanation:

  • Immediate Acknowledgment: When a 5xx error occurs in your internal system, you must return a 200 OK to Genesys Cloud. If you return a 5xx to Genesys, Genesys will retry the delivery using its own exponential backoff strategy. By returning 200, you signal that the message was received, allowing you to take control of the retry logic.
  • Payload Persistence: The entire JSON payload is stored in the DLQ. This ensures that if the internal system is down for hours, the event data is not lost.

Step 3: Implement the Retry Worker

This background process polls the DLQ, attempts to redeliver the payloads, and updates the status. It implements exponential backoff between retries to avoid overwhelming the internal system.

import threading
import time
import json
import requests

class RetryWorker:
    def __init__(self, dlq: DeadLetterQueue):
        self.dlq = dlq
        self.running = False
        self.thread = None

    def start(self):
        """
        Starts the background retry worker thread.
        """
        self.running = True
        self.thread = threading.Thread(target=self._run_worker)
        self.thread.daemon = True
        self.thread.start()
        logger.info("Retry worker started.")

    def stop(self):
        """
        Stops the background retry worker.
        """
        self.running = False
        if self.thread:
            self.thread.join()
        logger.info("Retry worker stopped.")

    def _run_worker(self):
        """
        Main loop for the retry worker.
        """
        while self.running:
            try:
                # 1. Get pending items
                pending_items = self.dlq.get_pending_retries(limit=10)
                
                if not pending_items:
                    time.sleep(10) # No items, wait before checking again
                    continue

                # 2. Process each item
                for item in pending_items:
                    self._process_item(item)

            except Exception as e:
                logger.error(f"Worker loop error: {str(e)}")
                time.sleep(5)

    def _process_item(self, item: dict):
        """
        Attempts to redeliver a single item from the DLQ.
        """
        item_id = item["id"]
        target_url = item["target_url"]
        payload = json.loads(item["payload"])
        retry_count = item["retry_count"]

        # Calculate backoff: 2^retry_count seconds, max 60 seconds
        backoff = min(2 ** retry_count, 60)
        
        # If this is the first retry, wait the backoff period
        # Note: In a real production system, you would check the timestamp 
        # of the last attempt to ensure the backoff has elapsed.
        if retry_count > 0:
            logger.info(f"Waiting {backoff}s before retrying item {item_id}...")
            time.sleep(backoff)

        try:
            # Attempt redelivery
            response = requests.post(
                target_url,
                json=payload,
                headers={"Content-Type": "application/json"},
                timeout=5
            )

            if response.status_code < 500:
                # Success (2xx, 3xx, 4xx)
                # Note: 4xx might indicate a permanent failure. 
                # You may want to move 4xx errors to a separate 'failed' table.
                logger.info(f"Item {item_id} delivered successfully after {retry_count} retries.")
                self.dlq.update_retry_status(item_id, success=True)
            else:
                # 5xx Error again
                logger.warning(f"Item {item_id} retry failed with {response.status_code}")
                self.dlq.update_retry_status(item_id, success=False, error_msg=f"HTTP {response.status_code}")

        except requests.exceptions.RequestException as e:
            logger.error(f"Item {item_id} retry failed due to network error: {str(e)}")
            self.dlq.update_retry_status(item_id, success=False, error_msg=str(e))

Step 4: Configure Genesys Cloud Webhook via API

To ensure your webhook is configured correctly to work with this DLQ pattern, you can use the Genesys Cloud API to create or update the webhook. This step is optional if you configure the webhook via the Admin Console, but it demonstrates API usage.

def create_webhook(auth: GenesysAuth, name: str, url: str, events: list):
    """
    Creates a webhook in Genesys Cloud using the API.
    """
    api_url = f"https://api.{auth.environment}/api/v2/webhooks/webhooks"
    headers = auth.get_headers()

    payload = {
        "name": name,
        "description": "Webhook with DLQ retry logic",
        "uri": url,
        "enabled": True,
        "type": "web",
        "eventFilters": events,
        "securityToken": None, # Optional: Add a token for validation
        "headers": {
            "Content-Type": "application/json"
        }
    }

    try:
        response = requests.post(api_url, json=payload, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        logger.error(f"Failed to create webhook: {e.response.text}")
        raise

Complete Working Example

The following script combines all components into a runnable application. It starts the Flask server for receiving webhooks and launches the background retry worker.

import os
import sys
from flask import Flask
import logging
from auth import GenesysAuth
from dlq import DeadLetterQueue
from worker import RetryWorker

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def main():
    # 1. Initialize Components
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")

    auth = GenesysAuth(client_id, client_secret)
    dlq = DeadLetterQueue("dlq.db")
    worker = RetryWorker(dlq)

    # 2. Start Retry Worker
    worker.start()

    # 3. Setup Flask App (using the route from Step 2)
    app = Flask(__name__)

    @app.route("/webhook/genesys", methods=["POST"])
    def receive_webhook():
        try:
            if not request.is_json:
                return jsonify({"error": "Content-Type must be application/json"}), 400

            payload = request.get_json()
            target_url = "https://your-internal-system.com/process-event"
            
            response = requests.post(
                target_url, 
                json=payload, 
                headers={"Content-Type": "application/json"},
                timeout=5
            )
            
            if response.status_code >= 500:
                raise Exception(f"Internal system returned {response.status_code}: {response.text}")
            
            return jsonify({"status": "accepted"}), 200

        except Exception as e:
            logger.error(f"Webhook processing failed: {str(e)}")
            
            try:
                dlq.add_failure(
                    target_url=target_url,
                    payload=json.dumps(payload),
                    headers=dict(request.headers),
                    error_msg=str(e)
                )
                return jsonify({"status": "queued_for_retry"}), 200
            except Exception as dlq_error:
                logger.critical(f"Failed to add to DLQ: {str(dlq_error)}")
                return jsonify({"error": "DLQ failure"}), 500

    # 4. Run Flask App
    # In production, use a WSGI server like Gunicorn
    if __name__ == "__main__":
        try:
            app.run(host="0.0.0.0", port=5000)
        except KeyboardInterrupt:
            worker.stop()
            print("Shutting down...")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized on Webhook Creation

  • Cause: The OAuth token is expired or the Service Account lacks the webhooks:write scope.
  • Fix: Ensure the Service Account has the correct scopes in the Genesys Cloud Admin Console. Verify the get_token() method is being called before the API request.

Error: 500 Internal Server Error from Genesys Cloud

  • Cause: Your endpoint returned a 5xx status code to Genesys Cloud.
  • Fix: If your internal system is down, your webhook receiver must return 200 OK and store the payload in the DLQ. Returning 500 triggers Genesys Cloud’s native retry mechanism, which may conflict with your custom DLQ logic.

Error: Database is locked in SQLite

  • Cause: Multiple threads are trying to write to the SQLite database simultaneously.
  • Fix: Use check_same_thread=False in sqlite3.connect() or implement a thread-safe queue wrapper. For high-volume webhooks, consider using a dedicated message broker like RabbitMQ or AWS SQS instead of SQLite.

Error: Payload Too Large

  • Cause: The webhook payload exceeds the storage limit of your DLQ or the request body limit of your web server.
  • Fix: Increase the MAX_CONTENT_LENGTH in Flask. Ensure your SQLite database file has sufficient disk space.

Official References