Filter EventBridge Events for Queue-Specific Conversation Endings
What You Will Build
- A Python application that subscribes to AWS EventBridge and filters inbound Genesys Cloud events to process only
conversation.endevents originating from a specific Genesys Queue. - The code uses the AWS SDK for Python (Boto3) to receive events from an EventBridge Bus and applies strict payload inspection to isolate the target queue.
- The programming language covered is Python 3.9+.
Prerequisites
- Genesys Cloud Account: You must have an organization with Event Streams enabled and configured to send events to an AWS EventBridge Bus.
- AWS Account: You need an active AWS account with permissions to create EventBridge Rules and invoke Lambda functions (or local testing via LocalStack).
- OAuth Scopes: While this tutorial focuses on the consumer side (EventBridge), the Genesys Cloud Event Stream configuration requires the
eventstream:allscope in your Genesys Cloud integration settings. - Python Dependencies:
boto3,requests(for optional validation),pydantic(for robust payload parsing).
Authentication Setup
This tutorial assumes the Genesys Cloud to EventBridge bridge is already configured. The “authentication” for this consumer code is handled by AWS IAM roles attached to your EventBridge Rule or Lambda function. You do not need to manage Genesys OAuth tokens in this specific consumer script because the event payload is pushed to you by Genesys Cloud.
However, to verify the queue ID you are filtering for, you must authenticate with Genesys Cloud once to retrieve the correct UUID.
import requests
import os
def get_queue_id_from_name(org_name: str, queue_name: str) -> str:
"""
Helper to find the Queue UUID using Genesys Cloud API.
Requires environment variables: GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_ORG_ID
"""
client_id = os.environ.get("GENESYS_CLIENT_ID")
client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
org_id = os.environ.get("GENESYS_ORG_ID")
if not client_id or not client_secret or not org_id:
raise ValueError("Missing Genesys Cloud credentials in environment variables.")
# 1. Get OAuth Token
token_url = f"https://api.mypurecloud.com/oauth/token"
auth_response = requests.post(
token_url,
auth=(client_id, client_secret),
data={"grant_type": "client_credentials"}
)
auth_response.raise_for_status()
access_token = auth_response.json()["access_token"]
# 2. Search for Queue
# Note: In production, cache this ID. Do not call this on every event.
search_url = f"https://{org_id}.mypurecloud.com/api/v2/routing/queues"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Fetch queues (pagination may be needed for large orgs, simplified here)
queues_response = requests.get(search_url, headers=headers)
queues_response.raise_for_status()
queues = queues_response.json().get("entities", [])
for queue in queues:
if queue["name"] == queue_name:
return queue["id"]
raise ValueError(f"Queue '{queue_name}' not found.")
# Example Usage:
# TARGET_QUEUE_ID = get_queue_id_from_name("my-org-id", "Sales Support")
Implementation
Step 1: Define the Event Structure and Filter Logic
Genesys Cloud sends events to EventBridge with a specific structure. The critical fields for filtering are:
detail-type: Must beconversation.end.detail.queue.id: Must match your target Queue UUID.
We will create a Pydantic model to enforce type safety and a filter function. This ensures that malformed events or events from other queues are rejected immediately.
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, validator
import logging
logger = logging.getLogger(__name__)
# Define the structure of the Genesys Cloud Conversation End Event
class QueueInfo(BaseModel):
id: str
name: str
class ConversationEndDetail(BaseModel):
"""
Represents the 'detail' payload of a Genesys Cloud conversation.end event.
"""
conversationId: str = Field(..., alias="conversationId")
queue: Optional[QueueInfo] = None
direction: Optional[str] = None
wrapupCode: Optional[str] = None
class Config:
populate_by_name = True # Allows using both camelCase and snake_case if needed
class GenesysEventBridgeEvent(BaseModel):
"""
Represents the full EventBridge event structure.
"""
id: str
source: str
account: str
time: str
region: str
resources: List[str]
detail_type: str = Field(..., alias="detail-type")
detail: ConversationEndDetail
@validator('source')
def check_source(cls, v):
if not v.startswith("com.genesys.cloud"):
raise ValueError("Event source is not from Genesys Cloud")
return v
def is_target_queue_event(event_dict: Dict[str, Any], target_queue_id: str) -> bool:
"""
Determines if the event is a conversation.end for the specific queue.
Args:
event_dict: The raw JSON payload from EventBridge.
target_queue_id: The UUID of the queue to filter for.
Returns:
True if the event matches, False otherwise.
"""
try:
# Parse and validate the event structure
event = GenesysEventBridgeEvent.parse_obj(event_dict)
# Filter 1: Check Detail Type
if event.detail_type != "conversation.end":
logger.debug(f"Skipping event {event.id}: detail-type is '{event.detail_type}', not 'conversation.end'")
return False
# Filter 2: Check Queue ID
# Note: Not all conversations are associated with a queue (e.g., direct agent calls).
if event.detail.queue is None:
logger.debug(f"Skipping event {event.id}: No queue associated with conversation.")
return False
if event.detail.queue.id != target_queue_id:
logger.debug(f"Skipping event {event.id}: Queue ID {event.detail.queue.id} does not match target {target_queue_id}")
return False
return True
except Exception as e:
# Log parsing errors but do not crash the consumer
logger.error(f"Failed to parse event: {e}")
return False
Step 2: Implement the AWS Lambda Handler
In a production environment, this logic runs inside an AWS Lambda function triggered by an EventBridge Rule. The handler receives the event, applies the filter, and processes the data.
import json
import logging
import os
from typing import Dict, Any, Optional
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Load Target Queue ID from Environment Variable
TARGET_QUEUE_ID = os.environ.get("TARGET_QUEUE_ID")
if not TARGET_QUEUE_ID:
raise EnvironmentError("TARGET_QUEUE_ID environment variable is not set.")
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for Genesys Cloud EventBridge events.
Filters for 'conversation.end' events for a specific queue.
"""
# EventBridge sends a list of events in some configurations,
# but typically one event per invocation for standard rules.
# However, if using a batch, event['detail'] might be wrapped.
# Standard EventBridge direct trigger passes the event directly.
# Check if this is a batched event (from EventBridge Archive or specific rule configs)
# Usually, direct Lambda triggers from EventBridge pass the single event.
events_to_process = event if isinstance(event, list) else [event]
processed_count = 0
skipped_count = 0
for single_event in events_to_process:
try:
# Step 1: Apply Filter
if is_target_queue_event(single_event, TARGET_QUEUE_ID):
# Step 2: Process the Event
process_conversation_end(single_event)
processed_count += 1
else:
skipped_count += 1
except Exception as e:
logger.error(f"Error processing event {single_event.get('id', 'unknown')}: {str(e)}")
# Depending on requirements, you might want to re-raise or send to DLQ
raise e
return {
"statusCode": 200,
"body": json.dumps({
"processed": processed_count,
"skipped": skipped_count
})
}
def process_conversation_end(event: Dict[str, Any]) -> None:
"""
Business logic to handle the filtered conversation.end event.
"""
detail = event.get("detail", {})
conversation_id = detail.get("conversationId")
queue_name = detail.get("queue", {}).get("name", "Unknown Queue")
wrapup_code = detail.get("wrapupCode")
logger.info(
f"Processing Conversation End: "
f"ConvID={conversation_id}, "
f"Queue={queue_name}, "
f"Wrapup={wrapup_code}"
)
# Example: Send to DynamoDB, SQS, or another API
# save_to_database(conversation_id, queue_name, wrapup_code)
Step 3: Configure the EventBridge Rule (Infrastructure as Code)
To ensure your Lambda receives the events, you must configure an EventBridge Rule. While you can do this in the AWS Console, using Terraform (HCL) ensures reproducibility. This rule filters at the infrastructure level before the Lambda is invoked, saving costs.
resource "aws_cloudwatch_event_rule" "genesys_conversation_end" {
name = "genesys-queue-conversation-end"
description = "Capture Genesys Cloud conversation.end events for specific queue"
event_pattern = jsonencode({
"source" = ["com.genesys.cloud"]
"detail-type" = ["conversation.end"]
})
}
resource "aws_cloudwatch_event_target" "genesys_lambda_target" {
rule = aws_cloudwatch_event_rule.genesys_conversation_end.name
target_id = "GenesysConversationEndTarget"
arn = aws_lambda_function.genesys_processor.arn
input_transformer {
input_paths = {
"event" = "$"
}
input_template = "{\"event\": <event>}"
}
}
resource "aws_lambda_function" "genesys_processor" {
function_name = "genesys-event-processor"
role = aws_iam_role.lambda_exec.arn
handler = "index.lambda_handler"
runtime = "python3.9"
filename = "lambda_package.zip"
environment {
variables = {
TARGET_QUEUE_ID = "YOUR-GENESYS-QUEUE-UUID-HERE"
}
}
}
Note on Infrastructure Filtering:
The EventBridge rule above filters for conversation.end globally. It does not filter by Queue ID at the infrastructure level because Genesys Cloud does not include the Queue ID in the top-level EventBridge metadata (it is nested in detail.queue.id). Therefore, the Lambda function must perform the Queue ID check. The infrastructure rule only saves you from receiving conversation.start, participant.added, etc.
Complete Working Example
Below is the complete, runnable Python script. You can test this locally by simulating the EventBridge payload.
import json
import os
import logging
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field, validator
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Data Models ---
class QueueInfo(BaseModel):
id: str
name: str
class ConversationEndDetail(BaseModel):
conversationId: str
queue: Optional[QueueInfo] = None
direction: Optional[str] = None
wrapupCode: Optional[str] = None
# Add other relevant fields from Genesys payload if needed
class Config:
populate_by_name = True
class GenesysEventBridgeEvent(BaseModel):
id: str
source: str
account: str
time: str
region: str
resources: List[str]
detail_type: str = Field(..., alias="detail-type")
detail: ConversationEndDetail
@validator('source')
def validate_source(cls, v):
if not v.startswith("com.genesys.cloud"):
raise ValueError(f"Invalid source: {v}")
return v
# --- Core Logic ---
def is_target_queue_event(event_dict: Dict[str, Any], target_queue_id: str) -> bool:
"""
Validates and filters the event.
"""
try:
event = GenesysEventBridgeEvent.parse_obj(event_dict)
# 1. Check Detail Type
if event.detail_type != "conversation.end":
logger.debug(f"Event {event.id} skipped: wrong detail-type '{event.detail_type}'")
return False
# 2. Check Queue Presence
if event.detail.queue is None:
logger.debug(f"Event {event.id} skipped: no queue association")
return False
# 3. Check Queue ID Match
if event.detail.queue.id != target_queue_id:
logger.debug(f"Event {event.id} skipped: queue mismatch ({event.detail.queue.id} != {target_queue_id})")
return False
return True
except Exception as e:
logger.error(f"Event parsing failed: {e}")
return False
def process_event(event: Dict[str, Any]) -> None:
"""
Placeholder for business logic.
"""
detail = event.get("detail", {})
conv_id = detail.get("conversationId")
queue_name = detail.get("queue", {}).get("name")
logger.info(f"SUCCESS: Processing end event for Conversation {conv_id} from Queue {queue_name}")
# --- Simulation/Test Harness ---
def simulate_event_bridge_trigger():
"""
Simulates incoming EventBridge events to test the filter logic.
"""
# Define Target Queue ID (Mock)
TARGET_QUEUE_ID = "12345678-1234-1234-1234-123456789012"
# Event 1: Correct Queue, conversation.end (Should PASS)
event_pass = {
"id": "evt-001",
"source": "com.genesys.cloud",
"account": "111222333444",
"time": "2023-10-27T10:00:00Z",
"region": "us-east-1",
"resources": [],
"detail-type": "conversation.end",
"detail": {
"conversationId": "conv-abc-123",
"queue": {
"id": "12345678-1234-1234-1234-123456789012",
"name": "Sales Support"
},
"direction": "inbound",
"wrapupCode": "Sale Closed"
}
}
# Event 2: Wrong Queue, conversation.end (Should FAIL)
event_fail_queue = {
"id": "evt-002",
"source": "com.genesys.cloud",
"account": "111222333444",
"time": "2023-10-27T10:01:00Z",
"region": "us-east-1",
"resources": [],
"detail-type": "conversation.end",
"detail": {
"conversationId": "conv-def-456",
"queue": {
"id": "99999999-9999-9999-9999-999999999999",
"name": "Billing Support"
},
"direction": "inbound",
"wrapupCode": "Refund Issued"
}
}
# Event 3: Correct Queue, conversation.start (Should FAIL)
event_fail_type = {
"id": "evt-003",
"source": "com.genesys.cloud",
"account": "111222333444",
"time": "2023-10-27T10:02:00Z",
"region": "us-east-1",
"resources": [],
"detail-type": "conversation.start",
"detail": {
"conversationId": "conv-ghi-789",
"queue": {
"id": "12345678-1234-1234-1234-123456789012",
"name": "Sales Support"
}
}
}
# Event 4: No Queue (Direct Agent Call) (Should FAIL)
event_fail_no_queue = {
"id": "evt-004",
"source": "com.genesys.cloud",
"account": "111222333444",
"time": "2023-10-27T10:03:00Z",
"region": "us-east-1",
"resources": [],
"detail-type": "conversation.end",
"detail": {
"conversationId": "conv-jkl-012",
"queue": None,
"direction": "internal"
}
}
events = [event_pass, event_fail_queue, event_fail_type, event_fail_no_queue]
print(f"--- Running Filter Simulation for Target Queue: {TARGET_QUEUE_ID} ---")
for event in events:
if is_target_queue_event(event, TARGET_QUEUE_ID):
process_event(event)
else:
logger.info(f"Event {event['id']} filtered out.")
if __name__ == "__main__":
simulate_event_bridge_trigger()
Common Errors & Debugging
Error: detail.queue is None
- What causes it: The conversation was not routed through a queue. This happens for direct calls to agents, internal calls, or API-initiated conversations.
- How to fix it: Ensure your filter logic explicitly checks
if event.detail.queue is Nonebefore accessingevent.detail.queue.id. If your business logic requires queue data, simply skip these events.
Error: KeyError: 'queue' or AttributeError: 'NoneType' object has no attribute 'id'
- What causes it: Accessing nested dictionary keys without validation.
- How to fix it: Use Pydantic models (as shown in Step 1) or defensive dictionary access (
detail.get("queue", {}).get("id")). Never assume the payload structure is perfect.
Error: EventBridge Rule Not Triggering
- What causes it: The EventBridge rule
detail-typedoes not match exactly. Genesys Cloud usesconversation.end(lowercase). - How to fix it: Verify the EventBridge rule JSON pattern. It must be
"detail-type": ["conversation.end"]. Note that Genesys Cloud sends the source ascom.genesys.cloud.
Error: 429 Too Many Requests (If calling Genesys API inside Lambda)
- What causes it: If your
process_eventfunction calls back into Genesys Cloud (e.g., to update a custom attribute), you may hit rate limits if many conversations end simultaneously. - How to fix it: Implement exponential backoff in your HTTP client. Better yet, write the event to an SQS queue inside the Lambda and have a separate worker process consume the queue and call Genesys APIs. This decouples the ingestion rate from the processing rate.