Handling 5xx Webhook Failures: Implementing a Dead Letter Queue with Genesys Cloud
What You Will Build
- You will build a Python service that intercepts failed Genesys Cloud webhooks, stores the payload in a persistent dead letter queue (DLQ) using SQLite, and implements an exponential backoff retry mechanism.
- You will use the Genesys Cloud Platform Client SDK for Python to validate webhook configurations and the
requestslibrary for custom HTTP handling. - The implementation uses Python 3.9+ with
sqlite3,requests, andpurecloudplatformclientv2.
Prerequisites
- OAuth Client Type: Service Account (Client Credentials Grant) with
webhook:readandwebhook:writescopes. - SDK Version:
purecloudplatformclientv2>= 150.0.0. - Language/Runtime: Python 3.9 or higher.
- External Dependencies:
pip install purecloudplatformclientv2pip install requestspip install python-dotenv(for managing secrets)
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials Grant for server-to-server communication. You must generate an access token before interacting with the Webhooks API or validating endpoint health.
The following code establishes a secure authentication context. It caches the token in memory and handles expiration by requesting a new token only when necessary.
import os
import time
from purecloudplatformclientv2 import Configuration, ApiClient
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.env = os.getenv("GENESYS_ENV", "mypurecloud.com")
self._token_cache = {}
self._token_expiry = 0
def get_token(self) -> str:
"""
Returns a valid OAuth access token.
Implements simple in-memory caching with a 5-minute safety buffer.
"""
now = time.time()
# Refresh if token is expired or will expire in the next 5 minutes
if now >= self._token_expiry - 300:
self._refresh_token()
return self._token_cache.get("access_token")
def _refresh_token(self):
"""
Fetches a new token from the Genesys Cloud OAuth endpoint.
"""
try:
url = f"https://login.{self.env}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self._token_cache = token_data
# Genesys tokens typically expire in 3600 seconds (1 hour)
self._token_expiry = now + token_data.get("expires_in", 3600)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to refresh OAuth token: {e}")
def get_api_client(self) -> ApiClient:
"""
Configures and returns the PureCloud Platform Client.
"""
config = Configuration()
config.host = f"https://api.{self.env}"
config.access_token = self.get_token()
# Configure the client to use our custom token refresh logic if needed
# Note: The SDK handles token refresh internally if configured correctly,
# but for explicit control in this tutorial, we inject the token manually.
api_client = ApiClient(config)
return api_client
# Initialize global auth instance
auth_service = GenesysAuth()
Implementation
Step 1: Define the Dead Letter Queue Structure
A Dead Letter Queue (DLQ) is a storage mechanism for messages that could not be processed. In this tutorial, we use SQLite for persistence. This ensures that if the retry service restarts, no failed webhooks are lost.
The schema must store the original payload, the target URL, the error status, and retry metadata.
import sqlite3
import json
import uuid
from datetime import datetime
DB_PATH = "dlq_webhooks.db"
def init_db():
"""
Creates the DLQ table if it does not exist.
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS failed_webhooks (
id TEXT PRIMARY KEY,
target_url TEXT NOT NULL,
payload TEXT NOT NULL,
error_status INTEGER,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
next_retry_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def enqueue_failed_webhook(target_url: str, payload: dict, status_code: int, error_msg: str):
"""
Adds a failed webhook payload to the DLQ.
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
webhook_id = str(uuid.uuid4())
# Calculate next retry time: Exponential backoff starting at 10 seconds
# Formula: 10 * (2 ^ retry_count) seconds
next_retry_timestamp = datetime.utcnow().isoformat()
cursor.execute("""
INSERT INTO failed_webhooks (id, target_url, payload, error_status, error_message, retry_count, next_retry_at)
VALUES (?, ?, ?, ?, ?, 0, ?)
""", (
webhook_id,
target_url,
json.dumps(payload),
status_code,
error_msg,
next_retry_timestamp
))
conn.commit()
conn.close()
return webhook_id
Step 2: Simulate and Capture 5xx Failures
In a production scenario, you would receive the webhook from Genesys Cloud. For this tutorial, we simulate the reception of a webhook and the subsequent failure when calling your downstream endpoint.
We assume you have an endpoint registered in Genesys Cloud. When Genesys sends a payload, your receiver attempts to forward it to a final business logic endpoint. If that endpoint returns a 5xx error, we capture it.
import requests
from purecloudplatformclientv2 import WebhooksApi
def receive_and_forward_webhook(payload: dict, target_downstream_url: str):
"""
Simulates receiving a webhook from Genesys and forwarding it.
If the downstream service returns 5xx, it enqueues the payload.
"""
headers = {
"Content-Type": "application/json",
"User-Agent": "GenesysWebhookHandler/1.0"
}
try:
# Attempt to forward the payload to the actual business logic endpoint
response = requests.post(target_downstream_url, json=payload, headers=headers, timeout=5)
if response.status_code >= 500:
# Server error: Enqueue for retry
print(f"Downstream server error {response.status_code}. Enqueuing for retry.")
enqueue_failed_webhook(
target_downstream_url,
payload,
response.status_code,
response.text
)
# Return 200 to Genesys immediately to acknowledge receipt
# This prevents Genesys from retrying, leaving the retry logic to our DLQ
return {"status": "accepted_for_retry"}
elif response.status_code == 200:
print("Webhook processed successfully.")
return {"status": "success"}
else:
# 4xx errors are usually client errors (bad data).
# Decide based on business logic if these should be retried.
# Typically, 4xx are NOT retried.
print(f"Client error {response.status_code}. Dropping payload.")
return {"status": "dropped"}
except requests.exceptions.RequestException as e:
# Network error, timeout, or connection refused
print(f"Network error forwarding webhook: {e}. Enqueuing for retry.")
enqueue_failed_webhook(
target_downstream_url,
payload,
503,
str(e)
)
return {"status": "accepted_for_retry"}
Step 3: Implement the Retry Worker with Exponential Backoff
The core of the DLQ system is the worker that pulls failed items, attempts delivery again, and updates their status. We implement exponential backoff to avoid hammering the failing downstream service.
Backoff Strategy:
- Attempt 0: Retry immediately (or after a short delay)
- Attempt 1: Wait 10 seconds
- Attempt 2: Wait 20 seconds
- Attempt 3: Wait 40 seconds
- Max Retries: 5
If the payload fails after the maximum retries, it is marked as dead and moved to a separate archival table or logged for manual inspection.
import time
import sqlite3
import json
MAX_RETRIES = 5
BASE_DELAY_SECONDS = 10
def calculate_backoff(retry_count: int) -> int:
"""
Calculates delay in seconds using exponential backoff.
"""
return BASE_DELAY_SECONDS * (2 ** retry_count)
def process_dlq_queue():
"""
Main worker loop to process failed webhooks.
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get current time to find items ready for retry
now = datetime.utcnow().isoformat()
# Select items where next_retry_at is in the past
cursor.execute("""
SELECT id, target_url, payload, retry_count, error_status
FROM failed_webhooks
WHERE next_retry_at <= ? AND retry_count < ?
""", (now, MAX_RETRIES))
pending_items = cursor.fetchall()
for item in pending_items:
webhook_id, target_url, payload_json, retry_count, _ = item
payload = json.loads(payload_json)
print(f"Retrying webhook {webhook_id} (Attempt {retry_count + 1}/{MAX_RETRIES})")
success = attempt_retry(target_url, payload)
if success:
# Remove from DLQ on success
cursor.execute("DELETE FROM failed_webhooks WHERE id = ?", (webhook_id,))
print(f"Webhook {webhook_id} delivered successfully.")
else:
# Update retry count and next retry time
new_retry_count = retry_count + 1
backoff_seconds = calculate_backoff(new_retry_count)
next_retry_time = datetime.utcnow() + timedelta(seconds=backoff_seconds)
cursor.execute("""
UPDATE failed_webhooks
SET retry_count = ?, next_retry_at = ?
WHERE id = ?
""", (new_retry_count, next_retry_time.isoformat(), webhook_id))
if new_retry_count >= MAX_RETRIES:
print(f"Webhook {webhook_id} exceeded max retries. Marked as dead.")
# In a production system, you would move this to a 'dead' table
# or send an alert to Slack/Email.
conn.commit()
conn.close()
def attempt_retry(target_url: str, payload: dict) -> bool:
"""
Attempts to deliver the payload to the target URL.
Returns True if successful, False otherwise.
"""
try:
response = requests.post(target_url, json=payload, timeout=5)
# Consider 2xx as success
return 200 <= response.status_code < 300
except requests.exceptions.RequestException:
return False
Step 4: Validate Webhook Configuration via SDK
Before relying on your DLQ, you should verify that the webhook is correctly configured in Genesys Cloud. Misconfigured webhooks (e.g., wrong event types) will never trigger, causing your DLQ to remain empty while issues persist.
We use the WebhooksApi to list and inspect webhooks.
def verify_webhook_config(webhook_id: str):
"""
Uses the Genesys Cloud SDK to fetch webhook details and validate configuration.
"""
api_client = auth_service.get_api_client()
webhooks_api = WebhooksApi(api_client)
try:
# Fetch the specific webhook
response = webhooks_api.post_webhooks_view_by_id(webhook_id)
print(f"Webhook ID: {response.id}")
print(f"Name: {response.name}")
print(f"Status: {response.status}")
print(f"Event Type: {response.event_type}")
print(f"Target URL: {response.target_url}")
# Check if the webhook is enabled
if response.status != "enabled":
print("WARNING: Webhook is not enabled. It will not trigger.")
# Validate target URL matches your expected endpoint
expected_url = os.getenv("EXPECTED_WEBHOOK_URL")
if response.target_url != expected_url:
print(f"WARNING: Target URL mismatch. Expected {expected_url}, got {response.target_url}")
return response
except ApiException as e:
print(f"Exception when calling WebhooksApi->post_webhooks_view_by_id: {e}")
if e.status == 401:
print("Authentication failed. Check your OAuth token.")
elif e.status == 404:
print(f"Webhook {webhook_id} not found.")
return None
Complete Working Example
This script combines authentication, DLQ management, and the retry worker into a single executable module. It includes a simple HTTP server to simulate the webhook receiver for testing purposes.
import os
import time
import sqlite3
import json
import uuid
import requests
from datetime import datetime, timedelta
from purecloudplatformclientv2 import Configuration, ApiClient, WebhooksApi
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
load_dotenv()
# --- Configuration ---
DB_PATH = "dlq_webhooks.db"
MAX_RETRIES = 5
BASE_DELAY_SECONDS = 10
PORT = 8080
# --- Authentication ---
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.env = os.getenv("GENESYS_ENV", "mypurecloud.com")
self._token_cache = {}
self._token_expiry = 0
def get_token(self) -> str:
now = time.time()
if now >= self._token_expiry - 300:
self._refresh_token()
return self._token_cache.get("access_token")
def _refresh_token(self):
try:
url = f"https://login.{self.env}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self._token_cache = token_data
self._token_expiry = time.time() + token_data.get("expires_in", 3600)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to refresh OAuth token: {e}")
def get_api_client(self) -> ApiClient:
config = Configuration()
config.host = f"https://api.{self.env}"
config.access_token = self.get_token()
return ApiClient(config)
auth_service = GenesysAuth()
# --- DLQ Database Functions ---
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS failed_webhooks (
id TEXT PRIMARY KEY,
target_url TEXT NOT NULL,
payload TEXT NOT NULL,
error_status INTEGER,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
next_retry_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def enqueue_failed_webhook(target_url: str, payload: dict, status_code: int, error_msg: str):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
webhook_id = str(uuid.uuid4())
next_retry_timestamp = datetime.utcnow().isoformat()
cursor.execute("""
INSERT INTO failed_webhooks (id, target_url, payload, error_status, error_message, retry_count, next_retry_at)
VALUES (?, ?, ?, ?, ?, 0, ?)
""", (webhook_id, target_url, json.dumps(payload), status_code, error_msg, next_retry_timestamp))
conn.commit()
conn.close()
return webhook_id
def process_dlq_queue():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
now = datetime.utcnow().isoformat()
cursor.execute("""
SELECT id, target_url, payload, retry_count, error_status
FROM failed_webhooks
WHERE next_retry_at <= ? AND retry_count < ?
""", (now, MAX_RETRIES))
pending_items = cursor.fetchall()
for item in pending_items:
webhook_id, target_url, payload_json, retry_count, _ = item
payload = json.loads(payload_json)
print(f"Retrying webhook {webhook_id} (Attempt {retry_count + 1}/{MAX_RETRIES})")
success = False
try:
response = requests.post(target_url, json=payload, timeout=5)
success = 200 <= response.status_code < 300
except requests.exceptions.RequestException:
success = False
if success:
cursor.execute("DELETE FROM failed_webhooks WHERE id = ?", (webhook_id,))
print(f"Webhook {webhook_id} delivered successfully.")
else:
new_retry_count = retry_count + 1
backoff_seconds = BASE_DELAY_SECONDS * (2 ** new_retry_count)
next_retry_time = datetime.utcnow() + timedelta(seconds=backoff_seconds)
cursor.execute("""
UPDATE failed_webhooks
SET retry_count = ?, next_retry_at = ?
WHERE id = ?
""", (new_retry_count, next_retry_time.isoformat(), webhook_id))
if new_retry_count >= MAX_RETRIES:
print(f"Webhook {webhook_id} exceeded max retries. Dead.")
conn.commit()
conn.close()
# --- HTTP Handler for Webhook Reception ---
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
payload = json.loads(post_data)
# Simulate forwarding to a downstream service that might fail
downstream_url = os.getenv("DOWNSTREAM_URL", "http://localhost:9090/log")
try:
# Simulate a 500 error for demonstration if flag is set
if os.getenv("SIMULATE_FAILURE") == "true":
raise Exception("Simulated Downstream 500 Error")
response = requests.post(downstream_url, json=payload, timeout=5)
if response.status_code >= 500:
enqueue_failed_webhook(downstream_url, payload, response.status_code, response.text)
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"status": "accepted_for_retry"}).encode())
elif response.status_code == 200:
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"status": "success"}).encode())
else:
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"status": "dropped"}).encode())
except Exception as e:
enqueue_failed_webhook(downstream_url, payload, 503, str(e))
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"status": "accepted_for_retry"}).encode())
def log_message(self, format, *args):
pass # Suppress default logging
def start_webhook_server():
server = HTTPServer(('0.0.0.0', PORT), WebhookHandler)
print(f"Webhook receiver listening on port {PORT}")
server.serve_forever()
def start_dlq_worker():
while True:
process_dlq_queue()
time.sleep(5) # Check queue every 5 seconds
if __name__ == "__main__":
init_db()
# Start webhook receiver in a thread
receiver_thread = threading.Thread(target=start_webhook_server, daemon=True)
receiver_thread.start()
# Start DLQ worker in a thread
worker_thread = threading.Thread(target=start_dlq_worker, daemon=True)
worker_thread.start()
print("System started. Send POST requests to http://localhost:8080")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
Common Errors & Debugging
Error: 401 Unauthorized on Webhooks API Call
- Cause: The OAuth token has expired or the client credentials are incorrect.
- Fix: Verify your
.envfile contains the correctGENESYS_CLIENT_IDandGENESYS_CLIENT_SECRET. Ensure the token refresh logic is triggered before the API call. In the code above,auth_service.get_token()handles this automatically.
Error: 429 Too Many Requests
- Cause: You are polling the DLQ or retrying webhooks too frequently, hitting Genesys or your downstream service rate limits.
- Fix: Implement exponential backoff. The code above uses
2 ^ retry_countdelays. If you are still hitting limits, increaseBASE_DELAY_SECONDSor add jitter (randomness) to the delay.
Error: SQLite Database is Locked
- Cause: Multiple threads are writing to the database simultaneously without proper locking.
- Fix: The
sqlite3module in Python is thread-safe for single-writer scenarios. If you scale to multiple processes, use a database with better concurrency like PostgreSQL or Redis. For this tutorial, the single-process SQLite implementation is sufficient.
Error: Webhook Payload Missing Required Fields
- Cause: The downstream service expects specific fields that Genesys Cloud did not send.
- Fix: Use the
verify_webhook_configfunction to inspect theevent_type. Ensure theevent_typematches the payload structure you expect. For example,routing:conversation:updatedpayloads differ significantly fromanalytics:report:ready.