How to Filter EventBridge Events for Conversation End in a Specific Queue

How to Filter EventBridge Events for Conversation End in a Specific Queue

What You Will Build

  • You will configure an AWS EventBridge rule that triggers exclusively when a conversation ends in a specific Genesys Cloud queue.
  • You will use the Genesys Cloud Python SDK to identify the correct queue ID and validate the event structure.
  • You will use Python and AWS Boto3 to programmatically create the EventBridge rule with precise JSON path matching.

Prerequisites

  • Genesys Cloud OAuth Application: A server-to-server application with the scope event:read.
  • AWS Account: An account with permissions to create EventBridge rules and targets.
  • SDK Version: Genesys Cloud Python SDK (genesyscloud) version 142.0.0 or higher.
  • Runtime: Python 3.9+ with boto3 and genesyscloud installed.
  • External Dependencies:
    pip install genesyscloud boto3 requests
    

Authentication Setup

Before interacting with Genesys Cloud or AWS, you must establish authenticated sessions. For Genesys Cloud, use the server-to-server OAuth flow. For AWS, use standard IAM credentials or a role assumed by your execution environment.

Genesys Cloud OAuth Token Acquisition

The following Python function retrieves an access token using client credentials. This token is required to query the Genesys Cloud API for queue IDs.

import requests
import os
from typing import Optional

def get_genesys_token(client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com") -> str:
    """
    Acquires an OAuth2 access token from Genesys Cloud.
    
    Args:
        client_id: The OAuth client ID from Genesys Cloud.
        client_secret: The OAuth client secret from Genesys Cloud.
        base_url: The Genesys Cloud API base URL.
        
    Returns:
        The access token string.
        
    Raises:
        requests.exceptions.HTTPError: If authentication fails.
    """
    url = f"{base_url}/oauth/token"
    data = {
        "grant_type": "client_credentials",
        "scope": "event:read queue:read"
    }
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    response = requests.post(url, data=data, auth=(client_id, client_secret))
    response.raise_for_status()
    
    token_data = response.json()
    return token_data["access_token"]

AWS Session Initialization

Initialize the Boto3 client for EventBridge. Ensure your environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION are set.

import boto3
from botocore.exceptions import ClientError

def get_eventbridge_client() -> boto3.client:
    """
    Initializes an AWS EventBridge client.
    
    Returns:
        A boto3 client for EventBridge.
    """
    try:
        return boto3.client('events')
    except ClientError as e:
        print(f"AWS Client Initialization Error: {e}")
        raise

Implementation

Step 1: Identify the Target Queue ID

EventBridge filtering relies on exact values. You must know the internal Genesys Cloud id of the queue you want to monitor. You cannot filter by queue name directly in the EventBridge rule; you must filter by the id field found in the event payload.

The following code uses the Genesys Cloud Python SDK to search for a queue by name and return its ID.

from genesyscloud import PlatformClient
from genesyscloud.routing.api import RoutingApi
from genesyscloud.routing.model import QueueSearchRequest

def find_queue_id_by_name(platform_client: PlatformClient, queue_name: str) -> Optional[str]:
    """
    Finds the Genesys Cloud queue ID by its name.
    
    Args:
        platform_client: An initialized Genesys Cloud PlatformClient.
        queue_name: The exact name of the queue in Genesys Cloud.
        
    Returns:
        The queue ID string, or None if not found.
    """
    routing_api = RoutingApi(platform_client)
    
    try:
        # Search for queues matching the name
        search_request = QueueSearchRequest(
            query=queue_name,
            size=1  # Only need the first match
        )
        
        response = routing_api.post_routing_queues_search(query=search_request)
        
        if response.entities and len(response.entities) > 0:
            return response.entities[0].id
        else:
            print(f"Queue '{queue_name}' not found.")
            return None
            
    except Exception as e:
        print(f"Error searching for queue: {e}")
        return None

Step 2: Construct the EventBridge Pattern

Genesys Cloud sends events to EventBridge with a specific structure. The detail object contains the core event data. For a conversation end event, the structure looks like this:

{
  "source": "com.genesyscloud",
  "detail-type": "conversation.end",
  "detail": {
    "conversationId": "uuid-here",
    "queueId": "queue-uuid-here",
    "type": "voice",
    "startTimestamp": "2023-10-27T10:00:00Z",
    "endTimestamp": "2023-10-27T10:05:00Z"
  }
}

To filter for a specific queue, you must match the source, detail-type, and detail.queueId. The EventBridge event pattern must be a JSON object.

import json

def create_eventbridge_pattern(queue_id: str) -> str:
    """
    Creates the JSON event pattern for EventBridge filtering.
    
    Args:
        queue_id: The Genesys Cloud queue ID.
        
    Returns:
        A JSON string representing the EventBridge event pattern.
    """
    pattern = {
        "source": [
            "com.genesyscloud"
        ],
        "detail-type": [
            "conversation.end"
        ],
        "detail": {
            "queueId": [
                queue_id
            ]
        }
    }
    return json.dumps(pattern)

Step 3: Create the EventBridge Rule

With the queue ID and the pattern, you can create the rule. This rule will only trigger if an event matches all three criteria: source, detail-type, and queueId.

def create_eventbridge_rule(eventbridge_client: boto3.client, rule_name: str, pattern: str, description: str) -> str:
    """
    Creates an AWS EventBridge rule.
    
    Args:
        eventbridge_client: The boto3 EventBridge client.
        rule_name: A unique name for the rule.
        pattern: The JSON string event pattern.
        description: A description of the rule.
        
    Returns:
        The ARN of the created rule.
        
    Raises:
        ClientError: If the rule creation fails.
    """
    try:
        response = eventbridge_client.put_rule(
            Name=rule_name,
            Description=description,
            EventPattern=pattern,
            State='ENABLED',
            RoleArn='' # Optional: If you need to invoke targets that require cross-account permissions
        )
        print(f"Rule '{rule_name}' created successfully. ARN: {response['RuleArn']}")
        return response['RuleArn']
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
            print(f"Rule '{rule_name}' already exists. Skipping creation.")
            # Optionally retrieve the existing ARN
            return eventbridge_client.describe_rule(Name=rule_name)['Arn']
        else:
            print(f"Error creating rule: {e}")
            raise

Step 4: Attach a Target (Lambda or SQS)

A rule without a target does nothing. You must attach a target, such as an AWS Lambda function or an SQS queue. Here is how to attach a Lambda function.

def add_lambda_target(eventbridge_client: boto3.client, rule_arn: str, lambda_arn: str, target_id: str) -> None:
    """
    Adds a Lambda function as a target for the EventBridge rule.
    
    Args:
        eventbridge_client: The boto3 EventBridge client.
        rule_arn: The ARN of the EventBridge rule.
        lambda_arn: The ARN of the Lambda function.
        target_id: A unique ID for the target within the rule.
    """
    try:
        eventbridge_client.put_targets(
            Rule=rule_arn,
            Targets=[
                {
                    'Id': target_id,
                    'Arn': lambda_arn,
                    'RoleArn': '' # Optional: If Lambda is in a different account
                }
            ]
        )
        print(f"Lambda target '{target_id}' added to rule '{rule_arn}'.")
    except ClientError as e:
        print(f"Error adding target: {e}")
        raise

Complete Working Example

This script ties everything together. It authenticates with Genesys Cloud, finds the queue ID, creates the EventBridge pattern, creates the rule, and attaches a Lambda target.

import os
import boto3
from genesyscloud import PlatformClient
from genesyscloud.oauth.api import OAuthApi
from genesyscloud.routing.api import RoutingApi
from genesyscloud.routing.model import QueueSearchRequest
from botocore.exceptions import ClientError
import json

def main():
    # Configuration
    GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
    GENESYS_BASE_URL = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
    TARGET_QUEUE_NAME = os.getenv("TARGET_QUEUE_NAME", "Support Queue")
    AWS_LAMBDA_ARN = os.getenv("AWS_LAMBDA_ARN", "arn:aws:lambda:us-east-1:123456789012:function:my-handler")
    RULE_NAME = "GenesysConversationEnd_SupportQueue"
    
    # 1. Authenticate with Genesys Cloud
    platform_client = PlatformClient()
    oauth_api = OAuthApi(platform_client)
    
    try:
        token_response = oauth_api.post_oauth_token(
            body={
                "grant_type": "client_credentials",
                "client_id": GENESYS_CLIENT_ID,
                "client_secret": GENESYS_CLIENT_SECRET,
                "scope": "event:read queue:read"
            }
        )
        print("Genesys Cloud authentication successful.")
    except Exception as e:
        print(f"Failed to authenticate with Genesys Cloud: {e}")
        return

    # 2. Find Queue ID
    routing_api = RoutingApi(platform_client)
    queue_id = None
    
    try:
        search_request = QueueSearchRequest(query=TARGET_QUEUE_NAME, size=1)
        response = routing_api.post_routing_queues_search(query=search_request)
        
        if response.entities and len(response.entities) > 0:
            queue_id = response.entities[0].id
            print(f"Found Queue ID: {queue_id}")
        else:
            print(f"Queue '{TARGET_QUEUE_NAME}' not found.")
            return
    except Exception as e:
        print(f"Error finding queue: {e}")
        return

    # 3. Initialize AWS EventBridge Client
    eventbridge_client = boto3.client('events')
    
    # 4. Create Event Pattern
    pattern = {
        "source": ["com.genesyscloud"],
        "detail-type": ["conversation.end"],
        "detail": {
            "queueId": [queue_id]
        }
    }
    pattern_str = json.dumps(pattern)
    
    # 5. Create EventBridge Rule
    try:
        rule_response = eventbridge_client.put_rule(
            Name=RULE_NAME,
            Description=f"Filters conversation.end events for queue {TARGET_QUEUE_NAME}",
            EventPattern=pattern_str,
            State='ENABLED'
        )
        rule_arn = rule_response['RuleArn']
        print(f"Rule created: {rule_arn}")
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
            rule_arn = eventbridge_client.describe_rule(Name=RULE_NAME)['Arn']
            print(f"Rule already exists: {rule_arn}")
        else:
            print(f"Error creating rule: {e}")
            return

    # 6. Add Lambda Target
    try:
        eventbridge_client.put_targets(
            Rule=rule_arn,
            Targets=[
                {
                    'Id': 'ProcessConversationEnd',
                    'Arn': AWS_LAMBDA_ARN
                }
            ]
        )
        print("Lambda target added successfully.")
    except ClientError as e:
        print(f"Error adding target: {e}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden on Queue Search

Cause: The OAuth token does not have the queue:read scope.
Fix: Update the scope parameter in the post_oauth_token call to include queue:read.

# Correct scope
"scope": "event:read queue:read"

Error: Rule Created but No Events Trigger

Cause: The queueId in the event pattern does not exactly match the queueId in the Genesys Cloud event.
Fix: Verify the queue ID. Genesys Cloud events use the internal UUID, not the external ID or name. Print the queue_id variable in the script to confirm it matches the ID seen in the Genesys Cloud admin console.

Error: ResourceAlreadyExistsException

Cause: You are trying to create a rule with a name that already exists in your AWS account.
Fix: Use a unique rule name or check for existence before creation, as shown in the complete example.

Error: Lambda Permission Denied

Cause: EventBridge does not have permission to invoke the Lambda function.
Fix: Add a permission statement to the Lambda function allowing EventBridge to invoke it.

import boto3

def add_lambda_permission(lambda_arn: str, rule_arn: str, principal: str = "events.amazonaws.com"):
    lambda_client = boto3.client('lambda')
    source_account = rule_arn.split(":")[4]
    
    lambda_client.add_permission(
        FunctionName=lambda_arn,
        StatementId='AllowEventBridgeInvoke',
        Action='lambda:InvokeFunction',
        Principal=principal,
        SourceArn=rule_arn,
        SourceAccount=source_account
    )

Official References