Handling Genesys Cloud Webhook 5xx Failures with a Local Dead Letter Queue
What You Will Build
- A Python microservice that acts as an intermediate receiver for Genesys Cloud webhooks, buffering payloads when the downstream target returns a 5xx error.
- Implementation of an exponential backoff retry mechanism with a configurable maximum attempt limit before moving the message to a dead letter queue (DLQ).
- A complete, runnable Python script using
requestsandsqlite3to simulate the DLQ storage and retry logic.
Prerequisites
- Platform: Genesys Cloud CX
- API Surface: Genesys Cloud Webhooks API (
/api/v2/webhooks) and standard HTTP POST delivery. - Language: Python 3.9+
- Dependencies:
requests: For HTTP communication.sqlite3: Standard library for local DLQ storage (simulates a persistent queue).fastapianduvicorn: For creating the local webhook endpoint (optional, but recommended for testing).python-dotenv: For managing environment variables.
Required OAuth Scopes:
webhook:read(to verify webhook configuration)webhook:write(if you need to update webhook status programmatically, though this tutorial focuses on the receiver side)
Note: This tutorial focuses on the receiver side of the webhook integration. Genesys Cloud handles initial retries (typically 3 attempts with exponential backoff) before marking a webhook as failed. This solution handles the “last mile” delivery when your downstream system is temporarily unavailable.
Authentication Setup
While the webhook payload itself does not contain OAuth tokens (it contains event data), your receiver service may need to authenticate back to Genesys Cloud to query conversation details or update entities. We will use a service account for this.
import os
import requests
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token: Optional[str] = None
self.expires_at: float = 0
def get_token(self) -> str:
"""
Retrieves an OAuth2 access token using the Client Credentials flow.
Implements basic caching to avoid requesting a new token on every call.
"""
import time
# Check if we have a valid cached token
if self.token and time.time() < self.expires_at:
return self.token
url = f"{self.base_url}/oauth/token"
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(url, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
# Genesys tokens typically last 3600 seconds. Subtract 60s for buffer.
self.expires_at = time.time() + token_data["expires_in"] - 60
return self.token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.status_code} {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error during authentication: {e}")
raise
Implementation
Step 1: Define the Dead Letter Queue Storage
We need a persistent store to hold messages that have failed delivery. For this tutorial, we will use SQLite. In production, you might use Redis, RabbitMQ, or AWS SQS.
import sqlite3
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List
class DLQStore:
def __init__(self, db_path: str = "dlq.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize the SQLite database and create the DLQ table if it does not exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
webhook_name TEXT NOT NULL,
payload TEXT NOT NULL,
headers TEXT NOT NULL,
error_message TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
next_retry_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending'
)
""")
conn.commit()
def add_to_dlq(self, webhook_name: str, payload: Dict, headers: Dict, error_msg: str, attempt_count: int) -> int:
"""
Adds a failed message to the DLQ.
Calculates next_retry_time based on exponential backoff.
"""
# Exponential backoff: 2^attempt seconds, capped at 1 hour (3600s)
backoff_seconds = min(2 ** attempt_count, 3600)
next_retry = datetime.utcnow() + timedelta(seconds=backoff_seconds)
headers_json = json.dumps(headers)
payload_json = json.dumps(payload)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
INSERT INTO dead_letter_queue
(webhook_name, payload, headers, error_message, attempt_count, next_retry_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'pending')
""", (webhook_name, payload_json, headers_json, error_msg, attempt_count, next_retry.isoformat()))
conn.commit()
return cursor.lastrowid
def get_pending_messages(self, max_attempts: int = 5) -> List[Dict]:
"""
Retrieves messages that are ready for retry.
Filters out messages that have exceeded max_attempts.
"""
now = datetime.utcnow().isoformat()
# First, mark messages that have exceeded max attempts as 'failed'
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE dead_letter_queue
SET status = 'failed'
WHERE status = 'pending' AND attempt_count >= ?
""", (max_attempts,))
conn.commit()
# Fetch pending messages that are ready for retry
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT id, webhook_name, payload, headers, error_message, attempt_count
FROM dead_letter_queue
WHERE status = 'pending' AND next_retry_time <= ?
ORDER BY next_retry_time ASC
LIMIT 100
""", (now,))
rows = cursor.fetchall()
results = []
for row in rows:
results.append({
"id": row[0],
"webhook_name": row[1],
"payload": json.loads(row[2]),
"headers": json.loads(row[3]),
"error_message": row[4],
"attempt_count": row[5]
})
return results
def update_message_status(self, message_id: int, status: str, error_msg: str = None, attempt_count: int = None):
"""Updates the status of a specific message in the DLQ."""
with sqlite3.connect(self.db_path) as conn:
if status == 'success':
conn.execute("""
UPDATE dead_letter_queue
SET status = 'success'
WHERE id = ?
""", (message_id,))
elif status == 'pending':
# Re-calculate next retry time for next attempt
backoff_seconds = min(2 ** attempt_count, 3600)
next_retry = datetime.utcnow() + timedelta(seconds=backoff_seconds)
conn.execute("""
UPDATE dead_letter_queue
SET status = 'pending', attempt_count = ?, next_retry_time = ?, error_message = ?
WHERE id = ?
""", (attempt_count, next_retry.isoformat(), error_msg, message_id))
conn.commit()
Step 2: Implement the Downstream Delivery Logic
This function attempts to send the webhook payload to your actual downstream system (e.g., a CRM, Data Warehouse, or internal API).
import requests
from typing import Dict, Any, Tuple
def deliver_to_downstream(payload: Dict, headers: Dict, target_url: str) -> Tuple[bool, str]:
"""
Attempts to deliver the payload to the downstream target.
Returns (success: bool, error_message: str).
"""
try:
# In a real scenario, you might need to sign the request or add specific auth headers
response = requests.post(
target_url,
json=payload,
headers=headers,
timeout=10 # Short timeout to avoid hanging the retry worker
)
# Success: 2xx status codes
if 200 <= response.status_code < 300:
return True, "Delivered successfully"
# Server Error: 5xx. This is what triggers the DLQ retry.
if 500 <= response.status_code < 600:
return False, f"Downstream server error: {response.status_code}"
# Client Error: 4xx. Usually indicates a bad payload or auth issue.
# Do not retry these automatically unless the error is transient (e.g., 429).
if response.status_code == 429:
return False, "Rate limited (429). Consider implementing custom backoff."
else:
return False, f"Client error: {response.status_code} - {response.text}"
except requests.exceptions.Timeout:
return False, "Downstream connection timed out"
except requests.exceptions.ConnectionError:
return False, "Downstream connection refused"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
Step 3: The Retry Worker Loop
This is the core logic that polls the DLQ, attempts delivery, and updates the status.
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebhookRetryWorker:
def __init__(self, dlq_store: DLQStore, downstream_url: str, max_attempts: int = 5, poll_interval: int = 10):
self.dlq_store = dlq_store
self.downstream_url = downstream_url
self.max_attempts = max_attempts
self.poll_interval = poll_interval
def run(self):
"""Main loop to process pending DLQ messages."""
logger.info(f"Starting Webhook Retry Worker. Target: {self.downstream_url}")
while True:
try:
pending_messages = self.dlq_store.get_pending_messages(self.max_attempts)
if not pending_messages:
logger.debug("No pending messages. Sleeping...")
time.sleep(self.poll_interval)
continue
logger.info(f"Processing {len(pending_messages)} pending messages.")
for msg in pending_messages:
self._process_message(msg)
except Exception as e:
logger.error(f"Critical error in worker loop: {e}")
time.sleep(self.poll_interval)
def _process_message(self, msg: Dict):
"""Processes a single message from the DLQ."""
msg_id = msg["id"]
attempt_count = msg["attempt_count"] + 1
webhook_name = msg["webhook_name"]
logger.info(f"Attempting delivery (Attempt {attempt_count}) for message ID {msg_id}")
# Attempt delivery
success, error_message = deliver_to_downstream(
payload=msg["payload"],
headers=msg["headers"],
target_url=self.downstream_url
)
if success:
logger.info(f"Successfully delivered message ID {msg_id}")
self.dlq_store.update_message_status(msg_id, status='success')
else:
logger.warning(f"Failed delivery for message ID {msg_id}: {error_message}")
# Check if we have exceeded max attempts
if attempt_count >= self.max_attempts:
logger.error(f"Max attempts reached for message ID {msg_id}. Marking as permanently failed.")
self.dlq_store.update_message_status(msg_id, status='failed', error_msg=error_message)
else:
# Update status to pending with new retry time
self.dlq_store.update_message_status(
msg_id,
status='pending',
error_msg=error_message,
attempt_count=attempt_count
)
Step 4: The Webhook Receiver Endpoint
This FastAPI endpoint receives the initial webhook from Genesys Cloud. If the downstream delivery fails immediately, it pushes the message to the DLQ.
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import json
app = FastAPI(title="Genesys Webhook Receiver with DLQ")
# Initialize components
DLQ = DLQStore("dlq.db")
WORKER = WebhookRetryWorker(DLQ, downstream_url="http://localhost:8080/api/webhook", max_attempts=5)
# Start the worker in a background thread (for demo purposes)
import threading
threading.Thread(target=WORKER.run, daemon=True).start()
class GenesysWebhookPayload(BaseModel):
"""
Simplified model for Genesys Webhook payload.
Actual payloads vary by event type (conversation, user, etc.).
"""
eventType: str
eventId: str
timestamp: str
data: dict
@app.post("/webhook/receive")
async def receive_webhook(request: Request):
"""
Receives the webhook from Genesys Cloud.
Genesys expects a 2xx response quickly.
"""
try:
body = await request.json()
headers = dict(request.headers)
# Extract webhook name from headers if available, else use a default
webhook_name = headers.get("x-genesys-cloud-webhook-name", "unknown")
# Attempt immediate delivery
success, error_msg = deliver_to_downstream(
payload=body,
headers=headers,
target_url=WORKER.downstream_url
)
if success:
return {"status": "delivered"}
else:
# If it fails, push to DLQ for retry
# We start with attempt_count = 1 because this is the first failure
DLQ.add_to_dlq(
webhook_name=webhook_name,
payload=body,
headers=headers,
error_msg=error_msg,
attempt_count=1
)
logger.info(f"Immediate delivery failed. Pushed to DLQ for retry. Error: {error_msg}")
return {"status": "queued_for_retry"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
except Exception as e:
logger.error(f"Error processing webhook: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
Complete Working Example
Below is the complete, consolidated Python script. Save this as main.py.
Prerequisites:
- Install dependencies:
pip install fastapi uvicorn requests python-dotenv - Create a mock downstream service (or modify
downstream_urlto a real endpoint). For testing, you can usehttpbin.org/postbut note that httpbin always returns 200. To test failure, you might need a local mock server that returns 500.
import os
import sys
import json
import time
import sqlite3
import logging
import threading
import requests
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
# --- Configuration ---
DOWNSTREAM_URL = os.getenv("DOWNSTREAM_URL", "http://localhost:9999/mock-endpoint")
MAX_ATTEMPTS = int(os.getenv("MAX_ATTEMPTS", "5"))
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "10"))
DB_PATH = os.getenv("DLQ_DB_PATH", "dlq.db")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- DLQ Storage Class ---
class DLQStore:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
webhook_name TEXT NOT NULL,
payload TEXT NOT NULL,
headers TEXT NOT NULL,
error_message TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
next_retry_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending'
)
""")
conn.commit()
def add_to_dlq(self, webhook_name: str, payload: Dict, headers: Dict, error_msg: str, attempt_count: int) -> int:
backoff_seconds = min(2 ** attempt_count, 3600)
next_retry = datetime.utcnow() + timedelta(seconds=backoff_seconds)
headers_json = json.dumps(headers)
payload_json = json.dumps(payload)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
INSERT INTO dead_letter_queue
(webhook_name, payload, headers, error_message, attempt_count, next_retry_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'pending')
""", (webhook_name, payload_json, headers_json, error_msg, attempt_count, next_retry.isoformat()))
conn.commit()
return cursor.lastrowid
def get_pending_messages(self, max_attempts: int) -> List[Dict]:
now = datetime.utcnow().isoformat()
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE dead_letter_queue
SET status = 'failed'
WHERE status = 'pending' AND attempt_count >= ?
""", (max_attempts,))
conn.commit()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT id, webhook_name, payload, headers, error_message, attempt_count
FROM dead_letter_queue
WHERE status = 'pending' AND next_retry_time <= ?
ORDER BY next_retry_time ASC
LIMIT 100
""", (now,))
rows = cursor.fetchall()
results = []
for row in rows:
results.append({
"id": row[0],
"webhook_name": row[1],
"payload": json.loads(row[2]),
"headers": json.loads(row[3]),
"error_message": row[4],
"attempt_count": row[5]
})
return results
def update_message_status(self, message_id: int, status: str, error_msg: str = None, attempt_count: int = None):
with sqlite3.connect(self.db_path) as conn:
if status == 'success':
conn.execute("UPDATE dead_letter_queue SET status = 'success' WHERE id = ?", (message_id,))
elif status == 'pending':
backoff_seconds = min(2 ** attempt_count, 3600)
next_retry = datetime.utcnow() + timedelta(seconds=backoff_seconds)
conn.execute("""
UPDATE dead_letter_queue
SET status = 'pending', attempt_count = ?, next_retry_time = ?, error_message = ?
WHERE id = ?
""", (attempt_count, next_retry.isoformat(), error_msg, message_id))
conn.commit()
# --- Delivery Logic ---
def deliver_to_downstream(payload: Dict, headers: Dict, target_url: str) -> Tuple[bool, str]:
try:
response = requests.post(
target_url,
json=payload,
headers=headers,
timeout=10
)
if 200 <= response.status_code < 300:
return True, "Delivered successfully"
if 500 <= response.status_code < 600:
return False, f"Downstream server error: {response.status_code}"
if response.status_code == 429:
return False, "Rate limited (429)"
return False, f"Client error: {response.status_code} - {response.text}"
except requests.exceptions.Timeout:
return False, "Downstream connection timed out"
except requests.exceptions.ConnectionError:
return False, "Downstream connection refused"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
# --- Retry Worker ---
class WebhookRetryWorker:
def __init__(self, dlq_store: DLQStore, downstream_url: str, max_attempts: int, poll_interval: int):
self.dlq_store = dlq_store
self.downstream_url = downstream_url
self.max_attempts = max_attempts
self.poll_interval = poll_interval
def run(self):
logger.info(f"Starting Webhook Retry Worker. Target: {self.downstream_url}")
while True:
try:
pending_messages = self.dlq_store.get_pending_messages(self.max_attempts)
if not pending_messages:
time.sleep(self.poll_interval)
continue
for msg in pending_messages:
self._process_message(msg)
except Exception as e:
logger.error(f"Critical error in worker loop: {e}")
time.sleep(self.poll_interval)
def _process_message(self, msg: Dict):
msg_id = msg["id"]
attempt_count = msg["attempt_count"] + 1
success, error_message = deliver_to_downstream(
payload=msg["payload"],
headers=msg["headers"],
target_url=self.downstream_url
)
if success:
logger.info(f"Successfully delivered message ID {msg_id}")
self.dlq_store.update_message_status(msg_id, status='success')
else:
logger.warning(f"Failed delivery for message ID {msg_id}: {error_message}")
if attempt_count >= self.max_attempts:
logger.error(f"Max attempts reached for message ID {msg_id}. Marking as permanently failed.")
self.dlq_store.update_message_status(msg_id, status='failed', error_msg=error_message)
else:
self.dlq_store.update_message_status(
msg_id,
status='pending',
error_msg=error_message,
attempt_count=attempt_count
)
# --- FastAPI App ---
app = FastAPI(title="Genesys Webhook Receiver with DLQ")
DLQ = DLQStore(DB_PATH)
WORKER = WebhookRetryWorker(DLQ, DOWNSTREAM_URL, MAX_ATTEMPTS, POLL_INTERVAL)
# Start worker in background thread
threading.Thread(target=WORKER.run, daemon=True).start()
@app.post("/webhook/receive")
async def receive_webhook(request: Request):
try:
body = await request.json()
headers = dict(request.headers)
webhook_name = headers.get("x-genesys-cloud-webhook-name", "unknown")
success, error_msg = deliver_to_downstream(
payload=body,
headers=headers,
target_url=WORKER.downstream_url
)
if success:
return {"status": "delivered"}
else:
DLQ.add_to_dlq(
webhook_name=webhook_name,
payload=body,
headers=headers,
error_msg=error_msg,
attempt_count=1
)
return {"status": "queued_for_retry"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
except Exception as e:
logger.error(f"Error processing webhook: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 429 Too Many Requests
What causes it: Your downstream system is rate-limiting your requests.
How to fix it: Genesys Cloud’s default retry logic does not handle 429s well if they persist. In your deliver_to_downstream function, detect 429s and implement a longer backoff or a “wait-and-retry” loop before pushing to the DLQ.
# Inside deliver_to_downstream
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
# Retry once immediately
response = requests.post(...)
if response.status_code == 200:
return True, "Delivered after 429 retry"
return False, "Persistent 429"
Error: SQLite Database Locked
What causes it: Multiple threads writing to the database simultaneously.
How to fix it: SQLite handles single-writer concurrency well, but high-throughput scenarios may lock. Use sqlite3.connect(..., timeout=10) to increase the wait time for the lock to release, or switch to a more robust queue like RabbitMQ or AWS SQS for production workloads.
Error: Webhook Payload Too Large
What causes it: Genesys Cloud webhooks can be large, especially for detailed conversation events.
How to fix it: Ensure your downstream system and the DLQ storage can handle the payload size. SQLite has a default page size that may need adjustment for very large JSON blobs. In the DLQ, consider storing a hash of the payload and keeping the full payload in a separate object store (like S3) if sizes exceed 1MB.