Filter EventBridge Events for Specific Queue Conversation Ends in Genesys Cloud
What You Will Build
- You will configure a Genesys Cloud event subscription to trigger on
conversation.endevents. - You will apply an attribute filter to isolate events for a specific Queue ID.
- You will implement a Python worker using
boto3to receive, parse, and process these events from an AWS SQS queue.
Prerequisites
- Genesys Cloud Account: A Developer or Admin role with permissions to manage Event Subscriptions.
- AWS Account: An active SQS queue and an IAM role with
sqs:ReceiveMessage,sqs:DeleteMessage, andsqs:GetQueueAttributespermissions. - OAuth 2.0 Client: A Genesys Cloud OAuth client with the
event:subscribe:readandevent:subscribe:writescopes. - Python 3.9+: With
pipinstalled. - Dependencies:
requests,boto3,purecloud-platform-client.
pip install requests boto3 purecloud-platform-client
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API calls. You must obtain an access token before creating or managing event subscriptions.
import requests
import os
from typing import Optional
def get_genesys_access_token(
client_id: str,
client_secret: str,
environment: str = "mypurecloud.com"
) -> str:
"""
Retrieves a Genesys Cloud OAuth access token.
Args:
client_id: The OAuth Client ID from Genesys Cloud.
client_secret: The OAuth Client Secret.
environment: The Genesys Cloud environment suffix (e.g., mypurecloud.com).
Returns:
str: The access token.
Raises:
requests.exceptions.HTTPError: If authentication fails.
"""
url = f"https://login.{environment}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "event:subscribe:read event:subscribe:write"
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
return token_data.get("access_token")
# Example usage
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ACCESS_TOKEN = get_genesys_access_token(CLIENT_ID, CLIENT_SECRET)
Implementation
Step 1: Create the Event Subscription with Attribute Filtering
Genesys Cloud Event Subscriptions allow filtering at the source using JSONata expressions in the attributeFilter field. This is critical for reducing noise. If you do not filter here, your AWS infrastructure receives every conversation.end event in your entire organization, which incurs unnecessary costs and processing load.
We will create a subscription that:
- Subscribes to
event:conversation:end. - Filters for a specific
queueId. - Targets an AWS SQS queue as the endpoint.
Required Scope: event:subscribe:write
import json
import requests
def create_filtered_event_subscription(
access_token: str,
queue_id: str,
sqs_arn: str,
region: str,
environment: str = "mypurecloud.com"
) -> dict:
"""
Creates a Genesys Cloud event subscription filtered for a specific queue.
Args:
access_token: Valid Genesys Cloud OAuth token.
queue_id: The Genesys Cloud Queue ID to filter on.
sqs_arn: The ARN of the target AWS SQS queue.
region: The AWS region of the SQS queue.
environment: Genesys Cloud environment suffix.
Returns:
dict: The response from the Genesys Cloud API.
"""
url = f"https://api.{environment}/api/v2/events/subscriptions"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# The attributeFilter uses JSONata.
# We check if the 'queueId' attribute exists and matches our target.
# Note: In conversation events, the primary queue ID is often in the 'queue' object
# or directly as an attribute depending on the event version.
# For 'conversation.end', we often look at the 'queue' attribute if present.
# A robust filter checks for the presence of the queue ID.
payload = {
"name": f"Queue-Specific-End-{queue_id}",
"description": "Filters conversation.end events for a specific queue.",
"eventTypes": [
"event:conversation:end"
],
"attributeFilter": f"queueId = '{queue_id}'",
"destination": {
"type": "aws_sqs",
"awsSqs": {
"arn": sqs_arn,
"region": region
}
},
"status": "active"
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 201:
print("Subscription created successfully.")
return response.json()
elif response.status_code == 409:
print("Subscription already exists or conflict.")
return response.json()
else:
response.raise_for_status()
# Example Usage
# QUEUE_ID = "a1b2c3d4-e5f6-7890-1234-567890abcdef"
# SQS_ARN = "arn:aws:sqs:us-east-1:123456789012:my-queue-endpoints"
# create_filtered_event_subscription(ACCESS_TOKEN, QUEUE_ID, SQS_ARN, "us-east-1")
Critical Note on Attribute Filtering: The attributeFilter string queueId = '...' assumes the queueId is a top-level attribute in the event payload for conversation.end. In Genesys Cloud, conversation events include a queue object. If your conversations involve multiple queues (e.g., transfer), you may need a more complex JSONata expression like exists(queue) and queue.id = 'YOUR_QUEUE_ID'. For standard single-queue interactions, the direct attribute match is efficient.
Step 2: Verify the Subscription
Before processing events, verify the subscription is active and the filter is applied correctly.
Required Scope: event:subscribe:read
def get_subscription(access_token: str, subscription_id: str, environment: str = "mypurecloud.com") -> dict:
url = f"https://api.{environment}/api/v2/events/subscriptions/{subscription_id}"
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
# subscription_data = get_subscription(ACCESS_TOKEN, "subscription-id-from-step-1")
# print(json.dumps(subscription_data, indent=2))
Step 3: Process Events from AWS SQS
Genesys Cloud sends events to your SQS queue in a specific format. The message body is a JSON object containing metadata and the actual event payload. You must parse this structure to extract the conversation details.
The structure of the message body received from Genesys Cloud looks like this:
{
"eventType": "event:conversation:end",
"eventTime": "2023-10-27T10:00:00.000Z",
"attributes": {
"conversationId": "conv-123",
"queueId": "queue-456",
"queue": {
"id": "queue-456",
"name": "Support Queue"
},
"participants": [...],
"metrics": [...]
}
}
Here is the Python worker code to poll SQS, process valid events, and handle errors.
import boto3
import json
import time
from botocore.exceptions import ClientError
from typing import List, Dict, Any
class GenesysEventProcessor:
def __init__(self, sqs_queue_url: str, region: str):
self.sqs_client = boto3.client('sqs', region_name=region)
self.queue_url = sqs_queue_url
self.max_wait_time = 5 # Seconds for long polling
self.max_messages = 10
def receive_messages(self) -> List[Dict[str, Any]]:
"""
Polls SQS for new messages.
"""
try:
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.max_messages,
WaitTimeSeconds=self.max_wait_time
)
return response.get('Messages', [])
except ClientError as e:
print(f"Error receiving messages: {e}")
return []
def delete_message(self, receipt_handle: str) -> bool:
"""
Deletes a message from SQS after processing.
"""
try:
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
return True
except ClientError as e:
print(f"Error deleting message: {e}")
return False
def process_event(self, event_payload: Dict[str, Any]) -> None:
"""
Processes a single Genesys Cloud event.
"""
event_type = event_payload.get("eventType")
if event_type != "event:conversation:end":
print(f"Ignoring non-target event type: {event_type}")
return
attributes = event_payload.get("attributes", {})
conversation_id = attributes.get("conversationId")
queue_id = attributes.get("queueId")
# Double-check the queue ID locally for safety,
# even though Genesys should have filtered it.
# This is a defense-in-depth practice.
if not queue_id:
print("Warning: Queue ID not found in attributes.")
return
print(f"Processing conversation end: ID={conversation_id}, Queue={queue_id}")
# TODO: Implement your business logic here
# e.g., update CRM, trigger analytics, send notification
def run(self):
"""
Main loop to poll and process messages.
"""
print("Starting event processor...")
while True:
messages = self.receive_messages()
for message in messages:
receipt_handle = message.get('ReceiptHandle')
body = message.get('Body')
if not body:
print("Empty message body received.")
continue
try:
# Parse the JSON body
event_data = json.loads(body)
# Process the event
self.process_event(event_data)
# Delete the message from SQS upon success
if self.delete_message(receipt_handle):
print("Message deleted successfully.")
else:
print("Failed to delete message. It will be redelivered.")
except json.JSONDecodeError:
print("Failed to parse message body as JSON.")
# Optionally send to DLQ or log for debugging
except Exception as e:
print(f"Error processing message: {e}")
# Do not delete message; let SQS retry based on visibility timeout
# Small sleep to prevent tight looping if no messages
if not messages:
time.sleep(1)
# Usage
# processor = GenesysEventProcessor("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", "us-east-1")
# processor.run()
Complete Working Example
This script combines authentication, subscription creation (idempotent check), and event processing. It assumes you have environment variables set for credentials.
import os
import json
import time
import requests
import boto3
from botocore.exceptions import ClientError
from typing import Dict, Any, List, Optional
# Configuration
GENESYS_ENV = os.getenv("GENESYS_ENV", "mypurecloud.com")
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
TARGET_QUEUE_ID = os.getenv("TARGET_QUEUE_ID")
AWS_SQS_ARN = os.getenv("AWS_SQS_ARN")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_SQS_QUEUE_URL = os.getenv("AWS_SQS_QUEUE_URL")
class GenesysEventBridge:
def __init__(self):
self.access_token = None
self.subscription_id = None
def authenticate(self) -> bool:
if self.access_token:
return True
try:
url = f"https://login.{GENESYS_ENV}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": GENESYS_CLIENT_ID,
"client_secret": GENESYS_CLIENT_SECRET,
"scope": "event:subscribe:read event:subscribe:write"
}
resp = requests.post(url, headers=headers, data=data)
resp.raise_for_status()
self.access_token = resp.json().get("access_token")
return True
except Exception as e:
print(f"Authentication failed: {e}")
return False
def ensure_subscription_exists(self) -> bool:
"""
Checks if subscription exists, creates if not.
"""
if not self.authenticate():
return False
url = f"https://api.{GENESYS_ENV}/api/v2/events/subscriptions"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# List existing subscriptions to check for duplicates
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
subscriptions = resp.json().get("entities", [])
for sub in subscriptions:
if sub.get("attributeFilter") == f"queueId = '{TARGET_QUEUE_ID}'":
self.subscription_id = sub.get("id")
print(f"Subscription already exists: {self.subscription_id}")
return True
# Create new subscription
payload = {
"name": f"Filtered-End-{TARGET_QUEUE_ID}",
"description": "Auto-created filtered subscription for queue end events.",
"eventTypes": ["event:conversation:end"],
"attributeFilter": f"queueId = '{TARGET_QUEUE_ID}'",
"destination": {
"type": "aws_sqs",
"awsSqs": {
"arn": AWS_SQS_ARN,
"region": AWS_REGION
}
},
"status": "active"
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code == 201:
self.subscription_id = resp.json().get("id")
print(f"Subscription created: {self.subscription_id}")
return True
else:
print(f"Failed to create subscription: {resp.text}")
return False
def process_events(self):
if not self.ensure_subscription_exists():
print("Aborting: Could not establish subscription.")
return
sqs_client = boto3.client('sqs', region_name=AWS_REGION)
print("Starting event loop...")
while True:
try:
response = sqs_client.receive_message(
QueueUrl=AWS_SQS_QUEUE_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=5
)
messages = response.get('Messages', [])
for msg in messages:
try:
body = json.loads(msg['Body'])
self.handle_event(body)
sqs_client.delete_message(
QueueUrl=AWS_SQS_QUEUE_URL,
ReceiptHandle=msg['ReceiptHandle']
)
except Exception as e:
print(f"Error processing message: {e}")
if not messages:
time.sleep(1)
except ClientError as e:
print(f"AWS Error: {e}")
time.sleep(5)
def handle_event(self, event: Dict[str, Any]):
if event.get("eventType") == "event:conversation:end":
attrs = event.get("attributes", {})
conv_id = attrs.get("conversationId")
print(f"Received end event for conversation: {conv_id}")
# Add business logic here
if __name__ == "__main__":
if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, AWS_SQS_ARN, AWS_SQS_QUEUE_URL]):
print("Missing environment variables. Please set GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, AWS_SQS_ARN, AWS_SQS_QUEUE_URL")
else:
app = GenesysEventBridge()
app.process_events()
Common Errors & Debugging
Error: 403 Forbidden on Subscription Creation
- Cause: The OAuth token lacks the
event:subscribe:writescope. - Fix: Update your OAuth client in Genesys Cloud Admin > Platform > OAuth 2.0 Clients to include
event:subscribe:write. Regenerate the token.
Error: No Events Received in SQS
- Cause: The
attributeFiltersyntax is incorrect, or the Queue ID does not match exactly. - Fix:
- Verify the Queue ID in Genesys Cloud Admin > Routing > Queues.
- Check the
attributeFilterin the subscription response. It must be exactlyqueueId = 'YOUR_QUEUE_ID'. - Ensure the queue has active conversations ending. If no conversations end in that queue, no events are generated.
Error: 429 Too Many Requests
- Cause: Your Python script is polling Genesys Cloud APIs too frequently during setup.
- Fix: Implement exponential backoff in your
requestscalls. For the SQS worker, AWS handles rate limiting, but ensure you do not delete messages before processing completes.
Error: SQS Message Accumulation
- Cause: The
process_eventmethod throws an exception, and the message is not deleted. SQS will retry delivery. - Fix: Ensure your
try/exceptblock inprocess_eventscatches all exceptions. If processing fails logically (not technically), you may want to move the message to a Dead Letter Queue (DLQ) to prevent infinite retries.