Filter EventBridge Events for Specific Queue Conversation End

Filter EventBridge Events for Specific Queue Conversation End

What You Will Build

  • This tutorial demonstrates how to configure an AWS EventBridge rule to filter Genesys Cloud conversation.end events for a single specific queue.
  • It uses the Genesys Cloud EventBridge Integration API and AWS CLI/SDK for rule configuration.
  • It covers Python for the Genesys Cloud setup and AWS CLI for the EventBridge rule definition.

Prerequisites

  • Genesys Cloud Environment: A Genesys Cloud org with an active EventBridge integration enabled.
  • AWS Account: An account with permissions to create EventBridge rules and targets.
  • Python Runtime: Python 3.8+ with requests library installed.
  • AWS CLI: Installed and configured with appropriate credentials.
  • OAuth Credentials: Genesys Cloud Client ID and Client Secret with event:write scope.
  • Queue ID: The specific ID of the Genesys Cloud queue you want to monitor.

Authentication Setup

To interact with the Genesys Cloud API to manage the EventBridge integration, you must first obtain an OAuth 2.0 access token. The following Python script handles the client credentials flow.

import requests
import json
import time

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url
        self.access_token = None
        self.token_expiry = 0

    def get_token(self) -> str:
        """
        Retrieves an OAuth access token if not expired.
        """
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token

        url = f"{self.base_url}/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "event:write"
        }

        try:
            response = requests.post(url, headers=headers, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]
            # Set expiry slightly before actual expiry to avoid race conditions
            self.token_expiry = time.time() + (token_data["expires_in"] - 60)
            return self.access_token
        except requests.exceptions.HTTPError as e:
            print(f"Authentication failed: {e.response.text}")
            raise
        except Exception as e:
            print(f"Error during authentication: {e}")
            raise

# Example usage
# auth = GenesysAuth("YOUR_CLIENT_ID", "YOUR_CLIENT_SECRET")
# token = auth.get_token()

Implementation

Step 1: Enable Genesys Cloud EventBridge Integration

Before filtering events in AWS, you must ensure the Genesys Cloud side is configured to send events. This involves retrieving your existing integration or creating a new one. We will use the event:write scope to manage this.

API Endpoint: PUT /api/v2/platform/eventbridge/integrations/{integrationId}

import requests
import json

def update_eventbridge_integration(auth: GenesysAuth, integration_id: str, event_types: list):
    """
    Updates the Genesys Cloud EventBridge integration to include conversation.end events.
    """
    url = f"{auth.base_url}/api/v2/platform/eventbridge/integrations/{integration_id}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {auth.get_token()}"
    }

    # Define the payload to enable conversation.end events
    payload = {
        "enabled": True,
        "eventTypes": event_types
    }

    try:
        response = requests.put(url, headers=headers, json=payload)
        response.raise_for_status()
        print("Integration updated successfully.")
        return response.json()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            print(f"Integration {integration_id} not found.")
        elif e.response.status_code == 403:
            print("Insufficient permissions. Check 'event:write' scope.")
        else:
            print(f"Error: {e.response.text}")
        raise

# Usage
# auth = GenesysAuth("CLIENT_ID", "CLIENT_SECRET")
# update_eventbridge_integration(auth, "your-integration-id", ["conversation.end"])

Note: If you do not have an integration ID, you must first create one using POST /api/v2/platform/eventbridge/integrations. The response will provide the integrationId.

Step 2: Understand the EventBridge Event Schema

Genesys Cloud sends events to EventBridge with a specific structure. Filtering requires understanding this structure. A conversation.end event looks like this:

{
    "version": "0",
    "id": "unique-event-id",
    "detail-type": "conversation.end",
    "source": "genesys.cloud",
    "account": "aws-account-id",
    "time": "2023-10-27T12:00:00Z",
    "region": "us-east-1",
    "resources": [],
    "detail": {
        "conversationId": "conv-12345",
        "type": "conversation",
        "queueId": "queue-abcde",
        "queueName": "Support Queue",
        "wrapUpCode": "Resolved",
        "metrics": {
            "handleTime": 120,
            "holdTime": 0
        }
    }
}

Critical Fields for Filtering:

  • source: Must be genesys.cloud.
  • detail-type: Must be conversation.end.
  • detail.queueId: Must match your specific queue ID.

Step 3: Create the EventBridge Rule with Filters

Now, create an AWS EventBridge rule that filters for these specific attributes. We will use the AWS CLI for this step, as it provides a clear view of the JSON filter policy.

AWS CLI Command:

aws events put-rule \
    --name "GenesysCloudQueueEndRule" \
    --event-pattern '{
        "source": ["genesys.cloud"],
        "detail-type": ["conversation.end"],
        "detail": {
            "queueId": ["your-specific-queue-id"]
        }
    }' \
    --state ENABLED

Explanation of Filter Policy:

  • source: Restricts events to those originating from Genesys Cloud.
  • detail-type: Restricts events to conversation.end only.
  • detail.queueId: The key filter. By specifying the array ["your-specific-queue-id"], EventBridge will only trigger the rule when the queueId in the event detail matches this value.

Step 4: Add a Target to the Rule

A rule without a target is useless. Add a target, such as an AWS Lambda function, to process these filtered events.

AWS CLI Command:

aws events put-targets \
    --rule "GenesysCloudQueueEndRule" \
    --targets '[
        {
            "Id": "LambdaTarget",
            "Arn": "arn:aws:lambda:us-east-1:123456789012:function:ProcessGenesysEndEvents",
            "RoleArn": "arn:aws:iam::123456789012:role/EventBridge_InvokeLambda_Role"
        }
    ]'

Note: Ensure the IAM role associated with EventBridge has permission to invoke the Lambda function.

Complete Working Example

Below is a complete Python script that combines authentication and integration update. You will still need to run the AWS CLI commands separately or use the boto3 library for full automation.

import requests
import json
import time
import sys

class GenesysEventBridgeConfig:
    def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url
        self.access_token = None
        self.token_expiry = 0

    def get_token(self) -> str:
        """Retrieves an OAuth access token if not expired."""
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token

        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "event:write"
        }

        try:
            response = requests.post(url, headers=headers, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + (token_data["expires_in"] - 60)
            return self.access_token
        except requests.exceptions.HTTPError as e:
            print(f"Authentication failed: {e.response.text}")
            raise

    def update_integration(self, integration_id: str, event_types: list) -> dict:
        """Updates the integration to enable specific event types."""
        url = f"{self.base_url}/api/v2/platform/eventbridge/integrations/{integration_id}"
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.get_token()}"
        }
        payload = {
            "enabled": True,
            "eventTypes": event_types
        }

        try:
            response = requests.put(url, headers=headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            print(f"Failed to update integration: {e.response.text}")
            raise

def main():
    # Configuration
    CLIENT_ID = "YOUR_CLIENT_ID"
    CLIENT_SECRET = "YOUR_CLIENT_SECRET"
    INTEGRATION_ID = "YOUR_INTEGRATION_ID"
    
    # Initialize
    config = GenesysEventBridgeConfig(CLIENT_ID, CLIENT_SECRET)
    
    try:
        # Enable conversation.end events
        print("Updating Genesys Cloud Integration...")
        result = config.update_integration(INTEGRATION_ID, ["conversation.end"])
        print("Integration updated successfully.")
        print(json.dumps(result, indent=2))
        
        print("\nNext Steps:")
        print("1. Run the AWS CLI command to create the EventBridge rule with the queue filter.")
        print("2. Verify the rule triggers by ending a conversation in the specified queue.")
        
    except Exception as e:
        print(f"An error occurred: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden on Genesys API

Cause: The OAuth token lacks the event:write scope.
Fix: Ensure your grant_type request includes "scope": "event:write". Verify the client credentials have this scope assigned in the Genesys Cloud Admin Console under Platform > APIs > OAuth 2.0.

Error: EventBridge Rule Not Triggering

Cause: The queueId in the filter does not exactly match the queueId in the event.
Fix:

  1. Check the Genesys Cloud Admin Console for the correct Queue ID.
  2. Verify the EventBridge rule’s detail.queueId filter uses the exact ID.
  3. Use the EventBridge “Test Event Pattern” feature in the AWS Console. Paste a sample conversation.end event (with the correct queueId) to see if it matches.

Error: 429 Too Many Requests

Cause: Hitting Genesys Cloud API rate limits.
Fix: Implement exponential backoff in your Python script. For simple scripts, add a time.sleep(1) between requests if making multiple calls.

Error: IAM Policy Error on EventBridge Target

Cause: EventBridge lacks permission to invoke the Lambda function.
Fix: Attach the AWSLambdaBasicExecutionRole or a custom policy allowing lambda:InvokeFunction to the IAM role specified in the put-targets command.

Official References