Deduplication Strategy for Genesys Cloud EventBridge Integrations
What You Will Build
- You will build a Python application that receives events from AWS EventBridge, detects duplicates using content-based deduplication, and forwards unique events to a downstream handler.
- You will use the AWS Boto3 SDK for EventBridge and SQS interaction, alongside standard Python libraries for hashing.
- The code is written in Python 3.9+ and targets the AWS Serverless or Lambda environment.
Prerequisites
- AWS Account: An active account with permissions to create EventBridge rules, SQS queues, and Lambda functions.
- Genesys Cloud OAuth Client: A confidential client with the
analytics:queryscope to verify event payloads (optional for testing, but recommended for validation). - Python Runtime: Python 3.9 or higher.
- Dependencies:
boto3(AWS SDK for Python)uuid(standard library)hashlib(standard library)json(standard library)
Authentication Setup
This integration relies on AWS IAM roles rather than OAuth tokens for the transport layer. The Genesys Cloud side requires an OAuth client to validate that the incoming event payloads are legitimate Genesys events if you choose to verify signatures or re-query the API. For this tutorial, we focus on the deduplication logic within the AWS ecosystem.
Ensure your Lambda function or application has an IAM role attached with the following policies:
AmazonSQSFullAccess(or a custom policy allowingsqs:ReceiveMessage,sqs:DeleteMessage, andsqs:GetQueueAttributes).AWSLambdaBasicExecutionRole(for logging).
import boto3
import json
import hashlib
import uuid
from typing import Dict, List, Optional
from botocore.exceptions import ClientError
# Initialize AWS clients
sqs_client = boto3.client('sqs')
eventbridge_client = boto3.client('events')
Implementation
Step 1: Configure the EventBridge Rule and SQS Target
Before writing the deduplication code, you must ensure EventBridge sends events to a queue that supports deduplication. Standard SQS queues do not guarantee exactly-once delivery by default. You must use a FIFO (First-In-First-Out) queue or implement a custom deduplication window in your consumer code.
For this tutorial, we assume you are using a Standard SQS Queue as the target because EventBridge does not natively support FIFO targets in all regions or configurations without significant overhead. Therefore, the deduplication logic must be implemented in the consumer (Lambda/Consumer App).
Configuration:
- Create a Standard SQS Queue named
GenesysEventsQueue. - Create an EventBridge Rule targeting this SQS queue.
- Set the Event Pattern in EventBridge to match Genesys Cloud events:
{ "source": ["genesys.cloud"], "detail-type": ["Genesys Cloud Event"] }
Step 2: Implement Content-Based Deduplication Logic
The core problem is that EventBridge may retry events if the SQS queue returns an error, or if there is a transient network issue. This results in duplicate events arriving at your consumer.
We will implement a sliding window deduplication strategy using an in-memory cache (for demonstration) or a distributed cache like Redis (for production). For this tutorial, we will use a simple in-memory dictionary with a Time-To-Live (TTL) simulation to keep the code self-contained. In a production Lambda environment, you would use a global variable that persists across invocations in the same container, or DynamoDB/Redis for cross-container deduplication.
The Deduplication Key:
We will generate a hash of the event payload. If the hash exists in our cache and has not expired, we discard the event.
import time
from collections import OrderedDict
class DeduplicationCache:
"""
A simple in-memory cache for deduplication.
In production, replace this with Redis or DynamoDB.
"""
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self.cache: OrderedDict = OrderedDict()
self.last_cleanup = time.time()
def is_duplicate(self, event_hash: str) -> bool:
"""
Check if an event hash exists in the cache.
If it exists, return True (duplicate).
If it does not exist, add it to the cache and return False (unique).
"""
current_time = time.time()
# Cleanup old entries if too much time has passed
if current_time - self.last_cleanup > self.window_seconds:
self._cleanup()
self.last_cleanup = current_time
if event_hash in self.cache:
# Event is within the window, so it is a duplicate
return True
else:
# Event is new, add to cache
self.cache[event_hash] = current_time
return False
def _cleanup(self):
"""Remove entries older than the window."""
current_time = time.time()
cutoff_time = current_time - self.window_seconds
while self.cache and self.cache[next(iter(self.cache))] < cutoff_time:
self.cache.popitem(last=False)
# Global instance to persist across Lambda invocations (cold start only resets this)
dedup_cache = DeduplicationCache(window_seconds=300)
Step 3: Process SQS Messages and Apply Deduplication
Now we combine the cache with the SQS consumer logic. We will read messages from the SQS queue, calculate a hash of the detail field from the EventBridge payload, and check for duplicates.
Important: EventBridge sends events in the MessageBody of the SQS message. The structure is:
{
"source": "genesys.cloud",
"account": "123456789",
"region": "us-east-1",
"time": "2023-10-27T10:00:00Z",
"id": "unique-event-id",
"detail-type": "Genesys Cloud Event",
"resources": [],
"detail": {
"eventType": "conversation/updated",
"payload": { ... }
}
}
def generate_event_hash(event_payload: Dict) -> str:
"""
Generate a deterministic hash for the event detail.
We hash the 'detail' field because it contains the actual event data.
"""
# Normalize the JSON string to ensure consistent hashing regardless of key order
normalized_json = json.dumps(event_payload.get('detail', {}), sort_keys=True)
return hashlib.sha256(normalized_json.encode('utf-8')).hexdigest()
def process_genesis_event(event_detail: Dict) -> None:
"""
Placeholder for your business logic.
"""
print(f"Processing unique event: {event_detail.get('eventType')}")
# Add your downstream logic here (e.g., update CRM, send notification)
def handle_sqs_event(sqs_event: Dict) -> None:
"""
Main handler for SQS events.
"""
records = sqs_event.get('Records', [])
if not records:
return
for record in records:
try:
# Parse the SQS message body
message_body = json.loads(record['body'])
# Generate hash for deduplication
event_hash = generate_event_hash(message_body)
# Check for duplicates
if dedup_cache.is_duplicate(event_hash):
print(f"Duplicate event detected. Skipping. Hash: {event_hash[:10]}...")
# Do not delete the message yet if you want to keep it in the queue for retry?
# Usually, for duplicates, you want to delete it to prevent infinite loops.
# However, if this is a transient duplicate from a retry, you might want to
# let it stay if the previous processing failed.
# For this tutorial, we assume idempotency: if we processed it once, we don't need it again.
delete_message(record['receiptHandle'], record['eventSourceARN'])
continue
# Process the unique event
process_genesis_event(message_body.get('detail', {}))
# Delete the message from the queue to acknowledge processing
delete_message(record['receiptHandle'], record['eventSourceARN'])
except json.JSONDecodeError as e:
print(f"Invalid JSON in message body: {e}")
# Do not delete, let it go to Dead Letter Queue (DLQ)
except Exception as e:
print(f"Error processing message: {e}")
# Do not delete, let it retry
def delete_message(receipt_handle: str, queue_url: str) -> None:
"""
Delete a message from the SQS queue.
"""
try:
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except ClientError as e:
print(f"Error deleting message from SQS: {e}")
Step 4: Handle Edge Cases and Idempotency
The in-memory cache approach has a limitation: if your Lambda function scales out to multiple containers, each container has its own cache. An event processed in Container A will not be known to Container B, leading to duplicates.
To solve this in production, you must use a distributed state store. Below is an example using DynamoDB for global deduplication.
DynamoDB Table Structure:
- Table Name:
GenesysEventDeduplication - Partition Key:
event_hash(String) - Time-To-Live (TTL):
expires_at(Number, Unix timestamp)
import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb')
dedup_table = dynamodb.Table('GenesysEventDeduplication')
def is_duplicate_global(event_hash: str, window_seconds: int = 300) -> bool:
"""
Check DynamoDB for duplicate events.
Uses conditional writes to ensure atomicity.
"""
import time
expires_at = int(time.time()) + window_seconds
try:
# Attempt to put the item only if it does not exist
dedup_table.put_item(
Item={
'event_hash': event_hash,
'expires_at': expires_at
},
ConditionExpression='attribute_not_exists(event_hash)'
)
# If put_item succeeds, the item was not there before.
return False
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
# The item already exists, so it is a duplicate
return True
else:
# Other errors (e.g., network, permissions)
raise e
Replace dedup_cache.is_duplicate() in the handle_sqs_event function with is_duplicate_global() for production-grade deduplication.
Complete Working Example
This is a complete Lambda handler script. It includes the SQS polling logic, the deduplication cache (in-memory for simplicity), and the error handling.
import json
import hashlib
import boto3
from botocore.exceptions import ClientError
from typing import Dict
# AWS Clients
sqs_client = boto3.client('sqs')
QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/GenesysEventsQueue' # Replace with your Queue URL
# In-Memory Deduplication Cache
class SimpleDedupCache:
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self.seen_hashes = set()
self.timestamps = {}
def is_duplicate(self, event_hash: str) -> bool:
import time
now = time.time()
# Cleanup old hashes
expired = [h for h, t in self.timestamps.items() if now - t > self.window_seconds]
for h in expired:
self.seen_hashes.discard(h)
del self.timestamps[h]
if event_hash in self.seen_hashes:
return True
self.seen_hashes.add(event_hash)
self.timestamps[event_hash] = now
return False
cache = SimpleDedupCache()
def lambda_handler(event: Dict, context: Dict) -> Dict:
"""
AWS Lambda handler for SQS events.
"""
records = event.get('Records', [])
processed_count = 0
duplicate_count = 0
error_count = 0
for record in records:
try:
body = json.loads(record['body'])
# Generate hash from the 'detail' part of the EventBridge event
detail = body.get('detail', {})
normalized_detail = json.dumps(detail, sort_keys=True)
event_hash = hashlib.sha256(normalized_detail.encode('utf-8')).hexdigest()
if cache.is_duplicate(event_hash):
print(f"Duplicate event ignored. Hash: {event_hash[:10]}...")
duplicate_count += 1
# Delete the duplicate message to prevent reprocessing
delete_from_sqs(record['receiptHandle'])
continue
# Process the unique event
process_event(detail)
processed_count += 1
# Delete the message after successful processing
delete_from_sqs(record['receiptHandle'])
except json.JSONDecodeError as e:
print(f"JSON Error: {e}")
error_count += 1
# Do not delete, let SQS retry or move to DLQ
except Exception as e:
print(f"Processing Error: {e}")
error_count += 1
# Do not delete, let SQS retry
return {
'statusCode': 200,
'body': json.dumps({
'processed': processed_count,
'duplicates': duplicate_count,
'errors': error_count
})
}
def process_event(detail: Dict) -> None:
"""
Placeholder for business logic.
"""
event_type = detail.get('eventType', 'unknown')
print(f"Successfully processed event type: {event_type}")
# Add your Genesys Cloud API calls or database updates here
def delete_from_sqs(receipt_handle: str) -> None:
"""
Delete message from SQS.
"""
try:
sqs_client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
except ClientError as e:
print(f"Failed to delete message from SQS: {e}")
Common Errors & Debugging
Error: ConditionalCheckFailedException in DynamoDB
- What causes it: This is the expected behavior when using DynamoDB for deduplication. It means the event hash already exists in the table.
- How to fix it: Catch this exception and treat it as a “duplicate found” signal. Do not log it as an error.
Error: MessageNotInFlight or ReceiptHandleIsInvalid
- What causes it: You are trying to delete a message from SQS using a receipt handle that has already expired or been used. SQS receipt handles are valid for a limited time (default 30 seconds, configurable up to 12 hours).
- How to fix it: Ensure your Lambda function processes messages quickly. If processing takes longer than the visibility timeout, the message becomes visible again, and the original receipt handle becomes invalid. Increase the SQS queue’s
VisibilityTimeoutor optimize your processing logic.
Error: Duplicate Events Still Occurring
- What causes it: If using the in-memory cache, Lambda cold starts reset the cache. If using DynamoDB, ensure your partition key is unique enough. If two events have identical
detailpayloads but different timestamps in the outer EventBridge envelope, they will have the same hash. This is correct behavior for content-based deduplication. If you need to distinguish between retries of the same logical event, ensure thedetailpayload contains a unique ID (e.g.,conversationId+timestamp). - How to fix it: For production, always use DynamoDB or Redis. For in-memory testing, warm up your Lambda instances to maintain cache state.