Genesys Cloud EventBridge Integration: Implementing Idempotent Deduplication for Duplicate Events

Genesys Cloud EventBridge Integration: Implementing Idempotent Deduplication for Duplicate Events

What You Will Build

  • A Python service that consumes events from AWS EventBridge, detects duplicates using a Redis-based idempotency store, and processes only unique events.
  • This tutorial uses the AWS SDK (boto3) for EventBridge interaction and the redis-py library for state management, alongside standard HTTP libraries for downstream processing.
  • The implementation is written in Python 3.9+ with type hints and production-ready error handling.

Prerequisites

  • AWS Credentials: An IAM user or role with permissions for events:PutEvents, events:DescribeEventBus, and kms:Decrypt (if using KMS encryption).
  • Genesys Cloud EventBridge Integration: An active integration configured in the Genesys Cloud Admin portal sending events to an EventBridge Event Bus.
  • Redis Instance: A running Redis server (local or managed like ElastiCache) for storing deduplication keys.
  • Python Environment: Python 3.9 or higher.
  • Dependencies:
    • boto3 (AWS SDK)
    • redis (Redis client)
    • requests (For downstream HTTP calls)
    • pydantic (For data validation)
pip install boto3 redis requests pydantic

Authentication Setup

This solution runs as a Lambda function or an EC2/ECS service. Authentication relies on AWS IAM roles. You do not need to hardcode access keys. Ensure your execution role has the following policy attached:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "events:PutEvents",
                "events:DescribeEventBus"
            ],
            "Resource": "arn:aws:events:*:*:event-bus/genesys-cloud-bus"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": "arn:aws:kms:*:*:key/your-kms-key-id"
        }
    ]
}

For the Genesys Cloud side, no additional authentication is required in the code because the integration pushes events via a secure HTTPS connection to the EventBridge API. The security boundary is the Event Bus policy, which must allow the Genesys Cloud service principal to put events.

Implementation

Step 1: Define the Idempotency Store with Redis

Genesys Cloud events are eventually consistent. In high-volume scenarios, or during network retries, the same event may arrive multiple times. The most robust deduplication strategy is to use a unique identifier from the event payload combined with a Time-To-Live (TTL) in Redis.

Genesys Cloud Conversation events typically contain a conversationId. For other event types (like User updates), the id field is used. We will create a generic deduplication service.

import redis
import time
import logging
from typing import Optional, Dict, Any

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IdempotencyStore:
    """
    Manages deduplication using Redis SETNX (Set if Not Exists).
    """
    def __init__(self, redis_host: str, redis_port: int, redis_password: Optional[str] = None, ttl_seconds: int = 300):
        """
        Initialize Redis connection.
        
        Args:
            redis_host: Hostname or IP of Redis instance.
            redis_port: Port of Redis instance.
            redis_password: Optional password for Redis.
            ttl_seconds: How long to keep the deduplication key (default 5 minutes).
        """
        try:
            self.client = redis.Redis(
                host=redis_host,
                port=redis_port,
                password=redis_password,
                decode_responses=True,
                socket_connect_timeout=2,
                socket_timeout=2
            )
            # Test connection
            self.client.ping()
            logger.info("Successfully connected to Redis.")
        except redis.exceptions.ConnectionError as e:
            logger.error(f"Failed to connect to Redis: {e}")
            raise

        self.ttl = ttl_seconds

    def is_duplicate(self, event_key: str) -> bool:
        """
        Check if an event has already been processed.
        
        Args:
            event_key: A unique string identifier for the event (e.g., 'conv-12345-update').
            
        Returns:
            True if the event was already processed (duplicate), False otherwise.
        """
        # SETNX returns 1 if the key was set (new), 0 if it already existed (duplicate)
        set_result = self.client.set(event_key, "1", nx=True, ex=self.ttl)
        
        if set_result is None:
            # Key already exists
            logger.debug(f"Duplicate event detected: {event_key}")
            return True
        else:
            logger.debug(f"New event processed: {event_key}")
            return False

Step 2: Parse and Normalize Genesys Cloud Events

Genesys Cloud sends events with a specific structure. The detail field contains the actual payload. We must extract a unique ID to serve as the deduplication key. Different event types have different ID fields.

from pydantic import BaseModel, Field
from typing import List, Optional

class GenesysEventDetail(BaseModel):
    """
    Represents the 'detail' section of a Genesys Cloud EventBridge event.
    """
    eventId: str = Field(..., alias="eventId")
    eventType: str = Field(..., alias="eventType")
    timestamp: str
    body: Dict[str, Any] = {}

    class Config:
        populate_by_name = True

class GenesysEvent(BaseModel):
    """
    Represents the full EventBridge event structure.
    """
    id: str
    source: str
    account: str
    time: str
    region: str
    resources: List[str]
    detail: GenesysEventDetail

    def get_dedup_key(self) -> str:
        """
        Generate a unique key for deduplication.
        
        Strategy:
        1. For Conversation events: Use 'conversationId' + 'timestamp' to handle multiple updates.
        2. For User/Queue events: Use the resource 'id'.
        """
        event_type = self.detail.eventType
        body = self.detail.body
        
        # Handle Conversation events specifically as they have high volume
        if "conversation" in event_type.lower():
            conv_id = body.get("conversationId")
            if conv_id:
                # Using timestamp ensures we don't block legitimate rapid updates, 
                # but in many cases, eventId is unique enough.
                # Genesys Cloud eventId is globally unique for the event instance.
                return f"gc-event-{self.detail.eventId}"
            
        # Fallback for other event types
        resource_id = body.get("id")
        if resource_id:
            return f"gc-resource-{event_type}-{resource_id}"
        
        # Ultimate fallback to the global event ID
        return f"gc-event-{self.detail.eventId}"

Step 3: Process Events with Deduplication Logic

This step combines the Redis store with the event parser. It simulates the downstream action (e.g., writing to a database).

import json
import requests
from typing import Dict, Any

class EventProcessor:
    def __init__(self, idempotency_store: IdempotencyStore, downstream_url: str):
        self.store = idempotency_store
        self.downstream_url = downstream_url

    def process_event(self, event_dict: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process a single Genesys Cloud event.
        
        Args:
            event_dict: The raw dictionary from EventBridge.
            
        Returns:
            A status dictionary indicating success, duplicate, or error.
        """
        try:
            # 1. Parse the event
            gc_event = GenesysEvent(**event_dict)
            
            # 2. Generate deduplication key
            dedup_key = gc_event.get_dedup_key()
            
            # 3. Check for duplicates
            if self.store.is_duplicate(dedup_key):
                return {
                    "status": "duplicate_skipped",
                    "event_id": gc_event.detail.eventId,
                    "key": dedup_key
                }
            
            # 4. Perform downstream action
            self._send_to_downstream(gc_event)
            
            return {
                "status": "processed",
                "event_id": gc_event.detail.eventId
            }
            
        except Exception as e:
            logger.error(f"Error processing event: {e}")
            # Note: In a real Lambda, you might send failed events to a Dead Letter Queue (DLQ)
            return {
                "status": "error",
                "message": str(e)
            }

    def _send_to_downstream(self, event: GenesysEvent) -> None:
        """
        Simulate sending data to a downstream system (e.g., Data Warehouse).
        """
        payload = {
            "eventType": event.detail.eventType,
            "body": event.detail.body,
            "timestamp": event.detail.timestamp
        }
        
        try:
            response = requests.post(
                self.downstream_url,
                json=payload,
                headers={"Content-Type": "application/json"},
                timeout=5
            )
            response.raise_for_status()
            logger.info(f"Successfully sent event {event.detail.eventId} to downstream.")
        except requests.exceptions.RequestException as e:
            logger.error(f"Downstream request failed: {e}")
            raise

Step 4: AWS Lambda Entry Point

This code ties everything together. It handles the EventBridge batch invocation pattern.

import os
import boto3
from botocore.exceptions import ClientError

# Initialize services
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
DOWNSTREAM_URL = os.environ.get("DOWNSTREAM_URL", "http://localhost:8080/webhook")

# Singleton instances to avoid re-initialization on each cold start
redis_store = IdempotencyStore(
    redis_host=REDIS_HOST,
    redis_port=REDIS_PORT,
    redis_password=REDIS_PASSWORD,
    ttl_seconds=600 # 10 minutes TTL for dedup window
)

processor = EventProcessor(
    idempotency_store=redis_store,
    downstream_url=DOWNSTREAM_URL
)

def lambda_handler(event, context):
    """
    AWS Lambda handler for EventBridge events.
    """
    # EventBridge sends events in a list under the 'detail' key if using custom patterns,
    # but typically Lambda receives a list of events directly in the 'Records' array 
    # if triggered by EventBridge Rule -> Lambda Target.
    
    # However, if using EventBridge Pipe or direct invocation, the structure varies.
    # Standard EventBridge Rule -> Lambda structure:
    # {
    #   "Records": [ ... ]
    # }
    
    records = event.get("Records", [])
    
    if not records:
        # Handle case where event is passed directly (testing)
        records = [event]

    results = []
    failed_events = []

    for record in records:
        # Extract the actual event payload
        # Note: EventBridge Lambda target passes the event directly as the record content
        # in some configurations, or inside 'body' if using HTTP targets.
        # For Lambda target, the event IS the record.
        
        # If the record has a 'body' string (common in SQS/SNS triggers, less so in direct EventBridge), parse it.
        # For direct EventBridge -> Lambda, the event is usually the record itself.
        # Let's assume standard EventBridge Lambda Target format where the event is the record.
        
        # Safety check: if the record is a string (JSON), parse it
        if isinstance(record, str):
            try:
                record_data = json.loads(record)
            except json.JSONDecodeError:
                logger.error(f"Invalid JSON in record: {record[:100]}...")
                continue
        else:
            record_data = record

        result = processor.process_event(record_data)
        results.append(result)

        if result["status"] == "error":
            failed_events.append(record_data)

    # Optional: Send failed events to DLQ using EventBridge PutEvents
    if failed_events:
        send_to_dlq(failed_events)

    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed": len([r for r in results if r["status"] == "processed"]),
            "skipped": len([r for r in results if r["status"] == "duplicate_skipped"]),
            "errors": len([r for r in results if r["status"] == "error"])
        })
    }

def send_to_dlq(events: list) -> None:
    """
    Send failed events to a Dead Letter Queue Event Bus.
    """
    client = boto3.client('events')
    dlq_bus_name = os.environ.get("DLQ_EVENT_BUS", "genesys-dlq")
    
    entries = []
    for evt in events:
        entries.append({
            'Source': evt.get('source', 'unknown'),
            'DetailType': evt.get('detail', {}).get('eventType', 'unknown'),
            'Detail': json.dumps(evt),
            'EventBusName': dlq_bus_name
        })

    try:
        client.put_events(Entries=entries)
        logger.info(f"Sent {len(entries)} failed events to DLQ.")
    except ClientError as e:
        logger.error(f"Failed to send to DLQ: {e}")

Complete Working Example

Combine the above modules into a single file app.py for local testing or deployment.

import os
import sys
import json
import logging
import redis
import requests
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field

# --- Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
DOWNSTREAM_URL = os.environ.get("DOWNSTREAM_URL", "http://localhost:8080/webhook")

# --- Models ---

class GenesysEventDetail(BaseModel):
    eventId: str = Field(..., alias="eventId")
    eventType: str = Field(..., alias="eventType")
    timestamp: str
    body: Dict[str, Any] = {}

    class Config:
        populate_by_name = True

class GenesysEvent(BaseModel):
    id: str
    source: str
    account: str
    time: str
    region: str
    resources: List[str]
    detail: GenesysEventDetail

    def get_dedup_key(self) -> str:
        if "conversation" in self.detail.eventType.lower():
            return f"gc-event-{self.detail.eventId}"
        resource_id = self.detail.body.get("id")
        if resource_id:
            return f"gc-resource-{self.detail.eventType}-{resource_id}"
        return f"gc-event-{self.detail.eventId}"

# --- Services ---

class IdempotencyStore:
    def __init__(self, redis_host: str, redis_port: int, redis_password: Optional[str] = None, ttl_seconds: int = 300):
        self.client = redis.Redis(
            host=redis_host,
            port=redis_port,
            password=redis_password,
            decode_responses=True,
            socket_connect_timeout=2,
            socket_timeout=2
        )
        self.client.ping()
        self.ttl = ttl_seconds

    def is_duplicate(self, event_key: str) -> bool:
        set_result = self.client.set(event_key, "1", nx=True, ex=self.ttl)
        return set_result is None

class EventProcessor:
    def __init__(self, idempotency_store: IdempotencyStore, downstream_url: str):
        self.store = idempotency_store
        self.downstream_url = downstream_url

    def process_event(self, event_dict: Dict[str, Any]) -> Dict[str, Any]:
        try:
            gc_event = GenesysEvent(**event_dict)
            dedup_key = gc_event.get_dedup_key()
            
            if self.store.is_duplicate(dedup_key):
                return {"status": "duplicate_skipped", "event_id": gc_event.detail.eventId}
            
            self._send_to_downstream(gc_event)
            return {"status": "processed", "event_id": gc_event.detail.eventId}
            
        except Exception as e:
            logger.error(f"Error processing event: {e}")
            return {"status": "error", "message": str(e)}

    def _send_to_downstream(self, event: GenesysEvent) -> None:
        payload = {
            "eventType": event.detail.eventType,
            "body": event.detail.body,
            "timestamp": event.detail.timestamp
        }
        response = requests.post(self.downstream_url, json=payload, timeout=5)
        response.raise_for_status()

# --- Lambda Handler ---

redis_store = IdempotencyStore(
    redis_host=REDIS_HOST,
    redis_port=REDIS_PORT,
    redis_password=REDIS_PASSWORD,
    ttl_seconds=600
)

processor = EventProcessor(
    idempotency_store=redis_store,
    downstream_url=DOWNSTREAM_URL
)

def lambda_handler(event, context):
    records = event.get("Records", [event])
    results = []
    
    for record in records:
        if isinstance(record, str):
            try:
                record_data = json.loads(record)
            except json.JSONDecodeError:
                continue
        else:
            record_data = record

        result = processor.process_event(record_data)
        results.append(result)

    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed": len([r for r in results if r["status"] == "processed"]),
            "skipped": len([r for r in results if r["status"] == "duplicate_skipped"]),
            "errors": len([r for r in results if r["status"] == "error"])
        })
    }

# --- Local Testing ---
if __name__ == "__main__":
    # Simulate a Genesys Cloud Event
    test_event = {
        "id": "1234567890",
        "source": "genesys.cloud",
        "account": "123456789",
        "time": "2023-10-27T10:00:00Z",
        "region": "us-east-1",
        "resources": ["arn:aws:events:us-east-1:123456789:event-bus/genesys-bus"],
        "detail": {
            "eventId": "evt-unique-123",
            "eventType": "conversation.update",
            "timestamp": "2023-10-27T10:00:00Z",
            "body": {
                "conversationId": "conv-98765",
                "status": "connected"
            }
        }
    }

    # Mock downstream URL for local test
    processor.downstream_url = "http://httpbin.org/post"

    # Run twice to test deduplication
    print("First run:")
    print(processor.process_event(test_event))
    
    print("Second run (should be skipped):")
    print(processor.process_event(test_event))

Common Errors & Debugging

Error: Redis Connection Timeout

  • Cause: The Lambda function is in a VPC, but the Redis instance is not in a subnet within that VPC, or security groups are blocking port 6379.
  • Fix: Ensure the Lambda function has VPC configuration matching the Redis subnet. Update the Security Group attached to the Redis instance to allow inbound traffic from the Lambda’s Security Group on port 6379.

Error: pydantic.ValidationError

  • Cause: The structure of the incoming event does not match the GenesysEvent model. Genesys Cloud may update event schemas.
  • Fix: Log the raw event_dict before parsing. Update the GenesysEvent model to make fields optional (Optional[str]) if they are not guaranteed to be present. Use model_config = ConfigDict(extra="allow") in Pydantic v2 to ignore unknown fields.

Error: Downstream 429 Too Many Requests

  • Cause: The downstream system (e.g., Data Warehouse API) is rate-limiting you.
  • Fix: Implement exponential backoff in the _send_to_downstream method. Do not retry immediately.
import time

def _send_to_downstream_with_retry(self, event: GenesysEvent, max_retries=3) -> None:
    for attempt in range(max_retries):
        try:
            response = requests.post(self.downstream_url, json=self._prepare_payload(event), timeout=5)
            response.raise_for_status()
            return
        except requests.exceptions.HTTPError as e:
            if response.status_code == 429 and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                logger.warning(f"Rate limited. Retrying in {wait_time} seconds.")
                time.sleep(wait_time)
            else:
                raise

Error: Duplicate Events Still Processing

  • Cause: The eventId in the Genesys Cloud payload is changing for what should be the same logical update, or the Redis TTL is too short.
  • Fix: Genesys Cloud eventId is unique per event instance. If you see duplicates, verify that Genesys Cloud is not retrying the delivery. If it is, the eventId should remain the same. If you are deduplicating based on conversationId + timestamp, ensure the timestamp granularity is sufficient. Relying on eventId is the most accurate method for EventBridge integrations.

Official References