EventBridge Integration Deduplication Strategy

EventBridge Integration Deduplication Strategy

What You Will Build

  • A robust consumer service that ingests events from Amazon EventBridge and eliminates duplicates based on a unique event identifier.
  • A deduplication logic layer that uses a sliding window cache to prevent processing the same event twice within a configurable time frame.
  • A complete implementation in Python using boto3 and redis (or an in-memory alternative) to handle high-throughput event streams.

Prerequisites

  • AWS Account: Access to Amazon EventBridge and Amazon SQS (or Lambda).
  • SDK: boto3 (AWS SDK for Python) version 1.28 or higher.
  • Cache Layer: A running instance of Redis or an in-memory dictionary for this tutorial (production requires Redis/ElastiCache).
  • Language: Python 3.9+
  • Dependencies: pip install boto3 redis

Authentication Setup

For this integration, authentication is handled via the AWS Identity and Access Management (IAM) role attached to the execution environment (Lambda, ECS, or EC2). You do not need to manually manage OAuth tokens for EventBridge. However, the IAM role must have the following permissions:

  1. AmazonSQSFullAccess (if using SQS as the target) or AWSLambdaInvoke (if using Lambda).
  2. AmazonEventBridgeFullAccess (for putting test events).
  3. AmazonS3ReadOnlyAccess (if storing dead-letter queue logs).

If you are running this locally for development, ensure your ~/.aws/credentials file is configured with an IAM user that has these permissions.

# Verify AWS configuration
aws sts get-caller-identity

Implementation

Step 1: Define the Deduplication Contract

Before writing the consumer, we must define what makes an event “unique.” In Genesys Cloud or NICE CXone integrations, events often contain a conversationId, interactionId, or a system-generated eventId.

We will create a DeduplicationService class. This class will manage a cache of seen event IDs. We use a Time-To-Live (TTL) approach because keeping a record of every event forever is inefficient. We only care about duplicates that arrive within a short window (e.g., 5 minutes).

Why a Sliding Window?
Network retries can cause the same event to be published multiple times within seconds. However, an event from yesterday is not a duplicate of today’s event. A sliding window (TTL) balances memory usage with deduplication accuracy.

import time
import json
from typing import Optional, Set
import redis

class DeduplicationService:
    """
    Handles deduplication of incoming events using a TTL-based cache.
    """
    def __init__(self, cache: redis.Redis, ttl_seconds: int = 300):
        """
        :param cache: Redis client instance.
        :param ttl_seconds: Time in seconds to keep an event ID in the cache.
        """
        self.cache = cache
        self.ttl_seconds = ttl_seconds

    def is_duplicate(self, event_id: str) -> bool:
        """
        Checks if an event_id has been seen recently.
        If not, adds it to the cache.
        
        :param event_id: Unique identifier for the event.
        :return: True if duplicate, False if new.
        """
        # Use SET with NX (Only set if Not eXists) and EX (Expire)
        # This is an atomic operation in Redis
        result = self.cache.set(f"event:{event_id}", "1", nx=True, ex=self.ttl_seconds)
        
        # If set() returns None, the key already existed
        return result is None

    def process_event(self, event: dict) -> Optional[dict]:
        """
        Extracts the ID and checks for duplicates.
        
        :param event: The raw EventBridge event payload.
        :return: The processed event payload if new, None if duplicate.
        """
        # Extract the unique ID. 
        # Adjust this key based on your source (Genesys/NICE)
        # Example: Genesys Cloud often puts this in 'detail' or 'id'
        event_id = self._extract_event_id(event)
        
        if not event_id:
            raise ValueError("Event does not contain a valid unique ID.")

        if self.is_duplicate(event_id):
            print(f"Deduplication hit: Skipping event {event_id}")
            return None
        
        print(f"New event detected: {event_id}")
        return event

    def _extract_event_id(self, event: dict) -> Optional[str]:
        """
        Helper to extract the unique ID from various event formats.
        """
        # Standard EventBridge structure
        # { "id": "...", "source": "...", "detail": { ... } }
        
        # Try standard EventBridge ID
        if event.get("id"):
            return event["id"]
        
        # Try Genesys Cloud specific ID in detail
        if event.get("detail") and isinstance(event["detail"], dict):
            if event["detail"].get("conversationId"):
                return f"gc_{event['detail']['conversationId']}"
            if event["detail"].get("interactionId"):
                return f"gc_{event['detail']['interactionId']}"
                
        # Try NICE CXone specific ID
        if event.get("payload") and isinstance(event["payload"], dict):
            if event["payload"].get("interactionId"):
                return f"nice_{event['payload']['interactionId']}"}
                
        return None

Step 2: Build the Event Consumer

Now we create the consumer. This example assumes the events are landing in an Amazon SQS queue, which is a common pattern for reliability. SQS guarantees “at-least-once” delivery, which is why duplicates occur.

We will use boto3 to poll the queue.

import boto3
import json
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EventConsumer:
    def __init__(self, queue_url: str, dedup_service: DeduplicationService):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url
        self.dedup_service = dedup_service
        self.max_messages = 10  # SQS max batch size

    def poll_and_process(self):
        """
        Polls the SQS queue for messages, processes them, and deletes them.
        """
        try:
            # Receive messages from the queue
            response = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=self.max_messages,
                WaitTimeSeconds=5,  # Long polling
                VisibilityTimeout=30
            )
            
            messages = response.get('Messages', [])
            
            if not messages:
                logger.info("No messages received.")
                return

            logger.info(f"Received {len(messages)} messages.")

            successful_deletes = []
            
            for message in messages:
                try:
                    # 1. Parse the message body
                    body = json.loads(message['Body'])
                    
                    # 2. Deduplicate
                    processed_event = self.dedup_service.process_event(body)
                    
                    if processed_event is None:
                        # It was a duplicate, we can safely delete it
                        # without processing business logic
                        logger.info(f"Skipping duplicate: {message['MessageId']}")
                        successful_deletes.append(message['ReceiptHandle'])
                        continue
                    
                    # 3. Process the unique event
                    # Replace this with your actual business logic
                    self._handle_business_logic(processed_event)
                    
                    # 4. Mark as successful for deletion
                    successful_deletes.append(message['ReceiptHandle'])

                except Exception as e:
                    logger.error(f"Error processing message {message['MessageId']}: {str(e)}")
                    # Do NOT delete the message. It will become visible again after VisibilityTimeout
                    # This allows for retry. Ensure your error handling is idempotent.

            # 5. Delete successfully processed messages in batch
            if successful_deletes:
                delete_entries = [
                    {"Id": str(i), "ReceiptHandle": handle}
                    for i, handle in enumerate(successful_deletes)
                ]
                
                self.sqs.delete_message_batch(
                    QueueUrl=self.queue_url,
                    Entries=delete_entries
                )
                logger.info(f"Deleted {len(successful_deletes)} messages.")

        except Exception as e:
            logger.error(f"Error polling queue: {str(e)}")

    def _handle_business_logic(self, event: dict):
        """
        Placeholder for actual business logic.
        """
        logger.info(f"Processing unique event: {event.get('id', 'unknown')}")
        # Example: Update a database, trigger a webhook, etc.

Step 3: Implement the Producer (For Testing)

To verify the deduplication strategy, we need a producer that sends the same event multiple times.

import boto3
import json
import time

class EventProducer:
    def __init__(self, event_bus_name: str):
        self.eventbridge = boto3.client('events')
        self.event_bus_name = event_bus_name

    def send_test_event(self, event_id: str, count: int = 3):
        """
        Sends the same event multiple times to simulate retries/duplicates.
        """
        base_event = {
            "Source": "com.mycompany.test",
            "DetailType": "TestEvent",
            "Detail": json.dumps({
                "conversationId": "conv-12345",
                "timestamp": time.time(),
                "data": {"status": "connected"}
            }),
            "EventBusName": self.event_bus_name
        }

        for i in range(count):
            try:
                response = self.eventbridge.put_events(
                    Entries=[base_event]
                )
                failed_count = response.get('FailedEntryCount', 0)
                if failed_count > 0:
                    print(f"Failed to send event: {response.get('Entries')}")
                else:
                    print(f"Sent event copy {i+1}/{count}")
                time.sleep(0.5) # Small delay to ensure distinct timestamps if needed
            except Exception as e:
                print(f"Error sending event: {e}")

Complete Working Example

This script combines the producer, deduplication service, and consumer. It uses an in-memory dictionary for the cache to allow you to run it without a Redis server. For production, replace SimpleCache with the RedisCache implementation from Step 1.

import boto3
import json
import time
import logging
from typing import Optional, Set
from collections import OrderedDict

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SimpleCache:
    """
    In-memory cache for demonstration purposes. 
    Replace with Redis for production.
    """
    def __init__(self, ttl_seconds: int = 300):
        self.ttl_seconds = ttl_seconds
        self.cache: OrderedDict = OrderedDict()

    def set(self, key: str, value: str, nx: bool = False, ex: int = None) -> Optional[bool]:
        """
        Mimics Redis set behavior.
        nx=True: Only set if not exists.
        ex: Expire in seconds.
        """
        if key in self.cache:
            if nx:
                return None # Key exists, do nothing
            # Update TTL
            self.cache[key] = (value, time.time() + ex)
            return True
        
        self.cache[key] = (value, time.time() + ex)
        return True

    def cleanup(self):
        """Remove expired keys"""
        now = time.time()
        expired_keys = [k for k, (v, exp) in self.cache.items() if now > exp]
        for k in expired_keys:
            del self.cache[k]

class DeduplicationService:
    def __init__(self, cache: SimpleCache):
        self.cache = cache

    def is_duplicate(self, event_id: str) -> bool:
        self.cache.cleanup()
        result = self.cache.set(f"event:{event_id}", "1", nx=True, ex=self.cache.ttl_seconds)
        return result is None

    def process_event(self, event: dict) -> Optional[dict]:
        event_id = self._extract_event_id(event)
        if not event_id:
            raise ValueError("Event does not contain a valid unique ID.")
        
        if self.is_duplicate(event_id):
            logger.info(f"Deduplication hit: Skipping event {event_id}")
            return None
        
        logger.info(f"New event detected: {event_id}")
        return event

    def _extract_event_id(self, event: dict) -> Optional[str]:
        if event.get("id"):
            return event["id"]
        if event.get("detail") and isinstance(event["detail"], str):
            try:
                detail_obj = json.loads(event["detail"])
                if detail_obj.get("conversationId"):
                    return f"gc_{detail_obj['conversationId']}"
            except json.JSONDecodeError:
                pass
        return None

class EventConsumer:
    def __init__(self, queue_url: str, dedup_service: DeduplicationService):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url
        self.dedup_service = dedup_service

    def poll_and_process(self):
        try:
            response = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5,
                VisibilityTimeout=30
            )
            
            messages = response.get('Messages', [])
            if not messages:
                return

            successful_deletes = []
            
            for message in messages:
                try:
                    body = json.loads(message['Body'])
                    processed_event = self.dedup_service.process_event(body)
                    
                    if processed_event is None:
                        successful_deletes.append(message['ReceiptHandle'])
                        continue
                    
                    self._handle_business_logic(processed_event)
                    successful_deletes.append(message['ReceiptHandle'])

                except Exception as e:
                    logger.error(f"Error processing message: {str(e)}")

            if successful_deletes:
                delete_entries = [
                    {"Id": str(i), "ReceiptHandle": handle}
                    for i, handle in enumerate(successful_deletes)
                ]
                self.sqs.delete_message_batch(
                    QueueUrl=self.queue_url,
                    Entries=delete_entries
                )

        except Exception as e:
            logger.error(f"Error polling queue: {str(e)}")

    def _handle_business_logic(self, event: dict):
        logger.info(f"Processing unique event: {event.get('id', 'unknown')}")

def create_test_infrastructure(event_bus_name: str, queue_name: str) -> str:
    """
    Helper to create SQS Queue and EventBridge Rule for testing.
    """
    sqs = boto3.client('sqs')
    events = boto3.client('events')
    
    # Create SQS Queue
    try:
        queue_resp = sqs.create_queue(QueueName=queue_name)
        queue_url = queue_resp['QueueUrl']
        queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
        print(f"Created/Found Queue: {queue_arn}")
    except sqs.exceptions.QueueAlreadyExists:
        queue_url = sqs.get_queue_url(QueueName=queue_name)['QueueUrl']
        queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']

    # Allow EventBridge to send to SQS
    sqs.set_queue_attributes(
        QueueUrl=queue_url,
        Attributes={
            'Policy': json.dumps({
                "Version": "2012-10-17",
                "Statement": [{
                    "Effect": "Allow",
                    "Principal": {"Service": "events.amazonaws.com"},
                    "Action": "sqs:SendMessage",
                    "Resource": queue_arn,
                    "Condition": {
                        "ArnEquals": {"aws:SourceArn": f"arn:aws:events:*:*:event-bus/{event_bus_name}"}
                    }
                }]
            })
        }
    )

    # Create EventBridge Rule
    rule_name = f"TestRule_{queue_name}"
    try:
        events.put_rule(
            Name=rule_name,
            EventPattern=json.dumps({"source": ["com.mycompany.test"]}),
            State="ENABLED",
            EventBusName=event_bus_name
        )
        events.put_targets(
            Rule=rule_name,
            EventBusName=event_bus_name,
            Targets=[{
                "Id": "1",
                "Arn": queue_arn
            }]
        )
        print(f"Created Rule: {rule_name}")
    except events.exceptions.ResourceAlreadyExistsException:
        print(f"Rule {rule_name} already exists.")

    return queue_url

if __name__ == "__main__":
    EVENT_BUS_NAME = "default" # Or your custom bus
    QUEUE_NAME = "TestDedupQueue"
    
    # 1. Setup Infrastructure
    queue_url = create_test_infrastructure(EVENT_BUS_NAME, QUEUE_NAME)
    
    # 2. Initialize Services
    cache = SimpleCache(ttl_seconds=300)
    dedup_service = DeduplicationService(cache)
    consumer = EventConsumer(queue_url, dedup_service)
    
    producer = boto3.client('events')
    
    # 3. Send Duplicate Events
    print("\n--- Sending 5 Duplicate Events ---")
    for i in range(5):
        try:
            producer.put_events(
                Entries=[{
                    "Source": "com.mycompany.test",
                    "DetailType": "TestEvent",
                    "Detail": json.dumps({"conversationId": "conv-12345", "timestamp": time.time()}),
                    "EventBusName": EVENT_BUS_NAME
                }]
            )
            print(f"Sent event {i+1}")
        except Exception as e:
            print(f"Error: {e}")
    
    # 4. Consume and Deduplicate
    print("\n--- Consuming Events ---")
    time.sleep(2) # Allow events to propagate
    consumer.poll_and_process()
    
    print("\n--- Done ---")

Common Errors & Debugging

Error: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the SendMessage

  • Cause: The EventBridge rule does not have permission to send messages to the SQS queue.
  • Fix: Ensure the SQS Queue Policy allows events.amazonaws.com to call sqs:SendMessage. Use the create_test_infrastructure function above to set this policy automatically.

Error: ValueError: Event does not contain a valid unique ID

  • Cause: The _extract_event_id method could not find a unique identifier in the event payload.
  • Fix: Inspect the raw event JSON. Genesys Cloud events often nest the ID in detail.conversationId. NICE CXone events may use payload.interactionId. Update the _extract_event_id logic to match your specific event schema.

Error: Duplicate events still processing

  • Cause: The cache is not persistent across restarts, or the TTL is too short.
  • Fix: For production, use Redis. Ensure the event_id is truly unique. If your source system generates a new ID for each retry, deduplication by ID will fail. In that case, you must deduplicate by a business key (e.g., conversationId + timestamp rounded to the nearest minute).

Error: QueueUrl not found

  • Cause: The SQS queue was not created or the name is incorrect.
  • Fix: Verify the queue exists in the AWS Console under SQS. Ensure the region in your AWS CLI/SDK matches the region where the queue was created.

Official References