How to filter Genesys Cloud EventBridge events to only receive conversation.end for a specific queue
What You Will Build
- You will configure a Genesys Cloud EventStream subscription via the REST API to emit events only when a conversation associated with a specific queue ends.
- You will implement a Python service that consumes these events from an AWS SQS queue (via EventBridge) and filters them locally to ensure strict adherence to the queue ID.
- This tutorial uses the Genesys Cloud Python SDK (
genesyscloud) and theboto3library for AWS interaction.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth Client with the scope
eventstream:readoreventstream:write(depending on whether you are reading or creating subscriptions). For this tutorial, you needeventstream:writeto create the subscription andeventstream:readto validate it. - AWS Account: An active AWS account with permissions to create EventBridge rules, SQS queues, and IAM roles.
- Genesys Cloud Region: You must know your Genesys Cloud region (e.g.,
mypurecloud.com,usw2.pure.cloud,euw1.pure.cloud). - Python Environment: Python 3.8+ with
genesyscloud(v2.20+) andboto3installed.
pip install genesyscloud boto3
- Queue ID: The UUID of the Genesys Cloud Queue you wish to monitor. You can find this in the Admin UI or via the
/api/v2/queuesendpoint.
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials Grant. You must obtain a valid access token before making any API calls. The token expires after one hour, so production code should handle refresh logic. For this tutorial, we will assume a function get_access_token handles the credential exchange.
import requests
import os
from typing import Optional
def get_access_token(client_id: str, client_secret: str, region: str) -> str:
"""
Exchange Client ID and Secret for an OAuth Access Token.
"""
token_url = f"https://{region}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, data=payload)
if response.status_code != 200:
raise Exception(f"Failed to obtain token: {response.text}")
return response.json().get("access_token")
Implementation
Step 1: Create the EventStream Subscription
The core filtering happens at the Genesys Cloud EventStream level. You must define a filter object in the subscription request. The filter syntax follows the Genesys Cloud EventStream query language.
To filter for conversation.end events for a specific queue, you must filter on:
event.type: Must beconversation.end.conversation.queueIds: Must contain the specific Queue ID.
Note: The conversation.queueIds field is an array. You filter using the contains operator or by checking if the specific ID is in the list.
The API Request
Endpoint: POST /api/v2/eventstreams/subscriptions
Required Scope: eventstream:write
Request Body:
{
"name": "Queue-Specific-End-Events",
"description": "Emits conversation.end events for Queue ID: 12345678-1234-1234-1234-123456789012",
"filter": {
"eventType": "conversation.end",
"conditions": [
{
"field": "conversation.queueIds",
"operator": "contains",
"value": "12345678-1234-1234-1234-123456789012"
}
]
},
"deliveryMethod": {
"type": "aws-eventbridge",
"config": {
"region": "us-east-1",
"accountId": "123456789012",
"eventBusName": "default"
}
}
}
Python SDK Implementation
from genesyscloud import rest
from genesyscloud.eventstream.api import event_stream_api
def create_eventstream_subscription(
api_client: rest.ApiClient,
queue_id: str,
aws_region: str,
aws_account_id: str,
event_bus_name: str = "default"
) -> dict:
"""
Creates an EventStream subscription for conversation.end events
filtered by a specific Queue ID.
"""
api_instance = event_stream_api.EventStreamApi(api_client)
# Construct the filter object
# The 'conditions' array allows for complex filtering logic.
# Here we ensure the event type is conversation.end AND the queue ID is present.
filter_obj = {
"eventType": "conversation.end",
"conditions": [
{
"field": "conversation.queueIds",
"operator": "contains",
"value": queue_id
}
]
}
# Construct the delivery configuration
delivery_config = {
"type": "aws-eventbridge",
"config": {
"region": aws_region,
"accountId": aws_account_id,
"eventBusName": event_bus_name
}
}
# Create the subscription request body
subscription_body = {
"name": f"Queue-{queue_id}-End-Events",
"description": f"Tracks conversation.end for queue {queue_id}",
"filter": filter_obj,
"deliveryMethod": delivery_config
}
try:
# Call the API
response = api_instance.post_eventstreams_subscriptions(body=subscription_body)
if response.status_code == 201:
print(f"Subscription created successfully. ID: {response.body.id}")
return response.body
else:
raise Exception(f"API Error {response.status_code}: {response.body}")
except Exception as e:
print(f"Failed to create subscription: {e}")
raise
Step 2: Configure AWS EventBridge Rule
While Genesys Cloud pushes events to the EventBridge bus, you need an EventBridge Rule to route these events to your consumer (e.g., SQS, Lambda, or HTTP Endpoint). The rule should filter further if necessary, although the Genesys filter is usually sufficient.
EventBridge Pattern:
{
"source": ["genesys.cloud"],
"detail-type": ["Genesys Cloud EventStream Event"],
"detail": {
"eventType": ["conversation.end"]
}
}
Boto3 Implementation to Create the Rule:
import boto3
import json
def setup_eventbridge_rule(
aws_region: str,
rule_name: str,
target_arn: str # ARN of SQS Queue or Lambda
) -> dict:
"""
Creates an EventBridge rule to capture Genesys conversation.end events.
"""
client = boto3.client('events', region_name=aws_region)
# The event pattern matches Genesys Cloud events
event_pattern = {
"source": ["genesys.cloud"],
"detail-type": ["Genesys Cloud EventStream Event"],
"detail": {
"eventType": ["conversation.end"]
}
}
try:
response = client.put_rule(
Name=rule_name,
EventPattern=json.dumps(event_pattern),
State='ENABLED',
Description='Rule to capture Genesys conversation.end events'
)
# Add the target (SQS/Lambda)
client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'Target1',
'Arn': target_arn
}
]
)
return response
except Exception as e:
print(f"Failed to setup EventBridge rule: {e}")
raise
Step 3: Process the Event Payload
When the event arrives at your AWS target (e.g., SQS), it will be wrapped in an EventBridge envelope. You must parse this envelope to extract the actual Genesys Cloud event data.
EventBridge Envelope Structure:
{
"version": "0",
"id": "abc123...",
"detail-type": "Genesys Cloud EventStream Event",
"source": "genesys.cloud",
"account": "123456789012",
"time": "2023-10-27T10:00:00Z",
"region": "us-east-1",
"resources": [],
"detail": {
"eventType": "conversation.end",
"conversationId": "conv-uuid",
"queueIds": ["queue-uuid"],
"participants": [ ... ],
"metrics": { ... }
}
}
Python Consumer Logic
import json
from typing import Dict, Any, Optional
def process_eventbridge_event(event: Dict[str, Any], expected_queue_id: str) -> Optional[Dict[str, Any]]:
"""
Processes an incoming EventBridge event.
Validates that the event is for the expected queue and extracts relevant data.
"""
# 1. Extract the Genesys Cloud payload from the EventBridge envelope
detail = event.get("detail")
if not detail:
raise ValueError("Invalid EventBridge event: missing 'detail' field")
event_type = detail.get("eventType")
if event_type != "conversation.end":
# This should be filtered by EventBridge rule, but double-check for safety
return None
# 2. Verify Queue ID (Redundant if EventStream filter is strict, but good practice)
queue_ids = detail.get("queueIds", [])
if expected_queue_id not in queue_ids:
# Log warning: Event passed filter but does not match expected queue
print(f"Warning: Event for queue {queue_ids} does not match expected {expected_queue_id}")
return None
# 3. Extract relevant conversation data
conversation_id = detail.get("conversationId")
participants = detail.get("participants", [])
# Example: Extract agent wrap-up code or disposition if available
# Note: Wrap-up codes are often in the 'participants' array under 'wrapupCode'
agent_wrapup = None
for p in participants:
if p.get("role") == "agent":
agent_wrapup = p.get("wrapupCode")
break
return {
"conversationId": conversation_id,
"queueId": expected_queue_id,
"agentWrapup": agent_wrapup,
"timestamp": event.get("time")
}
Complete Working Example
This script combines authentication, subscription creation, and a mock consumer logic. It assumes you have environment variables set for your credentials.
import os
import sys
import json
import boto3
from genesyscloud import rest
from genesyscloud.eventstream.api import event_stream_api
# Configuration
GENESYS_REGION = os.getenv("GENESYS_REGION", "mypurecloud.com")
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
TARGET_QUEUE_ID = os.getenv("TARGET_QUEUE_ID") # The Genesys Queue UUID
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")
SQS_QUEUE_ARN = os.getenv("SQS_QUEUE_ARN") # ARN of the SQS queue to receive events
def get_access_token() -> str:
token_url = f"https://{GENESYS_REGION}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": GENESYS_CLIENT_ID,
"client_secret": GENESYS_CLIENT_SECRET
}
response = requests.post(token_url, data=payload)
if response.status_code != 200:
raise Exception(f"OAuth Error: {response.text}")
return response.json().get("access_token")
def setup_genesis_subscription(api_client: rest.ApiClient) -> str:
api_instance = event_stream_api.EventStreamApi(api_client)
filter_obj = {
"eventType": "conversation.end",
"conditions": [
{
"field": "conversation.queueIds",
"operator": "contains",
"value": TARGET_QUEUE_ID
}
]
}
delivery_config = {
"type": "aws-eventbridge",
"config": {
"region": AWS_REGION,
"accountId": AWS_ACCOUNT_ID,
"eventBusName": "default"
}
}
subscription_body = {
"name": f"Queue-{TARGET_QUEUE_ID}-End-Events",
"description": f"Tracks conversation.end for queue {TARGET_QUEUE_ID}",
"filter": filter_obj,
"deliveryMethod": delivery_config
}
response = api_instance.post_eventstreams_subscriptions(body=subscription_body)
if response.status_code == 201:
return response.body.id
else:
raise Exception(f"Failed to create subscription: {response.body}")
def setup_aws_rule(rule_name: str) -> None:
client = boto3.client('events', region_name=AWS_REGION)
event_pattern = {
"source": ["genesys.cloud"],
"detail-type": ["Genesys Cloud EventStream Event"],
"detail": {
"eventType": ["conversation.end"]
}
}
client.put_rule(
Name=rule_name,
EventPattern=json.dumps(event_pattern),
State='ENABLED',
Description='Rule to capture Genesys conversation.end events'
)
client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'Target1',
'Arn': SQS_QUEUE_ARN
}
]
)
def main():
if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, SQS_QUEUE_ARN]):
print("Missing environment variables. Check README.")
sys.exit(1)
try:
# 1. Authenticate
token = get_access_token()
config = rest.Configuration()
config.host = f"https://{GENESYS_REGION}"
config.access_token = token
api_client = rest.ApiClient(config)
# 2. Create Genesys Subscription
sub_id = setup_genesis_subscription(api_client)
print(f"Created Genesys Subscription: {sub_id}")
# 3. Setup AWS EventBridge Rule
rule_name = f"Genesys-Queue-{TARGET_QUEUE_ID}-Ends"
setup_aws_rule(rule_name)
print(f"Created EventBridge Rule: {rule_name}")
print("Setup complete. Events will now flow to your SQS queue.")
except Exception as e:
print(f"Error during setup: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request on Subscription Creation
- Cause: The filter syntax is invalid, or the Queue ID does not exist.
- Fix: Verify the Queue ID exists via
GET /api/v2/queues/{queueId}. Ensure theconditionsarray uses the correct field nameconversation.queueIdsand operatorcontains. - Debug Code:
# Check if queue exists before creating subscription
api_instance = queues_api.QueuesApi(api_client)
try:
queue = api_instance.get_queue(queue_id)
print(f"Queue found: {queue.body.name}")
except Exception as e:
print(f"Queue not found or error: {e}")
Error: Events Not Arriving in SQS
- Cause: The EventBridge Rule pattern does not match the event source, or the IAM role lacks permissions to write to the SQS queue.
- Fix:
- Check CloudWatch Logs for EventBridge rule failures.
- Ensure the SQS Queue Policy allows
events.amazonaws.comto send messages. - Verify the Genesys Cloud subscription status is
active.
- Debug Code:
# Check subscription status
api_instance = event_stream_api.EventStreamApi(api_client)
response = api_instance.get_eventstreams_subscriptions(subscription_id=sub_id)
print(f"Subscription Status: {response.body.status}")
Error: 401 Unauthorized
- Cause: The OAuth token has expired or is invalid.
- Fix: Implement token refresh logic. The
get_access_tokenfunction should be called before every API request if the token is older than 55 minutes.