How to filter Genesys Cloud EventBridge events to only receive conversation.end for a specific queue

How to filter Genesys Cloud EventBridge events to only receive conversation.end for a specific queue

What You Will Build

  • You will configure a Genesys Cloud EventStream subscription via the REST API to emit events only when a conversation associated with a specific queue ends.
  • You will implement a Python service that consumes these events from an AWS SQS queue (via EventBridge) and filters them locally to ensure strict adherence to the queue ID.
  • This tutorial uses the Genesys Cloud Python SDK (genesyscloud) and the boto3 library for AWS interaction.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth Client with the scope eventstream:read or eventstream:write (depending on whether you are reading or creating subscriptions). For this tutorial, you need eventstream:write to create the subscription and eventstream:read to validate it.
  • AWS Account: An active AWS account with permissions to create EventBridge rules, SQS queues, and IAM roles.
  • Genesys Cloud Region: You must know your Genesys Cloud region (e.g., mypurecloud.com, usw2.pure.cloud, euw1.pure.cloud).
  • Python Environment: Python 3.8+ with genesyscloud (v2.20+) and boto3 installed.
pip install genesyscloud boto3
  • Queue ID: The UUID of the Genesys Cloud Queue you wish to monitor. You can find this in the Admin UI or via the /api/v2/queues endpoint.

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials Grant. You must obtain a valid access token before making any API calls. The token expires after one hour, so production code should handle refresh logic. For this tutorial, we will assume a function get_access_token handles the credential exchange.

import requests
import os
from typing import Optional

def get_access_token(client_id: str, client_secret: str, region: str) -> str:
    """
    Exchange Client ID and Secret for an OAuth Access Token.
    """
    token_url = f"https://{region}/oauth/token"
    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }
    
    response = requests.post(token_url, data=payload)
    if response.status_code != 200:
        raise Exception(f"Failed to obtain token: {response.text}")
    
    return response.json().get("access_token")

Implementation

Step 1: Create the EventStream Subscription

The core filtering happens at the Genesys Cloud EventStream level. You must define a filter object in the subscription request. The filter syntax follows the Genesys Cloud EventStream query language.

To filter for conversation.end events for a specific queue, you must filter on:

  1. event.type: Must be conversation.end.
  2. conversation.queueIds: Must contain the specific Queue ID.

Note: The conversation.queueIds field is an array. You filter using the contains operator or by checking if the specific ID is in the list.

The API Request

Endpoint: POST /api/v2/eventstreams/subscriptions

Required Scope: eventstream:write

Request Body:

{
  "name": "Queue-Specific-End-Events",
  "description": "Emits conversation.end events for Queue ID: 12345678-1234-1234-1234-123456789012",
  "filter": {
    "eventType": "conversation.end",
    "conditions": [
      {
        "field": "conversation.queueIds",
        "operator": "contains",
        "value": "12345678-1234-1234-1234-123456789012"
      }
    ]
  },
  "deliveryMethod": {
    "type": "aws-eventbridge",
    "config": {
      "region": "us-east-1",
      "accountId": "123456789012",
      "eventBusName": "default"
    }
  }
}

Python SDK Implementation

from genesyscloud import rest
from genesyscloud.eventstream.api import event_stream_api

def create_eventstream_subscription(
    api_client: rest.ApiClient,
    queue_id: str,
    aws_region: str,
    aws_account_id: str,
    event_bus_name: str = "default"
) -> dict:
    """
    Creates an EventStream subscription for conversation.end events 
    filtered by a specific Queue ID.
    """
    api_instance = event_stream_api.EventStreamApi(api_client)
    
    # Construct the filter object
    # The 'conditions' array allows for complex filtering logic.
    # Here we ensure the event type is conversation.end AND the queue ID is present.
    filter_obj = {
        "eventType": "conversation.end",
        "conditions": [
            {
                "field": "conversation.queueIds",
                "operator": "contains",
                "value": queue_id
            }
        ]
    }
    
    # Construct the delivery configuration
    delivery_config = {
        "type": "aws-eventbridge",
        "config": {
            "region": aws_region,
            "accountId": aws_account_id,
            "eventBusName": event_bus_name
        }
    }
    
    # Create the subscription request body
    subscription_body = {
        "name": f"Queue-{queue_id}-End-Events",
        "description": f"Tracks conversation.end for queue {queue_id}",
        "filter": filter_obj,
        "deliveryMethod": delivery_config
    }
    
    try:
        # Call the API
        response = api_instance.post_eventstreams_subscriptions(body=subscription_body)
        
        if response.status_code == 201:
            print(f"Subscription created successfully. ID: {response.body.id}")
            return response.body
        else:
            raise Exception(f"API Error {response.status_code}: {response.body}")
            
    except Exception as e:
        print(f"Failed to create subscription: {e}")
        raise

Step 2: Configure AWS EventBridge Rule

While Genesys Cloud pushes events to the EventBridge bus, you need an EventBridge Rule to route these events to your consumer (e.g., SQS, Lambda, or HTTP Endpoint). The rule should filter further if necessary, although the Genesys filter is usually sufficient.

EventBridge Pattern:

{
  "source": ["genesys.cloud"],
  "detail-type": ["Genesys Cloud EventStream Event"],
  "detail": {
    "eventType": ["conversation.end"]
  }
}

Boto3 Implementation to Create the Rule:

import boto3
import json

def setup_eventbridge_rule(
    aws_region: str,
    rule_name: str,
    target_arn: str  # ARN of SQS Queue or Lambda
) -> dict:
    """
    Creates an EventBridge rule to capture Genesys conversation.end events.
    """
    client = boto3.client('events', region_name=aws_region)
    
    # The event pattern matches Genesys Cloud events
    event_pattern = {
        "source": ["genesys.cloud"],
        "detail-type": ["Genesys Cloud EventStream Event"],
        "detail": {
            "eventType": ["conversation.end"]
        }
    }
    
    try:
        response = client.put_rule(
            Name=rule_name,
            EventPattern=json.dumps(event_pattern),
            State='ENABLED',
            Description='Rule to capture Genesys conversation.end events'
        )
        
        # Add the target (SQS/Lambda)
        client.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': 'Target1',
                    'Arn': target_arn
                }
            ]
        )
        
        return response
    except Exception as e:
        print(f"Failed to setup EventBridge rule: {e}")
        raise

Step 3: Process the Event Payload

When the event arrives at your AWS target (e.g., SQS), it will be wrapped in an EventBridge envelope. You must parse this envelope to extract the actual Genesys Cloud event data.

EventBridge Envelope Structure:

{
  "version": "0",
  "id": "abc123...",
  "detail-type": "Genesys Cloud EventStream Event",
  "source": "genesys.cloud",
  "account": "123456789012",
  "time": "2023-10-27T10:00:00Z",
  "region": "us-east-1",
  "resources": [],
  "detail": {
    "eventType": "conversation.end",
    "conversationId": "conv-uuid",
    "queueIds": ["queue-uuid"],
    "participants": [ ... ],
    "metrics": { ... }
  }
}

Python Consumer Logic

import json
from typing import Dict, Any, Optional

def process_eventbridge_event(event: Dict[str, Any], expected_queue_id: str) -> Optional[Dict[str, Any]]:
    """
    Processes an incoming EventBridge event.
    Validates that the event is for the expected queue and extracts relevant data.
    """
    # 1. Extract the Genesys Cloud payload from the EventBridge envelope
    detail = event.get("detail")
    if not detail:
        raise ValueError("Invalid EventBridge event: missing 'detail' field")
    
    event_type = detail.get("eventType")
    if event_type != "conversation.end":
        # This should be filtered by EventBridge rule, but double-check for safety
        return None
    
    # 2. Verify Queue ID (Redundant if EventStream filter is strict, but good practice)
    queue_ids = detail.get("queueIds", [])
    if expected_queue_id not in queue_ids:
        # Log warning: Event passed filter but does not match expected queue
        print(f"Warning: Event for queue {queue_ids} does not match expected {expected_queue_id}")
        return None
    
    # 3. Extract relevant conversation data
    conversation_id = detail.get("conversationId")
    participants = detail.get("participants", [])
    
    # Example: Extract agent wrap-up code or disposition if available
    # Note: Wrap-up codes are often in the 'participants' array under 'wrapupCode'
    agent_wrapup = None
    for p in participants:
        if p.get("role") == "agent":
            agent_wrapup = p.get("wrapupCode")
            break
    
    return {
        "conversationId": conversation_id,
        "queueId": expected_queue_id,
        "agentWrapup": agent_wrapup,
        "timestamp": event.get("time")
    }

Complete Working Example

This script combines authentication, subscription creation, and a mock consumer logic. It assumes you have environment variables set for your credentials.

import os
import sys
import json
import boto3
from genesyscloud import rest
from genesyscloud.eventstream.api import event_stream_api

# Configuration
GENESYS_REGION = os.getenv("GENESYS_REGION", "mypurecloud.com")
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
TARGET_QUEUE_ID = os.getenv("TARGET_QUEUE_ID")  # The Genesys Queue UUID
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")
SQS_QUEUE_ARN = os.getenv("SQS_QUEUE_ARN")  # ARN of the SQS queue to receive events

def get_access_token() -> str:
    token_url = f"https://{GENESYS_REGION}/oauth/token"
    payload = {
        "grant_type": "client_credentials",
        "client_id": GENESYS_CLIENT_ID,
        "client_secret": GENESYS_CLIENT_SECRET
    }
    response = requests.post(token_url, data=payload)
    if response.status_code != 200:
        raise Exception(f"OAuth Error: {response.text}")
    return response.json().get("access_token")

def setup_genesis_subscription(api_client: rest.ApiClient) -> str:
    api_instance = event_stream_api.EventStreamApi(api_client)
    
    filter_obj = {
        "eventType": "conversation.end",
        "conditions": [
            {
                "field": "conversation.queueIds",
                "operator": "contains",
                "value": TARGET_QUEUE_ID
            }
        ]
    }
    
    delivery_config = {
        "type": "aws-eventbridge",
        "config": {
            "region": AWS_REGION,
            "accountId": AWS_ACCOUNT_ID,
            "eventBusName": "default"
        }
    }
    
    subscription_body = {
        "name": f"Queue-{TARGET_QUEUE_ID}-End-Events",
        "description": f"Tracks conversation.end for queue {TARGET_QUEUE_ID}",
        "filter": filter_obj,
        "deliveryMethod": delivery_config
    }
    
    response = api_instance.post_eventstreams_subscriptions(body=subscription_body)
    if response.status_code == 201:
        return response.body.id
    else:
        raise Exception(f"Failed to create subscription: {response.body}")

def setup_aws_rule(rule_name: str) -> None:
    client = boto3.client('events', region_name=AWS_REGION)
    event_pattern = {
        "source": ["genesys.cloud"],
        "detail-type": ["Genesys Cloud EventStream Event"],
        "detail": {
            "eventType": ["conversation.end"]
        }
    }
    client.put_rule(
        Name=rule_name,
        EventPattern=json.dumps(event_pattern),
        State='ENABLED',
        Description='Rule to capture Genesys conversation.end events'
    )
    client.put_targets(
        Rule=rule_name,
        Targets=[
            {
                'Id': 'Target1',
                'Arn': SQS_QUEUE_ARN
            }
        ]
    )

def main():
    if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, SQS_QUEUE_ARN]):
        print("Missing environment variables. Check README.")
        sys.exit(1)

    try:
        # 1. Authenticate
        token = get_access_token()
        config = rest.Configuration()
        config.host = f"https://{GENESYS_REGION}"
        config.access_token = token
        api_client = rest.ApiClient(config)

        # 2. Create Genesys Subscription
        sub_id = setup_genesis_subscription(api_client)
        print(f"Created Genesys Subscription: {sub_id}")

        # 3. Setup AWS EventBridge Rule
        rule_name = f"Genesys-Queue-{TARGET_QUEUE_ID}-Ends"
        setup_aws_rule(rule_name)
        print(f"Created EventBridge Rule: {rule_name}")

        print("Setup complete. Events will now flow to your SQS queue.")

    except Exception as e:
        print(f"Error during setup: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request on Subscription Creation

  • Cause: The filter syntax is invalid, or the Queue ID does not exist.
  • Fix: Verify the Queue ID exists via GET /api/v2/queues/{queueId}. Ensure the conditions array uses the correct field name conversation.queueIds and operator contains.
  • Debug Code:
# Check if queue exists before creating subscription
api_instance = queues_api.QueuesApi(api_client)
try:
    queue = api_instance.get_queue(queue_id)
    print(f"Queue found: {queue.body.name}")
except Exception as e:
    print(f"Queue not found or error: {e}")

Error: Events Not Arriving in SQS

  • Cause: The EventBridge Rule pattern does not match the event source, or the IAM role lacks permissions to write to the SQS queue.
  • Fix:
    1. Check CloudWatch Logs for EventBridge rule failures.
    2. Ensure the SQS Queue Policy allows events.amazonaws.com to send messages.
    3. Verify the Genesys Cloud subscription status is active.
  • Debug Code:
# Check subscription status
api_instance = event_stream_api.EventStreamApi(api_client)
response = api_instance.get_eventstreams_subscriptions(subscription_id=sub_id)
print(f"Subscription Status: {response.body.status}")

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or is invalid.
  • Fix: Implement token refresh logic. The get_access_token function should be called before every API request if the token is older than 55 minutes.

Official References