Processing High-Volume Interaction Events from EventBridge Without Hitting Lambda Concurrency Limits
What You Will Build
- A Python-based AWS Lambda function that processes Genesys Cloud interaction events streamed via EventBridge.
- The solution uses Step Functions Standard Workflows to decouple ingestion from processing, ensuring no Lambda concurrency limits are hit during traffic spikes.
- The code covers the EventBridge rule creation, the ingestion Lambda, and the Step Functions state machine definition.
Prerequisites
- AWS Account: With permissions to create Lambda, EventBridge, Step Functions, and CloudWatch Logs resources.
- Genesys Cloud OAuth Client: A confidential client with the
interaction:viewscope. - SDK:
boto3(AWS SDK for Python) version 1.26.0 or later. - Runtime: Python 3.9 or later.
- External Dependencies:
requestsfor Genesys API calls,botocorefor error handling.
Authentication Setup
To process Genesys Cloud interactions, the Lambda function must authenticate using OAuth 2.0. In a high-throughput scenario, caching tokens is critical to avoid rate-limiting the Genesys Cloud authorization server.
The following snippet demonstrates a production-grade token cache using a simple in-memory dictionary with TTL (Time-To-Live). In a distributed environment, use ElastiCache or DynamoDB for shared state.
import time
import requests
import boto3
import json
import os
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError
# Configuration
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'us-east-1')
GENESYS_CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
GENESYS_CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
GENESYS_BASE_URL = f'https://{GENESYS_REGION}.mypurecloud.com'
# Token Cache
_token_cache: Dict[str, dict] = {}
_TOKEN_TTL = 5400 # 90 minutes in seconds (Genesys tokens last 1 hour)
def get_access_token() -> str:
"""
Retrieves a valid Genesys Cloud access token.
Implements a simple TTL cache to reduce API calls to the auth server.
"""
now = time.time()
# Check cache
if 'access_token' in _token_cache:
token_data = _token_cache['access_token']
if now < token_data['expires_at']:
return token_data['token']
# Fetch new token
auth_url = f"{GENESYS_BASE_URL}/oauth/token"
payload = {
'grant_type': 'client_credentials',
'client_id': GENESYS_CLIENT_ID,
'client_secret': GENESYS_CLIENT_SECRET
}
try:
response = requests.post(auth_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
# Cache the token
_token_cache['access_token'] = {
'token': data['access_token'],
'expires_at': now + _TOKEN_TTL
}
return data['access_token']
except requests.exceptions.RequestException as e:
# Log error and re-raise for Step Functions to handle retry
raise RuntimeError(f"Failed to fetch Genesys token: {e}") from e
Implementation
Step 1: Create the Ingestion Lambda Function
The ingestion Lambda is the entry point. Its sole responsibility is to receive the EventBridge event, extract the interaction ID, and start a Step Functions execution. This Lambda must be fast and lightweight to handle high concurrency.
OAuth Scope Required: None (Authentication happens in the processor Lambda).
import json
import os
import boto3
from typing import Any, Dict
step_functions = boto3.client('stepfunctions')
STATE_MACHINE_ARN = os.environ.get('STATE_MACHINE_ARN')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Ingestion Lambda: Receives EventBridge events and starts a Step Functions execution.
This decouples the ingestion rate from the processing rate.
"""
# EventBridge events are wrapped in a 'detail' object
detail = event.get('detail', {})
interaction_id = detail.get('interactionId')
if not interaction_id:
raise ValueError("Missing interactionId in event detail")
# Start Step Functions Execution
# Input is passed as a stringified JSON object
execution_input = json.dumps({
'interactionId': interaction_id,
'eventType': detail.get('eventType'),
'timestamp': detail.get('timestamp')
})
try:
response = step_functions.start_execution(
stateMachineArn=STATE_MACHINE_ARN,
input=execution_input,
name=f"process-interaction-{interaction_id}"
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Execution started',
'executionArn': response['executionArn']
})
}
except ClientError as e:
# If Step Functions is throttled, we fail fast.
# In production, consider a dead-letter queue or exponential backoff here.
raise RuntimeError(f"Failed to start Step Functions execution: {e}") from e
Step 2: Define the Step Functions State Machine
Step Functions Standard Workflows do not have a concurrency limit in the same way Lambda does. They allow you to process millions of executions over days. We will use a “Map” state to process batches if necessary, or a simple “Chain” for single interaction processing.
For high-volume interaction processing, we use a Standard Workflow with a Retry Policy for the processor Lambda.
Create a file named state_machine.asl.json:
{
"Comment": "Process Genesys Cloud Interactions",
"StartAt": "FetchInteractionDetails",
"States": {
"FetchInteractionDetails": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:GenesysProcessorFunction",
"Parameters": {
"interactionId.$": "$.interactionId",
"eventType.$": "$.eventType"
},
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
},
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 5,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "SendToDeadLetterQueue"
}
],
"End": true
},
"SendToDeadLetterQueue": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "DLQ_QUEUE_URL",
"MessageBody.$": "$"
},
"End": true
}
}
}
Step 3: Implement the Processor Lambda
This Lambda performs the heavy lifting: authenticating, fetching data from Genesys Cloud, and writing it to your data store (e.g., DynamoDB, S3, or a database).
OAuth Scope Required: interaction:view
import json
import os
import boto3
import requests
from typing import Dict, Any
from datetime import datetime
# Reuse the get_access_token function from Authentication Setup
# ... [Insert get_access_token code here] ...
dynamodb = boto3.resource('dynamodb')
TABLE_NAME = os.environ.get('INTERACTION_TABLE_NAME')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Processor Lambda: Fetches interaction details from Genesys Cloud and stores them.
"""
interaction_id = event['interactionId']
try:
# 1. Authenticate
token = get_access_token()
# 2. Fetch Interaction Details from Genesys Cloud
# Endpoint: GET /api/v2/interactions/events/details/{interactionId}
url = f"https://{os.environ.get('GENESYS_REGION', 'us-east-1')}.mypurecloud.com/api/v2/interactions/events/details/{interaction_id}"
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 401:
# Token expired or invalid, clear cache and retry
_token_cache.pop('access_token', None)
raise RuntimeError("Token invalid, retrying...")
if response.status_code == 429:
# Rate limited, Step Functions will retry based on retry policy
raise RuntimeError("Genesys API Rate Limited")
response.raise_for_status()
interaction_data = response.json()
# 3. Process and Store Data
# Example: Store in DynamoDB
item = {
'interactionId': interaction_id,
'startTime': interaction_data.get('startTime'),
'channels': interaction_data.get('channels', []),
'processedAt': datetime.utcnow().isoformat()
}
dynamodb.Table(TABLE_NAME).put_item(Item=item)
return {
'statusCode': 200,
'message': 'Interaction processed successfully'
}
except Exception as e:
# Log the error and raise to trigger Step Functions Retry
print(f"Error processing interaction {interaction_id}: {str(e)}")
raise e
Step 4: Deploy Infrastructure with Terraform
To ensure the components are wired correctly, use Terraform to create the EventBridge Rule, Lambda functions, and Step Functions state machine.
provider "aws" {
region = "us-east-1"
}
# 1. IAM Roles
resource "aws_iam_role" "ingestion_lambda_role" {
name = "gen_ingestion_lambda_role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "ingestion_lambda_policy" {
role = aws_iam_role.ingestion_lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role" "processor_lambda_role" {
name = "gen_processor_lambda_role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "processor_lambda_policy" {
role = aws_iam_role.processor_lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
# 2. Lambda Functions
resource "aws_lambda_function" "ingestion_function" {
filename = "lambda/ingestion.zip"
function_name = "GenesysIngestionFunction"
role = aws_iam_role.ingestion_lambda_role.arn
handler = "ingestion.lambda_handler"
runtime = "python3.9"
timeout = 10
memory_size = 128
environment {
variables = {
STATE_MACHINE_ARN = aws_sfn_state_machine.gen_processor.arn
}
}
}
resource "aws_lambda_function" "processor_function" {
filename = "lambda/processor.zip"
function_name = "GenesysProcessorFunction"
role = aws_iam_role.processor_lambda_role.arn
handler = "processor.lambda_handler"
runtime = "python3.9"
timeout = 60
memory_size = 512
environment {
variables = {
GENESYS_CLIENT_ID = var.gen_client_id
GENESYS_CLIENT_SECRET = var.gen_client_secret
GENESYS_REGION = var.gen_region
INTERACTION_TABLE_NAME = aws_dynamodb_table.interactions.name
}
}
}
# 3. Step Functions State Machine
resource "aws_sfn_state_machine" "gen_processor" {
name = "GenesysInteractionProcessor"
role_arn = aws_iam_role.sfn_role.arn
type = "STANDARD" # Critical for high volume, low cost
definition = jsonencode({
Comment = "Process Genesys Interactions"
StartAt = "FetchInteractionDetails"
States = {
FetchInteractionDetails = {
Type = "Task"
Resource = aws_lambda_function.processor_function.invoke_arn
Parameters = {
interactionId.$ = "$.interactionId"
eventType.$ = "$.eventType"
}
Retry = [
{
ErrorEquals = ["States.TaskFailed"]
IntervalSeconds = 2
MaxAttempts = 3
BackoffRate = 2.0
}
]
End = true
}
}
})
}
# 4. EventBridge Rule
resource "aws_cloudwatch_event_rule" "genesys_interactions" {
name = "GenesysInteractionEvents"
description = "Capture Genesys Cloud Interaction Events"
event_pattern = jsonencode({
source = ["genesys.cloud"]
detail-type = ["Genesys Cloud Interaction Event"]
})
}
resource "aws_cloudwatch_event_target" "ingestion_target" {
rule = aws_cloudwatch_event_rule.genesys_interactions.name
target_id = "IngestionLambda"
arn = aws_lambda_function.ingestion_function.arn
}
resource "aws_lambda_permission" "allow_eventbridge" {
statement_id = "AllowExecutionFromEventBridge"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.ingestion_function.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.genesys_interactions.arn
}
# 5. DynamoDB Table
resource "aws_dynamodb_table" "interactions" {
name = "GenesysInteractions"
billing_mode = "PAY_PER_REQUEST"
hash_key = "interactionId"
attribute {
name = "interactionId"
type = "S"
}
}
Complete Working Example
The complete solution requires three files: ingestion.py, processor.py, and state_machine.asl.json (or embedded in Terraform as shown above).
File: processor.py
import time
import requests
import boto3
import os
import json
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'us-east-1')
GENESYS_CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
GENESYS_CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
GENESYS_BASE_URL = f'https://{GENESYS_REGION}.mypurecloud.com'
TABLE_NAME = os.environ.get('INTERACTION_TABLE_NAME')
_token_cache: Dict[str, dict] = {}
_TOKEN_TTL = 5400
dynamodb = boto3.resource('dynamodb')
def get_access_token() -> str:
now = time.time()
if 'access_token' in _token_cache:
token_data = _token_cache['access_token']
if now < token_data['expires_at']:
return token_data['token']
auth_url = f"{GENESYS_BASE_URL}/oauth/token"
payload = {
'grant_type': 'client_credentials',
'client_id': GENESYS_CLIENT_ID,
'client_secret': GENESYS_CLIENT_SECRET
}
try:
response = requests.post(auth_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
_token_cache['access_token'] = {
'token': data['access_token'],
'expires_at': now + _TOKEN_TTL
}
return data['access_token']
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Auth failed: {e}") from e
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
interaction_id = event['interactionId']
try:
token = get_access_token()
url = f"{GENESYS_BASE_URL}/api/v2/interactions/events/details/{interaction_id}"
headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json'}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 401:
_token_cache.pop('access_token', None)
raise RuntimeError("Token expired")
if response.status_code == 429:
raise RuntimeError("Rate Limited")
response.raise_for_status()
data = response.json()
item = {
'interactionId': interaction_id,
'startTime': data.get('startTime'),
'channels': data.get('channels', []),
'processedAt': time.time()
}
dynamodb.Table(TABLE_NAME).put_item(Item=item)
return {'statusCode': 200}
except Exception as e:
print(f"Error: {e}")
raise e
Common Errors & Debugging
Error: 429 Too Many Requests
- What causes it: Genesys Cloud API enforces rate limits per client ID. If your Step Functions executions retry too aggressively, you will hit this limit.
- How to fix it: Increase the
IntervalSecondsandBackoffRatein the Step Functions Retry policy. Implement exponential backoff in the Lambda itself if calling multiple endpoints. - Code showing the fix:
"Retry": [ { "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 10, "MaxAttempts": 5, "BackoffRate": 3.0 } ]
Error: Lambda Concurrency Limit Reached
- What causes it: The ingestion Lambda is invoked directly by EventBridge. If Genesys sends 10,000 events in a second, Lambda will throttle after its reserved concurrency limit.
- How to fix it: Ensure the ingestion Lambda is extremely lightweight (just starting the Step Functions execution). Step Functions Standard Workflows handle the queueing. If you still see throttling, increase the Lambda Reserved Concurrency or use SQS as a buffer between EventBridge and Lambda.
- Code showing the fix: No code change needed in the Lambda. Adjust AWS Lambda configuration via Console or Terraform:
resource "aws_lambda_function" "ingestion_function" { reserved_concurrent_executions = 500 # ... }
Error: 401 Unauthorized
- What causes it: The OAuth token has expired, or the client credentials are incorrect.
- How to fix it: Verify
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRET. Ensure the token cache is cleared when a 401 is received, as shown in theprocessor.pyexample.