Filter EventBridge Events for Specific Queue Conversation Ends
What You Will Build
- You will configure an Amazon EventBridge rule that triggers only when a
conversation.endevent occurs for a specific Genesys Cloud queue. - This implementation uses the Genesys Cloud EventBridge integration and AWS SDKs for rule creation and target invocation.
- The tutorial covers Python (Boto3) for AWS resource management and JavaScript (Node.js) for the Lambda target that processes the filtered event.
Prerequisites
- Genesys Cloud:
- An active Genesys Cloud organization with an EventBridge integration configured.
- A specific Queue ID (e.g.,
12345678-abcd-1234-abcd-123456789abc) you wish to monitor. - The EventBridge integration must be enabled and publishing events to your AWS Account.
- AWS:
- An AWS Account with an EventBridge Bus (default or custom) receiving Genesys events.
- An IAM Role with permissions for
events:PutRule,events:PutTargets, andlambda:InvokeFunction. - A Lambda function (or placeholder) to serve as the target for the rule.
- Language/SDK Requirements:
- Python 3.9+ with
boto3installed (pip install boto3). - Node.js 18+ for the Lambda handler example.
- Python 3.9+ with
Authentication Setup
This tutorial assumes you are running the AWS configuration script from a local development environment or a CI/CD pipeline. You must configure AWS credentials via environment variables or an IAM role attached to the execution context.
For the Python script, boto3 automatically handles credential resolution. Ensure your AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION are set.
import boto3
import json
import os
# Initialize the EventBridge client
# boto3 uses default credential chain (env vars, IAM role, ~/.aws/credentials)
events_client = boto3.client('events', region_name='us-east-1')
No OAuth scopes are required for the AWS side of this integration. The Genesys Cloud side requires the EventBridge integration to be pre-configured in the Genesys Admin Console under Integrations > EventBridge. Ensure the integration is “Active” and that the conversation.end event type is selected for publishing.
Implementation
Step 1: Define the Event Pattern Filter
The core of this tutorial is the EventBridge Event Pattern. Genesys Cloud publishes events with a specific structure. To filter for conversation.end events for a specific queue, you must match against the detail object within the event.
The Genesys Cloud EventBridge payload structure for conversation.end includes a routing.queueId field. We will construct a JSON filter that matches this ID.
Required OAuth Scope (for Genesys side configuration): integration:eventbridge:write (if configuring via API, though console configuration is standard).
Here is the precise event pattern structure:
{
"source": [
"genesys.cloud"
],
"detail-type": [
"conversation.end"
],
"detail": {
"routing": {
"queueId": [
"YOUR_QUEUE_ID_HERE"
]
}
}
}
Why this works:
source: Genesys Cloud always usesgenesys.cloudas the event source.detail-type: This filters outconversation.start,interaction.start, etc.detail.routing.queueId: This is the critical filter. If the conversation was not associated with this queue at the time of ending, the event is dropped by EventBridge before hitting your Lambda.
Step 2: Create the EventBridge Rule via Python
We will use the AWS SDK for Python (Boto3) to create a rule that applies this filter. This step ensures that only the specific events you want are forwarded to your target.
import boto3
import json
import uuid
from botocore.exceptions import ClientError
def create_queue_end_rule(queue_id: str, target_arn: str, region: str = 'us-east-1') -> dict:
"""
Creates an EventBridge rule that triggers on conversation.end for a specific queue.
Args:
queue_id: The Genesys Cloud Queue ID to filter on.
target_arn: The ARN of the Lambda function to invoke.
region: The AWS region where the EventBridge bus exists.
Returns:
The response from the PutRule API call.
"""
events = boto3.client('events', region_name=region)
# Generate a unique rule name to avoid conflicts in dev/test environments
rule_name = f"genesys-conv-end-queue-{queue_id[:8]}"
# Define the event pattern
event_pattern = {
"source": ["genesys.cloud"],
"detail-type": ["conversation.end"],
"detail": {
"routing": {
"queueId": [queue_id]
}
}
}
# Define the schedule expression (none needed for event-driven)
# We use 'rate' or 'cron' only for scheduled events, so this is empty for event patterns
try:
# Step 1: Create the Rule
rule_response = events.put_rule(
Name=rule_name,
Description=f"Triggers on conversation.end for Queue {queue_id}",
EventPattern=json.dumps(event_pattern),
State='ENABLED',
RoleArn=None # Not needed for Lambda targets, Lambda permission is handled separately
)
print(f"Rule '{rule_name}' created successfully.")
# Step 2: Attach the Target (Lambda Function)
events.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'genesys-queue-end-target-1',
'Arn': target_arn,
'InputPath': '$', # Pass the entire event to the Lambda
'DeadLetterConfig': {
'Arn': f'arn:aws:sqs:{region}:{os.getenv("AWS_ACCOUNT_ID")}:dlq-genesis-events' # Optional DLQ
}
}
]
)
print(f"Target attached to rule '{rule_name}'.")
return rule_response
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceAlreadyExistsException':
print(f"Rule '{rule_name}' already exists. Skipping creation.")
# You may want to update the rule here instead
else:
print(f"Failed to create rule: {e}")
raise
# Example usage
# QUEUE_ID = "12345678-abcd-1234-abcd-123456789abc"
# LAMBDA_ARN = "arn:aws:lambda:us-east-1:123456789012:function:MyGenesysProcessor"
# create_queue_end_rule(QUEUE_ID, LAMBDA_ARN)
Error Handling Note:
ResourceAlreadyExistsException: If you run this script twice, EventBridge will throw this error. In production, implement anupdate_rulefallback or check for existence first usingdescribe_rule.InvalidEventPatternException: If the JSON structure of the event pattern is malformed, AWS will reject the rule. EnsurequeueIdis an array of strings, even if there is only one ID.
Step 3: Process the Event in Lambda (JavaScript)
Once the rule matches, EventBridge invokes the target Lambda. Here is a Node.js handler that validates the event structure and extracts the relevant conversation data.
import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda";
/**
* Handler for Genesys Cloud conversation.end events filtered by Queue ID.
*
* @param {EventBridgeEvent<GenesysConversationEnd>} event
*/
export const handler = async (event) => {
console.log("Received Event:", JSON.stringify(event, null, 2));
// 1. Validate the event structure
if (!event.detail || !event.detail.routing) {
throw new Error("Invalid Genesys Cloud event structure: missing detail.routing");
}
const queueId = event.detail.routing.queueId;
const conversationId = event.detail.id;
const status = event.detail.status;
const wrapUpCode = event.detail.wrapUpCode;
const duration = event.detail.duration; // in milliseconds
// 2. Business Logic: Process the ended conversation
try {
// Example: Send to a data warehouse or update a CRM
await processConversationEnd({
queueId,
conversationId,
status,
wrapUpCode,
duration
});
return {
statusCode: 200,
body: JSON.stringify({ message: `Successfully processed conversation end for Queue ${queueId}` })
};
} catch (error) {
console.error("Error processing conversation end:", error);
// Re-throw to trigger retry or DLQ
throw error;
}
};
/**
* Simulates processing the conversation data.
*/
async function processConversationEnd(data) {
console.log(`Processing conversation ${data.conversationId} for queue ${data.queueId}`);
console.log(`Status: ${data.status}, Wrap-up: ${data.wrapUpCode}, Duration: ${data.duration}ms`);
// Here you would typically call an RDS database, S3, or another API
// For example, inserting into DynamoDB:
// await dynamoDBClient.put({ Item: data, TableName: 'GenesysConversations' });
return true;
}
Key Fields in the Payload:
event.detail.id: The unique Genesys Cloud conversation ID.event.detail.routing.queueId: The ID of the queue this conversation was assigned to.event.detail.wrapUpCode: The wrap-up code selected by the agent (if applicable).event.detail.duration: Total duration of the conversation in milliseconds.
Step 4: Verify the Filter with a Test Event
To ensure the filter works without waiting for a real call, you can send a test event to the EventBridge bus using the AWS CLI or Python. This simulates a conversation.end event.
import boto3
import json
def send_test_event(queue_id: str, region: str = 'us-east-1') -> None:
"""
Sends a synthetic conversation.end event to EventBridge for testing.
"""
events = boto3.client('events', region_name=region)
# Construct a realistic Genesys Cloud event payload
test_payload = {
"source": ["genesys.cloud"],
"account": os.getenv("AWS_ACCOUNT_ID"),
"region": region,
"time": "2023-10-27T10:00:00Z",
"id": str(uuid.uuid4()),
"detail-type": "conversation.end",
"resources": [],
"detail": {
"id": "test-conversation-id-123",
"type": "voice",
"status": "completed",
"routing": {
"queueId": queue_id, # This MUST match the rule filter
"queueName": "Support Queue"
},
"wrapUpCode": "resolved",
"duration": 120000, # 2 minutes in ms
"participantCount": 2
}
}
try:
response = events.put_events(
Entries=[
{
'Source': 'genesys.cloud',
'DetailType': 'conversation.end',
'Detail': json.dumps(test_payload['detail']),
'EventBusName': 'default' # Use your bus name if custom
}
]
)
if response['FailedEntryCount'] == 0:
print("Test event sent successfully. Check CloudWatch Logs for Lambda execution.")
else:
print(f"Failed to send event: {response['Entries']}")
except Exception as e:
print(f"Error sending test event: {e}")
# Usage
# send_test_event("12345678-abcd-1234-abcd-123456789abc")
Debugging Tip:
If the Lambda does not trigger, check the CloudWatch Logs for the EventBridge service. If the queueId in the test payload does not exactly match the queueId in the Rule’s Event Pattern, the rule will not match, and the event will be silently dropped.
Complete Working Example
Below is the full Python script that combines rule creation and test event sending. Save this as manage_genesys_events.py.
import boto3
import json
import uuid
import os
from botocore.exceptions import ClientError
class GenesysEventBridgeManager:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.events_client = boto3.client('events', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
def create_filter_rule(self, queue_id: str, target_lambda_arn: str) -> str:
"""
Creates an EventBridge rule for a specific queue's conversation.end events.
"""
rule_name = f"genesys-conv-end-{queue_id[:8]}"
event_pattern = {
"source": ["genesys.cloud"],
"detail-type": ["conversation.end"],
"detail": {
"routing": {
"queueId": [queue_id]
}
}
}
try:
self.events_client.put_rule(
Name=rule_name,
Description=f"Filter for conversation.end on Queue {queue_id}",
EventPattern=json.dumps(event_pattern),
State='ENABLED'
)
print(f"Rule '{rule_name}' created/updated.")
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise
try:
self.events_client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'target-1',
'Arn': target_lambda_arn,
'InputPath': '$',
'DeadLetterConfig': {
'Arn': f'arn:aws:sqs:{self.region}:{os.getenv("AWS_ACCOUNT_ID")}:dlq-genesis'
}
}
]
)
print(f"Target attached to '{rule_name}'.")
except ClientError as e:
print(f"Failed to attach target: {e}")
raise
return rule_name
def send_test_event(self, queue_id: str) -> None:
"""
Sends a test conversation.end event to verify the filter.
"""
test_detail = {
"id": f"test-conv-{uuid.uuid4()}",
"type": "voice",
"status": "completed",
"routing": {
"queueId": queue_id,
"queueName": "Test Queue"
},
"wrapUpCode": "test-resolved",
"duration": 5000,
"participantCount": 2
}
try:
response = self.events_client.put_events(
Entries=[
{
'Source': 'genesys.cloud',
'DetailType': 'conversation.end',
'Detail': json.dumps(test_detail),
'EventBusName': 'default'
}
]
)
if response['FailedEntryCount'] == 0:
print("Test event dispatched successfully.")
else:
print(f"Event dispatch failed: {response['Entries']}")
except Exception as e:
print(f"Error sending test event: {e}")
if __name__ == "__main__":
# Configuration
QUEUE_ID = os.getenv("GENESYS_QUEUE_ID", "12345678-abcd-1234-abcd-123456789abc")
LAMBDA_ARN = os.getenv("AWS_LAMBDA_ARN", "arn:aws:lambda:us-east-1:123456789012:function:MyGenesysProcessor")
if not QUEUE_ID or not LAMBDA_ARN:
raise ValueError("GENESYS_QUEUE_ID and AWS_LAMBDA_ARN environment variables are required.")
manager = GenesysEventBridgeManager(region='us-east-1')
# Step 1: Create/Update Rule
manager.create_filter_rule(QUEUE_ID, LAMBDA_ARN)
# Step 2: Send Test Event
manager.send_test_event(QUEUE_ID)
Common Errors & Debugging
Error: ResourceAlreadyExistsException
What causes it: You ran the script twice with the same rule_name. EventBridge rules are unique within a bus.
How to fix it: Use a deterministic name based on the Queue ID (as shown in the code) and implement a check or ignore this specific error code during updates.
Code Fix: The create_filter_rule method above wraps the put_rule call in a try-except block that handles this specific error code gracefully.
Error: Lambda Permission Denied
What causes it: EventBridge tries to invoke the Lambda, but the Lambda’s resource-based policy does not grant permission to the EventBridge service principal.
How to fix it: Add a permission statement to the Lambda function.
Code Fix (Python):
lambda_client = boto3.client('lambda')
lambda_client.add_permission(
FunctionName='MyGenesysProcessor',
StatementId='genesys-eventbridge-permission',
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=f'arn:aws:events:{region}:{account_id}:rule/genesys-conv-end-*'
)
Error: Event Not Matching
What causes it: The queueId in the Genesys event does not match the filter, or the detail-type is different (e.g., interaction.end vs conversation.end).
How to fix it:
- Verify the Queue ID in Genesys Cloud matches the ID in the filter.
- Check CloudWatch Logs for the Lambda. If the Lambda is not triggered, the filter failed.
- Use the
send_test_eventmethod to inject a known-good event and verify the Lambda triggers. If it does, the filter is correct, and the issue is with the live Genesys events (e.g., the conversation was not assigned to that queue).