EventBridge Integration Sending Duplicate Events — Deduplication Strategy

EventBridge Integration Sending Duplicate Events — Deduplication Strategy

What You Will Build

  • A robust Python service that ingests events from AWS EventBridge, detects duplicates using content-based hash keys, and forwards only unique events to a downstream system.
  • This solution uses the AWS SDK for Python (Boto3) for local processing and standard HTTP libraries for downstream integration.
  • The programming language covered is Python 3.9+.

Prerequisites

  • AWS Credentials: An IAM user or role with permissions to read from EventBridge (or Kinesis Data Streams if using the fan-out pattern) and write to DynamoDB or S3 for deduplication state.
  • Required IAM Permissions:
    • dynamodb:PutItem, dynamodb:GetItem (for state storage).
    • kinesis:GetRecords, kinesis:GetShardIterator (if consuming from Kinesis).
    • events:PutEvents (if re-pushing to another EventBridge bus).
  • SDK Version: Boto3 >= 1.26.0, Botocore >= 1.29.0.
  • Dependencies:
    • boto3
    • hashlib (standard library)
    • json (standard library)
    • uuid (standard library)

Authentication Setup

This tutorial assumes the application runs within an environment that provides IAM credentials automatically (e.g., AWS Lambda, ECS Task Role, or EC2 Instance Profile). If running locally, configure the AWS CLI or set environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

For production-grade services, never hardcode credentials. Use IAM Roles for Execution.

import boto3
import os
import json
import hashlib
import time
from typing import Dict, List, Any, Optional

# Initialize clients
def get_clients() -> Dict[str, Any]:
    """
    Initializes Boto3 clients for DynamoDB and Kinesis.
    Uses default credential provider chain.
    """
    region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
    
    session = boto3.Session(region_name=region)
    
    # DynamoDB for storing deduplication hashes
    dynamodb = session.client('dynamodb')
    
    # Kinesis for consuming events (if using fan-out)
    kinesis = session.client('kinesis')
    
    return {
        "dynamodb": dynamodb,
        "kinesis": kinesis,
        "region": region
    }

Implementation

Step 1: Define the Deduplication Key Strategy

EventBridge does not guarantee exactly-once delivery at the consumer level if the consumer acknowledges receipt slowly or if the producer retries. To deduplicate, you must generate a unique identifier for each event.

The most reliable method is a content-based hash. We will use SHA-256 on the detail field of the event, as this contains the payload data. The id field in the EventBridge event is unique per producer invocation, but if the producer retries the same logical action, the id changes. Therefore, hashing the business payload (detail) is superior for idempotency.

import hashlib
import json

def generate_dedup_key(event: Dict[str, Any]) -> str:
    """
    Generates a deterministic SHA-256 hash based on the event's detail content.
    
    Args:
        event: The raw EventBridge event dictionary.
        
    Returns:
        A hexadecimal string representing the hash of the event detail.
    """
    try:
        # Extract the detail object. If it is already a string, parse it.
        # If it is a dict, serialize it.
        detail = event.get("detail", {})
        
        if isinstance(detail, str):
            # Ensure consistent parsing
            detail_obj = json.loads(detail)
        else:
            detail_obj = detail
            
        # Sort keys to ensure deterministic hashing regardless of JSON order
        canonical_json = json.dumps(detail_obj, sort_keys=True, separators=(',', ':'))
        
        # Generate SHA-256 hash
        hash_object = hashlib.sha256(canonical_json.encode('utf-8'))
        hex_digest = hash_object.hexdigest()
        
        return hex_digest
    except Exception as e:
        # Fallback: if parsing fails, use the event ID as the key
        # This prevents blocking the stream but may allow duplicates
        print(f"Warning: Failed to hash detail for event {event.get('id')}. Error: {e}")
        return event.get("id", "unknown")

Step 2: Implement State Management with DynamoDB

We need a persistent store to check if a hash has been seen recently. DynamoDB is ideal for this due to its low latency and high throughput. We will use a Time-To-Live (TTL) attribute to automatically clean up old hashes, preventing the table from growing indefinitely.

DynamoDB Table Schema:

  • Partition Key: dedup_key (String)
  • TTL Attribute: expires_at (Number, Unix timestamp)
class DeduplicationManager:
    def __init__(self, dynamodb_client: Any, table_name: str, ttl_seconds: int = 3600):
        self.dynamodb = dynamodb_client
        self.table_name = table_name
        self.ttl_seconds = ttl_seconds

    def is_duplicate(self, dedup_key: str) -> bool:
        """
        Checks if the dedup_key exists in DynamoDB.
        
        Args:
            dedup_key: The SHA-256 hash of the event.
            
        Returns:
            True if the key exists (duplicate), False otherwise.
        """
        try:
            response = self.dynamodb.get_item(
                TableName=self.table_name,
                Key={
                    "dedup_key": {"S": dedup_key}
                }
            )
            # If 'Item' is present, it is a duplicate
            return "Item" in response
        except self.dynamodb.exceptions.ResourceNotFoundException:
            # Table does not exist, treat as not duplicate to avoid blocking
            print(f"Error: DynamoDB table {self.table_name} not found.")
            return False
        except Exception as e:
            # In case of network error, fail open (process the event)
            # to avoid data loss, though duplicates may occur.
            print(f"Error checking duplicate status: {e}")
            return False

    def mark_as_processed(self, dedup_key: str) -> None:
        """
        Stores the dedup_key in DynamoDB with a TTL.
        
        Args:
            dedup_key: The SHA-256 hash of the event.
        """
        try:
            # Calculate expiration time
            expires_at = int(time.time()) + self.ttl_seconds
            
            self.dynamodb.put_item(
                TableName=self.table_name,
                Item={
                    "dedup_key": {"S": dedup_key},
                    "expires_at": {"N": str(expires_at)}
                },
                # Conditional check to avoid overwriting existing items unnecessarily
                # Although for idempotency, overwriting with a new TTL is acceptable
                # if the event is re-processed after TTL expiry.
                ExpressionAttributeNames={
                    "#ttl": "expires_at"
                },
                UpdateExpression="SET #ttl = :val",
                ExpressionAttributeValues={
                    ":val": {"N": str(expires_at)}
                }
            )
        except Exception as e:
            print(f"Error marking item as processed: {e}")
            # Critical failure: log but do not crash the consumer

Step 3: Process Events and Apply Deduplication

This step ties the hash generation and state management together. We assume the input is a batch of events from EventBridge (via Lambda) or Kinesis.

def process_event_batch(events: List[Dict[str, Any]], dedup_manager: DeduplicationManager) -> Dict[str, int]:
    """
    Processes a batch of events, filtering out duplicates.
    
    Args:
        events: List of EventBridge event dictionaries.
        dedup_manager: Instance of DeduplicationManager.
        
    Returns:
        A dictionary with counts of processed, skipped, and failed events.
    """
    stats = {
        "processed": 0,
        "skipped_duplicate": 0,
        "failed": 0
    }
    
    for event in events:
        try:
            # Step 1: Generate the deduplication key
            dedup_key = generate_dedup_key(event)
            
            # Step 2: Check if it is a duplicate
            if dedup_manager.is_duplicate(dedup_key):
                stats["skipped_duplicate"] += 1
                print(f"Duplicate detected for key: {dedup_key[:16]}...")
                continue
            
            # Step 3: Process the unique event
            # Replace this with your actual business logic (e.g., API call, DB insert)
            forward_event_to_downstream(event)
            
            # Step 4: Mark as processed AFTER successful forward
            # This ensures at-least-once semantics for the downstream system
            dedup_manager.mark_as_processed(dedup_key)
            
            stats["processed"] += 1
            print(f"Processed event: {event.get('id')}")
            
        except Exception as e:
            stats["failed"] += 1
            print(f"Failed to process event {event.get('id')}: {e}")
            # Depending on requirements, you might want to retry or send to DLQ
            
    return stats

def forward_event_to_downstream(event: Dict[str, Any]) -> None:
    """
    Placeholder for downstream integration logic.
    In a real scenario, this might call an HTTP API or insert into a database.
    """
    # Example: Simulating an HTTP POST
    print(f"Forwarding event {event.get('id')} to downstream system...")
    # requests.post("https://api.example.com/events", json=event)
    pass

Complete Working Example

The following script demonstrates a complete consumer that simulates receiving events from EventBridge. In production, this logic would reside in an AWS Lambda function triggered by EventBridge or a Kinesis Data Stream.

import boto3
import json
import time
import hashlib
import os
from typing import Dict, List, Any

# --- Configuration ---
DYNAMODB_TABLE_NAME = os.environ.get("DEDUP_TABLE_NAME", "EventBridgeDedupTable")
TTL_SECONDS = 3600  # 1 hour retention for dedup keys

# --- Helpers ---

def generate_dedup_key(event: Dict[str, Any]) -> str:
    """Generates a SHA-256 hash of the event detail."""
    try:
        detail = event.get("detail", {})
        if isinstance(detail, str):
            detail_obj = json.loads(detail)
        else:
            detail_obj = detail
            
        # Canonical JSON: sorted keys, no whitespace
        canonical_json = json.dumps(detail_obj, sort_keys=True, separators=(',', ':'))
        return hashlib.sha256(canonical_json.encode('utf-8')).hexdigest()
    except Exception as e:
        print(f"Hash generation failed: {e}")
        return event.get("id", "fallback-id")

class DeduplicationManager:
    def __init__(self, dynamodb_client: Any, table_name: str, ttl_seconds: int):
        self.dynamodb = dynamodb_client
        self.table_name = table_name
        self.ttl_seconds = ttl_seconds

    def is_duplicate(self, dedup_key: str) -> bool:
        try:
            response = self.dynamodb.get_item(
                TableName=self.table_name,
                Key={"dedup_key": {"S": dedup_key}}
            )
            return "Item" in response
        except Exception as e:
            print(f"DynamoDB check failed: {e}")
            return False

    def mark_as_processed(self, dedup_key: str) -> None:
        try:
            expires_at = int(time.time()) + self.ttl_seconds
            self.dynamodb.put_item(
                TableName=self.table_name,
                Item={
                    "dedup_key": {"S": dedup_key},
                    "expires_at": {"N": str(expires_at)}
                }
            )
        except Exception as e:
            print(f"DynamoDB write failed: {e}")

def forward_event_to_downstream(event: Dict[str, Any]) -> None:
    """Simulates sending the event to a downstream API."""
    print(f"[DOWNSTREAM] Received unique event: {event.get('id')}")
    # In production:
    # import requests
    # resp = requests.post("https://your-api.com/webhook", json=event)
    # if resp.status_code != 200:
    #     raise Exception(f"Downstream failed: {resp.status_code}")

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    AWS Lambda entry point.
    Expects EventBridge format or Kinesis batch format.
    """
    dynamodb = boto3.client('dynamodb')
    dedup_manager = DeduplicationManager(dynamodb, DYNAMODB_TABLE_NAME, TTL_SECONDS)
    
    # Handle EventBridge direct trigger (single event or batch)
    # EventBridge Lambda trigger sends a single event object, not a list
    # However, Kinesis triggers send a list of records.
    
    records = []
    if "detail" in event:
        # Direct EventBridge trigger
        records = [event]
    elif "Records" in event:
        # Kinesis or SQS trigger
        for record in event["Records"]:
            # Parse the body if it's a Kinesis record
            if "kinesis" in record:
                import base64
                payload = base64.b64decode(record["kinesis"]["data"])
                try:
                    event_obj = json.loads(payload)
                    records.append(event_obj)
                except json.JSONDecodeError:
                    print(f"Failed to decode Kinesis data for record {record['kinesis']['sequenceNumber']}")
            else:
                # Assume raw event for other triggers
                records.append(record)
    else:
        print("Unknown event format")
        return {"statusCode": 400, "body": "Unknown format"}

    processed_count = 0
    skipped_count = 0
    failed_count = 0

    for record_event in records:
        try:
            dedup_key = generate_dedup_key(record_event)
            
            if dedup_manager.is_duplicate(dedup_key):
                skipped_count += 1
                print(f"Skipped duplicate: {dedup_key[:12]}...")
                continue
            
            # Process unique event
            forward_event_to_downstream(record_event)
            
            # Mark as processed
            dedup_manager.mark_as_processed(dedup_key)
            processed_count += 1
            
        except Exception as e:
            failed_count += 1
            print(f"Error processing event: {e}")

    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed": processed_count,
            "skipped_duplicates": skipped_count,
            "failed": failed_count
        })
    }

# --- Local Testing Simulation ---
if __name__ == "__main__":
    # Mock DynamoDB for local testing (using boto3 mock or localstack)
    # For this example, we assume a real DynamoDB table exists
    
    sample_event = {
        "version": "0",
        "id": "12345678-1234-1234-1234-123456789012",
        "detail-type": "OrderCreated",
        "source": "com.example.orders",
        "account": "123456789012",
        "time": "2023-10-27T10:00:00Z",
        "region": "us-east-1",
        "resources": ["arn:aws:dynamodb:us-east-1:123456789012:table/Orders"],
        "detail": {
            "orderId": "ORD-999",
            "amount": 100.50,
            "currency": "USD"
        }
    }

    # Simulate Lambda invocation
    print("=== Test 1: First Event (Should Process) ===")
    result1 = lambda_handler({"detail": sample_event["detail"], "id": sample_event["id"], "source": sample_event["source"]}, None)
    print(f"Result: {result1}")

    print("\n=== Test 2: Duplicate Event (Should Skip) ===")
    # Send the exact same event again
    result2 = lambda_handler({"detail": sample_event["detail"], "id": "different-id-but-same-detail", "source": sample_event["source"]}, None)
    print(f"Result: {result2}")

Common Errors & Debugging

Error: ConditionalCheckFailedException

What causes it: This occurs if you use a conditional expression in put_item that expects the item to not exist, but it does. In the deduplication logic, we typically allow overwrites to refresh the TTL. If you are seeing this, check your UpdateExpression or ConditionExpression.

How to fix it: Ensure you are not using attribute_not_exists(dedup_key) in the condition if you intend to refresh the TTL. Instead, use a simple put_item or an update_item that always sets the TTL.

# Incorrect: Fails if key already exists
ConditionExpression="attribute_not_exists(dedup_key)"

# Correct: Always updates the TTL
UpdateExpression="SET expires_at = :val"

Error: ThrottlingException

What causes it: DynamoDB has read/write capacity limits. If your EventBridge throughput is high (e.g., thousands of events per second), you may hit provisioned limits.

How to fix it:

  1. Switch to On-Demand capacity mode in DynamoDB.
  2. Implement exponential backoff in your DynamoDB client.
  3. Use Kinesis Fan-Out to distribute reads across multiple consumers, reducing the load on any single DynamoDB endpoint.
import botocore.config

# Configure retry strategy in Boto3
config = botocore.config.Config(
    retries={
        'max_attempts': 5,
        'mode': 'adaptive' # Adaptive mode backs off when throttling is detected
    }
)

dynamodb = boto3.client('dynamodb', config=config)

Error: Event Detail is Not JSON Serializable

What causes it: The detail field in EventBridge may contain binary data, non-UTF-8 characters, or circular references if improperly constructed by the producer.

How to fix it: Wrap the hashing logic in a try-except block. If hashing fails, fall back to the Event ID. This ensures the pipeline does not block on malformed data.

except (json.JSONDecodeError, TypeError, ValueError) as e:
    print(f"Cannot hash detail: {e}. Using Event ID.")
    return event.get("id")

Official References