Handling Webhook 5xx Failures with a Dead Letter Queue
What You Will Build
- You will build a Python service that intercepts failed Genesys Cloud CX webhook deliveries and stores them in a persistent dead letter queue (DLQ) using Amazon SQS.
- You will use the Genesys Cloud CX Python SDK (
genesyscloud) to configure the initial webhook and verify its status, and AWS SDK (boto3) to manage the DLQ. - The language covered is Python 3.10+.
Prerequisites
- Genesys Cloud CX: An OAuth Client with
webhooks:view,webhooks:edit, andintegrations:viewscopes. - AWS Account: Permissions to create SQS queues and send messages (
sqs:CreateQueue,sqs:SendMessage). - Python Environment: Python 3.10 or higher with
pip. - Dependencies:
genesyscloud(latest stable version)boto3(latest stable version)requests(for manual HTTP validation if needed)
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 Client Credentials flow for server-to-server integration. For AWS, you will use IAM roles or access keys.
Genesys Cloud CX Authentication
You must initialize the PureCloudPlatformClientV2 client. In production, cache the token. The SDK handles refresh logic automatically if configured correctly, but for a script, a direct login is sufficient.
from genesyscloud.rest import Configuration
from genesyscloud.platform.client import PlatformClient
from genesyscloud.webhooks.api import WebhooksApi
def get_genesys_client(client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
"""
Initializes and returns an authenticated Genesys Cloud Platform Client.
"""
config = Configuration(
base_url=base_url,
access_token=None # Will be set by login
)
client = PlatformClient(config)
# Login using Client Credentials
client.login(client_id=client_id, client_secret=client_secret)
return client
AWS SQS Setup
You need an SQS client to send failed webhook payloads to the DLQ.
import boto3
from botocore.exceptions import ClientError
def get_sqs_client(region: str = "us-east-1"):
"""
Returns an AWS SQS client.
Assumes IAM Role or Environment Variables for credentials.
"""
return boto3.client('sqs', region_name=region)
Implementation
Step 1: Verify Webhook Configuration
Before handling failures, you must ensure the webhook is configured correctly to understand what data you are catching. A 5xx error indicates the target server failed, but you must verify the Genesys side is healthy.
Scope Required: webhooks:view
def verify_webhook(webhooks_api: WebhooksApi, webhook_id: str):
"""
Retrieves a specific webhook to verify its configuration and status.
"""
try:
# Get the webhook details
response = webhooks_api.get_webhook(
webhook_id=webhook_id,
expand=["endpoint", "security"] # Expand critical fields
)
print(f"Webhook '{response.name}' is in state: {response.status}")
if response.status != "enabled":
raise Exception(f"Webhook is not enabled. Current status: {response.status}")
return response
except Exception as e:
print(f"Error retrieving webhook {webhook_id}: {e}")
raise
Step 2: Simulate and Capture 5xx Failures
Genesys Cloud CX retries failed webhooks exponentially (usually 3 attempts). If the target server returns a 5xx error after all retries, Genesys marks the delivery as failed.
There is no native “push” from Genesys to a DLQ. You must poll for failed deliveries or use an intermediate serverless function (like AWS Lambda) that acts as the webhook target.
Strategy: The most robust pattern for a DLQ is to have a lightweight receiver service (e.g., AWS API Gateway + Lambda) that acts as the Genesys Webhook Endpoint. This receiver validates the payload. If the downstream consumer (your actual business logic) fails with 5xx, the Lambda catches the error and sends the original Genesys payload to the SQS DLQ.
Here is the architecture for the Receiver Lambda Function:
- Genesys sends POST to
https://your-lambda-url.com/webhook. - Lambda receives payload.
- Lambda attempts to process business logic (e.g., write to database).
- If business logic throws a 5xx-equivalent error, Lambda catches it.
- Lambda sends the original Genesys event to SQS DLQ.
- Lambda returns HTTP 200 to Genesys (acknowledging receipt) to stop Genesys retries.
Code: The Receiver Lambda (Python)
import json
import boto3
import logging
import time
# Initialize SQS client
sqs = boto3.client('sqs')
DLQ_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/webhook-dlq"
# Mock downstream service that might fail
def process_business_logic(event_payload: dict):
"""
Simulates downstream processing that may fail with 5xx.
"""
# Example: Intentional failure for demonstration
if event_payload.get('type') == 'conversation:created' and event_payload.get('data', {}).get('to', {}).get('name') == 'CriticalSystem':
raise Exception("Downstream Database Connection Refused (500)")
# Successful processing
print(f"Processing event: {event_payload.get('type')}")
return {"status": "success"}
def lambda_handler(event, context):
"""
AWS Lambda handler acting as the Genesys Webhook Target.
"""
try:
# Parse the incoming request body
body = json.loads(event['body']) if 'body' in event else event
# 1. Attempt downstream processing
result = process_business_logic(body)
# 2. Success: Return 200 to Genesys
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps(result)
}
except Exception as e:
# 3. Failure: Log the error and send to DLQ
error_message = str(e)
logging.error(f"Downstream processing failed: {error_message}")
# Send original payload to DLQ
send_to_dlq(body, error_message)
# 4. Return 200 to Genesys to acknowledge receipt and stop retries
# This is critical. If you return 500, Genesys will keep retrying.
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({"status": "received_but_failed", "error": error_message})
}
def send_to_dlq(payload: dict, error_context: str):
"""
Sends the failed webhook payload to the SQS Dead Letter Queue.
"""
try:
message_body = {
"original_payload": payload,
"error_context": error_context,
"timestamp": time.time(),
"dlq_action_required": True
}
sqs.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps(message_body),
MessageAttributes={
'source': {
'DataType': 'String',
'StringValue': 'genesys-webhook-failure'
}
}
)
print("Message sent to DLQ successfully.")
except ClientError as e:
logging.error(f"Failed to send message to DLQ: {e}")
# In a real scenario, you might have a secondary alerting mechanism here
Step 3: Processing the Dead Letter Queue
Now that failed events are in SQS, you need a worker process to consume them, attempt retries with backoff, or archive them for manual review.
Scope Required: None (This is internal processing)
import json
import time
import logging
import boto3
from botocore.exceptions import ClientError
class DLQWorker:
def __init__(self, dlq_url: str, region: str = "us-east-1"):
self.sqs = boto3.client('sqs', region_name=region)
self.dlq_url = dlq_url
self.max_retries = 3
self.retry_delay = 5 # seconds
def process_dlq(self):
"""
Polls the SQS DLQ and attempts to reprocess messages.
"""
print(f"Starting DLQ worker for queue: {self.dlq_url}")
while True:
try:
# Receive messages (up to 10 at a time)
response = self.sqs.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5 # Long polling
)
messages = response.get('Messages', [])
if not messages:
print("No messages in DLQ. Waiting...")
time.sleep(10)
continue
for message in messages:
self._handle_message(message)
except ClientError as e:
logging.error(f"SQS Error: {e}")
time.sleep(10)
def _handle_message(self, message: dict):
"""
Processes a single message from the DLQ.
"""
receipt_handle = message['ReceiptHandle']
body_str = message['Body']
try:
body = json.loads(body_str)
original_payload = body.get('original_payload')
error_context = body.get('error_context')
print(f"Retrying payload: {original_payload.get('type', 'unknown')}")
# Simulate retry logic
# In production, you would call your downstream service here again
# with exponential backoff or different parameters if applicable.
success = self._attempt_retry(original_payload)
if success:
# Delete message from DLQ if successful
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=receipt_handle
)
print("Message processed successfully. Deleted from DLQ.")
else:
# If retry fails again, keep it in DLQ or move to an archive queue
print("Retry failed. Keeping in DLQ for next cycle or moving to archive.")
except Exception as e:
logging.error(f"Error processing DLQ message: {e}")
# Do not delete the message; let it become visible again after visibility timeout
def _attempt_retry(self, payload: dict) -> bool:
"""
Attempts to reprocess the payload.
Returns True if successful, False otherwise.
"""
# Placeholder for actual retry logic
# For demonstration, we assume retries succeed 50% of the time
import random
return random.choice([True, False])
if __name__ == "__main__":
worker = DLQWorker(dlq_url="https://sqs.us-east-1.amazonaws.com/123456789012/webhook-dlq")
worker.process_dlq()
Complete Working Example
Below is the complete Python script that combines the Genesys verification and the DLQ worker logic. In a production environment, the Lambda function and the DLQ Worker would be separate services. This script demonstrates the DLQ Worker side.
import json
import time
import logging
import boto3
import random
from botocore.exceptions import ClientError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GenesysWebhookDLQProcessor:
def __init__(self, dlq_url: str, region: str = "us-east-1"):
self.sqs = boto3.client('sqs', region_name=region)
self.dlq_url = dlq_url
self.max_retries = 5
self.base_delay = 5
def start_processing(self):
logger.info(f"Starting DLQ processor for: {self.dlq_url}")
while True:
try:
# Long polling for efficiency
response = self.sqs.receive_message(
QueueUrl=self.dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=30
)
messages = response.get('Messages', [])
if not messages:
logger.info("No messages in DLQ. Polling again...")
time.sleep(5)
continue
logger.info(f"Received {len(messages)} message(s) from DLQ.")
for message in messages:
self._process_single_message(message)
except ClientError as e:
logger.error(f"AWS Client Error: {e}")
time.sleep(10)
except Exception as e:
logger.error(f"Unexpected error: {e}")
time.sleep(10)
def _process_single_message(self, message: dict):
receipt_handle = message['ReceiptHandle']
body_str = message['Body']
try:
# Parse the DLQ message
dlq_record = json.loads(body_str)
original_event = dlq_record.get('original_payload')
error_reason = dlq_record.get('error_context')
logger.info(f"Processing failed event: {original_event.get('type', 'N/A')}")
logger.warning(f"Original Failure: {error_reason}")
# Attempt to reprocess
success = self._retry_downstream_logic(original_event)
if success:
# Remove from DLQ
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=receipt_handle
)
logger.info("Successfully reprocessed event. Removed from DLQ.")
else:
logger.error("Retry failed. Event remains in DLQ.")
# Optional: Move to an 'Archive' queue if max retries exceeded
# self._move_to_archive(dlq_record)
except json.JSONDecodeError:
logger.error("Invalid JSON in DLQ message.")
self.sqs.delete_message(
QueueUrl=self.dlq_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
logger.error(f"Error handling message: {e}")
# Leave message in DLQ for next poll
def _retry_downstream_logic(self, event: dict) -> bool:
"""
Simulates the retry of the downstream business logic.
Replace this with actual API calls or database operations.
"""
# Simulate network jitter or transient errors
# 80% chance of success on retry
return random.random() < 0.8
if __name__ == "__main__":
# Replace with your actual SQS DLQ URL
DLQ_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/webhook-dlq"
processor = GenesysWebhookDLQProcessor(dlq_url=DLQ_QUEUE_URL)
processor.start_processing()
Common Errors & Debugging
Error: 403 Forbidden on SQS SendMessage
- Cause: The IAM Role attached to the Lambda or the EC2 instance running the worker does not have
sqs:SendMessagepermissions. - Fix: Attach the
AmazonSQSFullAccesspolicy or a custom policy allowingsqs:SendMessageon the specific ARN of the DLQ.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:webhook-dlq"
}
]
}
Error: Genesys Webhook Returns 500 to Lambda
- Cause: The Lambda function threw an unhandled exception.
- Fix: Ensure all exceptions in
lambda_handlerare caught. Always return HTTP 200 to Genesys if you want to stop retries. If you return 500, Genesys will retry the webhook, potentially creating duplicate messages in your DLQ if the receiver is not idempotent.
Error: MessageBody Too Large
- Cause: Genesys webhook payloads for complex conversations (e.g., long transcripts) can exceed SQS’s 256KB message size limit.
- Fix: Use SQS FIFO queues with larger limits or, more commonly, send the Genesys
event.idto SQS and store the full payload in S3. Reference the S3 key in the SQS message.
Error: Duplicate Events in DLQ
- Cause: The downstream service failed, the Lambda sent to DLQ, but the DLQ worker failed to delete the message from SQS.
- Fix: Ensure
delete_messageis called only after successful processing. Implement idempotency in your downstream service using the Genesysevent.idto prevent double-processing if duplicates slip through.