Filter EventBridge Events for Conversation End by Queue in Python

Filter EventBridge Events for Conversation End by Queue in Python

What You Will Build

  • This tutorial demonstrates how to configure an AWS EventBridge rule that filters Genesys Cloud conversation.end events to trigger only when a specific queue is involved.
  • The solution uses the Genesys Cloud Python SDK to identify the target queue ID and the AWS SDK (boto3) to create the precise EventBridge rule.
  • The implementation is written in Python 3.9+ using requests for API validation and boto3 for infrastructure management.

Prerequisites

  • Genesys Cloud OAuth Client: A Service Account or OAuth Client with the following scopes:
    • queue:queue:read (to look up queue IDs)
    • analytics:conversation:read (optional, for verification)
  • AWS Account: An IAM user or role with permissions to create EventBridge rules and targets (events:PutRule, events:PutTargets).
  • Genesys Cloud Integration: The “Amazon EventBridge” integration must be enabled and active in your Genesys Cloud organization.
  • SDKs:
    • genesyscloud-python-sdk (latest version)
    • boto3 (latest version)
  • Environment Variables:
    • GENESYS_CLOUD_CLIENT_ID
    • GENESYS_CLOUD_CLIENT_SECRET
    • GENESYS_CLOUD_REGION (e.g., my.genesys.cloud)
    • AWS_ACCESS_KEY_ID
    • AWS_SECRET_ACCESS_KEY
    • AWS_DEFAULT_REGION (e.g., us-east-1)

Authentication Setup

Before interacting with either platform, you must establish authenticated sessions. Genesys Cloud uses OAuth 2.0 Client Credentials flow, while AWS uses IAM signatures handled by boto3.

Genesys Cloud OAuth Helper

Create a utility function to handle token acquisition and caching. This avoids re-authenticating on every script run.

import requests
import time
import os
from typing import Optional

# Cache for the access token
_token_cache: dict = {}

def get_genesys_access_token() -> str:
    """
    Retrieves an OAuth access token from Genesys Cloud.
    Implements basic caching with a 50-minute TTL to avoid unnecessary requests.
    """
    current_time = time.time()
    
    # Check cache
    if "token" in _token_cache and "expiry" in _token_cache:
        if current_time < _token_cache["expiry"]:
            return _token_cache["token"]

    # Fetch new token
    client_id = os.environ["GENESYS_CLOUD_CLIENT_ID"]
    client_secret = os.environ["GENESYS_CLOUD_CLIENT_SECRET"]
    region = os.environ["GENESYS_CLOUD_REGION"]
    
    url = f"https://{region}/oauth/token"
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    try:
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        if response.status_code == 401:
            raise Exception("Authentication failed: Invalid Client ID or Secret.") from e
        elif response.status_code == 403:
            raise Exception("Authentication failed: Client does not have required scopes.") from e
        else:
            raise Exception(f"Unexpected HTTP error: {response.status_code}") from e

    token_data = response.json()
    access_token = token_data["access_token"]
    expires_in = token_data["expires_in"]

    # Cache with a slight buffer
    _token_cache["token"] = access_token
    _token_cache["expiry"] = current_time + (expires_in - 60)

    return access_token

AWS Boto3 Session

Initialize the boto3 session for EventBridge interactions.

import boto3
from botocore.exceptions import ClientError

def get_eventbridge_client():
    """
    Creates an AWS EventBridge client using default credential chain.
    """
    try:
        client = boto3.client('events')
        # Verify connectivity and permissions
        client.list_rules()
        return client
    except ClientError as e:
        if e.response['Error']['Code'] == 'UnauthorizedOperation':
            raise Exception("AWS IAM permissions missing for EventBridge.") from e
        raise Exception(f"Failed to initialize AWS EventBridge client: {e}") from e

Implementation

Step 1: Identify the Queue ID in Genesys Cloud

EventBridge filters operate on JSON attributes. To filter by queue, you must know the exact UUID of the queue in Genesys Cloud. You cannot filter by queue name directly in the EventBridge rule pattern; you must use the ID.

Use the Genesys Cloud Python SDK to search for the queue by name and return its ID.

from genesyscloud.platform.client import PlatformClient
from genesyscloud.queue.api import QueueApi
from genesyscloud.auth import AuthClient

def get_queue_id_by_name(queue_name: str) -> str:
    """
    Searches Genesys Cloud for a queue by name and returns its ID.
    
    Args:
        queue_name: The exact name of the queue in Genesys Cloud.
        
    Returns:
        The UUID string of the queue.
        
    Raises:
        ValueError: If the queue is not found or multiple queues match.
    """
    # Initialize Auth and Platform Client
    auth_client = AuthClient(
        client_id=os.environ["GENESYS_CLOUD_CLIENT_ID"],
        client_secret=os.environ["GENESYS_CLOUD_CLIENT_SECRET"],
        region=os.environ["GENESYS_CLOUD_REGION"]
    )
    
    platform_client = PlatformClient()
    queue_api = QueueApi(platform_client)

    try:
        # Search for queues matching the name
        # Note: The search endpoint is /api/v2/queues/search
        response = queue_api.post_queue_search(
            body={
                "query": queue_name,
                "size": 20
            }
        )
    except Exception as e:
        raise Exception(f"Failed to search queues: {e}") from e

    if response.entities is None or len(response.entities) == 0:
        raise ValueError(f"No queue found with name: {queue_name}")
    
    if len(response.entities) > 1:
        # In case of multiple matches, prefer exact match or return first
        exact_match = next((q for q in response.entities if q.name == queue_name), None)
        if exact_match:
            return exact_match.id
        else:
            raise ValueError(f"Multiple queues found for '{queue_name}'. Please specify a unique name.")

    return response.entities[0].id

Step 2: Construct the EventBridge Filter Pattern

The core of this tutorial is constructing the JSON filter pattern. Genesys Cloud sends events to EventBridge with a specific structure. For conversation.end events, the queue information is nested within the detail object.

The relevant path for the queue ID is detail.queue.id.

Critical Note on Event Structure:
Genesys Cloud EventBridge events follow this schema:

{
  "source": "genesys.cloud",
  "detail-type": "conversation.end",
  "detail": {
    "conversationId": "...",
    "queue": {
      "id": "uuid-of-queue",
      "name": "Queue Name"
    },
    ...
  }
}

You must filter on detail.queue.id. However, a conversation may not always have a queue assigned (e.g., direct skill-based routing without a queue, or abandoned calls before queue assignment). Therefore, the filter must check for the existence of the queue object and then match the ID.

Construct the filter pattern in Python:

def create_eventbridge_filter_pattern(queue_id: str) -> dict:
    """
    Constructs the EventBridge event pattern for filtering conversation.end events
    for a specific queue.
    """
    # The pattern matches the top-level attributes and the nested detail queue id
    pattern = {
        "source": ["genesys.cloud"],
        "detail-type": ["conversation.end"],
        "detail": {
            "queue": {
                "id": [queue_id]
            }
        }
    }
    return pattern

Step 3: Create the EventBridge Rule

Now, use boto3 to create the rule. This rule will capture only events that match the pattern defined in Step 2.

def create_filtered_eventbridge_rule(rule_name: str, queue_id: str, target_arn: str) -> str:
    """
    Creates an EventBridge rule that filters Genesys Cloud conversation.end events
    for a specific queue.
    
    Args:
        rule_name: A unique name for the rule (e.g., 'GC_ConvEnd_SupportQueue').
        queue_id: The UUID of the Genesys Cloud queue.
        target_arn: The ARN of the target (e.g., Lambda function, SNS topic, SQS queue).
        
    Returns:
        The ARN of the created rule.
    """
    client = get_eventbridge_client()
    filter_pattern = create_eventbridge_filter_pattern(queue_id)
    
    # Convert dict to JSON string as required by boto3
    import json
    pattern_json = json.dumps(filter_pattern)

    try:
        response = client.put_rule(
            Name=rule_name,
            EventPattern=pattern_json,
            State='ENABLED',
            Description=f'Genesys Cloud conversation.end filter for Queue ID: {queue_id}',
            RoleArn=os.environ.get('AWS_EVENTBRIDGE_ROLE_ARN', '') # Optional: if cross-account or specific role needed
        )
        
        rule_arn = response['RuleArn']
        print(f"Rule created successfully: {rule_arn}")
        
        # Add the target to the rule
        client.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': 'Target1',
                    'Arn': target_arn
                }
            ]
        )
        print(f"Target added to rule: {target_arn}")
        
        return rule_arn

    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'LimitExceededException':
            raise Exception("EventBridge rule limit exceeded. Check existing rules.") from e
        elif error_code == 'InvalidEventPatternException':
            raise Exception(f"Invalid event pattern. Check JSON structure: {pattern_json}") from e
        else:
            raise Exception(f"AWS Error: {e}") from e

Step 4: Verification and Testing

To ensure the filter works, you can simulate a test event or monitor the rule’s metrics. However, the most reliable method is to check the rule configuration.

def verify_rule_configuration(rule_name: str) -> dict:
    """
    Retrieves and prints the configuration of the created rule to verify the filter pattern.
    """
    client = get_eventbridge_client()
    
    try:
        response = client.describe_rule(Name=rule_name)
        print(f"Rule Name: {response['Name']}")
        print(f"State: {response['State']}")
        print(f"Event Pattern:")
        import json
        print(json.dumps(json.loads(response['EventPattern']), indent=2))
        return response
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"Rule '{rule_name}' not found.")
        else:
            raise Exception(f"Failed to describe rule: {e}") from e

Complete Working Example

This script combines all steps into a single executable module. It authenticates with Genesys Cloud, finds the queue ID, constructs the filter, and creates the EventBridge rule.

import os
import sys
import time
import requests
import boto3
import json
from typing import Optional
from botocore.exceptions import ClientError

# --- Configuration ---
GENESYS_CLIENT_ID = os.environ.get("GENESYS_CLOUD_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.environ.get("GENESYS_CLOUD_CLIENT_SECRET")
GENESYS_REGION = os.environ.get("GENESYS_CLOUD_REGION", "my.genesys.cloud")
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
TARGET_ARN = os.environ.get("AWS_TARGET_ARN", "arn:aws:lambda:us-east-1:123456789012:function:MyGenesysHandler")
QUEUE_NAME = os.environ.get("TARGET_QUEUE_NAME", "General Support")
RULE_NAME = os.environ.get("EVENTBRIDGE_RULE_NAME", "GC_ConvEnd_SpecificQueue")

# --- Auth Helpers ---
_token_cache = {}

def get_genesys_access_token() -> str:
    current_time = time.time()
    if "token" in _token_cache and "expiry" in _token_cache:
        if current_time < _token_cache["expiry"]:
            return _token_cache["token"]

    url = f"https://{GENESYS_REGION}/oauth/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {
        "grant_type": "client_credentials",
        "client_id": GENESYS_CLIENT_ID,
        "client_secret": GENESYS_CLIENT_SECRET
    }

    try:
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        raise Exception(f"Genesys Auth Error: {response.status_code} - {response.text}") from e

    token_data = response.json()
    _token_cache["token"] = token_data["access_token"]
    _token_cache["expiry"] = current_time + (token_data["expires_in"] - 60)
    return _token_cache["token"]

def get_eventbridge_client():
    try:
        return boto3.client('events', region_name=AWS_REGION)
    except Exception as e:
        raise Exception(f"Failed to init AWS Client: {e}") from e

# --- Genesys Logic ---
def get_queue_id_by_name(queue_name: str) -> str:
    token = get_genesys_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    url = f"https://{GENESYS_REGION}/api/v2/queues/search"
    
    body = {
        "query": queue_name,
        "size": 20
    }

    try:
        response = requests.post(url, headers=headers, json=body)
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        raise Exception(f"Genesys Queue Search Error: {response.status_code}") from e

    data = response.json()
    entities = data.get("entities", [])
    
    if not entities:
        raise ValueError(f"No queue found with name: {queue_name}")
    
    # Prefer exact match
    exact_match = next((q for q in entities if q["name"] == queue_name), None)
    if exact_match:
        return exact_match["id"]
    
    # Fallback to first match
    return entities[0]["id"]

# --- AWS Logic ---
def create_filtered_rule(queue_id: str, target_arn: str, rule_name: str):
    client = get_eventbridge_client()
    
    # Define the filter pattern
    # This pattern ensures ONLY conversation.end events with the specific queue ID are passed
    filter_pattern = {
        "source": ["genesys.cloud"],
        "detail-type": ["conversation.end"],
        "detail": {
            "queue": {
                "id": [queue_id]
            }
        }
    }

    pattern_json = json.dumps(filter_pattern)

    try:
        # Create Rule
        client.put_rule(
            Name=rule_name,
            EventPattern=pattern_json,
            State='ENABLED',
            Description=f'Filter for Queue ID: {queue_id}'
        )
        print(f"Rule '{rule_name}' created/updated.")

        # Attach Target
        client.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': 'Target1',
                    'Arn': target_arn
                }
            ]
        )
        print(f"Target {target_arn} attached to rule '{rule_name}'.")
        
    except ClientError as e:
        print(f"AWS Error: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
        sys.exit(1)

# --- Main Execution ---
def main():
    print(f"Starting setup for Queue: {QUEUE_NAME}")
    
    try:
        # 1. Get Queue ID
        print("Fetching Queue ID from Genesys Cloud...")
        queue_id = get_queue_id_by_name(QUEUE_NAME)
        print(f"Found Queue ID: {queue_id}")

        # 2. Create EventBridge Rule
        print(f"Creating EventBridge Rule: {RULE_NAME}")
        create_filtered_rule(queue_id, TARGET_ARN, RULE_NAME)
        
        print("Setup complete. EventBridge will now filter conversation.end events for this queue.")
        
    except ValueError as ve:
        print(f"Configuration Error: {ve}")
    except Exception as e:
        print(f"Unexpected Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: InvalidEventPatternException

  • Cause: The JSON structure of the filter pattern is malformed or contains invalid syntax.
  • Fix: Ensure the detail object is correctly nested. Verify that queue is a dictionary and id is a list of strings.
  • Code Check:
    # Correct
    "detail": {"queue": {"id": ["uuid"]}}
    # Incorrect (id is not a list)
    "detail": {"queue": {"id": "uuid"}}
    

Error: ResourceNotFoundException (Genesys Queue)

  • Cause: The queue name provided does not exist in the Genesys Cloud organization, or the OAuth client lacks queue:queue:read scope.
  • Fix: Verify the queue name in the Genesys Cloud Admin Console. Check the OAuth client scopes in the Genesys Cloud API Console.

Error: No Events Received

  • Cause: The queue ID in the filter does not match the queue ID in the event payload. This often happens if the queue was deleted and recreated (new ID) or if the event payload structure changed.
  • Fix:
    1. Temporarily create a rule with a broad filter ("detail-type": ["conversation.end"]) and log all events to CloudWatch.
    2. Trigger a test conversation end in the target queue.
    3. Inspect the logged event in CloudWatch Logs to verify the exact path of the queue ID (e.g., detail.queue.id).
    4. Update the filter pattern to match the observed structure.

Error: AccessDeniedException (AWS)

  • Cause: The IAM user/role does not have events:PutRule or events:PutTargets permissions.
  • Fix: Attach the AmazonEventBridgeFullAccess policy or a custom policy allowing these actions to the IAM entity running the script.

Official References