Genesys Cloud EventBridge Integration: Implementing Idempotent Deduplication for Duplicate Events
What You Will Build
- A Python service that consumes events from AWS EventBridge, detects duplicates using a Redis-based idempotency store, and processes only unique events.
- This tutorial uses the AWS SDK (boto3) for EventBridge interaction and the
redis-pylibrary for state management, alongside standard HTTP libraries for downstream processing. - The implementation is written in Python 3.9+ with type hints and production-ready error handling.
Prerequisites
- AWS Credentials: An IAM user or role with permissions for
events:PutEvents,events:DescribeEventBus, andkms:Decrypt(if using KMS encryption). - Genesys Cloud EventBridge Integration: An active integration configured in the Genesys Cloud Admin portal sending events to an EventBridge Event Bus.
- Redis Instance: A running Redis server (local or managed like ElastiCache) for storing deduplication keys.
- Python Environment: Python 3.9 or higher.
- Dependencies:
boto3(AWS SDK)redis(Redis client)requests(For downstream HTTP calls)pydantic(For data validation)
pip install boto3 redis requests pydantic
Authentication Setup
This solution runs as a Lambda function or an EC2/ECS service. Authentication relies on AWS IAM roles. You do not need to hardcode access keys. Ensure your execution role has the following policy attached:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"events:PutEvents",
"events:DescribeEventBus"
],
"Resource": "arn:aws:events:*:*:event-bus/genesys-cloud-bus"
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt"
],
"Resource": "arn:aws:kms:*:*:key/your-kms-key-id"
}
]
}
For the Genesys Cloud side, no additional authentication is required in the code because the integration pushes events via a secure HTTPS connection to the EventBridge API. The security boundary is the Event Bus policy, which must allow the Genesys Cloud service principal to put events.
Implementation
Step 1: Define the Idempotency Store with Redis
Genesys Cloud events are eventually consistent. In high-volume scenarios, or during network retries, the same event may arrive multiple times. The most robust deduplication strategy is to use a unique identifier from the event payload combined with a Time-To-Live (TTL) in Redis.
Genesys Cloud Conversation events typically contain a conversationId. For other event types (like User updates), the id field is used. We will create a generic deduplication service.
import redis
import time
import logging
from typing import Optional, Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IdempotencyStore:
"""
Manages deduplication using Redis SETNX (Set if Not Exists).
"""
def __init__(self, redis_host: str, redis_port: int, redis_password: Optional[str] = None, ttl_seconds: int = 300):
"""
Initialize Redis connection.
Args:
redis_host: Hostname or IP of Redis instance.
redis_port: Port of Redis instance.
redis_password: Optional password for Redis.
ttl_seconds: How long to keep the deduplication key (default 5 minutes).
"""
try:
self.client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2
)
# Test connection
self.client.ping()
logger.info("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
self.ttl = ttl_seconds
def is_duplicate(self, event_key: str) -> bool:
"""
Check if an event has already been processed.
Args:
event_key: A unique string identifier for the event (e.g., 'conv-12345-update').
Returns:
True if the event was already processed (duplicate), False otherwise.
"""
# SETNX returns 1 if the key was set (new), 0 if it already existed (duplicate)
set_result = self.client.set(event_key, "1", nx=True, ex=self.ttl)
if set_result is None:
# Key already exists
logger.debug(f"Duplicate event detected: {event_key}")
return True
else:
logger.debug(f"New event processed: {event_key}")
return False
Step 2: Parse and Normalize Genesys Cloud Events
Genesys Cloud sends events with a specific structure. The detail field contains the actual payload. We must extract a unique ID to serve as the deduplication key. Different event types have different ID fields.
from pydantic import BaseModel, Field
from typing import List, Optional
class GenesysEventDetail(BaseModel):
"""
Represents the 'detail' section of a Genesys Cloud EventBridge event.
"""
eventId: str = Field(..., alias="eventId")
eventType: str = Field(..., alias="eventType")
timestamp: str
body: Dict[str, Any] = {}
class Config:
populate_by_name = True
class GenesysEvent(BaseModel):
"""
Represents the full EventBridge event structure.
"""
id: str
source: str
account: str
time: str
region: str
resources: List[str]
detail: GenesysEventDetail
def get_dedup_key(self) -> str:
"""
Generate a unique key for deduplication.
Strategy:
1. For Conversation events: Use 'conversationId' + 'timestamp' to handle multiple updates.
2. For User/Queue events: Use the resource 'id'.
"""
event_type = self.detail.eventType
body = self.detail.body
# Handle Conversation events specifically as they have high volume
if "conversation" in event_type.lower():
conv_id = body.get("conversationId")
if conv_id:
# Using timestamp ensures we don't block legitimate rapid updates,
# but in many cases, eventId is unique enough.
# Genesys Cloud eventId is globally unique for the event instance.
return f"gc-event-{self.detail.eventId}"
# Fallback for other event types
resource_id = body.get("id")
if resource_id:
return f"gc-resource-{event_type}-{resource_id}"
# Ultimate fallback to the global event ID
return f"gc-event-{self.detail.eventId}"
Step 3: Process Events with Deduplication Logic
This step combines the Redis store with the event parser. It simulates the downstream action (e.g., writing to a database).
import json
import requests
from typing import Dict, Any
class EventProcessor:
def __init__(self, idempotency_store: IdempotencyStore, downstream_url: str):
self.store = idempotency_store
self.downstream_url = downstream_url
def process_event(self, event_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a single Genesys Cloud event.
Args:
event_dict: The raw dictionary from EventBridge.
Returns:
A status dictionary indicating success, duplicate, or error.
"""
try:
# 1. Parse the event
gc_event = GenesysEvent(**event_dict)
# 2. Generate deduplication key
dedup_key = gc_event.get_dedup_key()
# 3. Check for duplicates
if self.store.is_duplicate(dedup_key):
return {
"status": "duplicate_skipped",
"event_id": gc_event.detail.eventId,
"key": dedup_key
}
# 4. Perform downstream action
self._send_to_downstream(gc_event)
return {
"status": "processed",
"event_id": gc_event.detail.eventId
}
except Exception as e:
logger.error(f"Error processing event: {e}")
# Note: In a real Lambda, you might send failed events to a Dead Letter Queue (DLQ)
return {
"status": "error",
"message": str(e)
}
def _send_to_downstream(self, event: GenesysEvent) -> None:
"""
Simulate sending data to a downstream system (e.g., Data Warehouse).
"""
payload = {
"eventType": event.detail.eventType,
"body": event.detail.body,
"timestamp": event.detail.timestamp
}
try:
response = requests.post(
self.downstream_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=5
)
response.raise_for_status()
logger.info(f"Successfully sent event {event.detail.eventId} to downstream.")
except requests.exceptions.RequestException as e:
logger.error(f"Downstream request failed: {e}")
raise
Step 4: AWS Lambda Entry Point
This code ties everything together. It handles the EventBridge batch invocation pattern.
import os
import boto3
from botocore.exceptions import ClientError
# Initialize services
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
DOWNSTREAM_URL = os.environ.get("DOWNSTREAM_URL", "http://localhost:8080/webhook")
# Singleton instances to avoid re-initialization on each cold start
redis_store = IdempotencyStore(
redis_host=REDIS_HOST,
redis_port=REDIS_PORT,
redis_password=REDIS_PASSWORD,
ttl_seconds=600 # 10 minutes TTL for dedup window
)
processor = EventProcessor(
idempotency_store=redis_store,
downstream_url=DOWNSTREAM_URL
)
def lambda_handler(event, context):
"""
AWS Lambda handler for EventBridge events.
"""
# EventBridge sends events in a list under the 'detail' key if using custom patterns,
# but typically Lambda receives a list of events directly in the 'Records' array
# if triggered by EventBridge Rule -> Lambda Target.
# However, if using EventBridge Pipe or direct invocation, the structure varies.
# Standard EventBridge Rule -> Lambda structure:
# {
# "Records": [ ... ]
# }
records = event.get("Records", [])
if not records:
# Handle case where event is passed directly (testing)
records = [event]
results = []
failed_events = []
for record in records:
# Extract the actual event payload
# Note: EventBridge Lambda target passes the event directly as the record content
# in some configurations, or inside 'body' if using HTTP targets.
# For Lambda target, the event IS the record.
# If the record has a 'body' string (common in SQS/SNS triggers, less so in direct EventBridge), parse it.
# For direct EventBridge -> Lambda, the event is usually the record itself.
# Let's assume standard EventBridge Lambda Target format where the event is the record.
# Safety check: if the record is a string (JSON), parse it
if isinstance(record, str):
try:
record_data = json.loads(record)
except json.JSONDecodeError:
logger.error(f"Invalid JSON in record: {record[:100]}...")
continue
else:
record_data = record
result = processor.process_event(record_data)
results.append(result)
if result["status"] == "error":
failed_events.append(record_data)
# Optional: Send failed events to DLQ using EventBridge PutEvents
if failed_events:
send_to_dlq(failed_events)
return {
"statusCode": 200,
"body": json.dumps({
"processed": len([r for r in results if r["status"] == "processed"]),
"skipped": len([r for r in results if r["status"] == "duplicate_skipped"]),
"errors": len([r for r in results if r["status"] == "error"])
})
}
def send_to_dlq(events: list) -> None:
"""
Send failed events to a Dead Letter Queue Event Bus.
"""
client = boto3.client('events')
dlq_bus_name = os.environ.get("DLQ_EVENT_BUS", "genesys-dlq")
entries = []
for evt in events:
entries.append({
'Source': evt.get('source', 'unknown'),
'DetailType': evt.get('detail', {}).get('eventType', 'unknown'),
'Detail': json.dumps(evt),
'EventBusName': dlq_bus_name
})
try:
client.put_events(Entries=entries)
logger.info(f"Sent {len(entries)} failed events to DLQ.")
except ClientError as e:
logger.error(f"Failed to send to DLQ: {e}")
Complete Working Example
Combine the above modules into a single file app.py for local testing or deployment.
import os
import sys
import json
import logging
import redis
import requests
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
# --- Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
DOWNSTREAM_URL = os.environ.get("DOWNSTREAM_URL", "http://localhost:8080/webhook")
# --- Models ---
class GenesysEventDetail(BaseModel):
eventId: str = Field(..., alias="eventId")
eventType: str = Field(..., alias="eventType")
timestamp: str
body: Dict[str, Any] = {}
class Config:
populate_by_name = True
class GenesysEvent(BaseModel):
id: str
source: str
account: str
time: str
region: str
resources: List[str]
detail: GenesysEventDetail
def get_dedup_key(self) -> str:
if "conversation" in self.detail.eventType.lower():
return f"gc-event-{self.detail.eventId}"
resource_id = self.detail.body.get("id")
if resource_id:
return f"gc-resource-{self.detail.eventType}-{resource_id}"
return f"gc-event-{self.detail.eventId}"
# --- Services ---
class IdempotencyStore:
def __init__(self, redis_host: str, redis_port: int, redis_password: Optional[str] = None, ttl_seconds: int = 300):
self.client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2
)
self.client.ping()
self.ttl = ttl_seconds
def is_duplicate(self, event_key: str) -> bool:
set_result = self.client.set(event_key, "1", nx=True, ex=self.ttl)
return set_result is None
class EventProcessor:
def __init__(self, idempotency_store: IdempotencyStore, downstream_url: str):
self.store = idempotency_store
self.downstream_url = downstream_url
def process_event(self, event_dict: Dict[str, Any]) -> Dict[str, Any]:
try:
gc_event = GenesysEvent(**event_dict)
dedup_key = gc_event.get_dedup_key()
if self.store.is_duplicate(dedup_key):
return {"status": "duplicate_skipped", "event_id": gc_event.detail.eventId}
self._send_to_downstream(gc_event)
return {"status": "processed", "event_id": gc_event.detail.eventId}
except Exception as e:
logger.error(f"Error processing event: {e}")
return {"status": "error", "message": str(e)}
def _send_to_downstream(self, event: GenesysEvent) -> None:
payload = {
"eventType": event.detail.eventType,
"body": event.detail.body,
"timestamp": event.detail.timestamp
}
response = requests.post(self.downstream_url, json=payload, timeout=5)
response.raise_for_status()
# --- Lambda Handler ---
redis_store = IdempotencyStore(
redis_host=REDIS_HOST,
redis_port=REDIS_PORT,
redis_password=REDIS_PASSWORD,
ttl_seconds=600
)
processor = EventProcessor(
idempotency_store=redis_store,
downstream_url=DOWNSTREAM_URL
)
def lambda_handler(event, context):
records = event.get("Records", [event])
results = []
for record in records:
if isinstance(record, str):
try:
record_data = json.loads(record)
except json.JSONDecodeError:
continue
else:
record_data = record
result = processor.process_event(record_data)
results.append(result)
return {
"statusCode": 200,
"body": json.dumps({
"processed": len([r for r in results if r["status"] == "processed"]),
"skipped": len([r for r in results if r["status"] == "duplicate_skipped"]),
"errors": len([r for r in results if r["status"] == "error"])
})
}
# --- Local Testing ---
if __name__ == "__main__":
# Simulate a Genesys Cloud Event
test_event = {
"id": "1234567890",
"source": "genesys.cloud",
"account": "123456789",
"time": "2023-10-27T10:00:00Z",
"region": "us-east-1",
"resources": ["arn:aws:events:us-east-1:123456789:event-bus/genesys-bus"],
"detail": {
"eventId": "evt-unique-123",
"eventType": "conversation.update",
"timestamp": "2023-10-27T10:00:00Z",
"body": {
"conversationId": "conv-98765",
"status": "connected"
}
}
}
# Mock downstream URL for local test
processor.downstream_url = "http://httpbin.org/post"
# Run twice to test deduplication
print("First run:")
print(processor.process_event(test_event))
print("Second run (should be skipped):")
print(processor.process_event(test_event))
Common Errors & Debugging
Error: Redis Connection Timeout
- Cause: The Lambda function is in a VPC, but the Redis instance is not in a subnet within that VPC, or security groups are blocking port 6379.
- Fix: Ensure the Lambda function has VPC configuration matching the Redis subnet. Update the Security Group attached to the Redis instance to allow inbound traffic from the Lambda’s Security Group on port 6379.
Error: pydantic.ValidationError
- Cause: The structure of the incoming event does not match the
GenesysEventmodel. Genesys Cloud may update event schemas. - Fix: Log the raw
event_dictbefore parsing. Update theGenesysEventmodel to make fields optional (Optional[str]) if they are not guaranteed to be present. Usemodel_config = ConfigDict(extra="allow")in Pydantic v2 to ignore unknown fields.
Error: Downstream 429 Too Many Requests
- Cause: The downstream system (e.g., Data Warehouse API) is rate-limiting you.
- Fix: Implement exponential backoff in the
_send_to_downstreammethod. Do not retry immediately.
import time
def _send_to_downstream_with_retry(self, event: GenesysEvent, max_retries=3) -> None:
for attempt in range(max_retries):
try:
response = requests.post(self.downstream_url, json=self._prepare_payload(event), timeout=5)
response.raise_for_status()
return
except requests.exceptions.HTTPError as e:
if response.status_code == 429 and attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Rate limited. Retrying in {wait_time} seconds.")
time.sleep(wait_time)
else:
raise
Error: Duplicate Events Still Processing
- Cause: The
eventIdin the Genesys Cloud payload is changing for what should be the same logical update, or the Redis TTL is too short. - Fix: Genesys Cloud
eventIdis unique per event instance. If you see duplicates, verify that Genesys Cloud is not retrying the delivery. If it is, theeventIdshould remain the same. If you are deduplicating based onconversationId+timestamp, ensure the timestamp granularity is sufficient. Relying oneventIdis the most accurate method for EventBridge integrations.