Deduplicating Genesys Cloud EventBridge Integration Events with Python and SQS
What You Will Build
- A Python service that consumes events from an Amazon SQS queue fed by the Genesys Cloud EventBridge integration.
- Logic to identify and discard duplicate events based on Genesys Cloud
idandtimestampfields, preventing downstream processing errors. - Implementation using the
boto3SDK for AWS and standard Python libraries for message deduplication.
Prerequisites
- AWS Account: Access to Amazon SQS, Amazon EventBridge, and AWS Lambda (optional for deployment).
- Genesys Cloud Account: Admin access to configure the EventBridge integration.
- Python 3.9+: Installed with
pip. - Dependencies:
boto3(AWS SDK for Python)botocore(included withboto3)pydantic(optional, for robust data validation)
Authentication Setup
This tutorial assumes you have already configured the Genesys Cloud EventBridge integration in the Genesys Cloud Admin portal. The integration pushes events to an AWS EventBridge bus, which routes them to an SQS FIFO or Standard queue.
Your Python service requires AWS credentials to access the SQS queue. Use one of the following methods:
- Environment Variables: Set
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, andAWS_DEFAULT_REGION. - IAM Role: If running on AWS Lambda or EC2, attach an IAM role with
sqs:ReceiveMessage,sqs:DeleteMessage, andsqs:GetQueueAttributespermissions.
No Genesys Cloud OAuth tokens are required for this consumer side, as the integration handles authentication server-to-server.
Implementation
Step 1: Configure AWS SQS Client
Initialize the boto3 client for SQS. We use a FIFO queue (*.fifo) to preserve order, which aids in deduplication logic.
import boto3
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone
class GenesysEventConsumer:
def __init__(self, queue_url: str, region_name: str = "us-east-1"):
"""
Initialize the SQS client and queue URL.
"""
self.sqs = boto3.client('sqs', region_name=region_name)
self.queue_url = queue_url
# Cache for deduplication: key is event ID, value is timestamp
self.processed_events: Dict[str, float] = {}
# Time window for deduplication (seconds). Events with same ID within this window are duplicates.
self.dedup_window_seconds = 300 # 5 minutes
def _get_messages(self) -> List[Dict[str, Any]]:
"""
Receive messages from the SQS queue.
"""
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
VisibilityTimeout=30
)
messages = response.get('Messages', [])
return messages
except Exception as e:
print(f"Error receiving messages: {e}")
return []
Step 2: Parse and Validate Event Structure
Genesys Cloud events in EventBridge have a specific structure. We must extract the core event data to identify duplicates.
def _parse_event(self, body: str) -> Optional[Dict[str, Any]]:
"""
Parse the SQS message body to extract the Genesys Cloud event.
Returns None if the event is malformed or not from Genesys.
"""
try:
# SQS message body is a JSON string
message_data = json.loads(body)
# EventBridge structure:
# {
# "id": "unique-event-id",
# "source": "com.genesyscloud",
# "detail": { ... actual event payload ... }
# }
source = message_data.get('source')
if source != 'com.genesyscloud':
print(f"Ignoring non-Genesys event: {source}")
return None
detail = message_data.get('detail', {})
event_type = detail.get('type')
if not event_type:
print("Missing event type in detail")
return None
# Extract key fields for deduplication
# Genesys events often have a 'id' in the detail object
event_id = detail.get('id')
# Timestamp is often in 'timestamp' or 'date' field in detail
timestamp_str = detail.get('timestamp') or detail.get('date')
if not event_id or not timestamp_str:
print(f"Missing ID or timestamp for event type {event_type}")
return None
# Convert timestamp to Unix timestamp for comparison
try:
# Genesys timestamps are ISO 8601
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
unix_timestamp = dt.timestamp()
except ValueError:
print(f"Invalid timestamp format: {timestamp_str}")
return None
return {
'id': event_id,
'type': event_type,
'timestamp': unix_timestamp,
'detail': detail,
'original_message_id': message_data.get('id') # EventBridge ID
}
except json.JSONDecodeError:
print("Failed to decode JSON body")
return None
except Exception as e:
print(f"Error parsing event: {e}")
return None
Step 3: Implement Deduplication Logic
This is the core logic. We check if an event with the same id has been processed within the dedup_window_seconds.
def _is_duplicate(self, event: Dict[str, Any]) -> bool:
"""
Check if the event is a duplicate based on ID and timestamp window.
"""
event_id = event['id']
current_time = time.time()
# Clean up old entries from the cache
self._cleanup_cache(current_time)
if event_id in self.processed_events:
last_processed_time = self.processed_events[event_id]
# If the event ID exists and was processed within the window, it's a duplicate
if current_time - last_processed_time < self.dedup_window_seconds:
return True
return False
def _cleanup_cache(self, current_time: float):
"""
Remove entries from the processed_events cache that are older than the dedup window.
"""
expired_keys = [
k for k, v in self.processed_events.items()
if current_time - v > self.dedup_window_seconds
]
for key in expired_keys:
del self.processed_events[key]
def process_event(self, event: Dict[str, Any]):
"""
Placeholder for your business logic.
"""
print(f"Processing event: {event['id']} of type {event['type']}")
# Example: Store in database, trigger workflow, etc.
pass
Step 4: Main Processing Loop
Combine receiving, parsing, deduplication, processing, and deletion.
def run(self):
"""
Main loop to process messages.
"""
print("Starting Genesys Event Consumer...")
while True:
messages = self._get_messages()
if not messages:
time.sleep(1)
continue
for message in messages:
receipt_handle = message['ReceiptHandle']
body = message['Body']
parsed_event = self._parse_event(body)
if not parsed_event:
# Delete message even if parsing failed to avoid reprocessing garbage
self._delete_message(receipt_handle)
continue
if self._is_duplicate(parsed_event):
print(f"Duplicate event detected: {parsed_event['id']}")
self._delete_message(receipt_handle)
continue
# Process the event
try:
self.process_event(parsed_event)
# Mark as processed in cache
self.processed_events[parsed_event['id']] = time.time()
# Delete from queue
self._delete_message(receipt_handle)
except Exception as e:
print(f"Error processing event {parsed_event['id']}: {e}")
# Do not delete message; let it become visible again for retry
# Optionally, send to DLQ after N retries using RedrivePolicy
def _delete_message(self, receipt_handle: str):
"""
Delete a message from the SQS queue.
"""
try:
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
print(f"Error deleting message: {e}")
Complete Working Example
import boto3
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone
class GenesysEventConsumer:
def __init__(self, queue_url: str, region_name: str = "us-east-1"):
self.sqs = boto3.client('sqs', region_name=region_name)
self.queue_url = queue_url
self.processed_events: Dict[str, float] = {}
self.dedup_window_seconds = 300 # 5 minutes
def _get_messages(self) -> List[Dict[str, Any]]:
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
VisibilityTimeout=30
)
return response.get('Messages', [])
except Exception as e:
print(f"Error receiving messages: {e}")
return []
def _parse_event(self, body: str) -> Optional[Dict[str, Any]]:
try:
message_data = json.loads(body)
source = message_data.get('source')
if source != 'com.genesyscloud':
print(f"Ignoring non-Genesys event: {source}")
return None
detail = message_data.get('detail', {})
event_type = detail.get('type')
if not event_type:
print("Missing event type in detail")
return None
event_id = detail.get('id')
timestamp_str = detail.get('timestamp') or detail.get('date')
if not event_id or not timestamp_str:
print(f"Missing ID or timestamp for event type {event_type}")
return None
try:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
unix_timestamp = dt.timestamp()
except ValueError:
print(f"Invalid timestamp format: {timestamp_str}")
return None
return {
'id': event_id,
'type': event_type,
'timestamp': unix_timestamp,
'detail': detail,
'original_message_id': message_data.get('id')
}
except json.JSONDecodeError:
print("Failed to decode JSON body")
return None
except Exception as e:
print(f"Error parsing event: {e}")
return None
def _is_duplicate(self, event: Dict[str, Any]) -> bool:
event_id = event['id']
current_time = time.time()
self._cleanup_cache(current_time)
if event_id in self.processed_events:
last_processed_time = self.processed_events[event_id]
if current_time - last_processed_time < self.dedup_window_seconds:
return True
return False
def _cleanup_cache(self, current_time: float):
expired_keys = [
k for k, v in self.processed_events.items()
if current_time - v > self.dedup_window_seconds
]
for key in expired_keys:
del self.processed_events[key]
def process_event(self, event: Dict[str, Any]):
print(f"Processing event: {event['id']} of type {event['type']}")
# Add your business logic here
def _delete_message(self, receipt_handle: str):
try:
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
print(f"Error deleting message: {e}")
def run(self):
print("Starting Genesys Event Consumer...")
while True:
messages = self._get_messages()
if not messages:
time.sleep(1)
continue
for message in messages:
receipt_handle = message['ReceiptHandle']
body = message['Body']
parsed_event = self._parse_event(body)
if not parsed_event:
self._delete_message(receipt_handle)
continue
if self._is_duplicate(parsed_event):
print(f"Duplicate event detected: {parsed_event['id']}")
self._delete_message(receipt_handle)
continue
try:
self.process_event(parsed_event)
self.processed_events[parsed_event['id']] = time.time()
self._delete_message(receipt_handle)
except Exception as e:
print(f"Error processing event {parsed_event['id']}: {e}")
if __name__ == "__main__":
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/genesys-events.fifo"
REGION = "us-east-1"
consumer = GenesysEventConsumer(queue_url=QUEUE_URL, region_name=REGION)
consumer.run()
Common Errors & Debugging
Error: Message Not Deleted (Reprocessing)
- Cause: The
ReceiptHandleis invalid or the message visibility timeout expired before deletion. - Fix: Ensure
VisibilityTimeoutinreceive_messageis long enough to process the message. If processing takes longer, increase the timeout or usechange_message_visibilityto extend it.
Error: Duplicate Events Still Occur
- Cause: The
dedup_window_secondsis too short, or the Genesys Cloud eventidis not unique across retries. - Fix: Increase
dedup_window_seconds. Verify that theidfield in thedetailobject is indeed unique per event instance. For some event types, Genesys Cloud may retry with the sameid.
Error: JSONDecodeError
- Cause: The SQS message body is not valid JSON.
- Fix: Check the SQS queue directly in the AWS Console to inspect the raw message body. Ensure the EventBridge integration is correctly formatting the payload.
Error: Permission Denied (AccessDenied)
- Cause: The AWS IAM role or user lacks
sqs:ReceiveMessage,sqs:DeleteMessage, orsqs:GetQueueAttributespermissions. - Fix: Update the IAM policy to include the necessary SQS actions for the specific queue.