Processing high-volume interaction events from EventBridge without hitting Lambda concurrency limits

Processing high-volume interaction events from EventBridge without hitting Lambda concurrency limits

What You Will Build

  • One sentence: You will build a Python Lambda function that processes Genesys Cloud interaction events from EventBridge in batches, using DynamoDB for state tracking and SQS for dead-letter queuing, specifically designed to prevent concurrency throttling during traffic spikes.
  • One sentence: This tutorial uses the AWS Lambda SDK (Boto3), AWS EventBridge, AWS SQS, and AWS DynamoDB APIs.
  • One sentence: The code is written in Python 3.9+ using the requests library for Genesys Cloud API calls and boto3 for AWS service interactions.

Prerequisites

  • AWS Account: With permissions to create/modify Lambda, EventBridge, SQS, and DynamoDB resources.
  • Genesys Cloud Account: An OAuth 2.0 Public or Confidential Client with the interaction:read and interaction:write scopes.
  • SDK Version: AWS SDK for Python (Boto3) version 1.26.0 or higher.
  • Language/Runtime: Python 3.9 or higher.
  • External Dependencies: requests, python-dotenv, boto3.

Authentication Setup

Genesys Cloud uses OAuth 2.0. For Lambda environments, you must cache tokens to avoid making a network request to the Genesys Cloud auth server for every single event. This adds latency and increases the chance of hitting rate limits on the auth endpoint itself.

We will use a singleton pattern within the Lambda execution environment to cache the token. Note that Lambda containers are often kept warm for several minutes, making this cache effective.

import os
import time
import requests
import boto3
import json
from typing import Dict, Any, Optional

# Genesys Cloud Configuration
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'mypurecloud.ie')
CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
AUTH_URL = f"https://{GENESYS_REGION}/oauth/token"

# AWS Configuration
DYNAMODB_TABLE = os.environ.get('STATE_TABLE_NAME')
SQS_DLQ_URL = os.environ.get('SQS_DLQ_URL')

# Global Token Cache (Persists across invocations in the same container)
_token_cache: Dict[str, Any] = {
    "access_token": None,
    "refresh_token": None,
    "expires_at": 0
}

def get_genesis_access_token() -> str:
    """
    Retrieves a valid Genesys Cloud access token.
    Uses cached token if valid, otherwise requests a new one.
    """
    current_time = time.time()
    
    # Check if we have a valid token
    if _token_cache["access_token"] and current_time < _token_cache["expires_at"]:
        return _token_cache["access_token"]
    
    # Token expired or missing, request new one
    try:
        response = requests.post(
            AUTH_URL,
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        token_data = response.json()
        
        # Update cache
        _token_cache["access_token"] = token_data["access_token"]
        _token_cache["refresh_token"] = token_data.get("refresh_token") # Usually null for client_credentials
        # Expire 5 minutes before actual expiry to be safe
        _token_cache["expires_at"] = current_time + (token_data["expires_in"] - 300)
        
        return _token_cache["access_token"]
    
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"Failed to obtain Genesys Cloud token: {str(e)}") from e

Implementation

Step 1: Configuring Lambda Concurrency and EventBridge Batch Size

Before writing the Lambda code, you must understand the infrastructure constraints. If EventBridge sends 100 events per second and your Lambda has a reserved concurrency of 10, you will hit limits immediately if each event is processed individually.

Critical Configuration:

  1. EventBridge Target: Configure the target to send batches of events. Set MaxBatchingWindow to 5 seconds and MaximumBatchingSize to 5120 KB. This allows Lambda to receive up to 10,000 events (if small) or fewer large events in a single invocation.
  2. Lambda Concurrency: Set Reserved Concurrency on the Lambda function. Do not rely on Account Level limits. If you expect 1,000 events/sec and each batch takes 200ms to process, you need at least 5 concurrent executions.

Why this matters: Without batching, each event triggers a new Lambda execution context startup (cold start or warm), which is expensive and slow. Batching amortizes this cost.

Step 2: Implementing the Batch Processor with Idempotency

We will process the detail of the EventBridge event. Genesys Cloud sends interaction events via EventBridge. We must ensure that if the same event is retried (due to a transient error), we do not process it twice. We use DynamoDB to track processed interaction IDs.

import uuid
from datetime import datetime

def check_and_mark_processed(interaction_id: str) -> bool:
    """
    Checks if an interaction ID has already been processed.
    If not, marks it as processed and returns False.
    If already processed, returns True.
    Uses DynamoDB conditional writes for atomicity.
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(DYNAMODB_TABLE)
    
    # Check if exists
    try:
        response = table.get_item(Key={'interactionId': interaction_id})
        if 'Item' in response:
            return True # Already processed
    except Exception as e:
        # Log error but continue to avoid blocking
        print(f"Error checking DynamoDB: {e}")
        pass
        
    # Mark as processed
    try:
        table.put_item(
            Item={
                'interactionId': interaction_id,
                'processedAt': datetime.utcnow().isoformat(),
                'processedBy': os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown')
            },
            ConditionExpression='attribute_not_exists(interactionId)'
        )
        return False # Newly processed
    except Exception as e:
        # If ConditionalCheckFailedException, it was processed by another concurrent Lambda
        if 'ConditionalCheckFailed' in str(e):
            return True
        raise e

def process_single_event(event_detail: Dict[str, Any]) -> bool:
    """
    Processes a single Genesys Cloud interaction event.
    Returns True if processed successfully, False otherwise.
    """
    interaction_id = event_detail.get('id')
    if not interaction_id:
        print("Warning: Event missing interaction ID")
        return False
    
    # Check idempotency
    if check_and_mark_processed(interaction_id):
        print(f"Interaction {interaction_id} already processed. Skipping.")
        return True # Considered successful because it's already done
    
    # Business Logic: Update Genesys Cloud Interaction or External System
    # Example: Fetch full interaction details from Genesys Cloud
    token = get_genesis_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    try:
        # Real API Endpoint: Get Interaction Details
        url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
        response = requests.get(url, headers=headers, timeout=5)
        
        if response.status_code == 429:
            # Rate limited by Genesys Cloud
            raise Exception(f"Genesys Cloud rate limited (429) for interaction {interaction_id}")
        
        response.raise_for_status()
        
        # Process the data
        interaction_data = response.json()
        print(f"Processed interaction {interaction_id}: Type={interaction_data.get('type')}")
        
        return True
        
    except Exception as e:
        print(f"Error processing interaction {interaction_id}: {str(e)}")
        return False

def send_to_dlq(failed_event: Dict[str, Any], error_message: str) -> None:
    """
    Sends failed events to an SQS Dead Letter Queue for later retry or analysis.
    """
    sqs = boto3.client('sqs')
    try:
        sqs.send_message(
            QueueUrl=SQS_DLQ_URL,
            MessageBody=json.dumps({
                "originalEvent": failed_event,
                "errorMessage": error_message,
                "timestamp": datetime.utcnow().isoformat()
            })
        )
    except Exception as e:
        print(f"Critical: Failed to send to DLQ: {e}")

Step 3: Handling the Lambda Handler with Batch Processing

The Lambda handler receives an array of events. We must iterate through them, process each, and track failures. If any event fails, we do not return an error for the entire batch immediately. Instead, we log the failures and send them to the DLQ. This prevents EventBridge from retrying the entire batch for one bad record, which would cause massive redundancy.

import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event: Dict[str, Any], context: Any) -> None:
    """
    Main Lambda handler for EventBridge events.
    Processes batches of Genesys Cloud interaction events.
    """
    batch_size = len(event.get('detail', [])) if isinstance(event.get('detail'), list) else len(event)
    
    # EventBridge sends a list of events if batched, or a single event if not.
    # We normalize to a list.
    events_to_process = event.get('detail', [event]) if 'detail' in event else [event]
    
    if not events_to_process:
        logger.info("No events to process")
        return

    logger.info(f"Processing batch of {len(events_to_process)} events")
    
    failed_events = []
    processed_count = 0
    
    for i, raw_event in enumerate(events_to_process):
        try:
            # Extract the actual interaction data from the EventBridge payload
            # Genesys Cloud EventBridge payload structure:
            # { "source": "genesys.cloud", "detail": { ... interaction data ... } }
            # However, when batched by EventBridge, the top-level event is the wrapper.
            # We need to look at 'detail' if it exists, else use the root.
            
            interaction_detail = raw_event.get('detail', raw_event)
            
            success = process_single_event(interaction_detail)
            
            if success:
                processed_count += 1
            else:
                failed_events.append({
                    "event": raw_event,
                    "error": "Processing failed or idempotency check failed unexpectedly"
                })
                
        except Exception as e:
            logger.error(f"Unhandled exception processing event {i}: {str(e)}")
            failed_events.append({
                "event": raw_event,
                "error": str(e)
            })
    
    logger.info(f"Batch complete. Processed: {processed_count}, Failed: {len(failed_events)}")
    
    # Send failed events to DLQ
    for failed in failed_events:
        send_to_dlq(failed['event'], failed['error'])

    # Note: We do not return an error here. 
    # Returning an error would cause EventBridge to retry the entire batch.
    # By sending failures to DLQ, we acknowledge receipt and move on.
    return {
        'statusCode': 200,
        'body': json.dumps(f'Processed {processed_count} events')
    }

Complete Working Example

Below is the full, copy-pasteable lambda_function.py file.

import os
import time
import requests
import boto3
import json
import logging
from typing import Dict, Any, List
from datetime import datetime

# --- Configuration ---
GENESYS_REGION = os.environ.get('GENESYS_REGION', 'mypurecloud.ie')
CLIENT_ID = os.environ.get('GENESYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('GENESYS_CLIENT_SECRET')
AUTH_URL = f"https://{GENESYS_REGION}/oauth/token"
DYNAMODB_TABLE = os.environ.get('STATE_TABLE_NAME')
SQS_DLQ_URL = os.environ.get('SQS_DLQ_URL')

# --- Token Cache ---
_token_cache: Dict[str, Any] = {
    "access_token": None,
    "expires_at": 0
}

# --- Logging ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_genesis_access_token() -> str:
    """Retrieves a valid Genesys Cloud access token with caching."""
    current_time = time.time()
    
    if _token_cache["access_token"] and current_time < _token_cache["expires_at"]:
        return _token_cache["access_token"]
    
    try:
        response = requests.post(
            AUTH_URL,
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        token_data = response.json()
        
        _token_cache["access_token"] = token_data["access_token"]
        _token_cache["expires_at"] = current_time + (token_data["expires_in"] - 300)
        
        return _token_cache["access_token"]
    
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"Failed to obtain Genesys Cloud token: {str(e)}") from e

def check_and_mark_processed(interaction_id: str) -> bool:
    """Checks DynamoDB for idempotency. Returns True if already processed."""
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(DYNAMODB_TABLE)
    
    try:
        response = table.get_item(Key={'interactionId': interaction_id})
        if 'Item' in response:
            return True
    except Exception as e:
        logger.warning(f"Error checking DynamoDB: {e}")
        
    try:
        table.put_item(
            Item={
                'interactionId': interaction_id,
                'processedAt': datetime.utcnow().isoformat()
            },
            ConditionExpression='attribute_not_exists(interactionId)'
        )
        return False
    except Exception as e:
        if 'ConditionalCheckFailed' in str(e):
            return True
        raise e

def process_single_event(event_detail: Dict[str, Any]) -> bool:
    """Processes a single interaction. Returns True on success."""
    interaction_id = event_detail.get('id')
    if not interaction_id:
        logger.warning("Event missing interaction ID")
        return False
    
    if check_and_mark_processed(interaction_id):
        logger.info(f"Interaction {interaction_id} already processed. Skipping.")
        return True
    
    token = get_genesis_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    try:
        url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
        response = requests.get(url, headers=headers, timeout=5)
        
        if response.status_code == 429:
            raise Exception(f"Genesys Cloud rate limited (429)")
        
        response.raise_for_status()
        interaction_data = response.json()
        logger.info(f"Processed interaction {interaction_id}: Type={interaction_data.get('type')}")
        return True
        
    except Exception as e:
        logger.error(f"Error processing interaction {interaction_id}: {str(e)}")
        return False

def send_to_dlq(failed_event: Dict[str, Any], error_message: str) -> None:
    """Sends failed events to SQS DLQ."""
    sqs = boto3.client('sqs')
    try:
        sqs.send_message(
            QueueUrl=SQS_DLQ_URL,
            MessageBody=json.dumps({
                "originalEvent": failed_event,
                "errorMessage": error_message,
                "timestamp": datetime.utcnow().isoformat()
            })
        )
    except Exception as e:
        logger.critical(f"Failed to send to DLQ: {e}")

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Main Lambda handler."""
    # Normalize event structure
    events_to_process: List[Dict[str, Any]] = []
    
    if 'detail' in event:
        if isinstance(event['detail'], list):
            events_to_process = event['detail']
        else:
            events_to_process = [event['detail']]
    else:
        events_to_process = [event]
    
    if not events_to_process:
        logger.info("No events to process")
        return {'statusCode': 200, 'body': 'No events'}

    logger.info(f"Processing batch of {len(events_to_process)} events")
    
    failed_events = []
    processed_count = 0
    
    for i, raw_event in enumerate(events_to_process):
        try:
            # The detail is the interaction payload
            interaction_detail = raw_event.get('detail', raw_event)
            
            success = process_single_event(interaction_detail)
            
            if success:
                processed_count += 1
            else:
                failed_events.append({
                    "event": raw_event,
                    "error": "Processing failed"
                })
                
        except Exception as e:
            logger.error(f"Unhandled exception processing event {i}: {str(e)}")
            failed_events.append({
                "event": raw_event,
                "error": str(e)
            })
    
    logger.info(f"Batch complete. Processed: {processed_count}, Failed: {len(failed_events)}")
    
    for failed in failed_events:
        send_to_dlq(failed['event'], failed['error'])

    return {
        'statusCode': 200,
        'body': json.dumps(f'Processed {processed_count} events')
    }

Common Errors & Debugging

Error: 429 Too Many Requests from Genesys Cloud

  • What causes it: Your Lambda concurrency is too high, or you are making too many API calls per second to Genesys Cloud. The default rate limit for /api/v2/interactions/details is 100 requests per second per client.
  • How to fix it:
    1. Reduce the Lambda Reserved Concurrency.
    2. Implement exponential backoff in your requests.get call.
    3. Use the Genesys Cloud SDK which handles retries automatically.
  • Code Fix: Add a retry decorator using tenacity.
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_interaction(interaction_id: str, token: str) -> Dict[str, Any]:
    url = f"https://{GENESYS_REGION}/api/v2/interactions/details/{interaction_id}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers, timeout=5)
    if response.status_code == 429:
        raise requests.exceptions.RetryError("Rate limited")
    response.raise_for_status()
    return response.json()

Error: Lambda Concurrency Limit Exceeded

  • What causes it: You have hit the AWS account-level or function-level concurrency limit. EventBridge will retry, causing a cascade.
  • How to fix it:
    1. Increase the Reserved Concurrency for the Lambda function.
    2. Increase the EventBridge batch size to reduce the number of Lambda invocations needed.
    3. Optimize the Lambda code to run faster.
  • Debugging: Check CloudWatch Logs for Concurrency limit exceeded errors. Monitor UnreservedConcurrentExecutions in CloudWatch.

Error: DynamoDB ConditionalCheckFailedException

  • What causes it: Two Lambda instances processed the same event simultaneously. Both tried to write the item, but only one succeeded. The other failed the condition.
  • How to fix it: This is expected behavior. The code already handles this by returning True (already processed). Ensure your business logic is idempotent.

Official References

My usual workaround is to adjusting the event source mapping batch size to 10,000 and enabling partial batch response support. This allows your Lambda to process valid records while rejecting only the failures, preventing full-throttle retries.

Configuration Value
MaximumBatchingWindowInSeconds 300
MaximumRecordAgeInSeconds -1

The docs actually state the batch size alone won’t save you if your Lambda execution time exceeds the window.

  • Set MaximumBatchingWindowInSeconds to 300.
  • Use TumblingWindow for consistent throughput.