Implementing a Dead Letter Queue for Genesys Cloud Webhook 5xx Failures
What You Will Build
- A Python service that intercepts failed Genesys Cloud webhooks, stores them in a persistent Dead Letter Queue (DLQ), and implements an exponential backoff retry mechanism.
- This solution uses the Genesys Cloud Webhooks API (
/api/v2/webhooks) and standard HTTP libraries. - The programming language covered is Python 3.9+ using
requests,sqlalchemy, andschedule.
Prerequisites
- OAuth Client Type: Service Account (Client Credentials Grant).
- Required Scopes:
webhook:read,webhook:write,integration:read. - SDK/API Version: Genesys Cloud API v2.
- Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests(HTTP client)sqlalchemy(Database ORM for DLQ persistence)schedule(Job scheduler for retries)sqlite3(Standard library database engine for this example)
Authentication Setup
To interact with the Genesys Cloud Webhooks API, you must obtain a valid OAuth 2.0 access token. This tutorial uses the Client Credentials flow, which is standard for backend services that operate without user interaction.
The following class handles token acquisition, caching, and automatic refresh before expiration. Genesys Cloud tokens typically expire in 15 minutes (900 seconds). This implementation caches the token to avoid unnecessary network calls.
import requests
import time
import json
from typing import Optional
class GenesysAuth:
def __init__(self, environment: str, client_id: str, client_secret: str):
self.environment = environment
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"https://{environment}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
"""
Returns a valid OAuth access token.
Refreshes the token if it has expired or will expire within 60 seconds.
"""
current_time = time.time()
# Return cached token if valid and not expiring soon
if self.access_token and current_time < (self.token_expiry - 60):
return self.access_token
# Fetch new token
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Genesys tokens are valid for 900 seconds (15 minutes)
self.token_expiry = current_time + 900
return self.access_token
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Invalid Client ID or Secret provided.") from e
raise Exception(f"Failed to acquire token: {response.text}") from e
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during token acquisition: {str(e)}") from e
Implementation
Step 1: Defining the Dead Letter Queue Schema
A robust DLQ must persist failed payloads to ensure no data is lost during service restarts or crashes. We use SQLAlchemy with SQLite for simplicity, but this schema maps directly to PostgreSQL or MySQL.
The DLQ record stores:
- The original webhook payload (JSON).
- The target endpoint URL.
- The HTTP status code that caused the failure.
- The number of retry attempts.
- The next scheduled retry timestamp.
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime, timezone
import uuid
Base = declarative_base()
class FailedWebhook(Base):
__tablename__ = 'failed_webhooks'
id = Column(Integer, primary_key=True, autoincrement=True)
external_id = Column(String, unique=True, default=lambda: str(uuid.uuid4()))
target_url = Column(String, nullable=False)
original_payload = Column(Text, nullable=False)
http_status_code = Column(Integer, nullable=False)
error_message = Column(String, nullable=True)
retry_count = Column(Integer, default=0)
next_retry_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))
created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))
is_resolved = Column(Integer, default=0) # 0 = pending, 1 = resolved
# Setup Database Engine
DB_URL = "sqlite:///dlq.db"
engine = create_engine(DB_URL, echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Step 2: Capturing the Failed Webhook
When a third-party system sends a webhook to your endpoint, your server processes it. If the downstream system (or your own processing logic) returns a 5xx error, you must capture that failure immediately.
In this scenario, assume your application receives a webhook from Genesys Cloud. Your application attempts to forward this data to an internal analytics engine. If the analytics engine returns 503 Service Unavailable, you push the original Genesys payload into the DLQ.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
@app.post("/webhook/genesys/incoming")
async def receive_genesys_webhook(request: Request):
"""
Endpoint that receives webhooks from Genesys Cloud.
"""
try:
payload = await request.json()
# Simulate forwarding to an internal system that might fail with 5xx
# In production, this is an actual HTTP call to your backend service
internal_response = await forward_to_analytics(payload)
if internal_response.status_code >= 500:
# Log the failure and push to DLQ
await push_to_dlq(payload, "/internal/analytics", internal_response.status_code)
# Return 200 to Genesys to acknowledge receipt and stop Genesys from retrying
# Genesys will not retry if it receives a 2xx. The retry logic is now handled by our DLQ.
return JSONResponse(content={"status": "received_but_failed_downstream"}, status_code=200)
return JSONResponse(content={"status": "processed"}, status_code=200)
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
return JSONResponse(content={"error": "Internal Server Error"}, status_code=500)
async def forward_to_analytics(payload: dict):
"""
Simulates an HTTP request to an internal service.
"""
# This is a placeholder for actual requests.post() call
pass
async def push_to_dlq(payload: dict, target_url: str, status_code: int):
"""
Persists the failed webhook to the database.
"""
db = SessionLocal()
try:
db_entry = FailedWebhook(
target_url=target_url,
original_payload=json.dumps(payload),
http_status_code=status_code,
retry_count=0,
next_retry_at=datetime.now(timezone.utc)
)
db.add(db_entry)
db.commit()
logger.info(f"Pushed failed webhook to DLQ: {db_entry.external_id}")
except Exception as e:
db.rollback()
logger.error(f"Failed to push to DLQ: {str(e)}")
finally:
db.close()
Step 3: Implementing the Retry Logic with Exponential Backoff
The core of the DLQ is the retry worker. This worker polls the database for records where next_retry_at is in the past. It attempts to resend the payload. If it fails again, it calculates the next retry time using exponential backoff (e.g., 1s, 2s, 4s, 8s, 16s).
We also implement a “Max Retries” limit (e.g., 5 attempts). After max retries, the record is marked as is_resolved (or is_failed_permanently) to stop processing, allowing for manual inspection.
import requests
import math
from datetime import timedelta
MAX_RETRIES = 5
BASE_DELAY_SECONDS = 1 # Start with 1 second delay
def calculate_backoff(retry_count: int) -> float:
"""
Calculates exponential backoff delay.
Formula: base_delay * (2 ^ retry_count)
"""
return BASE_DELAY_SECONDS * (2 ** retry_count)
def process_dlq_queue(auth: GenesysAuth):
"""
Worker function that processes the Dead Letter Queue.
Should be run periodically (e.g., every 10 seconds).
"""
db = SessionLocal()
try:
# Find all failed webhooks that are ready for retry
now = datetime.now(timezone.utc)
pending_items = db.query(FailedWebhook).filter(
FailedWebhook.is_resolved == 0,
FailedWebhook.next_retry_at <= now
).all()
for item in pending_items:
try:
payload = json.loads(item.original_payload)
# Attempt to resend to the target URL
# Note: In a real scenario, you might need to re-authenticate headers here
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth.get_token()}"
}
response = requests.post(item.target_url, json=payload, headers=headers, timeout=10)
if 200 <= response.status_code < 300:
# Success
item.is_resolved = 1
logger.info(f"Successfully retried webhook {item.external_id}")
else:
# Failure
handle_retry_failure(db, item, response.status_code, response.text)
except requests.exceptions.RequestException as e:
# Network error or timeout
handle_retry_failure(db, item, 0, str(e))
except Exception as e:
logger.error(f"Unexpected error processing DLQ item {item.external_id}: {str(e)}")
handle_retry_failure(db, item, 0, "Unexpected internal error")
db.commit()
except Exception as e:
db.rollback()
logger.error(f"Error processing DLQ queue: {str(e)}")
finally:
db.close()
def handle_retry_failure(db_session, item: FailedWebhook, status_code: int, error_msg: str):
"""
Updates the DLQ item with retry information and calculates next retry time.
"""
item.retry_count += 1
item.error_message = f"Status: {status_code}, Msg: {error_msg[:200]}" # Truncate error msg
if item.retry_count >= MAX_RETRIES:
# Mark as permanently failed/resolved to stop retries
item.is_resolved = 1
logger.warning(f"Max retries reached for webhook {item.external_id}. Moving to archive.")
else:
# Calculate next retry time with exponential backoff
delay_seconds = calculate_backoff(item.retry_count)
next_retry = datetime.now(timezone.utc) + timedelta(seconds=delay_seconds)
item.next_retry_at = next_retry
logger.info(f"Retry {item.retry_count}/{MAX_RETRIES} for {item.external_id}. Next attempt at {next_retry}")
Step 4: Orchestrating the Worker
Use the schedule library to run the process_dlq_queue function periodically. This ensures that failed webhooks are retried without blocking the main application thread.
import schedule
import threading
import time
def start_dlq_worker(auth: GenesysAuth, interval_seconds: int = 10):
"""
Starts a background thread that runs the DLQ processor periodically.
"""
def job():
process_dlq_queue(auth)
schedule.every(interval_seconds).seconds.do(job)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
thread = threading.Thread(target=run_scheduler, daemon=True)
thread.start()
logger.info(f"DLQ Worker started. Checking every {interval_seconds} seconds.")
Complete Working Example
Below is the complete, runnable Python script. It combines authentication, database setup, webhook reception, and the DLQ retry worker.
Instructions:
- Install dependencies:
pip install requests sqlalchemy schedule fastapi uvicorn - Set environment variables:
GENESYS_ENV,GENESYS_CLIENT_ID,GENESYS_CLIENT_SECRET. - Run the script:
python dlq_webhook_service.py
import os
import json
import logging
import time
import threading
from datetime import datetime, timezone, timedelta
from typing import Optional
import requests
import schedule
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy.orm import declarative_base, sessionmaker
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Configuration ---
GENESYS_ENV = os.getenv("GENESYS_ENV", "us-east-1")
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "")
DB_URL = "sqlite:///dlq.db"
MAX_RETRIES = 5
BASE_DELAY_SECONDS = 1
# --- Database Setup ---
Base = declarative_base()
class FailedWebhook(Base):
__tablename__ = 'failed_webhooks'
id = Column(Integer, primary_key=True, autoincrement=True)
external_id = Column(String, unique=True, default=lambda: str(uuid.uuid4()))
target_url = Column(String, nullable=False)
original_payload = Column(Text, nullable=False)
http_status_code = Column(Integer, nullable=False)
error_message = Column(String, nullable=True)
retry_count = Column(Integer, default=0)
next_retry_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))
created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(timezone.utc))
is_resolved = Column(Integer, default=0)
engine = create_engine(DB_URL, echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# --- Authentication ---
class GenesysAuth:
def __init__(self, environment: str, client_id: str, client_secret: str):
self.environment = environment
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"https://{environment}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
current_time = time.time()
if self.access_token and current_time < (self.token_expiry - 60):
return self.access_token
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = current_time + 900
return self.access_token
except Exception as e:
logger.error(f"Auth Error: {str(e)}")
raise e
# --- DLQ Logic ---
def calculate_backoff(retry_count: int) -> float:
return BASE_DELAY_SECONDS * (2 ** retry_count)
def handle_retry_failure(db_session, item: FailedWebhook, status_code: int, error_msg: str):
item.retry_count += 1
item.error_message = f"Status: {status_code}, Msg: {error_msg[:200]}"
if item.retry_count >= MAX_RETRIES:
item.is_resolved = 1
logger.warning(f"Max retries reached for {item.external_id}.")
else:
delay_seconds = calculate_backoff(item.retry_count)
next_retry = datetime.now(timezone.utc) + timedelta(seconds=delay_seconds)
item.next_retry_at = next_retry
logger.info(f"Retry {item.retry_count}/{MAX_RETRIES} for {item.external_id}. Next at {next_retry}")
def process_dlq_queue(auth: GenesysAuth):
db = SessionLocal()
try:
now = datetime.now(timezone.utc)
pending_items = db.query(FailedWebhook).filter(
FailedWebhook.is_resolved == 0,
FailedWebhook.next_retry_at <= now
).all()
for item in pending_items:
try:
payload = json.loads(item.original_payload)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth.get_token()}"
}
# Retry the request to the target URL
response = requests.post(item.target_url, json=payload, headers=headers, timeout=10)
if 200 <= response.status_code < 300:
item.is_resolved = 1
logger.info(f"Successfully retried webhook {item.external_id}")
else:
handle_retry_failure(db, item, response.status_code, response.text)
except requests.exceptions.RequestException as e:
handle_retry_failure(db, item, 0, str(e))
except Exception as e:
logger.error(f"Error processing {item.external_id}: {str(e)}")
handle_retry_failure(db, item, 0, "Internal Error")
db.commit()
except Exception as e:
db.rollback()
logger.error(f"DLQ Processing Error: {str(e)}")
finally:
db.close()
# --- FastAPI App ---
app = FastAPI()
auth = GenesysAuth(GENESYS_ENV, GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET)
@app.post("/webhook/genesys/incoming")
async def receive_genesys_webhook(request: Request):
try:
payload = await request.json()
# Simulate downstream failure for demonstration
# In production, replace this with actual business logic
import random
if random.random() < 0.5: # 50% chance of simulated failure
raise Exception("Simulated Downstream 503 Error")
return JSONResponse(content={"status": "success"}, status_code=200)
except Exception as e:
# Push to DLQ
db = SessionLocal()
try:
db_entry = FailedWebhook(
target_url="https://your-internal-api.com/process", # Replace with actual target
original_payload=json.dumps(payload),
http_status_code=503,
retry_count=0,
next_retry_at=datetime.now(timezone.utc)
)
db.add(db_entry)
db.commit()
logger.info(f"Pushed to DLQ: {db_entry.external_id}")
except Exception as db_err:
db.rollback()
logger.error(f"DB Error: {str(db_err)}")
finally:
db.close()
# Return 200 to Genesys to stop their retries
return JSONResponse(content={"status": "queued_for_retry"}, status_code=200)
def start_scheduler():
schedule.every(10).seconds.do(process_dlq_queue, auth)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
import uvicorn
# Start DLQ Worker in background thread
worker_thread = threading.Thread(target=start_scheduler, daemon=True)
worker_thread.start()
# Start FastAPI Server
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 401 Unauthorized during Retry
- Cause: The OAuth token used in the retry header has expired.
- Fix: Ensure your
GenesysAuth.get_token()method is called immediately before every retry request. The token cache must check expiration. - Code Fix: In
process_dlq_queue, callauth.get_token()fresh for each item.
Error: 429 Too Many Requests
- Cause: The retry worker is firing too fast, overwhelming the target endpoint or Genesys Cloud APIs.
- Fix: Increase the
BASE_DELAY_SECONDSor implement jitter. Add a small random delay to the backoff calculation to prevent thundering herd issues. - Code Fix: Modify
calculate_backoff:import random def calculate_backoff(retry_count: int) -> float: base = BASE_DELAY_SECONDS * (2 ** retry_count) jitter = random.uniform(0, 1) return base + jitter
Error: Database Lock (SQLite)
- Cause: SQLite does not handle high concurrency well. If multiple threads write to the DB simultaneously, you may get “Database is locked” errors.
- Fix: For production, switch the
DB_URLto PostgreSQL or MySQL. Ensurecheck_same_thread=Falseis not used if you are sharing connections improperly. In this example,SessionLocal()creates a new session per call, which is safe for low-to-medium concurrency.
Error: Payload Too Large
- Cause: Genesys Cloud conversation detail webhooks can contain large JSON payloads. If your target endpoint has a body size limit, the retry will fail with
413 Payload Too Large. - Fix: Truncate non-essential fields in the payload before storing in the DLQ, or increase the body size limit on your target server.