Filter EventBridge Events for Conversation End by Queue in Python
What You Will Build
- This tutorial demonstrates how to configure an AWS EventBridge rule that filters Genesys Cloud
conversation.endevents to trigger only when a specific queue is involved. - The solution uses the Genesys Cloud Python SDK to identify the target queue ID and the AWS SDK (boto3) to create the precise EventBridge rule.
- The implementation is written in Python 3.9+ using
requestsfor API validation andboto3for infrastructure management.
Prerequisites
- Genesys Cloud OAuth Client: A Service Account or OAuth Client with the following scopes:
queue:queue:read(to look up queue IDs)analytics:conversation:read(optional, for verification)
- AWS Account: An IAM user or role with permissions to create EventBridge rules and targets (
events:PutRule,events:PutTargets). - Genesys Cloud Integration: The “Amazon EventBridge” integration must be enabled and active in your Genesys Cloud organization.
- SDKs:
genesyscloud-python-sdk(latest version)boto3(latest version)
- Environment Variables:
GENESYS_CLOUD_CLIENT_IDGENESYS_CLOUD_CLIENT_SECRETGENESYS_CLOUD_REGION(e.g.,my.genesys.cloud)AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_DEFAULT_REGION(e.g.,us-east-1)
Authentication Setup
Before interacting with either platform, you must establish authenticated sessions. Genesys Cloud uses OAuth 2.0 Client Credentials flow, while AWS uses IAM signatures handled by boto3.
Genesys Cloud OAuth Helper
Create a utility function to handle token acquisition and caching. This avoids re-authenticating on every script run.
import requests
import time
import os
from typing import Optional
# Cache for the access token
_token_cache: dict = {}
def get_genesys_access_token() -> str:
"""
Retrieves an OAuth access token from Genesys Cloud.
Implements basic caching with a 50-minute TTL to avoid unnecessary requests.
"""
current_time = time.time()
# Check cache
if "token" in _token_cache and "expiry" in _token_cache:
if current_time < _token_cache["expiry"]:
return _token_cache["token"]
# Fetch new token
client_id = os.environ["GENESYS_CLOUD_CLIENT_ID"]
client_secret = os.environ["GENESYS_CLOUD_CLIENT_SECRET"]
region = os.environ["GENESYS_CLOUD_REGION"]
url = f"https://{region}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Authentication failed: Invalid Client ID or Secret.") from e
elif response.status_code == 403:
raise Exception("Authentication failed: Client does not have required scopes.") from e
else:
raise Exception(f"Unexpected HTTP error: {response.status_code}") from e
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]
# Cache with a slight buffer
_token_cache["token"] = access_token
_token_cache["expiry"] = current_time + (expires_in - 60)
return access_token
AWS Boto3 Session
Initialize the boto3 session for EventBridge interactions.
import boto3
from botocore.exceptions import ClientError
def get_eventbridge_client():
"""
Creates an AWS EventBridge client using default credential chain.
"""
try:
client = boto3.client('events')
# Verify connectivity and permissions
client.list_rules()
return client
except ClientError as e:
if e.response['Error']['Code'] == 'UnauthorizedOperation':
raise Exception("AWS IAM permissions missing for EventBridge.") from e
raise Exception(f"Failed to initialize AWS EventBridge client: {e}") from e
Implementation
Step 1: Identify the Queue ID in Genesys Cloud
EventBridge filters operate on JSON attributes. To filter by queue, you must know the exact UUID of the queue in Genesys Cloud. You cannot filter by queue name directly in the EventBridge rule pattern; you must use the ID.
Use the Genesys Cloud Python SDK to search for the queue by name and return its ID.
from genesyscloud.platform.client import PlatformClient
from genesyscloud.queue.api import QueueApi
from genesyscloud.auth import AuthClient
def get_queue_id_by_name(queue_name: str) -> str:
"""
Searches Genesys Cloud for a queue by name and returns its ID.
Args:
queue_name: The exact name of the queue in Genesys Cloud.
Returns:
The UUID string of the queue.
Raises:
ValueError: If the queue is not found or multiple queues match.
"""
# Initialize Auth and Platform Client
auth_client = AuthClient(
client_id=os.environ["GENESYS_CLOUD_CLIENT_ID"],
client_secret=os.environ["GENESYS_CLOUD_CLIENT_SECRET"],
region=os.environ["GENESYS_CLOUD_REGION"]
)
platform_client = PlatformClient()
queue_api = QueueApi(platform_client)
try:
# Search for queues matching the name
# Note: The search endpoint is /api/v2/queues/search
response = queue_api.post_queue_search(
body={
"query": queue_name,
"size": 20
}
)
except Exception as e:
raise Exception(f"Failed to search queues: {e}") from e
if response.entities is None or len(response.entities) == 0:
raise ValueError(f"No queue found with name: {queue_name}")
if len(response.entities) > 1:
# In case of multiple matches, prefer exact match or return first
exact_match = next((q for q in response.entities if q.name == queue_name), None)
if exact_match:
return exact_match.id
else:
raise ValueError(f"Multiple queues found for '{queue_name}'. Please specify a unique name.")
return response.entities[0].id
Step 2: Construct the EventBridge Filter Pattern
The core of this tutorial is constructing the JSON filter pattern. Genesys Cloud sends events to EventBridge with a specific structure. For conversation.end events, the queue information is nested within the detail object.
The relevant path for the queue ID is detail.queue.id.
Critical Note on Event Structure:
Genesys Cloud EventBridge events follow this schema:
{
"source": "genesys.cloud",
"detail-type": "conversation.end",
"detail": {
"conversationId": "...",
"queue": {
"id": "uuid-of-queue",
"name": "Queue Name"
},
...
}
}
You must filter on detail.queue.id. However, a conversation may not always have a queue assigned (e.g., direct skill-based routing without a queue, or abandoned calls before queue assignment). Therefore, the filter must check for the existence of the queue object and then match the ID.
Construct the filter pattern in Python:
def create_eventbridge_filter_pattern(queue_id: str) -> dict:
"""
Constructs the EventBridge event pattern for filtering conversation.end events
for a specific queue.
"""
# The pattern matches the top-level attributes and the nested detail queue id
pattern = {
"source": ["genesys.cloud"],
"detail-type": ["conversation.end"],
"detail": {
"queue": {
"id": [queue_id]
}
}
}
return pattern
Step 3: Create the EventBridge Rule
Now, use boto3 to create the rule. This rule will capture only events that match the pattern defined in Step 2.
def create_filtered_eventbridge_rule(rule_name: str, queue_id: str, target_arn: str) -> str:
"""
Creates an EventBridge rule that filters Genesys Cloud conversation.end events
for a specific queue.
Args:
rule_name: A unique name for the rule (e.g., 'GC_ConvEnd_SupportQueue').
queue_id: The UUID of the Genesys Cloud queue.
target_arn: The ARN of the target (e.g., Lambda function, SNS topic, SQS queue).
Returns:
The ARN of the created rule.
"""
client = get_eventbridge_client()
filter_pattern = create_eventbridge_filter_pattern(queue_id)
# Convert dict to JSON string as required by boto3
import json
pattern_json = json.dumps(filter_pattern)
try:
response = client.put_rule(
Name=rule_name,
EventPattern=pattern_json,
State='ENABLED',
Description=f'Genesys Cloud conversation.end filter for Queue ID: {queue_id}',
RoleArn=os.environ.get('AWS_EVENTBRIDGE_ROLE_ARN', '') # Optional: if cross-account or specific role needed
)
rule_arn = response['RuleArn']
print(f"Rule created successfully: {rule_arn}")
# Add the target to the rule
client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'Target1',
'Arn': target_arn
}
]
)
print(f"Target added to rule: {target_arn}")
return rule_arn
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'LimitExceededException':
raise Exception("EventBridge rule limit exceeded. Check existing rules.") from e
elif error_code == 'InvalidEventPatternException':
raise Exception(f"Invalid event pattern. Check JSON structure: {pattern_json}") from e
else:
raise Exception(f"AWS Error: {e}") from e
Step 4: Verification and Testing
To ensure the filter works, you can simulate a test event or monitor the rule’s metrics. However, the most reliable method is to check the rule configuration.
def verify_rule_configuration(rule_name: str) -> dict:
"""
Retrieves and prints the configuration of the created rule to verify the filter pattern.
"""
client = get_eventbridge_client()
try:
response = client.describe_rule(Name=rule_name)
print(f"Rule Name: {response['Name']}")
print(f"State: {response['State']}")
print(f"Event Pattern:")
import json
print(json.dumps(json.loads(response['EventPattern']), indent=2))
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print(f"Rule '{rule_name}' not found.")
else:
raise Exception(f"Failed to describe rule: {e}") from e
Complete Working Example
This script combines all steps into a single executable module. It authenticates with Genesys Cloud, finds the queue ID, constructs the filter, and creates the EventBridge rule.
import os
import sys
import time
import requests
import boto3
import json
from typing import Optional
from botocore.exceptions import ClientError
# --- Configuration ---
GENESYS_CLIENT_ID = os.environ.get("GENESYS_CLOUD_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.environ.get("GENESYS_CLOUD_CLIENT_SECRET")
GENESYS_REGION = os.environ.get("GENESYS_CLOUD_REGION", "my.genesys.cloud")
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
TARGET_ARN = os.environ.get("AWS_TARGET_ARN", "arn:aws:lambda:us-east-1:123456789012:function:MyGenesysHandler")
QUEUE_NAME = os.environ.get("TARGET_QUEUE_NAME", "General Support")
RULE_NAME = os.environ.get("EVENTBRIDGE_RULE_NAME", "GC_ConvEnd_SpecificQueue")
# --- Auth Helpers ---
_token_cache = {}
def get_genesys_access_token() -> str:
current_time = time.time()
if "token" in _token_cache and "expiry" in _token_cache:
if current_time < _token_cache["expiry"]:
return _token_cache["token"]
url = f"https://{GENESYS_REGION}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": GENESYS_CLIENT_ID,
"client_secret": GENESYS_CLIENT_SECRET
}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise Exception(f"Genesys Auth Error: {response.status_code} - {response.text}") from e
token_data = response.json()
_token_cache["token"] = token_data["access_token"]
_token_cache["expiry"] = current_time + (token_data["expires_in"] - 60)
return _token_cache["token"]
def get_eventbridge_client():
try:
return boto3.client('events', region_name=AWS_REGION)
except Exception as e:
raise Exception(f"Failed to init AWS Client: {e}") from e
# --- Genesys Logic ---
def get_queue_id_by_name(queue_name: str) -> str:
token = get_genesys_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = f"https://{GENESYS_REGION}/api/v2/queues/search"
body = {
"query": queue_name,
"size": 20
}
try:
response = requests.post(url, headers=headers, json=body)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise Exception(f"Genesys Queue Search Error: {response.status_code}") from e
data = response.json()
entities = data.get("entities", [])
if not entities:
raise ValueError(f"No queue found with name: {queue_name}")
# Prefer exact match
exact_match = next((q for q in entities if q["name"] == queue_name), None)
if exact_match:
return exact_match["id"]
# Fallback to first match
return entities[0]["id"]
# --- AWS Logic ---
def create_filtered_rule(queue_id: str, target_arn: str, rule_name: str):
client = get_eventbridge_client()
# Define the filter pattern
# This pattern ensures ONLY conversation.end events with the specific queue ID are passed
filter_pattern = {
"source": ["genesys.cloud"],
"detail-type": ["conversation.end"],
"detail": {
"queue": {
"id": [queue_id]
}
}
}
pattern_json = json.dumps(filter_pattern)
try:
# Create Rule
client.put_rule(
Name=rule_name,
EventPattern=pattern_json,
State='ENABLED',
Description=f'Filter for Queue ID: {queue_id}'
)
print(f"Rule '{rule_name}' created/updated.")
# Attach Target
client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': 'Target1',
'Arn': target_arn
}
]
)
print(f"Target {target_arn} attached to rule '{rule_name}'.")
except ClientError as e:
print(f"AWS Error: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
sys.exit(1)
# --- Main Execution ---
def main():
print(f"Starting setup for Queue: {QUEUE_NAME}")
try:
# 1. Get Queue ID
print("Fetching Queue ID from Genesys Cloud...")
queue_id = get_queue_id_by_name(QUEUE_NAME)
print(f"Found Queue ID: {queue_id}")
# 2. Create EventBridge Rule
print(f"Creating EventBridge Rule: {RULE_NAME}")
create_filtered_rule(queue_id, TARGET_ARN, RULE_NAME)
print("Setup complete. EventBridge will now filter conversation.end events for this queue.")
except ValueError as ve:
print(f"Configuration Error: {ve}")
except Exception as e:
print(f"Unexpected Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: InvalidEventPatternException
- Cause: The JSON structure of the filter pattern is malformed or contains invalid syntax.
- Fix: Ensure the
detailobject is correctly nested. Verify thatqueueis a dictionary andidis a list of strings. - Code Check:
# Correct "detail": {"queue": {"id": ["uuid"]}} # Incorrect (id is not a list) "detail": {"queue": {"id": "uuid"}}
Error: ResourceNotFoundException (Genesys Queue)
- Cause: The queue name provided does not exist in the Genesys Cloud organization, or the OAuth client lacks
queue:queue:readscope. - Fix: Verify the queue name in the Genesys Cloud Admin Console. Check the OAuth client scopes in the Genesys Cloud API Console.
Error: No Events Received
- Cause: The queue ID in the filter does not match the queue ID in the event payload. This often happens if the queue was deleted and recreated (new ID) or if the event payload structure changed.
- Fix:
- Temporarily create a rule with a broad filter (
"detail-type": ["conversation.end"]) and log all events to CloudWatch. - Trigger a test conversation end in the target queue.
- Inspect the logged event in CloudWatch Logs to verify the exact path of the queue ID (e.g.,
detail.queue.id). - Update the filter pattern to match the observed structure.
- Temporarily create a rule with a broad filter (
Error: AccessDeniedException (AWS)
- Cause: The IAM user/role does not have
events:PutRuleorevents:PutTargetspermissions. - Fix: Attach the
AmazonEventBridgeFullAccesspolicy or a custom policy allowing these actions to the IAM entity running the script.