Processing high-volume interaction events from EventBridge without hitting Lambda concurrency limits
What You Will Build
- One sentence: You will build a Python Lambda function that processes Genesys Cloud interaction events from EventBridge in batches, using DynamoDB for state tracking and SQS for dead-letter queuing, specifically designed to prevent concurrency throttling during traffic spikes.
- One sentence: This tutorial uses the AWS Lambda SDK (Boto3), AWS EventBridge, AWS SQS, and AWS DynamoDB APIs.
- One sentence: The code is written in Python 3.9+ using the
requestslibrary for Genesys Cloud API calls andboto3for AWS service interactions.
Prerequisites
- AWS Account: With permissions to create/modify Lambda, EventBridge, SQS, and DynamoDB resources.
- Genesys Cloud Account: An OAuth 2.0 Public or Confidential Client with the
interaction:readandinteraction:writescopes. - SDK Version: AWS SDK for Python (Boto3) version 1.26.0 or higher.
- Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests,python-dotenv,boto3.
Authentication Setup
Genesys Cloud uses OAuth 2.0. For Lambda environments, you must cache tokens to avoid making a network request to the Genesys Cloud auth server for every single event. This adds latency and increases the chance of hitting rate limits on the auth endpoint itself.
We will use a singleton pattern within the Lambda execution environment to cache the token. Note that Lambda containers are often kept warm for several minutes, making this cache effective.
import os
import time
import requests
import boto3
import json
from typing import Dict, Any, Optional
# Genesys Cloud Configuration
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'mypurecloud.ie')
CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
AUTH_URL = f"https://{GENESYS_REGION}/oauth/token"
# AWS Configuration
DYNAMODB_TABLE = os.environ.get('STATE_TABLE_NAME')
SQS_DLQ_URL = os.environ.get('SQS_DLQ_URL')
# Global Token Cache (Persists across invocations in the same container)
_token_cache: Dict[str, Any] = {
"access_token": None,
"refresh_token": None,
"expires_at": 0
}
def get_genesis_access_token() -> str:
"""
Retrieves a valid Genesys Cloud access token.
Uses cached token if valid, otherwise requests a new one.
"""
current_time = time.time()
# Check if we have a valid token
if _token_cache["access_token"] and current_time < _token_cache["expires_at"]:
return _token_cache["access_token"]
# Token expired or missing, request new one
try:
response = requests.post(
AUTH_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
token_data = response.json()
# Update cache
_token_cache["access_token"] = token_data["access_token"]
_token_cache["refresh_token"] = token_data.get("refresh_token") # Usually null for client_credentials
# Expire 5 minutes before actual expiry to be safe
_token_cache["expires_at"] = current_time + (token_data["expires_in"] - 300)
return _token_cache["access_token"]
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to obtain Genesys Cloud token: {str(e)}") from e
Implementation
Step 1: Configuring Lambda Concurrency and EventBridge Batch Size
Before writing the Lambda code, you must understand the infrastructure constraints. If EventBridge sends 100 events per second and your Lambda has a reserved concurrency of 10, you will hit limits immediately if each event is processed individually.
Critical Configuration:
- EventBridge Target: Configure the target to send batches of events. Set
MaxBatchingWindowto 5 seconds andMaximumBatchingSizeto 5120 KB. This allows Lambda to receive up to 10,000 events (if small) or fewer large events in a single invocation. - Lambda Concurrency: Set Reserved Concurrency on the Lambda function. Do not rely on Account Level limits. If you expect 1,000 events/sec and each batch takes 200ms to process, you need at least 5 concurrent executions.
Why this matters: Without batching, each event triggers a new Lambda execution context startup (cold start or warm), which is expensive and slow. Batching amortizes this cost.
Step 2: Implementing the Batch Processor with Idempotency
We will process the detail of the EventBridge event. Genesys Cloud sends interaction events via EventBridge. We must ensure that if the same event is retried (due to a transient error), we do not process it twice. We use DynamoDB to track processed interaction IDs.
import uuid
from datetime import datetime
def check_and_mark_processed(interaction_id: str) -> bool:
"""
Checks if an interaction ID has already been processed.
If not, marks it as processed and returns False.
If already processed, returns True.
Uses DynamoDB conditional writes for atomicity.
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
# Check if exists
try:
response = table.get_item(Key={'interactionId': interaction_id})
if 'Item' in response:
return True # Already processed
except Exception as e:
# Log error but continue to avoid blocking
print(f"Error checking DynamoDB: {e}")
pass
# Mark as processed
try:
table.put_item(
Item={
'interactionId': interaction_id,
'processedAt': datetime.utcnow().isoformat(),
'processedBy': os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown')
},
ConditionExpression='attribute_not_exists(interactionId)'
)
return False # Newly processed
except Exception as e:
# If ConditionalCheckFailedException, it was processed by another concurrent Lambda
if 'ConditionalCheckFailed' in str(e):
return True
raise e
def process_single_event(event_detail: Dict[str, Any]) -> bool:
"""
Processes a single Genesys Cloud interaction event.
Returns True if processed successfully, False otherwise.
"""
interaction_id = event_detail.get('id')
if not interaction_id:
print("Warning: Event missing interaction ID")
return False
# Check idempotency
if check_and_mark_processed(interaction_id):
print(f"Interaction {interaction_id} already processed. Skipping.")
return True # Considered successful because it's already done
# Business Logic: Update Genesys Cloud Interaction or External System
# Example: Fetch full interaction details from Genesys Cloud
token = get_genesis_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
# Real API Endpoint: Get Interaction Details
url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 429:
# Rate limited by Genesys Cloud
raise Exception(f"Genesys Cloud rate limited (429) for interaction {interaction_id}")
response.raise_for_status()
# Process the data
interaction_data = response.json()
print(f"Processed interaction {interaction_id}: Type={interaction_data.get('type')}")
return True
except Exception as e:
print(f"Error processing interaction {interaction_id}: {str(e)}")
return False
def send_to_dlq(failed_event: Dict[str, Any], error_message: str) -> None:
"""
Sends failed events to an SQS Dead Letter Queue for later retry or analysis.
"""
sqs = boto3.client('sqs')
try:
sqs.send_message(
QueueUrl=SQS_DLQ_URL,
MessageBody=json.dumps({
"originalEvent": failed_event,
"errorMessage": error_message,
"timestamp": datetime.utcnow().isoformat()
})
)
except Exception as e:
print(f"Critical: Failed to send to DLQ: {e}")
Step 3: Handling the Lambda Handler with Batch Processing
The Lambda handler receives an array of events. We must iterate through them, process each, and track failures. If any event fails, we do not return an error for the entire batch immediately. Instead, we log the failures and send them to the DLQ. This prevents EventBridge from retrying the entire batch for one bad record, which would cause massive redundancy.
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: Dict[str, Any], context: Any) -> None:
"""
Main Lambda handler for EventBridge events.
Processes batches of Genesys Cloud interaction events.
"""
batch_size = len(event.get('detail', [])) if isinstance(event.get('detail'), list) else len(event)
# EventBridge sends a list of events if batched, or a single event if not.
# We normalize to a list.
events_to_process = event.get('detail', [event]) if 'detail' in event else [event]
if not events_to_process:
logger.info("No events to process")
return
logger.info(f"Processing batch of {len(events_to_process)} events")
failed_events = []
processed_count = 0
for i, raw_event in enumerate(events_to_process):
try:
# Extract the actual interaction data from the EventBridge payload
# Genesys Cloud EventBridge payload structure:
# { "source": "genesys.cloud", "detail": { ... interaction data ... } }
# However, when batched by EventBridge, the top-level event is the wrapper.
# We need to look at 'detail' if it exists, else use the root.
interaction_detail = raw_event.get('detail', raw_event)
success = process_single_event(interaction_detail)
if success:
processed_count += 1
else:
failed_events.append({
"event": raw_event,
"error": "Processing failed or idempotency check failed unexpectedly"
})
except Exception as e:
logger.error(f"Unhandled exception processing event {i}: {str(e)}")
failed_events.append({
"event": raw_event,
"error": str(e)
})
logger.info(f"Batch complete. Processed: {processed_count}, Failed: {len(failed_events)}")
# Send failed events to DLQ
for failed in failed_events:
send_to_dlq(failed['event'], failed['error'])
# Note: We do not return an error here.
# Returning an error would cause EventBridge to retry the entire batch.
# By sending failures to DLQ, we acknowledge receipt and move on.
return {
'statusCode': 200,
'body': json.dumps(f'Processed {processed_count} events')
}
Complete Working Example
Below is the full, copy-pasteable lambda_function.py file.
import os
import time
import requests
import boto3
import json
import logging
from typing import Dict, Any, List
from datetime import datetime
# --- Configuration ---
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'mypurecloud.ie')
CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
AUTH_URL = f"https://{GENESYS_REGION}/oauth/token"
DYNAMODB_TABLE = os.environ.get('STATE_TABLE_NAME')
SQS_DLQ_URL = os.environ.get('SQS_DLQ_URL')
# --- Token Cache ---
_token_cache: Dict[str, Any] = {
"access_token": None,
"expires_at": 0
}
# --- Logging ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_genesis_access_token() -> str:
"""Retrieves a valid Genesys Cloud access token with caching."""
current_time = time.time()
if _token_cache["access_token"] and current_time < _token_cache["expires_at"]:
return _token_cache["access_token"]
try:
response = requests.post(
AUTH_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
token_data = response.json()
_token_cache["access_token"] = token_data["access_token"]
_token_cache["expires_at"] = current_time + (token_data["expires_in"] - 300)
return _token_cache["access_token"]
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to obtain Genesys Cloud token: {str(e)}") from e
def check_and_mark_processed(interaction_id: str) -> bool:
"""Checks DynamoDB for idempotency. Returns True if already processed."""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
try:
response = table.get_item(Key={'interactionId': interaction_id})
if 'Item' in response:
return True
except Exception as e:
logger.warning(f"Error checking DynamoDB: {e}")
try:
table.put_item(
Item={
'interactionId': interaction_id,
'processedAt': datetime.utcnow().isoformat()
},
ConditionExpression='attribute_not_exists(interactionId)'
)
return False
except Exception as e:
if 'ConditionalCheckFailed' in str(e):
return True
raise e
def process_single_event(event_detail: Dict[str, Any]) -> bool:
"""Processes a single interaction. Returns True on success."""
interaction_id = event_detail.get('id')
if not interaction_id:
logger.warning("Event missing interaction ID")
return False
if check_and_mark_processed(interaction_id):
logger.info(f"Interaction {interaction_id} already processed. Skipping.")
return True
token = get_genesis_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 429:
raise Exception(f"Genesys Cloud rate limited (429)")
response.raise_for_status()
interaction_data = response.json()
logger.info(f"Processed interaction {interaction_id}: Type={interaction_data.get('type')}")
return True
except Exception as e:
logger.error(f"Error processing interaction {interaction_id}: {str(e)}")
return False
def send_to_dlq(failed_event: Dict[str, Any], error_message: str) -> None:
"""Sends failed events to SQS DLQ."""
sqs = boto3.client('sqs')
try:
sqs.send_message(
QueueUrl=SQS_DLQ_URL,
MessageBody=json.dumps({
"originalEvent": failed_event,
"errorMessage": error_message,
"timestamp": datetime.utcnow().isoformat()
})
)
except Exception as e:
logger.critical(f"Failed to send to DLQ: {e}")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Main Lambda handler."""
# Normalize event structure
events_to_process: List[Dict[str, Any]] = []
if 'detail' in event:
if isinstance(event['detail'], list):
events_to_process = event['detail']
else:
events_to_process = [event['detail']]
else:
events_to_process = [event]
if not events_to_process:
logger.info("No events to process")
return {'statusCode': 200, 'body': 'No events'}
logger.info(f"Processing batch of {len(events_to_process)} events")
failed_events = []
processed_count = 0
for i, raw_event in enumerate(events_to_process):
try:
# The detail is the interaction payload
interaction_detail = raw_event.get('detail', raw_event)
success = process_single_event(interaction_detail)
if success:
processed_count += 1
else:
failed_events.append({
"event": raw_event,
"error": "Processing failed"
})
except Exception as e:
logger.error(f"Unhandled exception processing event {i}: {str(e)}")
failed_events.append({
"event": raw_event,
"error": str(e)
})
logger.info(f"Batch complete. Processed: {processed_count}, Failed: {len(failed_events)}")
for failed in failed_events:
send_to_dlq(failed['event'], failed['error'])
return {
'statusCode': 200,
'body': json.dumps(f'Processed {processed_count} events')
}
Common Errors & Debugging
Error: 429 Too Many Requests from Genesys Cloud
- What causes it: Your Lambda concurrency is too high, or you are making too many API calls per second to Genesys Cloud. The default rate limit for
/api/v2/interactions/detailsis 100 requests per second per client. - How to fix it:
- Reduce the Lambda Reserved Concurrency.
- Implement exponential backoff in your
requests.getcall. - Use the Genesys Cloud SDK which handles retries automatically.
- Code Fix: Add a retry decorator using
tenacity.
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_interaction(interaction_id: str, token: str) -> Dict[str, Any]:
url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 429:
raise requests.exceptions.RetryError("Rate limited")
response.raise_for_status()
return response.json()
Error: Lambda Concurrency Limit Exceeded
- What causes it: You have hit the AWS account-level or function-level concurrency limit. EventBridge will retry, causing a cascade.
- How to fix it:
- Increase the Reserved Concurrency for the Lambda function.
- Increase the EventBridge batch size to reduce the number of Lambda invocations needed.
- Optimize the Lambda code to run faster.
- Debugging: Check CloudWatch Logs for
Concurrency limit exceedederrors. MonitorUnreservedConcurrentExecutionsin CloudWatch.
Error: DynamoDB ConditionalCheckFailedException
- What causes it: Two Lambda instances processed the same event simultaneously. Both tried to write the item, but only one succeeded. The other failed the condition.
- How to fix it: This is expected behavior. The code already handles this by returning
True(already processed). Ensure your business logic is idempotent.