Filter Genesys Cloud EventBridge Events for Specific Queue Conversation Ends
What You Will Build
- A Python script that registers an Event Subscription in Genesys Cloud to listen for
conversation.endevents. - Logic to filter these events so that only conversations associated with a specific queue are processed.
- A working AWS Lambda function skeleton that receives the EventBridge payload, validates the source, and extracts conversation details.
Prerequisites
- Platform: Genesys Cloud CX
- OAuth Client Type: Private Private Client (for service-to-service authentication) or Public Client with PKCE. This tutorial uses a Private Client for simplicity.
- Required Scopes:
event:subscriptions:read,event:subscriptions:write,queue:read - Language: Python 3.9+
- Dependencies:
genesys-cloud-sdk-python,boto3(for AWS Lambda context),pydantic(for payload validation) - AWS Account: An active AWS account with permissions to create EventBridge rules and Lambda functions.
Authentication Setup
Genesys Cloud API access requires OAuth 2.0. For server-side integrations like EventBridge subscriptions, a Private Client is the standard approach. You must obtain a Client ID and Client Secret from the Genesys Cloud Admin Console under Organization > Integrations > OAuth Clients.
The following Python function handles the token acquisition and caching. In production, you should implement a token cache to avoid requesting a new token for every event, as the token is valid for 3600 seconds by default.
import requests
import os
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: int = 0
def get_access_token(self) -> str:
"""
Retrieves an OAuth access token.
In a production Lambda, you would cache this token in a global variable
or use a shared layer to avoid re-authenticating on every invocation.
"""
# Simple check for token expiration logic would go here
# For this tutorial, we fetch fresh every time to ensure validity
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {requests.auth.HTTPBasicAuth(self.client_id, self.client_secret).encode()"
}
data = {
"grant_type": "client_credentials"
}
try:
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = token_data.get("expires_in", 3600)
return self.access_token
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Authentication failed: Invalid Client ID or Secret.")
elif response.status_code == 403:
raise Exception("Authentication failed: Client lacks permission to request tokens.")
else:
raise Exception(f"Authentication error: {e}")
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during authentication: {e}")
# Usage
# auth = GenesysAuth(os.environ["GENESYS_CLIENT_ID"], os.environ["GENESYS_CLIENT_SECRET"])
# token = auth.get_access_token()
Implementation
Step 1: Identify the Target Queue ID
To filter by queue, you need the specific id of the queue in Genesys Cloud. The EventBridge filter uses this ID. You can retrieve this via the Queues API.
import requests
import json
def get_queue_id(auth: GenesysAuth, queue_name: str) -> Optional[str]:
"""
Searches for a queue by name and returns its ID.
"""
token = auth.get_access_token()
url = f"{auth.base_url}/api/v2/routing/queues"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Query parameters for pagination and search
params = {
"pageSize": 100,
"page": 1
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
queues = response.json().get("entities", [])
for queue in queues:
if queue.get("name") == queue_name:
return queue.get("id")
return None
except requests.exceptions.HTTPError as e:
print(f"Error fetching queues: {e}")
return None
# Example usage:
# queue_id = get_queue_id(auth, "Support-English")
# if not queue_id:
# print("Queue not found.")
Step 2: Register the Event Subscription
Genesys Cloud EventBridge integration works by creating an Event Subscription in Genesys Cloud that points to an EventBridge Partner Event Source. You must first have the Partner Event Source configured in the Genesys Admin Console.
- Go to Admin > Integrations > EventBridge.
- Connect your AWS Account.
- Note the Event Source ARN provided by AWS after the connection is established.
Now, use the Genesys SDK to create the subscription. The critical part is the eventTypes and the filter expression. Genesys Cloud supports filtering at the source for conversation.end events.
Note: Genesys Cloud’s native EventBridge integration currently pushes events for specific event types. Fine-grained filtering by queue ID is often done via EventBridge Rules in AWS rather than the Genesys subscription itself, because the Genesys Event Subscription API primarily filters by eventTypes. However, we will set up the subscription to listen to conversation.end and then apply the queue filter in the AWS EventBridge Rule.
from genesyscloud import event_subscription_api, event_subscription_builder
def create_event_subscription(auth: GenesysAuth, event_source_arn: str, subscription_name: str) -> dict:
"""
Creates an Event Subscription in Genesys Cloud that sends events to EventBridge.
"""
from purecloudplatformclientv2 import ApiClient, Configuration, EventSubscriptionApi, EventSubscriptionEventTypes, EventSubscriptionFilter
# Initialize SDK
configuration = Configuration()
configuration.host = auth.base_url
configuration.access_token = auth.get_access_token()
api_client = ApiClient(configuration)
event_api = EventSubscriptionApi(api_client)
# Define the event types to listen to
# We specifically want conversation.end
event_types = [
EventSubscriptionEventTypes().event_type("conversation.end")
]
# Define the filter
# Note: Genesys Event Subscriptions allow basic filtering.
# For queue-specific filtering, it is often more robust to capture all conversation.end
# and filter in AWS EventBridge, as the Genesys filter syntax for nested objects
# can be complex and limited in real-time push scenarios.
# Here we create a basic subscription.
subscription_body = event_subscription_builder.EventSubscriptionBuilder(
name=subscription_name,
description="Subscription for Queue Conversation Ends",
event_types=event_types,
delivery_mode="eventbridge",
eventbridge_config=event_subscription_builder.EventBridgeDeliveryConfig(
event_source_arn=event_source_arn
)
).build()
try:
response = event_api.post_eventsubscriptions(body=subscription_body)
print(f"Subscription created with ID: {response.id}")
return response
except Exception as e:
print(f"Error creating subscription: {e}")
raise e
Step 3: Configure AWS EventBridge Rule for Queue Filtering
Since Genesys Cloud pushes the event to EventBridge, you must create an EventBridge Rule to filter these events. This is where the “only receive for a specific queue” logic lives.
The event payload from Genesys Cloud looks like this (simplified):
{
"version": "0",
"id": "unique-event-id",
"detail-type": "conversation.end",
"source": "genesys.cloud",
"account": "123456789012",
"time": "2023-10-27T10:00:00Z",
"region": "us-east-1",
"resources": [],
"detail": {
"conversationId": "conv-123",
"eventType": "conversation.end",
"entity": {
"id": "conv-123",
"type": "conversation",
"attributes": {},
"wrapUpCode": null,
"queue": {
"id": "queue-id-abc",
"name": "Support-English"
},
"participants": []
}
}
}
To filter for a specific queue, you create an EventBridge Rule with the following pattern:
{
"source": ["genesys.cloud"],
"detail-type": ["conversation.end"],
"detail": {
"entity": {
"queue": {
"id": ["YOUR_QUEUE_ID_HERE"]
}
}
}
}
Steps to create this rule in AWS Console:
- Go to EventBridge > Rules > Create Rule.
- Name:
GenesysQueueConversationEnd. - Event Source: Event Pattern.
- Paste the JSON pattern above, replacing
YOUR_QUEUE_ID_HEREwith the ID from Step 1. - Target: Select your Lambda Function.
Step 4: Process the Event in AWS Lambda
Now, implement the Lambda function that receives the filtered event. This function should validate the payload and process the business logic.
import json
import os
from typing import Dict, Any
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for Genesys Cloud EventBridge events.
"""
logger.info(f"Received event: {json.dumps(event)}")
# 1. Validate the event structure
if event.get("source") != "genesys.cloud":
logger.error(f"Invalid source: {event.get('source')}")
return {
"statusCode": 400,
"body": json.dumps({"message": "Invalid event source"})
}
if event.get("detail-type") != "conversation.end":
logger.error(f"Invalid detail-type: {event.get('detail-type')}")
return {
"statusCode": 400,
"body": json.dumps({"message": "Invalid event type"})
}
detail = event.get("detail", {})
entity = detail.get("entity", {})
# 2. Extract Conversation and Queue Information
conversation_id = entity.get("id")
queue_id = entity.get("queue", {}).get("id")
queue_name = entity.get("queue", {}).get("name")
# 3. Double-check queue ID (Defense in Depth)
# Although EventBridge filtered this, it is good practice to verify.
expected_queue_id = os.environ.get("EXPECTED_QUEUE_ID")
if queue_id != expected_queue_id:
logger.warning(f"Queue ID mismatch. Expected: {expected_queue_id}, Received: {queue_id}")
# Depending on your needs, you might return an error or just log it.
# Since EventBridge filtered it, this should rarely happen.
# 4. Process the Conversation End
# Example: Save to DynamoDB, send to Slack, update CRM
try:
process_conversation_end(conversation_id, queue_id, queue_name, entity)
return {
"statusCode": 200,
"body": json.dumps({"message": "Event processed successfully"})
}
except Exception as e:
logger.error(f"Error processing event: {e}")
# Return 200 to EventBridge to acknowledge receipt,
# but log the error for retry logic if configured.
# If you want EventBridge to retry, return a non-2xx status.
return {
"statusCode": 500,
"body": json.dumps({"message": f"Processing error: {str(e)}"})
}
def process_conversation_end(conversation_id: str, queue_id: str, queue_name: str, entity: Dict[str, Any]):
"""
Placeholder for business logic.
"""
logger.info(f"Processing conversation end for ID: {conversation_id} in queue: {queue_name}")
# Example: Extract participant details
participants = entity.get("participants", [])
for participant in participants:
role = participant.get("role")
if role == "agent":
agent_id = participant.get("user", {}).get("id")
logger.info(f"Agent {agent_id} completed conversation {conversation_id}")
# Add your integration logic here (e.g., boto3 call to DynamoDB)
Complete Working Example
This is a consolidated Python script that can be run locally to test the Genesys side (Authentication + Queue Lookup + Subscription Creation). The AWS side requires the Lambda code and EventBridge configuration described above.
import os
import sys
import requests
from typing import Optional
# Add your local path to the Genesys SDK if not installed globally
# sys.path.append('/path/to/genesys-cloud-sdk-python')
try:
from purecloudplatformclientv2 import ApiClient, Configuration, EventSubscriptionApi, EventSubscriptionEventTypes
from genesyscloud import event_subscription_builder
except ImportError:
print("Please install the Genesys Cloud SDK: pip install purecloudplatformclientv2")
sys.exit(1)
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
def get_access_token(self) -> str:
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {requests.auth.HTTPBasicAuth(self.client_id, self.client_secret).encode()"
}
data = {"grant_type": "client_credentials"}
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
return response.json()["access_token"]
def get_queue_id(auth: GenesysAuth, queue_name: str) -> Optional[str]:
token = auth.get_access_token()
url = f"{auth.base_url}/api/v2/routing/queues"
headers = {"Authorization": f"Bearer {token}"}
params = {"pageSize": 100, "page": 1}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
queues = response.json().get("entities", [])
for queue in queues:
if queue.get("name") == queue_name:
return queue.get("id")
return None
def create_event_subscription(auth: GenesysAuth, event_source_arn: str, subscription_name: str) -> dict:
configuration = Configuration()
configuration.host = auth.base_url
configuration.access_token = auth.get_access_token()
api_client = ApiClient(configuration)
event_api = EventSubscriptionApi(api_client)
event_types = [EventSubscriptionEventTypes().event_type("conversation.end")]
subscription_body = event_subscription_builder.EventSubscriptionBuilder(
name=subscription_name,
description="Subscription for Queue Conversation Ends",
event_types=event_types,
delivery_mode="eventbridge",
eventbridge_config=event_subscription_builder.EventBridgeDeliveryConfig(
event_source_arn=event_source_arn
)
).build()
response = event_api.post_eventsubscriptions(body=subscription_body)
return response
if __name__ == "__main__":
CLIENT_ID = os.environ.get("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.environ.get("GENESYS_CLIENT_SECRET")
QUEUE_NAME = os.environ.get("TARGET_QUEUE_NAME", "Support-English")
EVENT_SOURCE_ARN = os.environ.get("EVENTBRIDGE_SOURCE_ARN")
if not all([CLIENT_ID, CLIENT_SECRET, EVENT_SOURCE_ARN]):
print("Missing environment variables. Please set GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, EVENTBRIDGE_SOURCE_ARN")
sys.exit(1)
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET)
print(f"Looking up queue ID for: {QUEUE_NAME}")
queue_id = get_queue_id(auth, QUEUE_NAME)
if not queue_id:
print("Queue not found. Please check the queue name.")
sys.exit(1)
print(f"Found Queue ID: {queue_id}")
print("IMPORTANT: Use this Queue ID in your AWS EventBridge Rule filter.")
print("Creating Event Subscription...")
try:
subscription = create_event_subscription(auth, EVENT_SOURCE_ARN, "QueueEndSub")
print(f"Subscription created successfully. ID: {subscription.id}")
except Exception as e:
print(f"Failed to create subscription: {e}")
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Invalid Client ID, Client Secret, or expired token.
- Fix: Verify the credentials in the Genesys Admin Console. Ensure the OAuth Client has the
event:subscriptions:writescope.
Error: 403 Forbidden
- Cause: The OAuth Client does not have sufficient permissions.
- Fix: In the Genesys Admin Console, go to Admin > Security > OAuth Clients. Edit your client and ensure it is assigned to a role that has permissions for Event Subscriptions and Queues.
Error: EventBridge Rule Not Triggering
- Cause: The filter pattern in EventBridge does not match the incoming payload.
- Fix:
- Check the EventBridge Console > Event Buses > Default > Archive (enable archive to see all events).
- Look for a
conversation.endevent. - Compare the
detail.entity.queue.idin the archived event with the ID in your Rule pattern. - Ensure the Rule pattern uses
detail.entity.queue.idexactly. Note that Genesys payloads are nested.
Error: Genesys Subscription Fails to Create
- Cause: The
event_source_arnis invalid or the EventBridge Partner Event Source is not active. - Fix: Go to Admin > Integrations > EventBridge in Genesys. Ensure the status is Connected. Copy the ARN directly from this page.