Handling Webhook 5xx Failures with a Dead Letter Queue and Retry Logic
What You Will Build
- A robust event listener that intercepts Genesys Cloud CX webhook delivery failures.
- A Python-based dead letter queue (DLQ) service using Amazon SQS to capture failed payloads for manual inspection or automated retry.
- A retry mechanism that attempts to redeliver failed payloads to the original endpoint with exponential backoff.
Prerequisites
- Genesys Cloud CX Account: A developer or admin account with access to the Admin Console.
- AWS Account: Active account with permissions to create SQS queues and IAM roles.
- Python Environment: Python 3.9+ installed.
- Dependencies:
boto3(AWS SDK for Python)requests(HTTP library)purecloudplatformclientv2(Genesys Cloud CX Python SDK)pydantic(Data validation)
- IAM Role: An IAM role or user with
sqs:SendMessage,sqs:ReceiveMessage, andsqs:DeleteMessagepermissions.
Authentication Setup
To interact with the Genesys Cloud CX API, you must authenticate using OAuth 2.0 Client Credentials flow. This tutorial assumes you have already created a Public App in the Genesys Admin Console with the following scopes:
webhook:read(to inspect webhook configurations)webhook:write(if you need to update webhook endpoints programmatically)event:subscribe(if using EventStreams, though this tutorial focuses on standard Webhooks)
Generating the Access Token
Create a helper function to handle token acquisition. In production, cache this token and refresh it before expiration (typically 1 hour).
import requests
from typing import Optional
class GenesysAuth:
def __init__(self, org_id: str, client_id: str, client_secret: str):
self.org_id = org_id
self.client_id = client_id
self.client_secret = client_secret
self.base_url = f"https://{org_id}.mypurecloud.com"
self.token: Optional[str] = None
def get_access_token(self) -> str:
"""
Retrieves an OAuth2 access token using Client Credentials flow.
Returns the token string.
"""
if self.token:
return self.token
url = f"{self.base_url}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "webhook:read webhook:write"
}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
return self.token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.text}")
raise
return self.token
Implementation
Step 1: Configure the Dead Letter Queue (SQS)
First, create the SQS queue in AWS. This queue acts as the buffer for failed webhook deliveries.
import boto3
from botocore.exceptions import ClientError
def create_dlq_queue(queue_name: str) -> str:
"""
Creates an SQS queue if it does not exist.
Returns the queue URL.
"""
sqs = boto3.resource('sqs')
try:
queue = sqs.create_queue(
QueueName=queue_name,
Attributes={
'VisibilityTimeout': '300', # 5 minutes
'MessageRetentionPeriod': '1209600', # 14 days
'ReceiveMessageWaitTimeSeconds': '20' # Long polling
}
)
print(f"Queue created: {queue.url}")
return queue.url
except ClientError as e:
if e.response['Error']['Code'] == 'QueueAlreadyExists':
queue = sqs.get_queue_by_name(QueueName=queue_name)
print(f"Queue already exists: {queue.url}")
return queue.url
raise
Step 2: Capture and Store Failed Webhook Payloads
In this architecture, we assume a middleware service (e.g., an AWS Lambda or a lightweight Flask/FastAPI app) receives the initial webhook from Genesys Cloud. If the downstream target returns a 5xx error, the middleware pushes the payload to the DLQ.
For this tutorial, we will simulate the middleware logic that pushes to the DLQ.
import json
import uuid
from datetime import datetime
import boto3
sqs_client = boto3.client('sqs')
def push_to_dlq(queue_url: str, webhook_payload: dict, original_url: str, error_code: int) -> str:
"""
Pushes a failed webhook payload to the SQS Dead Letter Queue.
Args:
queue_url: The URL of the SQS queue.
webhook_payload: The original JSON payload from Genesys Cloud.
original_url: The endpoint that failed to respond correctly.
error_code: The HTTP status code returned by the original endpoint.
Returns:
The MessageId of the sent message.
"""
message_body = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"original_endpoint": original_url,
"error_code": error_code,
"payload": webhook_payload,
"retry_count": 0
}
try:
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body)
)
print(f"Message pushed to DLQ: {response['MessageId']}")
return response['MessageId']
except ClientError as e:
print(f"Failed to push to DLQ: {e}")
raise
Step 3: Implement the Retry Worker with Exponential Backoff
The core of this solution is a worker process that polls the DLQ, attempts to redeliver the message, and handles retries.
import time
import requests
import json
from datetime import datetime
class WebhookRetryWorker:
def __init__(self, queue_url: str, max_retries: int = 5):
self.queue_url = queue_url
self.max_retries = max_retries
self.sqs = boto3.client('sqs')
def poll_queue(self) -> list:
"""
Polls the SQS queue for messages.
Uses long polling to reduce empty receives.
"""
try:
messages = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
return messages.get('Messages', [])
except ClientError as e:
print(f"Error polling queue: {e}")
return []
def retry_delivery(self, message: dict) -> bool:
"""
Attempts to redeliver the webhook payload to the original endpoint.
Args:
message: The message structure from SQS.
Returns:
True if delivery was successful, False otherwise.
"""
body = json.loads(message['Body'])
original_url = body['original_endpoint']
payload = body['payload']
retry_count = body.get('retry_count', 0)
if retry_count >= self.max_retries:
print(f"Max retries reached for message ID: {body['id']}")
# Optionally move to a permanent archive or alert
return False
# Exponential Backoff: 2^retry_count seconds
backoff_time = 2 ** retry_count
print(f"Waiting {backoff_time} seconds before retry {retry_count + 1}...")
time.sleep(backoff_time)
try:
# Send the payload back to the original endpoint
response = requests.post(
original_url,
json=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Source': 'GenesysCloud-Retry'
},
timeout=10
)
# Check if the response is successful (2xx)
if 200 <= response.status_code < 300:
print(f"Retry successful for message ID: {body['id']}")
return True
else:
print(f"Retry failed with status {response.status_code} for message ID: {body['id']}")
return False
except requests.exceptions.RequestException as e:
print(f"Network error during retry: {e}")
return False
def process_message(self, message: dict):
"""
Processes a single message: retries delivery and deletes from queue if successful.
"""
success = self.retry_delivery(message)
if success:
# Delete the message from the queue
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Message deleted from DLQ: {message['MessageId']}")
else:
# If retry failed, we do NOT delete it.
# It will become visible again after the VisibilityTimeout.
# To implement proper retry counting, we should update the message body
# with the new retry count, but SQS doesn't allow easy in-place updates.
# A common pattern is to send a new message with incremented retry_count
# and delete the old one, or use a separate database for state.
# For this simple example, we let it re-queue naturally.
print(f"Message re-queued for next visibility window: {message['MessageId']}")
def run(self):
"""
Main loop to process messages continuously.
"""
print("Starting Webhook Retry Worker...")
while True:
messages = self.poll_queue()
for message in messages:
try:
self.process_message(message)
except Exception as e:
print(f"Unexpected error processing message: {e}")
time.sleep(5) # Small delay between poll cycles
Step 4: Validating Webhook Configurations in Genesys Cloud
Before deploying the retry worker, ensure your webhooks are correctly configured in Genesys Cloud. You can use the SDK to list and validate webhook endpoints.
from purecloudplatformclientv2 import WebhooksApi, Configuration, ApiClient
from purecloudplatformclientv2.rest import ApiException
def validate_webhooks(org_id: str, client_id: str, client_secret: str):
"""
Lists all webhooks and prints their status and endpoint.
"""
auth = GenesysAuth(org_id, client_id, client_secret)
token = auth.get_access_token()
configuration = Configuration(
host=f"https://{org_id}.mypurecloud.com",
access_token=token
)
with ApiClient(configuration) as api_client:
webhooks_api = WebhooksApi(api_client)
try:
# Get all webhooks
result = webhooks_api.get_platform_webhooks()
print("Current Webhook Configurations:")
print("-" * 80)
for webhook in result.entities:
print(f"ID: {webhook.id}")
print(f"Name: {webhook.name}")
print(f"Endpoint: {webhook.endpoint}")
print(f"Events: {', '.join(webhook.events)}")
print(f"Status: {webhook.status}")
print("-" * 80)
except ApiException as e:
print(f"Exception when calling WebhooksApi->get_platform_webhooks: {e}")
if e.status == 401:
print("Authentication failed. Check your credentials.")
elif e.status == 403:
print("Forbidden. Check your OAuth scopes.")
Complete Working Example
Below is the complete, integrated script. It combines authentication, DLQ setup, and the retry worker. Save this as main.py.
import os
import sys
import json
import time
import requests
import boto3
from botocore.exceptions import ClientError
from typing import Optional
# --- Configuration ---
ORG_ID = os.getenv("GENESYS_ORG_ID", "your_org_id")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
DLQ_QUEUE_NAME = "gen-webhook-dlq"
MAX_RETRIES = 5
# --- Authentication Module ---
class GenesysAuth:
def __init__(self, org_id: str, client_id: str, client_secret: str):
self.org_id = org_id
self.client_id = client_id
self.client_secret = client_secret
self.base_url = f"https://{org_id}.mypurecloud.com"
self.token: Optional[str] = None
def get_access_token(self) -> str:
if self.token:
return self.token
url = f"{self.base_url}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "webhook:read webhook:write"
}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
self.token = response.json()["access_token"]
return self.token
except requests.exceptions.HTTPError as e:
print(f"Auth Error: {e.response.text}")
raise
# --- SQS DLQ Module ---
def setup_dlq(queue_name: str) -> str:
sqs = boto3.resource('sqs')
try:
queue = sqs.create_queue(
QueueName=queue_name,
Attributes={
'VisibilityTimeout': '300',
'MessageRetentionPeriod': '1209600',
'ReceiveMessageWaitTimeSeconds': '20'
}
)
print(f"DLQ Created: {queue.url}")
return queue.url
except ClientError as e:
if e.response['Error']['Code'] == 'QueueAlreadyExists':
queue = sqs.get_queue_by_name(QueueName=queue_name)
print(f"DLQ Exists: {queue.url}")
return queue.url
raise
def push_to_dlq(queue_url: str, payload: dict, endpoint: str, error_code: int):
sqs_client = boto3.client('sqs')
message_body = {
"id": str(uuid.uuid4()),
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"original_endpoint": endpoint,
"error_code": error_code,
"payload": payload,
"retry_count": 0
}
try:
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body)
)
print("Payload pushed to DLQ.")
except ClientError as e:
print(f"Failed to push to DLQ: {e}")
# --- Retry Worker Module ---
class RetryWorker:
def __init__(self, queue_url: str, max_retries: int):
self.queue_url = queue_url
self.max_retries = max_retries
self.sqs = boto3.client('sqs')
def run(self):
print("Starting Retry Worker...")
while True:
messages = self._poll()
for msg in messages:
self._process(msg)
time.sleep(5)
def _poll(self) -> list:
try:
resp = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
return resp.get('Messages', [])
except ClientError as e:
print(f"Poll Error: {e}")
return []
def _process(self, message: dict):
try:
body = json.loads(message['Body'])
if self._retry_delivery(body):
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Deleted: {message['MessageId']}")
else:
print(f"Re-queueing: {message['MessageId']}")
except Exception as e:
print(f"Process Error: {e}")
def _retry_delivery(self, body: dict) -> bool:
retry_count = body.get('retry_count', 0)
if retry_count >= self.max_retries:
print("Max retries exceeded.")
return False
backoff = 2 ** retry_count
print(f"Waiting {backoff}s...")
time.sleep(backoff)
try:
resp = requests.post(
body['original_endpoint'],
json=body['payload'],
headers={'Content-Type': 'application/json'},
timeout=10
)
return 200 <= resp.status_code < 300
except Exception as e:
print(f"Request Error: {e}")
return False
if __name__ == "__main__":
import uuid
# 1. Setup DLQ
queue_url = setup_dlq(DLQ_QUEUE_NAME)
# 2. Validate Webhooks (Optional Check)
try:
auth = GenesysAuth(ORG_ID, CLIENT_ID, CLIENT_SECRET)
token = auth.get_access_token()
print("Authentication successful.")
except Exception as e:
print("Authentication failed. Check credentials.")
sys.exit(1)
# 3. Start Worker
worker = RetryWorker(queue_url, MAX_RETRIES)
worker.run()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or the client credentials are invalid.
- Fix: Ensure the
GenesysAuthclass refreshes the token. In theRetryWorker, if you are using the Genesys SDK for other operations, ensure theApiClientis initialized with a fresh token. For the retry worker itself, 401 errors usually come from the target endpoint, not Genesys. If the target returns 401, the retry logic will keep failing. You may need to implement logic to detect 401s and skip retries or alert admins.
Error: 429 Too Many Requests
- Cause: The target endpoint is rate-limiting your retry attempts.
- Fix: Increase the exponential backoff base. Instead of
2 ** retry_count, use5 ** retry_countor add a jitter. Also, check if the target API has a specific rate limit header (e.g.,X-RateLimit-Reset) and respect it.
Error: SQS Visibility Timeout Expired
- Cause: The retry logic took longer than the queue’s VisibilityTimeout (300 seconds in the example).
- Fix: If processing is slow, extend the VisibilityTimeout in the SQS queue settings. Alternatively, use
change_message_visibilityto extend the timeout during long-running retries.
Error: Payload Too Large
- Cause: Genesys Cloud webhooks can contain large payloads (e.g., detailed conversation analytics). SQS has a 256KB message size limit.
- Fix: If payloads exceed 256KB, do not store the full payload in SQS. Instead, store the payload in Amazon S3 or a database, and push only the reference (URL/ID) and metadata to SQS. The retry worker then fetches the payload from S3 before redelivery.