EventBridge Integration Deduplication Strategy
What You Will Build
- A robust consumer service that ingests events from Amazon EventBridge and eliminates duplicates based on a unique event identifier.
- A deduplication logic layer that uses a sliding window cache to prevent processing the same event twice within a configurable time frame.
- A complete implementation in Python using
boto3andredis(or an in-memory alternative) to handle high-throughput event streams.
Prerequisites
- AWS Account: Access to Amazon EventBridge and Amazon SQS (or Lambda).
- SDK:
boto3(AWS SDK for Python) version 1.28 or higher. - Cache Layer: A running instance of Redis or an in-memory dictionary for this tutorial (production requires Redis/ElastiCache).
- Language: Python 3.9+
- Dependencies:
pip install boto3 redis
Authentication Setup
For this integration, authentication is handled via the AWS Identity and Access Management (IAM) role attached to the execution environment (Lambda, ECS, or EC2). You do not need to manually manage OAuth tokens for EventBridge. However, the IAM role must have the following permissions:
- AmazonSQSFullAccess (if using SQS as the target) or AWSLambdaInvoke (if using Lambda).
- AmazonEventBridgeFullAccess (for putting test events).
- AmazonS3ReadOnlyAccess (if storing dead-letter queue logs).
If you are running this locally for development, ensure your ~/.aws/credentials file is configured with an IAM user that has these permissions.
# Verify AWS configuration
aws sts get-caller-identity
Implementation
Step 1: Define the Deduplication Contract
Before writing the consumer, we must define what makes an event “unique.” In Genesys Cloud or NICE CXone integrations, events often contain a conversationId, interactionId, or a system-generated eventId.
We will create a DeduplicationService class. This class will manage a cache of seen event IDs. We use a Time-To-Live (TTL) approach because keeping a record of every event forever is inefficient. We only care about duplicates that arrive within a short window (e.g., 5 minutes).
Why a Sliding Window?
Network retries can cause the same event to be published multiple times within seconds. However, an event from yesterday is not a duplicate of today’s event. A sliding window (TTL) balances memory usage with deduplication accuracy.
import time
import json
from typing import Optional, Set
import redis
class DeduplicationService:
"""
Handles deduplication of incoming events using a TTL-based cache.
"""
def __init__(self, cache: redis.Redis, ttl_seconds: int = 300):
"""
:param cache: Redis client instance.
:param ttl_seconds: Time in seconds to keep an event ID in the cache.
"""
self.cache = cache
self.ttl_seconds = ttl_seconds
def is_duplicate(self, event_id: str) -> bool:
"""
Checks if an event_id has been seen recently.
If not, adds it to the cache.
:param event_id: Unique identifier for the event.
:return: True if duplicate, False if new.
"""
# Use SET with NX (Only set if Not eXists) and EX (Expire)
# This is an atomic operation in Redis
result = self.cache.set(f"event:{event_id}", "1", nx=True, ex=self.ttl_seconds)
# If set() returns None, the key already existed
return result is None
def process_event(self, event: dict) -> Optional[dict]:
"""
Extracts the ID and checks for duplicates.
:param event: The raw EventBridge event payload.
:return: The processed event payload if new, None if duplicate.
"""
# Extract the unique ID.
# Adjust this key based on your source (Genesys/NICE)
# Example: Genesys Cloud often puts this in 'detail' or 'id'
event_id = self._extract_event_id(event)
if not event_id:
raise ValueError("Event does not contain a valid unique ID.")
if self.is_duplicate(event_id):
print(f"Deduplication hit: Skipping event {event_id}")
return None
print(f"New event detected: {event_id}")
return event
def _extract_event_id(self, event: dict) -> Optional[str]:
"""
Helper to extract the unique ID from various event formats.
"""
# Standard EventBridge structure
# { "id": "...", "source": "...", "detail": { ... } }
# Try standard EventBridge ID
if event.get("id"):
return event["id"]
# Try Genesys Cloud specific ID in detail
if event.get("detail") and isinstance(event["detail"], dict):
if event["detail"].get("conversationId"):
return f"gc_{event['detail']['conversationId']}"
if event["detail"].get("interactionId"):
return f"gc_{event['detail']['interactionId']}"
# Try NICE CXone specific ID
if event.get("payload") and isinstance(event["payload"], dict):
if event["payload"].get("interactionId"):
return f"nice_{event['payload']['interactionId']}"}
return None
Step 2: Build the Event Consumer
Now we create the consumer. This example assumes the events are landing in an Amazon SQS queue, which is a common pattern for reliability. SQS guarantees “at-least-once” delivery, which is why duplicates occur.
We will use boto3 to poll the queue.
import boto3
import json
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventConsumer:
def __init__(self, queue_url: str, dedup_service: DeduplicationService):
self.sqs = boto3.client('sqs')
self.queue_url = queue_url
self.dedup_service = dedup_service
self.max_messages = 10 # SQS max batch size
def poll_and_process(self):
"""
Polls the SQS queue for messages, processes them, and deletes them.
"""
try:
# Receive messages from the queue
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.max_messages,
WaitTimeSeconds=5, # Long polling
VisibilityTimeout=30
)
messages = response.get('Messages', [])
if not messages:
logger.info("No messages received.")
return
logger.info(f"Received {len(messages)} messages.")
successful_deletes = []
for message in messages:
try:
# 1. Parse the message body
body = json.loads(message['Body'])
# 2. Deduplicate
processed_event = self.dedup_service.process_event(body)
if processed_event is None:
# It was a duplicate, we can safely delete it
# without processing business logic
logger.info(f"Skipping duplicate: {message['MessageId']}")
successful_deletes.append(message['ReceiptHandle'])
continue
# 3. Process the unique event
# Replace this with your actual business logic
self._handle_business_logic(processed_event)
# 4. Mark as successful for deletion
successful_deletes.append(message['ReceiptHandle'])
except Exception as e:
logger.error(f"Error processing message {message['MessageId']}: {str(e)}")
# Do NOT delete the message. It will become visible again after VisibilityTimeout
# This allows for retry. Ensure your error handling is idempotent.
# 5. Delete successfully processed messages in batch
if successful_deletes:
delete_entries = [
{"Id": str(i), "ReceiptHandle": handle}
for i, handle in enumerate(successful_deletes)
]
self.sqs.delete_message_batch(
QueueUrl=self.queue_url,
Entries=delete_entries
)
logger.info(f"Deleted {len(successful_deletes)} messages.")
except Exception as e:
logger.error(f"Error polling queue: {str(e)}")
def _handle_business_logic(self, event: dict):
"""
Placeholder for actual business logic.
"""
logger.info(f"Processing unique event: {event.get('id', 'unknown')}")
# Example: Update a database, trigger a webhook, etc.
Step 3: Implement the Producer (For Testing)
To verify the deduplication strategy, we need a producer that sends the same event multiple times.
import boto3
import json
import time
class EventProducer:
def __init__(self, event_bus_name: str):
self.eventbridge = boto3.client('events')
self.event_bus_name = event_bus_name
def send_test_event(self, event_id: str, count: int = 3):
"""
Sends the same event multiple times to simulate retries/duplicates.
"""
base_event = {
"Source": "com.mycompany.test",
"DetailType": "TestEvent",
"Detail": json.dumps({
"conversationId": "conv-12345",
"timestamp": time.time(),
"data": {"status": "connected"}
}),
"EventBusName": self.event_bus_name
}
for i in range(count):
try:
response = self.eventbridge.put_events(
Entries=[base_event]
)
failed_count = response.get('FailedEntryCount', 0)
if failed_count > 0:
print(f"Failed to send event: {response.get('Entries')}")
else:
print(f"Sent event copy {i+1}/{count}")
time.sleep(0.5) # Small delay to ensure distinct timestamps if needed
except Exception as e:
print(f"Error sending event: {e}")
Complete Working Example
This script combines the producer, deduplication service, and consumer. It uses an in-memory dictionary for the cache to allow you to run it without a Redis server. For production, replace SimpleCache with the RedisCache implementation from Step 1.
import boto3
import json
import time
import logging
from typing import Optional, Set
from collections import OrderedDict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SimpleCache:
"""
In-memory cache for demonstration purposes.
Replace with Redis for production.
"""
def __init__(self, ttl_seconds: int = 300):
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict = OrderedDict()
def set(self, key: str, value: str, nx: bool = False, ex: int = None) -> Optional[bool]:
"""
Mimics Redis set behavior.
nx=True: Only set if not exists.
ex: Expire in seconds.
"""
if key in self.cache:
if nx:
return None # Key exists, do nothing
# Update TTL
self.cache[key] = (value, time.time() + ex)
return True
self.cache[key] = (value, time.time() + ex)
return True
def cleanup(self):
"""Remove expired keys"""
now = time.time()
expired_keys = [k for k, (v, exp) in self.cache.items() if now > exp]
for k in expired_keys:
del self.cache[k]
class DeduplicationService:
def __init__(self, cache: SimpleCache):
self.cache = cache
def is_duplicate(self, event_id: str) -> bool:
self.cache.cleanup()
result = self.cache.set(f"event:{event_id}", "1", nx=True, ex=self.cache.ttl_seconds)
return result is None
def process_event(self, event: dict) -> Optional[dict]:
event_id = self._extract_event_id(event)
if not event_id:
raise ValueError("Event does not contain a valid unique ID.")
if self.is_duplicate(event_id):
logger.info(f"Deduplication hit: Skipping event {event_id}")
return None
logger.info(f"New event detected: {event_id}")
return event
def _extract_event_id(self, event: dict) -> Optional[str]:
if event.get("id"):
return event["id"]
if event.get("detail") and isinstance(event["detail"], str):
try:
detail_obj = json.loads(event["detail"])
if detail_obj.get("conversationId"):
return f"gc_{detail_obj['conversationId']}"
except json.JSONDecodeError:
pass
return None
class EventConsumer:
def __init__(self, queue_url: str, dedup_service: DeduplicationService):
self.sqs = boto3.client('sqs')
self.queue_url = queue_url
self.dedup_service = dedup_service
def poll_and_process(self):
try:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
VisibilityTimeout=30
)
messages = response.get('Messages', [])
if not messages:
return
successful_deletes = []
for message in messages:
try:
body = json.loads(message['Body'])
processed_event = self.dedup_service.process_event(body)
if processed_event is None:
successful_deletes.append(message['ReceiptHandle'])
continue
self._handle_business_logic(processed_event)
successful_deletes.append(message['ReceiptHandle'])
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
if successful_deletes:
delete_entries = [
{"Id": str(i), "ReceiptHandle": handle}
for i, handle in enumerate(successful_deletes)
]
self.sqs.delete_message_batch(
QueueUrl=self.queue_url,
Entries=delete_entries
)
except Exception as e:
logger.error(f"Error polling queue: {str(e)}")
def _handle_business_logic(self, event: dict):
logger.info(f"Processing unique event: {event.get('id', 'unknown')}")
def create_test_infrastructure(event_bus_name: str, queue_name: str) -> str:
"""
Helper to create SQS Queue and EventBridge Rule for testing.
"""
sqs = boto3.client('sqs')
events = boto3.client('events')
# Create SQS Queue
try:
queue_resp = sqs.create_queue(QueueName=queue_name)
queue_url = queue_resp['QueueUrl']
queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
print(f"Created/Found Queue: {queue_arn}")
except sqs.exceptions.QueueAlreadyExists:
queue_url = sqs.get_queue_url(QueueName=queue_name)['QueueUrl']
queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
# Allow EventBridge to send to SQS
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'Policy': json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "events.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn,
"Condition": {
"ArnEquals": {"aws:SourceArn": f"arn:aws:events:*:*:event-bus/{event_bus_name}"}
}
}]
})
}
)
# Create EventBridge Rule
rule_name = f"TestRule_{queue_name}"
try:
events.put_rule(
Name=rule_name,
EventPattern=json.dumps({"source": ["com.mycompany.test"]}),
State="ENABLED",
EventBusName=event_bus_name
)
events.put_targets(
Rule=rule_name,
EventBusName=event_bus_name,
Targets=[{
"Id": "1",
"Arn": queue_arn
}]
)
print(f"Created Rule: {rule_name}")
except events.exceptions.ResourceAlreadyExistsException:
print(f"Rule {rule_name} already exists.")
return queue_url
if __name__ == "__main__":
EVENT_BUS_NAME = "default" # Or your custom bus
QUEUE_NAME = "TestDedupQueue"
# 1. Setup Infrastructure
queue_url = create_test_infrastructure(EVENT_BUS_NAME, QUEUE_NAME)
# 2. Initialize Services
cache = SimpleCache(ttl_seconds=300)
dedup_service = DeduplicationService(cache)
consumer = EventConsumer(queue_url, dedup_service)
producer = boto3.client('events')
# 3. Send Duplicate Events
print("\n--- Sending 5 Duplicate Events ---")
for i in range(5):
try:
producer.put_events(
Entries=[{
"Source": "com.mycompany.test",
"DetailType": "TestEvent",
"Detail": json.dumps({"conversationId": "conv-12345", "timestamp": time.time()}),
"EventBusName": EVENT_BUS_NAME
}]
)
print(f"Sent event {i+1}")
except Exception as e:
print(f"Error: {e}")
# 4. Consume and Deduplicate
print("\n--- Consuming Events ---")
time.sleep(2) # Allow events to propagate
consumer.poll_and_process()
print("\n--- Done ---")
Common Errors & Debugging
Error: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the SendMessage
- Cause: The EventBridge rule does not have permission to send messages to the SQS queue.
- Fix: Ensure the SQS Queue Policy allows
events.amazonaws.comto callsqs:SendMessage. Use thecreate_test_infrastructurefunction above to set this policy automatically.
Error: ValueError: Event does not contain a valid unique ID
- Cause: The
_extract_event_idmethod could not find a unique identifier in the event payload. - Fix: Inspect the raw event JSON. Genesys Cloud events often nest the ID in
detail.conversationId. NICE CXone events may usepayload.interactionId. Update the_extract_event_idlogic to match your specific event schema.
Error: Duplicate events still processing
- Cause: The cache is not persistent across restarts, or the TTL is too short.
- Fix: For production, use Redis. Ensure the
event_idis truly unique. If your source system generates a new ID for each retry, deduplication by ID will fail. In that case, you must deduplicate by a business key (e.g.,conversationId+timestamprounded to the nearest minute).
Error: QueueUrl not found
- Cause: The SQS queue was not created or the name is incorrect.
- Fix: Verify the queue exists in the AWS Console under SQS. Ensure the region in your AWS CLI/SDK matches the region where the queue was created.