Handling Webhook 5xx Failures: Building a Dead Letter Queue with Genesys Cloud
What You Will Build
- A Python script that consumes failed webhook deliveries from Genesys Cloud and persists them to a local JSON dead letter queue for manual inspection or automated retry.
- Integration with the Genesys Cloud Integration API to fetch specific failed message logs.
- Implementation in Python 3.9+ using the
genesyscloudSDK andrequestsfor fallback HTTP calls.
Prerequisites
- OAuth Client Type: Application (Client Credentials) or User (OAuth2).
- Required Scopes:
integration:webhook:view,integration:webhook:read. - SDK Version:
genesyscloudPython SDK version 150.0.0 or higher. - Runtime: Python 3.9+.
- Dependencies:
genesyscloudrequestspython-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud APIs require OAuth 2.0 authentication. For server-to-server integrations like this dead letter queue processor, the Client Credentials flow is the standard approach. This flow exchanges a Client ID and Client Secret for an access token without requiring user interaction.
The following code demonstrates how to initialize the Genesys Cloud Python SDK with proper error handling for authentication failures.
import os
import logging
from typing import Optional
from purecloud_platform_client_v2 import ApiClient, Configuration, PureCloudPlatformClientV2
from purecloud_platform_client_v2.rest import ApiException
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_genesis_client() -> Optional[PureCloudPlatformClientV2]:
"""
Initializes and returns the Genesys Cloud SDK client.
Returns:
PureCloudPlatformClientV2: The configured client instance.
None: If authentication fails.
"""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
logger.error("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
return None
try:
# Create the configuration object
configuration = Configuration(
client_id=client_id,
client_secret=client_secret,
environment=environment
)
# Initialize the API client
api_client = ApiClient(configuration)
# Initialize the platform client
platform_client = PureCloudPlatformClientV2(api_client)
logger.info("Successfully authenticated with Genesys Cloud.")
return platform_client
except ApiException as e:
logger.error(f"Authentication failed: {e.status} - {e.reason}")
if e.body:
logger.error(f"Response body: {e.body}")
return None
except Exception as e:
logger.error(f"Unexpected error during initialization: {e}")
return None
Key Authentication Details:
- The
Configurationobject handles the token exchange internally. - The SDK caches the token and automatically refreshes it before expiration.
- If the token refresh fails (e.g., invalid credentials), the SDK raises an
ApiExceptionwith a 401 status code.
Implementation
Step 1: Identify the Webhook Integration
To fetch failed webhook deliveries, you first need the unique identifier (id) of the webhook integration. While you can hardcode this ID, it is best practice to locate it dynamically by name, especially in multi-environment deployments (Dev, Stage, Prod).
The endpoint /api/v2/integrations/webhooks lists all webhook integrations.
from purecloud_platform_client_v2.models import WebhookIntegrationSearchQuery
from purecloud_platform_client_v2.api import IntegrationApi
import time
def find_webhook_by_name(platform_client: PureCloudPlatformClientV2, webhook_name: str) -> Optional[str]:
"""
Searches for a webhook integration by name and returns its ID.
Args:
platform_client: The authenticated Genesys Cloud client.
webhook_name: The name of the webhook to find.
Returns:
str: The webhook ID if found, None otherwise.
"""
api_instance = IntegrationApi(platform_client)
# Construct the search query
# We use pagination to ensure we find the webhook even if there are many
page_size = 20
page_number = 1
while True:
try:
# Note: The SDK method list_integration_webhook uses a query object
# However, the v2 API endpoint /api/v2/integrations/webhooks supports
# query parameters directly in the HTTP request.
# The SDK wraps this in list_integration_webhook.
# Using the SDK's list method
response = api_instance.post_integration_webhook_search(
body=WebhookIntegrationSearchQuery(
name=webhook_name,
size=page_size,
page=page_number
)
)
if response.entities:
for webhook in response.entities:
if webhook.name == webhook_name:
logger.info(f"Found webhook ID: {webhook.id}")
return webhook.id
# Check if there are more pages
if page_number * page_size >= response.total:
break
page_number += 1
# Small delay to avoid rate limiting if looping many times
time.sleep(0.1)
except ApiException as e:
logger.error(f"Error searching for webhook: {e.status} - {e.reason}")
return None
logger.warning(f"Webhook with name '{webhook_name}' not found.")
return None
API Endpoint Note:
- Method:
POST - Path:
/api/v2/integrations/webhooks/search - Scope:
integration:webhook:view - Request Body: A JSON object containing
name,size, andpage.
Step 2: Fetch Failed Webhook Deliveries
Genesys Cloud retains webhook delivery logs. To retrieve failed deliveries, you query the integration history. The relevant endpoint is /api/v2/integrations/{integrationId}/history. You must filter by status set to failed.
This step involves handling pagination because a single webhook might have hundreds of failures during a prolonged outage.
from purecloud_platform_client_v2.models import IntegrationHistoryQuery
from purecloud_platform_client_v2.api import IntegrationApi
import time
class DeadLetterItem:
"""
Data class to hold details of a failed webhook delivery.
"""
def __init__(self, delivery_id: str, timestamp: str, status_code: int,
status_message: str, request_body: str, response_body: str,
webhook_id: str):
self.delivery_id = delivery_id
self.timestamp = timestamp
self.status_code = status_code
self.status_message = status_message
self.request_body = request_body
self.response_body = response_body
self.webhook_id = webhook_id
def to_dict(self) -> dict:
return {
"delivery_id": self.delivery_id,
"timestamp": self.timestamp,
"status_code": self.status_code,
"status_message": self.status_message,
"request_body": self.request_body,
"response_body": self.response_body,
"webhook_id": self.webhook_id
}
def fetch_failed_deliveries(platform_client: PureCloudPlatformClientV2,
webhook_id: str,
max_items: int = 100) -> list[DeadLetterItem]:
"""
Fetches failed webhook deliveries for a specific webhook.
Args:
platform_client: The authenticated Genesys Cloud client.
webhook_id: The ID of the webhook integration.
max_items: Maximum number of failed items to retrieve.
Returns:
list[DeadLetterItem]: A list of failed delivery objects.
"""
api_instance = IntegrationApi(platform_client)
failed_items = []
page_size = 20
page_number = 1
logger.info(f"Fetching failed deliveries for webhook ID: {webhook_id}")
while len(failed_items) < max_items:
try:
# Create the query object for history
# status='failed' is critical here
query = IntegrationHistoryQuery(
status="failed",
size=page_size,
page=page_number
)
response = api_instance.post_integration_history(
integration_id=webhook_id,
body=query
)
if not response.entities:
logger.info("No more failed deliveries found.")
break
for history_item in response.entities:
# Extract relevant details
# The history item contains the delivery details
delivery_id = history_item.id
timestamp = history_item.timestamp
# Status code and message might be in the response details
status_code = history_item.status_code if history_item.status_code else 500
status_message = history_item.status_message or "Unknown error"
# Request and Response bodies are often truncated in history logs
# For full bodies, you might need to store them at the time of failure
# via a custom handler, but history often provides snippets.
request_body = history_item.request_body or "{}"
response_body = history_item.response_body or "{}"
item = DeadLetterItem(
delivery_id=delivery_id,
timestamp=timestamp,
status_code=status_code,
status_message=status_message,
request_body=request_body,
response_body=response_body,
webhook_id=webhook_id
)
failed_items.append(item)
if len(failed_items) >= max_items:
break
# Check if we have more pages
if page_number * page_size >= response.total:
break
page_number += 1
time.sleep(0.1) # Rate limit protection
except ApiException as e:
if e.status == 404:
logger.error(f"Webhook ID {webhook_id} not found in history.")
else:
logger.error(f"Error fetching history: {e.status} - {e.reason}")
break
return failed_items
Critical Parameter Explanation:
status="failed": This filter is mandatory. Without it, you will retrieve successful deliveries (2xx), which are not part of the dead letter queue.size: The maximum number of items per page. The API caps this at 200. Using 20 reduces memory footprint during pagination.IntegrationHistoryQuery: This model class maps directly to the JSON body expected by the/api/v2/integrations/{integrationId}/historyendpoint.
Step 3: Persist to Dead Letter Queue
Once the failed deliveries are fetched, they must be persisted. For this tutorial, we will implement a simple JSON file-based dead letter queue. In a production environment, you would replace this with a message broker (e.g., AWS SQS, RabbitMQ) or a database (e.g., PostgreSQL).
The persistence logic must be atomic to prevent data corruption if the script crashes mid-write.
import json
import os
from datetime import datetime
def save_to_dead_letter_queue(items: list[DeadLetterItem], output_file: str = "dead_letter_queue.json"):
"""
Saves failed webhook items to a JSON file.
Args:
items: List of DeadLetterItem objects.
output_file: Path to the output JSON file.
"""
if not items:
logger.info("No items to save.")
return
# Load existing queue if it exists
existing_items = []
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
existing_items = json.load(f)
except json.JSONDecodeError:
logger.warning(f"Could not parse existing file {output_file}. Starting fresh.")
existing_items = []
# Avoid duplicates based on delivery_id
existing_ids = {item.get('delivery_id') for item in existing_items}
new_items_to_add = []
for item in items:
if item.delivery_id not in existing_ids:
new_items_to_add.append(item.to_dict())
existing_ids.add(item.delivery_id)
else:
logger.info(f"Skipping duplicate delivery ID: {item.delivery_id}")
if not new_items_to_add:
logger.info("No new items to add to the dead letter queue.")
return
# Append new items
existing_items.extend(new_items_to_add)
# Write back to file atomically
temp_file = output_file + ".tmp"
try:
with open(temp_file, 'w') as f:
json.dump(existing_items, f, indent=2, default=str)
# Rename temp file to actual file (atomic on POSIX systems)
os.replace(temp_file, output_file)
logger.info(f"Successfully saved {len(new_items_to_add)} items to {output_file}")
except IOError as e:
logger.error(f"Failed to write to dead letter queue: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
Error Handling Considerations:
- Duplicate Prevention: The code checks for existing
delivery_identries to prevent re-processing the same failure if the script runs multiple times. - Atomic Writes: Using a temporary file and
os.replaceensures that if the process crashes during write, the original file remains intact and valid.
Complete Working Example
The following script combines all previous steps into a single runnable module. It authenticates, finds the webhook, fetches failed deliveries, and saves them to a local JSON file.
#!/usr/bin/env python3
"""
Genesys Cloud Webhook Dead Letter Queue Processor
This script fetches failed webhook deliveries from Genesys Cloud and stores them
in a local JSON file for manual inspection or automated retry logic.
Prerequisites:
- Install dependencies: pip install genesyscloud requests python-dotenv
- Set environment variables:
- GENESYS_CLIENT_ID
- GENESYS_CLIENT_SECRET
- GENESYS_ENVIRONMENT (default: mypurecloud.com)
"""
import os
import logging
import sys
from typing import Optional
# Import Genesys Cloud SDK
from purecloud_platform_client_v2 import ApiClient, Configuration, PureCloudPlatformClientV2
from purecloud_platform_client_v2.rest import ApiException
from purecloud_platform_client_v2.models import WebhookIntegrationSearchQuery, IntegrationHistoryQuery
from purecloud_platform_client_v2.api import IntegrationApi
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DeadLetterItem:
def __init__(self, delivery_id: str, timestamp: str, status_code: int,
status_message: str, request_body: str, response_body: str,
webhook_id: str):
self.delivery_id = delivery_id
self.timestamp = timestamp
self.status_code = status_code
self.status_message = status_message
self.request_body = request_body
self.response_body = response_body
self.webhook_id = webhook_id
def to_dict(self) -> dict:
return {
"delivery_id": self.delivery_id,
"timestamp": self.timestamp,
"status_code": self.status_code,
"status_message": self.status_message,
"request_body": self.request_body,
"response_body": self.response_body,
"webhook_id": self.webhook_id
}
def get_genesis_client() -> Optional[PureCloudPlatformClientV2]:
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
logger.error("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
return None
try:
configuration = Configuration(
client_id=client_id,
client_secret=client_secret,
environment=environment
)
api_client = ApiClient(configuration)
platform_client = PureCloudPlatformClientV2(api_client)
logger.info("Successfully authenticated with Genesys Cloud.")
return platform_client
except ApiException as e:
logger.error(f"Authentication failed: {e.status} - {e.reason}")
return None
except Exception as e:
logger.error(f"Unexpected error during initialization: {e}")
return None
def find_webhook_by_name(platform_client: PureCloudPlatformClientV2, webhook_name: str) -> Optional[str]:
api_instance = IntegrationApi(platform_client)
page_size = 20
page_number = 1
while True:
try:
response = api_instance.post_integration_webhook_search(
body=WebhookIntegrationSearchQuery(
name=webhook_name,
size=page_size,
page=page_number
)
)
if response.entities:
for webhook in response.entities:
if webhook.name == webhook_name:
logger.info(f"Found webhook ID: {webhook.id}")
return webhook.id
if page_number * page_size >= response.total:
break
page_number += 1
except ApiException as e:
logger.error(f"Error searching for webhook: {e.status} - {e.reason}")
return None
logger.warning(f"Webhook with name '{webhook_name}' not found.")
return None
def fetch_failed_deliveries(platform_client: PureCloudPlatformClientV2,
webhook_id: str,
max_items: int = 50) -> list[DeadLetterItem]:
api_instance = IntegrationApi(platform_client)
failed_items = []
page_size = 20
page_number = 1
logger.info(f"Fetching failed deliveries for webhook ID: {webhook_id}")
while len(failed_items) < max_items:
try:
query = IntegrationHistoryQuery(
status="failed",
size=page_size,
page=page_number
)
response = api_instance.post_integration_history(
integration_id=webhook_id,
body=query
)
if not response.entities:
logger.info("No more failed deliveries found.")
break
for history_item in response.entities:
delivery_id = history_item.id
timestamp = history_item.timestamp
status_code = history_item.status_code if history_item.status_code else 500
status_message = history_item.status_message or "Unknown error"
request_body = history_item.request_body or "{}"
response_body = history_item.response_body or "{}"
item = DeadLetterItem(
delivery_id=delivery_id,
timestamp=timestamp,
status_code=status_code,
status_message=status_message,
request_body=request_body,
response_body=response_body,
webhook_id=webhook_id
)
failed_items.append(item)
if len(failed_items) >= max_items:
break
if page_number * page_size >= response.total:
break
page_number += 1
except ApiException as e:
logger.error(f"Error fetching history: {e.status} - {e.reason}")
break
return failed_items
def save_to_dead_letter_queue(items: list[DeadLetterItem], output_file: str = "dead_letter_queue.json"):
if not items:
logger.info("No items to save.")
return
existing_items = []
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
existing_items = json.load(f)
except json.JSONDecodeError:
logger.warning(f"Could not parse existing file {output_file}. Starting fresh.")
existing_items = []
existing_ids = {item.get('delivery_id') for item in existing_items}
new_items_to_add = []
for item in items:
if item.delivery_id not in existing_ids:
new_items_to_add.append(item.to_dict())
existing_ids.add(item.delivery_id)
if not new_items_to_add:
logger.info("No new items to add to the dead letter queue.")
return
existing_items.extend(new_items_to_add)
temp_file = output_file + ".tmp"
try:
with open(temp_file, 'w') as f:
json.dump(existing_items, f, indent=2, default=str)
os.replace(temp_file, output_file)
logger.info(f"Successfully saved {len(new_items_to_add)} items to {output_file}")
except IOError as e:
logger.error(f"Failed to write to dead letter queue: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
if __name__ == "__main__":
# Configuration
WEBHOOK_NAME = os.getenv("GENESYS_WEBHOOK_NAME", "MyCriticalWebhook")
MAX_ITEMS = int(os.getenv("MAX_DLQ_ITEMS", "50"))
# Step 1: Authenticate
client = get_genesis_client()
if not client:
sys.exit(1)
# Step 2: Find Webhook
webhook_id = find_webhook_by_name(client, WEBHOOK_NAME)
if not webhook_id:
logger.error("Could not find webhook. Exiting.")
sys.exit(1)
# Step 3: Fetch Failures
failed_items = fetch_failed_deliveries(client, webhook_id, max_items=MAX_ITEMS)
# Step 4: Save to DLQ
save_to_dead_letter_queue(failed_items)
logger.info("Process complete.")
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The Client ID or Client Secret is incorrect, or the OAuth token has expired and the SDK failed to refresh it.
Fix:
- Verify the
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETenvironment variables are set correctly. - Ensure the client application in Genesys Cloud is not disabled.
- Check that the client has the
integration:webhook:viewscope assigned.
Code Check:
# In get_genesis_client()
try:
# ... initialization ...
except ApiException as e:
if e.status == 401:
logger.error("Invalid credentials. Check Client ID and Secret.")
Error: 403 Forbidden
Cause: The OAuth client does not have the required scopes, or the user associated with the token (if using User OAuth) does not have the necessary permissions in Genesys Cloud.
Fix:
- Navigate to the Genesys Cloud Admin Console > Administration > Security > OAuth.
- Select your client application.
- Ensure the scope
integration:webhook:viewis checked. - If using User OAuth, ensure the user has the “Integrations” role or equivalent permissions.
Error: 404 Not Found (Webhook History)
Cause: The webhook ID is invalid, or no history exists for the specified webhook.
Fix:
- Verify the webhook ID is correct by printing it after
find_webhook_by_name. - Check if the webhook has actually failed recently. If the webhook was just created and has never failed, the history endpoint will return an empty list, not a 404. A 404 on the history endpoint usually means the integration ID itself is wrong.
Error: 429 Too Many Requests
Cause: The script is making API calls too frequently.
Fix:
- Implement exponential backoff in the retry logic.
- Increase the delay between pagination requests.
- Use the
Retry-Afterheader from the response to determine the wait time.
Code Fix:
import time
import random
def make_api_call_with_retry(api_call_func, max_retries=3):
for attempt in range(max_retries):
try:
return api_call_func()
except ApiException as e:
if e.status == 429:
retry_after = int(e.headers.get('Retry-After', 2 ** attempt))
logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after + random.uniform(0, 1)) # Jitter
else:
raise
raise Exception("Max retries exceeded")