Filter EventBridge Events for Specific Queue Conversation Ends in Genesys Cloud

Filter EventBridge Events for Specific Queue Conversation Ends in Genesys Cloud

What You Will Build

  • You will configure a Genesys Cloud event subscription to trigger on conversation.end events.
  • You will apply an attribute filter to isolate events for a specific Queue ID.
  • You will implement a Python worker using boto3 to receive, parse, and process these events from an AWS SQS queue.

Prerequisites

  • Genesys Cloud Account: A Developer or Admin role with permissions to manage Event Subscriptions.
  • AWS Account: An active SQS queue and an IAM role with sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes permissions.
  • OAuth 2.0 Client: A Genesys Cloud OAuth client with the event:subscribe:read and event:subscribe:write scopes.
  • Python 3.9+: With pip installed.
  • Dependencies: requests, boto3, purecloud-platform-client.
pip install requests boto3 purecloud-platform-client

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API calls. You must obtain an access token before creating or managing event subscriptions.

import requests
import os
from typing import Optional

def get_genesys_access_token(
    client_id: str, 
    client_secret: str, 
    environment: str = "mypurecloud.com"
) -> str:
    """
    Retrieves a Genesys Cloud OAuth access token.
    
    Args:
        client_id: The OAuth Client ID from Genesys Cloud.
        client_secret: The OAuth Client Secret.
        environment: The Genesys Cloud environment suffix (e.g., mypurecloud.com).
        
    Returns:
        str: The access token.
        
    Raises:
        requests.exceptions.HTTPError: If authentication fails.
    """
    url = f"https://login.{environment}/oauth/token"
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "event:subscribe:read event:subscribe:write"
    }

    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    
    token_data = response.json()
    return token_data.get("access_token")

# Example usage
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ACCESS_TOKEN = get_genesys_access_token(CLIENT_ID, CLIENT_SECRET)

Implementation

Step 1: Create the Event Subscription with Attribute Filtering

Genesys Cloud Event Subscriptions allow filtering at the source using JSONata expressions in the attributeFilter field. This is critical for reducing noise. If you do not filter here, your AWS infrastructure receives every conversation.end event in your entire organization, which incurs unnecessary costs and processing load.

We will create a subscription that:

  1. Subscribes to event:conversation:end.
  2. Filters for a specific queueId.
  3. Targets an AWS SQS queue as the endpoint.

Required Scope: event:subscribe:write

import json
import requests

def create_filtered_event_subscription(
    access_token: str,
    queue_id: str,
    sqs_arn: str,
    region: str,
    environment: str = "mypurecloud.com"
) -> dict:
    """
    Creates a Genesys Cloud event subscription filtered for a specific queue.
    
    Args:
        access_token: Valid Genesys Cloud OAuth token.
        queue_id: The Genesys Cloud Queue ID to filter on.
        sqs_arn: The ARN of the target AWS SQS queue.
        region: The AWS region of the SQS queue.
        environment: Genesys Cloud environment suffix.
        
    Returns:
        dict: The response from the Genesys Cloud API.
    """
    url = f"https://api.{environment}/api/v2/events/subscriptions"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    # The attributeFilter uses JSONata.
    # We check if the 'queueId' attribute exists and matches our target.
    # Note: In conversation events, the primary queue ID is often in the 'queue' object 
    # or directly as an attribute depending on the event version. 
    # For 'conversation.end', we often look at the 'queue' attribute if present.
    # A robust filter checks for the presence of the queue ID.
    
    payload = {
        "name": f"Queue-Specific-End-{queue_id}",
        "description": "Filters conversation.end events for a specific queue.",
        "eventTypes": [
            "event:conversation:end"
        ],
        "attributeFilter": f"queueId = '{queue_id}'",
        "destination": {
            "type": "aws_sqs",
            "awsSqs": {
                "arn": sqs_arn,
                "region": region
            }
        },
        "status": "active"
    }

    response = requests.post(url, headers=headers, json=payload)
    
    if response.status_code == 201:
        print("Subscription created successfully.")
        return response.json()
    elif response.status_code == 409:
        print("Subscription already exists or conflict.")
        return response.json()
    else:
        response.raise_for_status()

# Example Usage
# QUEUE_ID = "a1b2c3d4-e5f6-7890-1234-567890abcdef"
# SQS_ARN = "arn:aws:sqs:us-east-1:123456789012:my-queue-endpoints"
# create_filtered_event_subscription(ACCESS_TOKEN, QUEUE_ID, SQS_ARN, "us-east-1")

Critical Note on Attribute Filtering: The attributeFilter string queueId = '...' assumes the queueId is a top-level attribute in the event payload for conversation.end. In Genesys Cloud, conversation events include a queue object. If your conversations involve multiple queues (e.g., transfer), you may need a more complex JSONata expression like exists(queue) and queue.id = 'YOUR_QUEUE_ID'. For standard single-queue interactions, the direct attribute match is efficient.

Step 2: Verify the Subscription

Before processing events, verify the subscription is active and the filter is applied correctly.

Required Scope: event:subscribe:read

def get_subscription(access_token: str, subscription_id: str, environment: str = "mypurecloud.com") -> dict:
    url = f"https://api.{environment}/api/v2/events/subscriptions/{subscription_id}"
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

# subscription_data = get_subscription(ACCESS_TOKEN, "subscription-id-from-step-1")
# print(json.dumps(subscription_data, indent=2))

Step 3: Process Events from AWS SQS

Genesys Cloud sends events to your SQS queue in a specific format. The message body is a JSON object containing metadata and the actual event payload. You must parse this structure to extract the conversation details.

The structure of the message body received from Genesys Cloud looks like this:

{
  "eventType": "event:conversation:end",
  "eventTime": "2023-10-27T10:00:00.000Z",
  "attributes": {
    "conversationId": "conv-123",
    "queueId": "queue-456",
    "queue": {
      "id": "queue-456",
      "name": "Support Queue"
    },
    "participants": [...],
    "metrics": [...]
  }
}

Here is the Python worker code to poll SQS, process valid events, and handle errors.

import boto3
import json
import time
from botocore.exceptions import ClientError
from typing import List, Dict, Any

class GenesysEventProcessor:
    def __init__(self, sqs_queue_url: str, region: str):
        self.sqs_client = boto3.client('sqs', region_name=region)
        self.queue_url = sqs_queue_url
        self.max_wait_time = 5  # Seconds for long polling
        self.max_messages = 10

    def receive_messages(self) -> List[Dict[str, Any]]:
        """
        Polls SQS for new messages.
        """
        try:
            response = self.sqs_client.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=self.max_messages,
                WaitTimeSeconds=self.max_wait_time
            )
            return response.get('Messages', [])
        except ClientError as e:
            print(f"Error receiving messages: {e}")
            return []

    def delete_message(self, receipt_handle: str) -> bool:
        """
        Deletes a message from SQS after processing.
        """
        try:
            self.sqs_client.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle
            )
            return True
        except ClientError as e:
            print(f"Error deleting message: {e}")
            return False

    def process_event(self, event_payload: Dict[str, Any]) -> None:
        """
        Processes a single Genesys Cloud event.
        """
        event_type = event_payload.get("eventType")
        if event_type != "event:conversation:end":
            print(f"Ignoring non-target event type: {event_type}")
            return

        attributes = event_payload.get("attributes", {})
        conversation_id = attributes.get("conversationId")
        queue_id = attributes.get("queueId")
        
        # Double-check the queue ID locally for safety, 
        # even though Genesys should have filtered it.
        # This is a defense-in-depth practice.
        if not queue_id:
            print("Warning: Queue ID not found in attributes.")
            return

        print(f"Processing conversation end: ID={conversation_id}, Queue={queue_id}")
        
        # TODO: Implement your business logic here
        # e.g., update CRM, trigger analytics, send notification

    def run(self):
        """
        Main loop to poll and process messages.
        """
        print("Starting event processor...")
        while True:
            messages = self.receive_messages()
            
            for message in messages:
                receipt_handle = message.get('ReceiptHandle')
                body = message.get('Body')
                
                if not body:
                    print("Empty message body received.")
                    continue
                
                try:
                    # Parse the JSON body
                    event_data = json.loads(body)
                    
                    # Process the event
                    self.process_event(event_data)
                    
                    # Delete the message from SQS upon success
                    if self.delete_message(receipt_handle):
                        print("Message deleted successfully.")
                    else:
                        print("Failed to delete message. It will be redelivered.")
                        
                except json.JSONDecodeError:
                    print("Failed to parse message body as JSON.")
                    # Optionally send to DLQ or log for debugging
                except Exception as e:
                    print(f"Error processing message: {e}")
                    # Do not delete message; let SQS retry based on visibility timeout
            
            # Small sleep to prevent tight looping if no messages
            if not messages:
                time.sleep(1)

# Usage
# processor = GenesysEventProcessor("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", "us-east-1")
# processor.run()

Complete Working Example

This script combines authentication, subscription creation (idempotent check), and event processing. It assumes you have environment variables set for credentials.

import os
import json
import time
import requests
import boto3
from botocore.exceptions import ClientError
from typing import Dict, Any, List, Optional

# Configuration
GENESYS_ENV = os.getenv("GENESYS_ENV", "mypurecloud.com")
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
TARGET_QUEUE_ID = os.getenv("TARGET_QUEUE_ID")
AWS_SQS_ARN = os.getenv("AWS_SQS_ARN")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_SQS_QUEUE_URL = os.getenv("AWS_SQS_QUEUE_URL")

class GenesysEventBridge:
    def __init__(self):
        self.access_token = None
        self.subscription_id = None
        
    def authenticate(self) -> bool:
        if self.access_token:
            return True
        try:
            url = f"https://login.{GENESYS_ENV}/oauth/token"
            headers = {"Content-Type": "application/x-www-form-urlencoded"}
            data = {
                "grant_type": "client_credentials",
                "client_id": GENESYS_CLIENT_ID,
                "client_secret": GENESYS_CLIENT_SECRET,
                "scope": "event:subscribe:read event:subscribe:write"
            }
            resp = requests.post(url, headers=headers, data=data)
            resp.raise_for_status()
            self.access_token = resp.json().get("access_token")
            return True
        except Exception as e:
            print(f"Authentication failed: {e}")
            return False

    def ensure_subscription_exists(self) -> bool:
        """
        Checks if subscription exists, creates if not.
        """
        if not self.authenticate():
            return False
            
        url = f"https://api.{GENESYS_ENV}/api/v2/events/subscriptions"
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        
        # List existing subscriptions to check for duplicates
        resp = requests.get(url, headers=headers)
        if resp.status_code == 200:
            subscriptions = resp.json().get("entities", [])
            for sub in subscriptions:
                if sub.get("attributeFilter") == f"queueId = '{TARGET_QUEUE_ID}'":
                    self.subscription_id = sub.get("id")
                    print(f"Subscription already exists: {self.subscription_id}")
                    return True
        
        # Create new subscription
        payload = {
            "name": f"Filtered-End-{TARGET_QUEUE_ID}",
            "description": "Auto-created filtered subscription for queue end events.",
            "eventTypes": ["event:conversation:end"],
            "attributeFilter": f"queueId = '{TARGET_QUEUE_ID}'",
            "destination": {
                "type": "aws_sqs",
                "awsSqs": {
                    "arn": AWS_SQS_ARN,
                    "region": AWS_REGION
                }
            },
            "status": "active"
        }
        
        resp = requests.post(url, headers=headers, json=payload)
        if resp.status_code == 201:
            self.subscription_id = resp.json().get("id")
            print(f"Subscription created: {self.subscription_id}")
            return True
        else:
            print(f"Failed to create subscription: {resp.text}")
            return False

    def process_events(self):
        if not self.ensure_subscription_exists():
            print("Aborting: Could not establish subscription.")
            return
            
        sqs_client = boto3.client('sqs', region_name=AWS_REGION)
        
        print("Starting event loop...")
        while True:
            try:
                response = sqs_client.receive_message(
                    QueueUrl=AWS_SQS_QUEUE_URL,
                    MaxNumberOfMessages=10,
                    WaitTimeSeconds=5
                )
                messages = response.get('Messages', [])
                
                for msg in messages:
                    try:
                        body = json.loads(msg['Body'])
                        self.handle_event(body)
                        sqs_client.delete_message(
                            QueueUrl=AWS_SQS_QUEUE_URL,
                            ReceiptHandle=msg['ReceiptHandle']
                        )
                    except Exception as e:
                        print(f"Error processing message: {e}")
                
                if not messages:
                    time.sleep(1)
                    
            except ClientError as e:
                print(f"AWS Error: {e}")
                time.sleep(5)

    def handle_event(self, event: Dict[str, Any]):
        if event.get("eventType") == "event:conversation:end":
            attrs = event.get("attributes", {})
            conv_id = attrs.get("conversationId")
            print(f"Received end event for conversation: {conv_id}")
            # Add business logic here

if __name__ == "__main__":
    if not all([GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, AWS_SQS_ARN, AWS_SQS_QUEUE_URL]):
        print("Missing environment variables. Please set GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, TARGET_QUEUE_ID, AWS_SQS_ARN, AWS_SQS_QUEUE_URL")
    else:
        app = GenesysEventBridge()
        app.process_events()

Common Errors & Debugging

Error: 403 Forbidden on Subscription Creation

  • Cause: The OAuth token lacks the event:subscribe:write scope.
  • Fix: Update your OAuth client in Genesys Cloud Admin > Platform > OAuth 2.0 Clients to include event:subscribe:write. Regenerate the token.

Error: No Events Received in SQS

  • Cause: The attributeFilter syntax is incorrect, or the Queue ID does not match exactly.
  • Fix:
    1. Verify the Queue ID in Genesys Cloud Admin > Routing > Queues.
    2. Check the attributeFilter in the subscription response. It must be exactly queueId = 'YOUR_QUEUE_ID'.
    3. Ensure the queue has active conversations ending. If no conversations end in that queue, no events are generated.

Error: 429 Too Many Requests

  • Cause: Your Python script is polling Genesys Cloud APIs too frequently during setup.
  • Fix: Implement exponential backoff in your requests calls. For the SQS worker, AWS handles rate limiting, but ensure you do not delete messages before processing completes.

Error: SQS Message Accumulation

  • Cause: The process_event method throws an exception, and the message is not deleted. SQS will retry delivery.
  • Fix: Ensure your try/except block in process_events catches all exceptions. If processing fails logically (not technically), you may want to move the message to a Dead Letter Queue (DLQ) to prevent infinite retries.

Official References