Filter EventBridge Events for Queue-Specific Conversation Endings

Filter EventBridge Events for Queue-Specific Conversation Endings

What You Will Build

  • A Python application that subscribes to AWS EventBridge and filters inbound Genesys Cloud events to process only conversation.end events originating from a specific Genesys Queue.
  • The code uses the AWS SDK for Python (Boto3) to receive events from an EventBridge Bus and applies strict payload inspection to isolate the target queue.
  • The programming language covered is Python 3.9+.

Prerequisites

  • Genesys Cloud Account: You must have an organization with Event Streams enabled and configured to send events to an AWS EventBridge Bus.
  • AWS Account: You need an active AWS account with permissions to create EventBridge Rules and invoke Lambda functions (or local testing via LocalStack).
  • OAuth Scopes: While this tutorial focuses on the consumer side (EventBridge), the Genesys Cloud Event Stream configuration requires the eventstream:all scope in your Genesys Cloud integration settings.
  • Python Dependencies: boto3, requests (for optional validation), pydantic (for robust payload parsing).

Authentication Setup

This tutorial assumes the Genesys Cloud to EventBridge bridge is already configured. The “authentication” for this consumer code is handled by AWS IAM roles attached to your EventBridge Rule or Lambda function. You do not need to manage Genesys OAuth tokens in this specific consumer script because the event payload is pushed to you by Genesys Cloud.

However, to verify the queue ID you are filtering for, you must authenticate with Genesys Cloud once to retrieve the correct UUID.

import requests
import os

def get_queue_id_from_name(org_name: str, queue_name: str) -> str:
    """
    Helper to find the Queue UUID using Genesys Cloud API.
    Requires environment variables: GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_ORG_ID
    """
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
    org_id = os.environ.get("GENESYS_ORG_ID")

    if not client_id or not client_secret or not org_id:
        raise ValueError("Missing Genesys Cloud credentials in environment variables.")

    # 1. Get OAuth Token
    token_url = f"https://api.mypurecloud.com/oauth/token"
    auth_response = requests.post(
        token_url,
        auth=(client_id, client_secret),
        data={"grant_type": "client_credentials"}
    )
    auth_response.raise_for_status()
    access_token = auth_response.json()["access_token"]

    # 2. Search for Queue
    # Note: In production, cache this ID. Do not call this on every event.
    search_url = f"https://{org_id}.mypurecloud.com/api/v2/routing/queues"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    # Fetch queues (pagination may be needed for large orgs, simplified here)
    queues_response = requests.get(search_url, headers=headers)
    queues_response.raise_for_status()
    
    queues = queues_response.json().get("entities", [])
    
    for queue in queues:
        if queue["name"] == queue_name:
            return queue["id"]
            
    raise ValueError(f"Queue '{queue_name}' not found.")

# Example Usage:
# TARGET_QUEUE_ID = get_queue_id_from_name("my-org-id", "Sales Support")

Implementation

Step 1: Define the Event Structure and Filter Logic

Genesys Cloud sends events to EventBridge with a specific structure. The critical fields for filtering are:

  1. detail-type: Must be conversation.end.
  2. detail.queue.id: Must match your target Queue UUID.

We will create a Pydantic model to enforce type safety and a filter function. This ensures that malformed events or events from other queues are rejected immediately.

from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, validator
import logging

logger = logging.getLogger(__name__)

# Define the structure of the Genesys Cloud Conversation End Event
class QueueInfo(BaseModel):
    id: str
    name: str

class ConversationEndDetail(BaseModel):
    """
    Represents the 'detail' payload of a Genesys Cloud conversation.end event.
    """
    conversationId: str = Field(..., alias="conversationId")
    queue: Optional[QueueInfo] = None
    direction: Optional[str] = None
    wrapupCode: Optional[str] = None
    
    class Config:
        populate_by_name = True  # Allows using both camelCase and snake_case if needed

class GenesysEventBridgeEvent(BaseModel):
    """
    Represents the full EventBridge event structure.
    """
    id: str
    source: str
    account: str
    time: str
    region: str
    resources: List[str]
    detail_type: str = Field(..., alias="detail-type")
    detail: ConversationEndDetail

    @validator('source')
    def check_source(cls, v):
        if not v.startswith("com.genesys.cloud"):
            raise ValueError("Event source is not from Genesys Cloud")
        return v

def is_target_queue_event(event_dict: Dict[str, Any], target_queue_id: str) -> bool:
    """
    Determines if the event is a conversation.end for the specific queue.
    
    Args:
        event_dict: The raw JSON payload from EventBridge.
        target_queue_id: The UUID of the queue to filter for.
        
    Returns:
        True if the event matches, False otherwise.
    """
    try:
        # Parse and validate the event structure
        event = GenesysEventBridgeEvent.parse_obj(event_dict)
        
        # Filter 1: Check Detail Type
        if event.detail_type != "conversation.end":
            logger.debug(f"Skipping event {event.id}: detail-type is '{event.detail_type}', not 'conversation.end'")
            return False
        
        # Filter 2: Check Queue ID
        # Note: Not all conversations are associated with a queue (e.g., direct agent calls).
        if event.detail.queue is None:
            logger.debug(f"Skipping event {event.id}: No queue associated with conversation.")
            return False
            
        if event.detail.queue.id != target_queue_id:
            logger.debug(f"Skipping event {event.id}: Queue ID {event.detail.queue.id} does not match target {target_queue_id}")
            return False
            
        return True

    except Exception as e:
        # Log parsing errors but do not crash the consumer
        logger.error(f"Failed to parse event: {e}")
        return False

Step 2: Implement the AWS Lambda Handler

In a production environment, this logic runs inside an AWS Lambda function triggered by an EventBridge Rule. The handler receives the event, applies the filter, and processes the data.

import json
import logging
import os
from typing import Dict, Any, Optional

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Load Target Queue ID from Environment Variable
TARGET_QUEUE_ID = os.environ.get("TARGET_QUEUE_ID")

if not TARGET_QUEUE_ID:
    raise EnvironmentError("TARGET_QUEUE_ID environment variable is not set.")

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    AWS Lambda handler for Genesys Cloud EventBridge events.
    Filters for 'conversation.end' events for a specific queue.
    """
    
    # EventBridge sends a list of events in some configurations, 
    # but typically one event per invocation for standard rules.
    # However, if using a batch, event['detail'] might be wrapped.
    # Standard EventBridge direct trigger passes the event directly.
    
    # Check if this is a batched event (from EventBridge Archive or specific rule configs)
    # Usually, direct Lambda triggers from EventBridge pass the single event.
    events_to_process = event if isinstance(event, list) else [event]

    processed_count = 0
    skipped_count = 0

    for single_event in events_to_process:
        try:
            # Step 1: Apply Filter
            if is_target_queue_event(single_event, TARGET_QUEUE_ID):
                # Step 2: Process the Event
                process_conversation_end(single_event)
                processed_count += 1
            else:
                skipped_count += 1
                
        except Exception as e:
            logger.error(f"Error processing event {single_event.get('id', 'unknown')}: {str(e)}")
            # Depending on requirements, you might want to re-raise or send to DLQ
            raise e

    return {
        "statusCode": 200,
        "body": json.dumps({
            "processed": processed_count,
            "skipped": skipped_count
        })
    }

def process_conversation_end(event: Dict[str, Any]) -> None:
    """
    Business logic to handle the filtered conversation.end event.
    """
    detail = event.get("detail", {})
    conversation_id = detail.get("conversationId")
    queue_name = detail.get("queue", {}).get("name", "Unknown Queue")
    wrapup_code = detail.get("wrapupCode")
    
    logger.info(
        f"Processing Conversation End: "
        f"ConvID={conversation_id}, "
        f"Queue={queue_name}, "
        f"Wrapup={wrapup_code}"
    )
    
    # Example: Send to DynamoDB, SQS, or another API
    # save_to_database(conversation_id, queue_name, wrapup_code)

Step 3: Configure the EventBridge Rule (Infrastructure as Code)

To ensure your Lambda receives the events, you must configure an EventBridge Rule. While you can do this in the AWS Console, using Terraform (HCL) ensures reproducibility. This rule filters at the infrastructure level before the Lambda is invoked, saving costs.

resource "aws_cloudwatch_event_rule" "genesys_conversation_end" {
  name           = "genesys-queue-conversation-end"
  description    = "Capture Genesys Cloud conversation.end events for specific queue"
  event_pattern = jsonencode({
    "source"      = ["com.genesys.cloud"]
    "detail-type" = ["conversation.end"]
  })
}

resource "aws_cloudwatch_event_target" "genesys_lambda_target" {
  rule      = aws_cloudwatch_event_rule.genesys_conversation_end.name
  target_id = "GenesysConversationEndTarget"
  arn       = aws_lambda_function.genesys_processor.arn

  input_transformer {
    input_paths = {
      "event" = "$"
    }
    input_template = "{\"event\": <event>}"
  }
}

resource "aws_lambda_function" "genesys_processor" {
  function_name = "genesys-event-processor"
  role          = aws_iam_role.lambda_exec.arn
  handler       = "index.lambda_handler"
  runtime       = "python3.9"
  filename      = "lambda_package.zip"

  environment {
    variables = {
      TARGET_QUEUE_ID = "YOUR-GENESYS-QUEUE-UUID-HERE"
    }
  }
}

Note on Infrastructure Filtering:
The EventBridge rule above filters for conversation.end globally. It does not filter by Queue ID at the infrastructure level because Genesys Cloud does not include the Queue ID in the top-level EventBridge metadata (it is nested in detail.queue.id). Therefore, the Lambda function must perform the Queue ID check. The infrastructure rule only saves you from receiving conversation.start, participant.added, etc.

Complete Working Example

Below is the complete, runnable Python script. You can test this locally by simulating the EventBridge payload.

import json
import os
import logging
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field, validator

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Data Models ---

class QueueInfo(BaseModel):
    id: str
    name: str

class ConversationEndDetail(BaseModel):
    conversationId: str
    queue: Optional[QueueInfo] = None
    direction: Optional[str] = None
    wrapupCode: Optional[str] = None
    # Add other relevant fields from Genesys payload if needed
    
    class Config:
        populate_by_name = True

class GenesysEventBridgeEvent(BaseModel):
    id: str
    source: str
    account: str
    time: str
    region: str
    resources: List[str]
    detail_type: str = Field(..., alias="detail-type")
    detail: ConversationEndDetail

    @validator('source')
    def validate_source(cls, v):
        if not v.startswith("com.genesys.cloud"):
            raise ValueError(f"Invalid source: {v}")
        return v

# --- Core Logic ---

def is_target_queue_event(event_dict: Dict[str, Any], target_queue_id: str) -> bool:
    """
    Validates and filters the event.
    """
    try:
        event = GenesysEventBridgeEvent.parse_obj(event_dict)
        
        # 1. Check Detail Type
        if event.detail_type != "conversation.end":
            logger.debug(f"Event {event.id} skipped: wrong detail-type '{event.detail_type}'")
            return False
        
        # 2. Check Queue Presence
        if event.detail.queue is None:
            logger.debug(f"Event {event.id} skipped: no queue association")
            return False
        
        # 3. Check Queue ID Match
        if event.detail.queue.id != target_queue_id:
            logger.debug(f"Event {event.id} skipped: queue mismatch ({event.detail.queue.id} != {target_queue_id})")
            return False
            
        return True

    except Exception as e:
        logger.error(f"Event parsing failed: {e}")
        return False

def process_event(event: Dict[str, Any]) -> None:
    """
    Placeholder for business logic.
    """
    detail = event.get("detail", {})
    conv_id = detail.get("conversationId")
    queue_name = detail.get("queue", {}).get("name")
    
    logger.info(f"SUCCESS: Processing end event for Conversation {conv_id} from Queue {queue_name}")

# --- Simulation/Test Harness ---

def simulate_event_bridge_trigger():
    """
    Simulates incoming EventBridge events to test the filter logic.
    """
    # Define Target Queue ID (Mock)
    TARGET_QUEUE_ID = "12345678-1234-1234-1234-123456789012"
    
    # Event 1: Correct Queue, conversation.end (Should PASS)
    event_pass = {
        "id": "evt-001",
        "source": "com.genesys.cloud",
        "account": "111222333444",
        "time": "2023-10-27T10:00:00Z",
        "region": "us-east-1",
        "resources": [],
        "detail-type": "conversation.end",
        "detail": {
            "conversationId": "conv-abc-123",
            "queue": {
                "id": "12345678-1234-1234-1234-123456789012",
                "name": "Sales Support"
            },
            "direction": "inbound",
            "wrapupCode": "Sale Closed"
        }
    }

    # Event 2: Wrong Queue, conversation.end (Should FAIL)
    event_fail_queue = {
        "id": "evt-002",
        "source": "com.genesys.cloud",
        "account": "111222333444",
        "time": "2023-10-27T10:01:00Z",
        "region": "us-east-1",
        "resources": [],
        "detail-type": "conversation.end",
        "detail": {
            "conversationId": "conv-def-456",
            "queue": {
                "id": "99999999-9999-9999-9999-999999999999",
                "name": "Billing Support"
            },
            "direction": "inbound",
            "wrapupCode": "Refund Issued"
        }
    }

    # Event 3: Correct Queue, conversation.start (Should FAIL)
    event_fail_type = {
        "id": "evt-003",
        "source": "com.genesys.cloud",
        "account": "111222333444",
        "time": "2023-10-27T10:02:00Z",
        "region": "us-east-1",
        "resources": [],
        "detail-type": "conversation.start",
        "detail": {
            "conversationId": "conv-ghi-789",
            "queue": {
                "id": "12345678-1234-1234-1234-123456789012",
                "name": "Sales Support"
            }
        }
    }

    # Event 4: No Queue (Direct Agent Call) (Should FAIL)
    event_fail_no_queue = {
        "id": "evt-004",
        "source": "com.genesys.cloud",
        "account": "111222333444",
        "time": "2023-10-27T10:03:00Z",
        "region": "us-east-1",
        "resources": [],
        "detail-type": "conversation.end",
        "detail": {
            "conversationId": "conv-jkl-012",
            "queue": None,
            "direction": "internal"
        }
    }

    events = [event_pass, event_fail_queue, event_fail_type, event_fail_no_queue]

    print(f"--- Running Filter Simulation for Target Queue: {TARGET_QUEUE_ID} ---")
    
    for event in events:
        if is_target_queue_event(event, TARGET_QUEUE_ID):
            process_event(event)
        else:
            logger.info(f"Event {event['id']} filtered out.")

if __name__ == "__main__":
    simulate_event_bridge_trigger()

Common Errors & Debugging

Error: detail.queue is None

  • What causes it: The conversation was not routed through a queue. This happens for direct calls to agents, internal calls, or API-initiated conversations.
  • How to fix it: Ensure your filter logic explicitly checks if event.detail.queue is None before accessing event.detail.queue.id. If your business logic requires queue data, simply skip these events.

Error: KeyError: 'queue' or AttributeError: 'NoneType' object has no attribute 'id'

  • What causes it: Accessing nested dictionary keys without validation.
  • How to fix it: Use Pydantic models (as shown in Step 1) or defensive dictionary access (detail.get("queue", {}).get("id")). Never assume the payload structure is perfect.

Error: EventBridge Rule Not Triggering

  • What causes it: The EventBridge rule detail-type does not match exactly. Genesys Cloud uses conversation.end (lowercase).
  • How to fix it: Verify the EventBridge rule JSON pattern. It must be "detail-type": ["conversation.end"]. Note that Genesys Cloud sends the source as com.genesys.cloud.

Error: 429 Too Many Requests (If calling Genesys API inside Lambda)

  • What causes it: If your process_event function calls back into Genesys Cloud (e.g., to update a custom attribute), you may hit rate limits if many conversations end simultaneously.
  • How to fix it: Implement exponential backoff in your HTTP client. Better yet, write the event to an SQS queue inside the Lambda and have a separate worker process consume the queue and call Genesys APIs. This decouples the ingestion rate from the processing rate.

Official References