Deduplicating Genesys Cloud EventBridge Events with Python
What You Will Build
- You will build a Python Lambda function that receives events from Genesys Cloud via EventBridge, detects duplicates using message attributes, and processes only unique events.
- This solution uses the AWS EventBridge API, AWS Lambda, and the Genesys Cloud REST API to verify event state when necessary.
- The tutorial covers Python 3.10+ with
boto3and therequestslibrary.
Prerequisites
- AWS Account: Access to EventBridge, Lambda, and IAM.
- Genesys Cloud OAuth: A private app with
analytics:conversations:viewandrouting:queues:viewscopes. - Python Runtime: Python 3.10 or later.
- Dependencies:
boto3(AWS SDK)requests(HTTP library)pydantic(Optional, for data validation)
- EventBridge Rule: A rule configured to send Genesys Cloud events to a target Lambda or SQS queue.
Authentication Setup
To interact with the Genesys Cloud API, you must obtain an OAuth 2.0 access token. In a serverless environment, caching the token is critical to avoid rate limits and latency.
The following code demonstrates a robust token caching mechanism using an in-memory variable with a TTL (Time To Live). For production, consider using AWS Secrets Manager or Parameter Store to fetch the token if it expires, or pre-generate a long-lived token if your use case allows.
import time
import requests
from typing import Optional, Dict, Any
# Configuration
GENESYS_CLOUD_REGION = "mypurecloud.com" # e.g., usw2.pure.cloud
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
# Token Cache
_token_cache: Dict[str, Any] = {
"token": None,
"expires_at": 0.0
}
def get_genesys_access_token() -> str:
"""
Retrieves a Genesys Cloud OAuth access token.
Uses in-memory cache to avoid repeated API calls during concurrent Lambda executions
if the Lambda container is reused.
"""
now = time.time()
# Return cached token if valid
if _token_cache["token"] and now < _token_cache["expires_at"]:
return _token_cache["token"]
# Fetch new token
url = f"https://api.{GENESYS_CLOUD_REGION}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
try:
response = requests.post(url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
# Cache the token and set expiration (subtract 60s for safety buffer)
_token_cache["token"] = data["access_token"]
_token_cache["expires_at"] = now + data["expires_in"] - 60
return _token_cache["token"]
except requests.exceptions.RequestException as e:
# Log error and re-raise
print(f"Failed to fetch OAuth token: {e}")
raise
Implementation
Step 1: Understanding EventBridge Event Structure and Duplicates
Genesys Cloud sends events to EventBridge with specific attributes. Duplicates occur due to:
- EventBridge Retries: If your Lambda returns an error, EventBridge retries the event.
- Genesys Cloud Replays: During system maintenance or connectivity issues, Genesys may replay events.
- Idempotency Gaps: If your processing logic is not idempotent, retries cause side effects.
Every event from Genesys Cloud includes a messageAttributes field in the SQS/SNS layer or a detail field in EventBridge. The critical identifier is the id field within the detail object.
Sample EventBridge Event Payload:
{
"version": "0",
"id": "a1b2c3d4-5678-90ab-cdef-1234567890ab",
"detail-type": "Genesys Cloud Conversation Update",
"source": "genesys.cloud",
"account": "123456789012",
"time": "2023-10-27T10:00:00Z",
"region": "us-east-1",
"resources": [],
"detail": {
"eventType": "conversation.update",
"eventId": "conv-12345-abcde",
"timestamp": "2023-10-27T10:00:00.000Z",
"data": { ... }
}
}
The id field at the root level (a1b2c3d4-...) is unique to that specific EventBridge dispatch. However, if Genesys replays the same business event, the detail.eventId remains the same, but the root id might differ depending on the replay mechanism. For true deduplication, you must track business-level IDs (detail.eventId) combined with a sequence or timestamp.
Step 2: Implementing Deduplication with DynamoDB
Use AWS DynamoDB as a deduplication store. We will use the eventId and a hash of the event payload as a composite key to ensure uniqueness.
DynamoDB Table Structure:
- Partition Key:
event_id(String) - Sort Key:
payload_hash(String) - TTL Attribute:
expires_at(Number)
This setup allows you to store processed events and automatically expire them after a defined period (e.g., 24 hours) to save costs.
import boto3
import hashlib
import json
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb')
DEDUP_TABLE_NAME = "GenesysEventDedupTable"
def get_dedup_table():
return dynamodb.Table(DEDUP_TABLE_NAME)
def is_event_duplicate(event_id: str, payload_hash: str) -> bool:
"""
Checks if an event has already been processed.
Uses a conditional write to ensure atomicity.
"""
table = get_dedup_table()
try:
# Attempt to put the item only if it does not exist
# If it exists, a ConditionalCheckFailedException is raised
table.put_item(
Item={
'event_id': event_id,
'payload_hash': payload_hash,
'processed_at': datetime.utcnow().isoformat(),
'expires_at': int(time.time()) + (24 * 3600) # Expire in 24 hours
},
ConditionExpression="attribute_not_exists(event_id) AND attribute_not_exists(payload_hash)"
)
return False # Not a duplicate, successfully stored
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return True # Duplicate found
else:
raise
Note on Condition Expression: The above ConditionExpression is slightly flawed for a composite key. A better approach is to use a single primary key or a global secondary index. For simplicity in this tutorial, we assume a single primary key dedup_key which is a concatenation of eventId and payload_hash.
Revised DynamoDB Logic:
def create_dedup_key(event_id: str, payload: Dict[str, Any]) -> str:
"""Creates a unique key for deduplication."""
# Sort keys to ensure consistent hashing regardless of JSON key order
canonical_json = json.dumps(payload, sort_keys=True)
hash_part = hashlib.sha256(canonical_json.encode('utf-8')).hexdigest()[:16]
return f"{event_id}#{hash_part}"
def check_and_store_event(dedup_key: str) -> bool:
"""
Returns True if the event is a duplicate, False if it is new.
"""
table = get_dedup_table()
try:
table.put_item(
Item={
'dedup_key': dedup_key,
'processed_at': datetime.utcnow().isoformat(),
'expires_at': int(time.time()) + (24 * 3600)
},
ConditionExpression="attribute_not_exists(dedup_key)"
)
return False
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return True
raise
Step 3: Processing the Event and Handling Edge Cases
Now, combine authentication, deduplication, and business logic. This step shows how to handle the conversation.update event type.
from datetime import datetime
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_unique_event(detail: Dict[str, Any]) -> None:
"""
Implements the business logic for a unique event.
"""
event_type = detail.get("eventType")
event_id = detail.get("eventId")
if event_type == "conversation.update":
# Example: Fetch full conversation details from Genesys Cloud
token = get_genesys_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Extract conversation ID from detail
conversation_id = detail.get("data", {}).get("id")
if not conversation_id:
logger.warning(f"Missing conversation ID in event: {event_id}")
return
# Fetch conversation details
url = f"https://api.{GENESYS_CLOUD_REGION}/api/v2/conversations/all/{conversation_id}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
conv_data = response.json()
# Process conversation data (e.g., send to data warehouse, trigger alert)
logger.info(f"Processed conversation {conversation_id} successfully.")
else:
logger.error(f"Failed to fetch conversation {conversation_id}: {response.status_code}")
raise Exception(f"API Error: {response.status_code}")
elif event_type == "queue.member.wrapup":
# Handle queue member wrapup
logger.info(f"Processed queue member wrapup for event: {event_id}")
else:
logger.warning(f"Unhandled event type: {event_type}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for EventBridge events.
"""
# EventBridge sends a single event per invocation, but batched if from SQS
# Assuming direct EventBridge to Lambda (single event)
records = event.get("Records", [event]) # Handle both direct and SQS batched
processed_count = 0
skipped_count = 0
for record in records:
try:
# Extract detail from EventBridge record
if "detail" in record:
detail = record["detail"]
else:
detail = record["body"] if "body" in record else record
event_id = detail.get("eventId")
if not event_id:
logger.warning("Event missing 'eventId', skipping.")
continue
# Create deduplication key
dedup_key = create_dedup_key(event_id, detail)
# Check for duplicates
is_duplicate = check_and_store_event(dedup_key)
if is_duplicate:
logger.info(f"Duplicate event detected and skipped: {event_id}")
skipped_count += 1
continue
# Process the event
process_unique_event(detail)
processed_count += 1
except Exception as e:
logger.error(f"Error processing event: {e}", exc_info=True)
# Re-raise to trigger Lambda retry if necessary
raise
return {
"statusCode": 200,
"body": json.dumps({
"message": f"Processed {processed_count}, Skipped {skipped_count} duplicates."
})
}
Complete Working Example
Below is the full, copy-pasteable Lambda function. Ensure you have the GenesysEventDedupTable DynamoDB table created with dedup_key as the partition key and TTL enabled on expires_at.
import json
import time
import hashlib
import logging
import requests
import boto3
from datetime import datetime
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError
# Configure Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Configuration
GENESYS_CLOUD_REGION = "mypurecloud.com"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
DEDUP_TABLE_NAME = "GenesysEventDedupTable"
# Global Resources
dynamodb = boto3.resource('dynamodb')
_token_cache: Dict[str, Any] = {
"token": None,
"expires_at": 0.0
}
def get_genesys_access_token() -> str:
now = time.time()
if _token_cache["token"] and now < _token_cache["expires_at"]:
return _token_cache["token"]
url = f"https://api.{GENESYS_CLOUD_REGION}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
try:
response = requests.post(url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
_token_cache["token"] = data["access_token"]
_token_cache["expires_at"] = now + data["expires_in"] - 60
return _token_cache["token"]
except requests.exceptions.RequestException as e:
logger.error(f"OAuth Error: {e}")
raise
def create_dedup_key(event_id: str, payload: Dict[str, Any]) -> str:
canonical_json = json.dumps(payload, sort_keys=True)
hash_part = hashlib.sha256(canonical_json.encode('utf-8')).hexdigest()[:16]
return f"{event_id}#{hash_part}"
def check_and_store_event(dedup_key: str) -> bool:
table = dynamodb.Table(DEDUP_TABLE_NAME)
try:
table.put_item(
Item={
'dedup_key': dedup_key,
'processed_at': datetime.utcnow().isoformat(),
'expires_at': int(time.time()) + (24 * 3600)
},
ConditionExpression="attribute_not_exists(dedup_key)"
)
return False
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return True
raise
def process_unique_event(detail: Dict[str, Any]) -> None:
event_type = detail.get("eventType")
event_id = detail.get("eventId")
if event_type == "conversation.update":
token = get_genesys_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
conversation_id = detail.get("data", {}).get("id")
if not conversation_id:
logger.warning(f"Missing conversation ID in event: {event_id}")
return
url = f"https://api.{GENESYS_CLOUD_REGION}/api/v2/conversations/all/{conversation_id}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
logger.info(f"Processed conversation {conversation_id}")
else:
logger.error(f"Failed to fetch conversation {conversation_id}: {response.status_code}")
raise Exception(f"API Error: {response.status_code}")
else:
logger.info(f"Processed event type: {event_type}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
records = event.get("Records", [event])
processed_count = 0
skipped_count = 0
for record in records:
try:
detail = record.get("detail", record.get("body", record))
event_id = detail.get("eventId")
if not event_id:
logger.warning("Event missing 'eventId', skipping.")
continue
dedup_key = create_dedup_key(event_id, detail)
if check_and_store_event(dedup_key):
logger.info(f"Duplicate skipped: {event_id}")
skipped_count += 1
continue
process_unique_event(detail)
processed_count += 1
except Exception as e:
logger.error(f"Error processing event: {e}", exc_info=True)
raise
return {
"statusCode": 200,
"body": json.dumps({
"message": f"Processed {processed_count}, Skipped {skipped_count} duplicates."
})
}
Common Errors & Debugging
Error: ConditionalCheckFailedException
- What causes it: The DynamoDB item already exists. This is expected behavior for deduplication.
- How to fix it: Ensure your code catches this exception and treats it as a successful deduplication (skip processing). Do not retry the business logic.
Error: 401 Unauthorized from Genesys Cloud
- What causes it: The OAuth token is expired or invalid.
- How to fix it: Verify your
CLIENT_IDandCLIENT_SECRET. Ensure the private app has the correct scopes (analytics:conversations:view). Check that the token caching logic is not returning a stale token.
Error: Event Missing ‘eventId’
- What causes it: The event structure does not match expectations.
- How to fix it: Log the raw event payload. Ensure you are extracting
detailcorrectly from the EventBridge record. If using SQS as an intermediary, the event structure changes; adjust thedetailextraction logic accordingly.
Error: High DynamoDB Write Costs
- What causes it: High volume of events.
- How to fix it: Use DynamoDB Streams or Batch Writes if volume is extremely high. For most CX use cases, single writes per event are acceptable. Ensure TTL is set to clean up old records.