Deduplication Strategy for Genesys Cloud EventBridge Integrations

Deduplication Strategy for Genesys Cloud EventBridge Integrations

What You Will Build

  • You will build a Python application that receives events from AWS EventBridge, detects duplicates using content-based deduplication, and forwards unique events to a downstream handler.
  • You will use the AWS Boto3 SDK for EventBridge and SQS interaction, alongside standard Python libraries for hashing.
  • The code is written in Python 3.9+ and targets the AWS Serverless or Lambda environment.

Prerequisites

  • AWS Account: An active account with permissions to create EventBridge rules, SQS queues, and Lambda functions.
  • Genesys Cloud OAuth Client: A confidential client with the analytics:query scope to verify event payloads (optional for testing, but recommended for validation).
  • Python Runtime: Python 3.9 or higher.
  • Dependencies:
    • boto3 (AWS SDK for Python)
    • uuid (standard library)
    • hashlib (standard library)
    • json (standard library)

Authentication Setup

This integration relies on AWS IAM roles rather than OAuth tokens for the transport layer. The Genesys Cloud side requires an OAuth client to validate that the incoming event payloads are legitimate Genesys events if you choose to verify signatures or re-query the API. For this tutorial, we focus on the deduplication logic within the AWS ecosystem.

Ensure your Lambda function or application has an IAM role attached with the following policies:

  1. AmazonSQSFullAccess (or a custom policy allowing sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes).
  2. AWSLambdaBasicExecutionRole (for logging).
import boto3
import json
import hashlib
import uuid
from typing import Dict, List, Optional
from botocore.exceptions import ClientError

# Initialize AWS clients
sqs_client = boto3.client('sqs')
eventbridge_client = boto3.client('events')

Implementation

Step 1: Configure the EventBridge Rule and SQS Target

Before writing the deduplication code, you must ensure EventBridge sends events to a queue that supports deduplication. Standard SQS queues do not guarantee exactly-once delivery by default. You must use a FIFO (First-In-First-Out) queue or implement a custom deduplication window in your consumer code.

For this tutorial, we assume you are using a Standard SQS Queue as the target because EventBridge does not natively support FIFO targets in all regions or configurations without significant overhead. Therefore, the deduplication logic must be implemented in the consumer (Lambda/Consumer App).

Configuration:

  1. Create a Standard SQS Queue named GenesysEventsQueue.
  2. Create an EventBridge Rule targeting this SQS queue.
  3. Set the Event Pattern in EventBridge to match Genesys Cloud events:
    {
      "source": ["genesys.cloud"],
      "detail-type": ["Genesys Cloud Event"]
    }
    

Step 2: Implement Content-Based Deduplication Logic

The core problem is that EventBridge may retry events if the SQS queue returns an error, or if there is a transient network issue. This results in duplicate events arriving at your consumer.

We will implement a sliding window deduplication strategy using an in-memory cache (for demonstration) or a distributed cache like Redis (for production). For this tutorial, we will use a simple in-memory dictionary with a Time-To-Live (TTL) simulation to keep the code self-contained. In a production Lambda environment, you would use a global variable that persists across invocations in the same container, or DynamoDB/Redis for cross-container deduplication.

The Deduplication Key:
We will generate a hash of the event payload. If the hash exists in our cache and has not expired, we discard the event.

import time
from collections import OrderedDict

class DeduplicationCache:
    """
    A simple in-memory cache for deduplication.
    In production, replace this with Redis or DynamoDB.
    """
    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self.cache: OrderedDict = OrderedDict()
        self.last_cleanup = time.time()

    def is_duplicate(self, event_hash: str) -> bool:
        """
        Check if an event hash exists in the cache.
        If it exists, return True (duplicate).
        If it does not exist, add it to the cache and return False (unique).
        """
        current_time = time.time()
        
        # Cleanup old entries if too much time has passed
        if current_time - self.last_cleanup > self.window_seconds:
            self._cleanup()
            self.last_cleanup = current_time

        if event_hash in self.cache:
            # Event is within the window, so it is a duplicate
            return True
        else:
            # Event is new, add to cache
            self.cache[event_hash] = current_time
            return False

    def _cleanup(self):
        """Remove entries older than the window."""
        current_time = time.time()
        cutoff_time = current_time - self.window_seconds
        
        while self.cache and self.cache[next(iter(self.cache))] < cutoff_time:
            self.cache.popitem(last=False)

# Global instance to persist across Lambda invocations (cold start only resets this)
dedup_cache = DeduplicationCache(window_seconds=300)

Step 3: Process SQS Messages and Apply Deduplication

Now we combine the cache with the SQS consumer logic. We will read messages from the SQS queue, calculate a hash of the detail field from the EventBridge payload, and check for duplicates.

Important: EventBridge sends events in the MessageBody of the SQS message. The structure is:

{
  "source": "genesys.cloud",
  "account": "123456789",
  "region": "us-east-1",
  "time": "2023-10-27T10:00:00Z",
  "id": "unique-event-id",
  "detail-type": "Genesys Cloud Event",
  "resources": [],
  "detail": {
    "eventType": "conversation/updated",
    "payload": { ... }
  }
}
def generate_event_hash(event_payload: Dict) -> str:
    """
    Generate a deterministic hash for the event detail.
    We hash the 'detail' field because it contains the actual event data.
    """
    # Normalize the JSON string to ensure consistent hashing regardless of key order
    normalized_json = json.dumps(event_payload.get('detail', {}), sort_keys=True)
    return hashlib.sha256(normalized_json.encode('utf-8')).hexdigest()

def process_genesis_event(event_detail: Dict) -> None:
    """
    Placeholder for your business logic.
    """
    print(f"Processing unique event: {event_detail.get('eventType')}")
    # Add your downstream logic here (e.g., update CRM, send notification)

def handle_sqs_event(sqs_event: Dict) -> None:
    """
    Main handler for SQS events.
    """
    records = sqs_event.get('Records', [])
    
    if not records:
        return

    for record in records:
        try:
            # Parse the SQS message body
            message_body = json.loads(record['body'])
            
            # Generate hash for deduplication
            event_hash = generate_event_hash(message_body)
            
            # Check for duplicates
            if dedup_cache.is_duplicate(event_hash):
                print(f"Duplicate event detected. Skipping. Hash: {event_hash[:10]}...")
                # Do not delete the message yet if you want to keep it in the queue for retry?
                # Usually, for duplicates, you want to delete it to prevent infinite loops.
                # However, if this is a transient duplicate from a retry, you might want to 
                # let it stay if the previous processing failed. 
                # For this tutorial, we assume idempotency: if we processed it once, we don't need it again.
                delete_message(record['receiptHandle'], record['eventSourceARN'])
                continue
            
            # Process the unique event
            process_genesis_event(message_body.get('detail', {}))
            
            # Delete the message from the queue to acknowledge processing
            delete_message(record['receiptHandle'], record['eventSourceARN'])
            
        except json.JSONDecodeError as e:
            print(f"Invalid JSON in message body: {e}")
            # Do not delete, let it go to Dead Letter Queue (DLQ)
        except Exception as e:
            print(f"Error processing message: {e}")
            # Do not delete, let it retry

def delete_message(receipt_handle: str, queue_url: str) -> None:
    """
    Delete a message from the SQS queue.
    """
    try:
        sqs_client.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=receipt_handle
        )
    except ClientError as e:
        print(f"Error deleting message from SQS: {e}")

Step 4: Handle Edge Cases and Idempotency

The in-memory cache approach has a limitation: if your Lambda function scales out to multiple containers, each container has its own cache. An event processed in Container A will not be known to Container B, leading to duplicates.

To solve this in production, you must use a distributed state store. Below is an example using DynamoDB for global deduplication.

DynamoDB Table Structure:

  • Table Name: GenesysEventDeduplication
  • Partition Key: event_hash (String)
  • Time-To-Live (TTL): expires_at (Number, Unix timestamp)
import boto3
from botocore.exceptions import ClientError

dynamodb = boto3.resource('dynamodb')
dedup_table = dynamodb.Table('GenesysEventDeduplication')

def is_duplicate_global(event_hash: str, window_seconds: int = 300) -> bool:
    """
    Check DynamoDB for duplicate events.
    Uses conditional writes to ensure atomicity.
    """
    import time
    
    expires_at = int(time.time()) + window_seconds
    
    try:
        # Attempt to put the item only if it does not exist
        dedup_table.put_item(
            Item={
                'event_hash': event_hash,
                'expires_at': expires_at
            },
            ConditionExpression='attribute_not_exists(event_hash)'
        )
        # If put_item succeeds, the item was not there before.
        return False
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            # The item already exists, so it is a duplicate
            return True
        else:
            # Other errors (e.g., network, permissions)
            raise e

Replace dedup_cache.is_duplicate() in the handle_sqs_event function with is_duplicate_global() for production-grade deduplication.

Complete Working Example

This is a complete Lambda handler script. It includes the SQS polling logic, the deduplication cache (in-memory for simplicity), and the error handling.

import json
import hashlib
import boto3
from botocore.exceptions import ClientError
from typing import Dict

# AWS Clients
sqs_client = boto3.client('sqs')
QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/GenesysEventsQueue' # Replace with your Queue URL

# In-Memory Deduplication Cache
class SimpleDedupCache:
    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self.seen_hashes = set()
        self.timestamps = {}

    def is_duplicate(self, event_hash: str) -> bool:
        import time
        now = time.time()
        
        # Cleanup old hashes
        expired = [h for h, t in self.timestamps.items() if now - t > self.window_seconds]
        for h in expired:
            self.seen_hashes.discard(h)
            del self.timestamps[h]
            
        if event_hash in self.seen_hashes:
            return True
        
        self.seen_hashes.add(event_hash)
        self.timestamps[event_hash] = now
        return False

cache = SimpleDedupCache()

def lambda_handler(event: Dict, context: Dict) -> Dict:
    """
    AWS Lambda handler for SQS events.
    """
    records = event.get('Records', [])
    processed_count = 0
    duplicate_count = 0
    error_count = 0

    for record in records:
        try:
            body = json.loads(record['body'])
            
            # Generate hash from the 'detail' part of the EventBridge event
            detail = body.get('detail', {})
            normalized_detail = json.dumps(detail, sort_keys=True)
            event_hash = hashlib.sha256(normalized_detail.encode('utf-8')).hexdigest()
            
            if cache.is_duplicate(event_hash):
                print(f"Duplicate event ignored. Hash: {event_hash[:10]}...")
                duplicate_count += 1
                # Delete the duplicate message to prevent reprocessing
                delete_from_sqs(record['receiptHandle'])
                continue
            
            # Process the unique event
            process_event(detail)
            processed_count += 1
            
            # Delete the message after successful processing
            delete_from_sqs(record['receiptHandle'])
            
        except json.JSONDecodeError as e:
            print(f"JSON Error: {e}")
            error_count += 1
            # Do not delete, let SQS retry or move to DLQ
        except Exception as e:
            print(f"Processing Error: {e}")
            error_count += 1
            # Do not delete, let SQS retry

    return {
        'statusCode': 200,
        'body': json.dumps({
            'processed': processed_count,
            'duplicates': duplicate_count,
            'errors': error_count
        })
    }

def process_event(detail: Dict) -> None:
    """
    Placeholder for business logic.
    """
    event_type = detail.get('eventType', 'unknown')
    print(f"Successfully processed event type: {event_type}")
    # Add your Genesys Cloud API calls or database updates here

def delete_from_sqs(receipt_handle: str) -> None:
    """
    Delete message from SQS.
    """
    try:
        sqs_client.delete_message(
            QueueUrl=QUEUE_URL,
            ReceiptHandle=receipt_handle
        )
    except ClientError as e:
        print(f"Failed to delete message from SQS: {e}")

Common Errors & Debugging

Error: ConditionalCheckFailedException in DynamoDB

  • What causes it: This is the expected behavior when using DynamoDB for deduplication. It means the event hash already exists in the table.
  • How to fix it: Catch this exception and treat it as a “duplicate found” signal. Do not log it as an error.

Error: MessageNotInFlight or ReceiptHandleIsInvalid

  • What causes it: You are trying to delete a message from SQS using a receipt handle that has already expired or been used. SQS receipt handles are valid for a limited time (default 30 seconds, configurable up to 12 hours).
  • How to fix it: Ensure your Lambda function processes messages quickly. If processing takes longer than the visibility timeout, the message becomes visible again, and the original receipt handle becomes invalid. Increase the SQS queue’s VisibilityTimeout or optimize your processing logic.

Error: Duplicate Events Still Occurring

  • What causes it: If using the in-memory cache, Lambda cold starts reset the cache. If using DynamoDB, ensure your partition key is unique enough. If two events have identical detail payloads but different timestamps in the outer EventBridge envelope, they will have the same hash. This is correct behavior for content-based deduplication. If you need to distinguish between retries of the same logical event, ensure the detail payload contains a unique ID (e.g., conversationId + timestamp).
  • How to fix it: For production, always use DynamoDB or Redis. For in-memory testing, warm up your Lambda instances to maintain cache state.

Official References