Deduplicating Genesys Cloud EventBridge Events Using Python and Idempotency Keys
What You Will Build
- A Python service that receives events from AWS EventBridge, detects duplicates using a Redis-backed idempotency store, and processes each unique event exactly once.
- This tutorial uses the AWS EventBridge API surface and standard Python libraries (
boto3,redis) rather than a specific Genesys Cloud SDK, as the deduplication logic occurs on the consumer side. - The programming language covered is Python 3.9+.
Prerequisites
- AWS Account: Access to AWS EventBridge, Lambda, and Elasticache (or LocalStack for local testing).
- Genesys Cloud Integration: An existing Genesys Cloud Integration configured to send events to EventBridge.
- Python Environment: Python 3.9 or higher.
- Dependencies:
boto3: For AWS interactions.redis: For the idempotency store.pydantic: For event validation.pymongo(optional): If you prefer MongoDB over Redis for the idempotency store.
Authentication Setup
This tutorial assumes the Lambda function or EC2 instance running this code has an IAM Role attached with the necessary permissions to write to your idempotency store (e.g., elasticache:Connect for Redis) and logs (CloudWatch). No OAuth is required for the EventBridge consumer itself, as EventBridge pushes events to your endpoint (Lambda or HTTP) via AWS Signature Version 4 verification (if using HTTP) or direct invocation (if using Lambda).
For local testing, ensure your ~/.aws/credentials are configured with an IAM user that has events:PutEvents (for testing) and access to your chosen cache layer.
import boto3
import redis
from botocore.exceptions import ClientError
# Initialize clients
eventbridge_client = boto3.client('events')
redis_client = redis.Redis(host='your-redis-endpoint.cache.amazonaws.com', port=6379, db=0, decode_responses=True)
Implementation
Step 1: Define the Event Structure and Validation
Genesys Cloud sends events to EventBridge with a specific schema. The detail field contains the actual Genesys payload. To deduplicate effectively, you must identify a unique key within this payload. For most Genesys events (like routing:conversation:created), the conversationId is the unique identifier. For others, like analytics:report:generated, it might be the reportId or a combination of startTime and endTime.
We will use Pydantic to validate the incoming EventBridge envelope and extract the deduplication key.
from pydantic import BaseModel, Field
from typing import Any, Optional
from datetime import datetime
class GenesysDetail(BaseModel):
"""
Represents the core Genesys Cloud event payload.
Adjust fields based on the specific event type you are consuming.
"""
id: str = Field(..., alias="id")
eventType: str = Field(..., alias="eventType")
timestamp: str = Field(..., alias="timestamp")
# Generic fields for common Genesys events
conversationId: Optional[str] = Field(None, alias="conversationId")
userId: Optional[str] = Field(None, alias="userId")
# Include other relevant fields based on the specific event schema
class EventBridgePayload(BaseModel):
"""
The standard AWS EventBridge event envelope.
"""
version: str
id: str = Field(..., alias="id") # The EventBridge event ID
source: str
account: str
time: datetime
region: str
resources: list[str]
detail_type: str = Field(..., alias="detailType")
detail: GenesysDetail
def get_deduplication_key(self) -> str:
"""
Generates a unique key for deduplication.
Strategy: Combine event type and the primary business ID.
"""
event_type = self.detail.eventType
# Fallback to EventBridge ID if no business ID is present
business_id = self.detail.conversationId or self.id
# Create a composite key to prevent collisions across different event types
return f"{event_type}_{business_id}"
Step 2: Implement the Idempotency Store
The core of the deduplication strategy is a fast, atomic check-and-set operation. Redis is ideal for this because of its SETNX (Set if Not Exists) command and support for TTL (Time To Live). This ensures that if the same event is received within a specific window (e.g., 1 hour), it is rejected. If the event is received after the window, it is processed again (useful for late-arriving events that might be valid re-processes, depending on your business logic).
import json
import logging
from datetime import timedelta
logger = logging.getLogger(__name__)
# Configuration
DEDUP_TTL_SECONDS = 3600 # 1 hour deduplication window
REDIS_PREFIX = "genesys_dedup:"
class IdempotencyStore:
def __init__(self, redis_conn: redis.Redis):
self.redis = redis_conn
def is_duplicate(self, dedup_key: str) -> bool:
"""
Checks if the key exists in Redis.
Returns True if it is a duplicate (key exists), False otherwise.
Atomically sets the key with a TTL if it does not exist.
"""
full_key = f"{REDIS_PREFIX}{dedup_key}"
# SETNX returns 1 if the key was set (new), 0 if it already existed (duplicate)
# We use the 'ex' parameter for TTL
is_new = self.redis.set(full_key, "1", nx=True, ex=DEDUP_TTL_SECONDS)
if is_new:
logger.info(f"New event detected for key: {dedup_key}")
return False
else:
logger.warning(f"Duplicate event detected for key: {dedup_key}. Ignoring.")
return True
def cleanup(self, dedup_key: str):
"""
Optional: Explicitly remove a key if processing failed and you want to allow retry.
"""
full_key = f"{REDIS_PREFIX}{dedup_key}"
self.redis.delete(full_key)
Step 3: Processing Logic and Lambda Handler
Now we combine the validation, idempotency check, and business logic. In a production environment, you might replace the print statement with a database write, an API call to another service, or a message queue publish.
import json
import os
import logging
from typing import Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize the store
idempotency_store = IdempotencyStore(redis_client)
def process_event(payload: EventBridgePayload) -> Dict[str, Any]:
"""
Your business logic goes here.
This function is only called if the event is NOT a duplicate.
"""
# Example: Log the event or write to a database
event_data = payload.detail.model_dump(by_alias=True)
logger.info(f"Processing event: {payload.detail.eventType} with ID: {payload.detail.id}")
# Simulate processing
result = {
"status": "success",
"processed_event_id": payload.detail.id,
"conversation_id": payload.detail.conversationId
}
return result
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for EventBridge events.
"""
try:
# 1. Validate the incoming event structure
try:
payload = EventBridgePayload.model_validate(event)
except Exception as e:
logger.error(f"Invalid event structure: {e}")
return {
"statusCode": 400,
"body": json.dumps({"error": "Invalid event structure"})
}
# 2. Generate deduplication key
dedup_key = payload.get_deduplication_key()
# 3. Check for duplicates
if idempotency_store.is_duplicate(dedup_key):
return {
"statusCode": 200,
"body": json.dumps({"status": "duplicate", "key": dedup_key})
}
# 4. Process the event
try:
result = process_event(payload)
return {
"statusCode": 200,
"body": json.dumps(result)
}
except Exception as e:
# If processing fails, you might want to remove the dedup key
# to allow retry, or keep it to prevent infinite loops.
# Here we keep it to prevent infinite loops on hard failures.
logger.error(f"Processing failed for event {dedup_key}: {e}")
return {
"statusCode": 500,
"body": json.dumps({"error": "Processing failed", "details": str(e)})
}
except Exception as e:
logger.error(f"Unhandled exception: {e}")
return {
"statusCode": 500,
"body": json.dumps({"error": "Internal server error"})
}
Complete Working Example
Below is the complete, copy-pasteable Python script. Save this as lambda_function.py. You will need to configure the REDIS_HOST environment variable in your Lambda environment or local settings.
import json
import os
import logging
import redis
from typing import Dict, Any, Optional
from datetime import datetime
from pydantic import BaseModel, Field
# --- Configuration ---
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
DEDUP_TTL_SECONDS = int(os.getenv("DEDUP_TTL_SECONDS", 3600))
REDIS_PREFIX = "genesys_dedup:"
# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Redis Client ---
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True,
socket_timeout=5,
retry_on_timeout=True
)
# Test connection
redis_client.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
redis_client = None
# --- Models ---
class GenesysDetail(BaseModel):
"""
Represents the core Genesys Cloud event payload.
"""
id: str = Field(..., alias="id")
eventType: str = Field(..., alias="eventType")
timestamp: str = Field(..., alias="timestamp")
conversationId: Optional[str] = Field(None, alias="conversationId")
userId: Optional[str] = Field(None, alias="userId")
# Add other fields as needed based on your specific Genesys event types
class EventBridgePayload(BaseModel):
"""
The standard AWS EventBridge event envelope.
"""
version: str
id: str = Field(..., alias="id")
source: str
account: str
time: datetime
region: str
resources: list[str]
detail_type: str = Field(..., alias="detailType")
detail: GenesysDetail
def get_deduplication_key(self) -> str:
"""
Generates a unique key for deduplication.
"""
event_type = self.detail.eventType
business_id = self.detail.conversationId or self.id
return f"{event_type}_{business_id}"
# --- Idempotency Store ---
class IdempotencyStore:
def __init__(self, redis_conn: redis.Redis):
self.redis = redis_conn
def is_duplicate(self, dedup_key: str) -> bool:
"""
Checks if the key exists in Redis.
Returns True if it is a duplicate (key exists), False otherwise.
"""
if not self.redis:
logger.warning("Redis not available. Skipping deduplication.")
return False
full_key = f"{REDIS_PREFIX}{dedup_key}"
try:
# SETNX returns 1 if the key was set (new), 0 if it already existed (duplicate)
is_new = self.redis.set(full_key, "1", nx=True, ex=DEDUP_TTL_SECONDS)
if is_new:
logger.info(f"New event detected for key: {dedup_key}")
return False
else:
logger.warning(f"Duplicate event detected for key: {dedup_key}. Ignoring.")
return True
except Exception as e:
logger.error(f"Redis error during dedup check: {e}")
# Fail open: process the event if Redis is down to avoid data loss
return False
idempotency_store = IdempotencyStore(redis_client)
# --- Business Logic ---
def process_event(payload: EventBridgePayload) -> Dict[str, Any]:
"""
Your business logic goes here.
"""
event_data = payload.detail.model_dump(by_alias=True)
logger.info(f"Processing event: {payload.detail.eventType} with ID: {payload.detail.id}")
# Simulate processing (e.g., DB write)
result = {
"status": "success",
"processed_event_id": payload.detail.id,
"conversation_id": payload.detail.conversationId
}
return result
# --- Lambda Handler ---
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for EventBridge events.
"""
try:
# 1. Validate
try:
payload = EventBridgePayload.model_validate(event)
except Exception as e:
logger.error(f"Invalid event structure: {e}")
return {
"statusCode": 400,
"body": json.dumps({"error": "Invalid event structure"})
}
# 2. Deduplication Key
dedup_key = payload.get_deduplication_key()
# 3. Check Duplicate
if idempotency_store.is_duplicate(dedup_key):
return {
"statusCode": 200,
"body": json.dumps({"status": "duplicate", "key": dedup_key})
}
# 4. Process
try:
result = process_event(payload)
return {
"statusCode": 200,
"body": json.dumps(result)
}
except Exception as e:
logger.error(f"Processing failed for event {dedup_key}: {e}")
return {
"statusCode": 500,
"body": json.dumps({"error": "Processing failed", "details": str(e)})
}
except Exception as e:
logger.error(f"Unhandled exception: {e}")
return {
"statusCode": 500,
"body": json.dumps({"error": "Internal server error"})
}
Common Errors & Debugging
Error: Redis Connection Timeout
- What causes it: The Lambda execution environment cannot reach the Redis endpoint due to VPC configuration, security groups, or incorrect hostnames.
- How to fix it: Ensure the Lambda is in the same VPC as the Elasticache cluster. Check security groups to allow inbound traffic on port 6379 from the Lambda’s security group.
- Code Fix: Add a retry mechanism or fallback to a local in-memory store for development.
# Example: Fallback to local memory if Redis fails
import threading
class FallbackIdempotencyStore:
def __init__(self):
self._store = {}
self._lock = threading.Lock()
def is_duplicate(self, dedup_key: str) -> bool:
with self._lock:
if dedup_key in self._store:
return True
self._store[dedup_key] = True
return False
Error: Pydantic Validation Error
- What causes it: The Genesys Cloud event schema changed, or you are testing with a malformed event.
- How to fix it: Inspect the
eventpayload in the Lambda logs. Update theGenesysDetailmodel to include missing fields or make them optional. - Debugging Tip: Use
model_dump_json(indent=2)on the payload to see exactly what fields are present.
Error: Duplicate Events Still Processing
- What causes it: The deduplication key is not unique enough. For example, if you only use
conversationId, two different events (e.g.,createdandupdated) for the same conversation will collide. - How to fix it: Refine the
get_deduplication_keymethod to include theeventTypeor a specific action identifier.
# Improved key generation
def get_deduplication_key(self) -> str:
event_type = self.detail.eventType
# Some events have an 'action' field in the detail
action = self.detail.get("action", "none")
business_id = self.detail.conversationId or self.id
return f"{event_type}_{action}_{business_id}"