Subscribing to Conversation Events via Genesys Cloud WebSocket Notifications

Subscribing to Conversation Events via Genesys Cloud WebSocket Notifications

What You Will Build

  • One sentence: You will build a persistent WebSocket client that connects to the Genesys Cloud Notification API to receive real-time updates for conversation lifecycle events.
  • One sentence: This uses the Genesys Cloud REST API for authentication and the WebSocket protocol for the event subscription stream.
  • One sentence: The implementation covers Python using the websockets library and the official purecloudplatformclientv2 SDK for token management.

Prerequisites

  • OAuth client type: Confidential Client (Client Credentials flow) or Public Client (Authorization Code flow). For this tutorial, we assume a Confidential Client for server-to-server integration.
  • Required Scopes: analytics:call:view, analytics:email:view, analytics:chat:view, analytics:message:view, analytics:socialmedia:view (depending on conversation types), and notification:subscribe.
  • SDK Version: purecloudplatformclientv2 >= 168.0.0.
  • Language/Runtime: Python 3.8+.
  • External Dependencies:
    • pip install purecloudplatformclientv2
    • pip install websockets
    • pip install httpx

Authentication Setup

The Notification API WebSocket does not accept a bearer token in the HTTP header during the upgrade request. Instead, the token is passed as a query parameter in the WebSocket URL. You must first obtain a valid OAuth access token using the standard Client Credentials flow.

The following code demonstrates how to retrieve a token using the official Python SDK. This token has a default lifetime of 3600 seconds. In a production environment, you must implement token caching and refresh logic to avoid hitting rate limits on the /oauth/token endpoint.

from purecloudplatformclientv2 import Configuration, AuthApi
from purecloudplatformclientv2.rest import ApiException
import os

def get_access_token() -> str:
    """
    Retrieves an OAuth access token using the Client Credentials flow.
    """
    # Environment variables must be set for these values
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # Configure the SDK
    config = Configuration()
    config.host = f"https://{environment}"
    config.access_token = None  # We are going to fetch it

    # Initialize the Auth API
    auth_api = AuthApi(configuration=config)

    try:
        # Exchange client credentials for a token
        token_response = auth_api.post_oauth_token(
            grant_type="client_credentials",
            client_id=client_id,
            client_secret=client_secret
        )
        
        return token_response.access_token
    except ApiException as e:
        print(f"Failed to retrieve token: {e.status} - {e.reason}")
        raise

Implementation

Step 1: Constructing the WebSocket Endpoint

The Notification API WebSocket endpoint follows a specific pattern. You must append your OAuth token as a query parameter. The base path is wss://{environment}/api/v2/notifications.

Unlike REST endpoints, the WebSocket connection is long-lived. If the connection drops, the server does not resend missed events. You must handle reconnection logic explicitly.

The URL structure is:
wss://{environment}/api/v2/notifications?access_token={token}

Step 2: Defining the Subscription Payload

Once the WebSocket connection is established, the server sends a connected event. You must immediately send a JSON payload to subscribe to specific event types. For conversation events, you typically subscribe to conversation:updated or specific conversation IDs.

The subscription payload requires:

  • subscribeTo: A list of event types or specific resource IDs.
  • conversationIds: Optional. If provided, you only receive events for these specific conversations. If omitted, you receive events for all conversations the authenticated user/client has access to. Warning: Subscribing to all conversations without filtering can result in massive throughput and high costs. Always filter by conversationIds or specific event types if possible.

Here is the structure of the subscription message:

{
  "subscribeTo": [
    "conversation:updated"
  ],
  "conversationIds": [
    "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
  ]
}

Step 3: Implementing the WebSocket Client

We will use the websockets library to manage the connection. The client must:

  1. Connect to the endpoint.
  2. Send the subscription payload.
  3. Listen for incoming messages.
  4. Handle ping/pong frames to keep the connection alive.
  5. Handle disconnections and reconnect.
import asyncio
import json
import os
import websockets
import logging
from purecloudplatformclientv2 import Configuration

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class GenesysNotificationClient:
    def __init__(self, environment: str, client_id: str, client_secret: str, conversation_ids: list[str]):
        self.environment = environment
        self.client_id = client_id
        self.client_secret = client_secret
        self.conversation_ids = conversation_ids
        self.ws_url = f"wss://{environment}/api/v2/notifications"
        self.token = None
        self.connected = False

    async def connect_and_subscribe(self):
        """
        Establishes the WebSocket connection, authenticates, and subscribes to events.
        """
        # 1. Get Token
        logger.info("Retrieving OAuth token...")
        self.token = self._get_token_sync()
        if not self.token:
            raise ConnectionError("Failed to retrieve OAuth token.")

        # 2. Construct WebSocket URL with Token
        ws_uri = f"{self.ws_url}?access_token={self.token}"
        
        logger.info(f"Connecting to {ws_uri}...")
        
        # 3. Connect
        async with websockets.connect(ws_uri, ping_interval=20, ping_timeout=10) as websocket:
            self.connected = True
            logger.info("Connected to Notification API.")

            # 4. Send Subscription Payload
            subscription_payload = {
                "subscribeTo": ["conversation:updated"],
                "conversationIds": self.conversation_ids
            }
            
            await websocket.send(json.dumps(subscription_payload))
            logger.info("Subscription request sent.")

            # 5. Listen for Events
            async for message in websocket:
                await self.handle_message(message)

    def _get_token_sync(self) -> str:
        """
        Synchronous wrapper for token retrieval to fit within the async context.
        In a full async app, you would use httpx.AsyncClient.
        """
        config = Configuration()
        config.host = f"https://{self.environment}"
        auth_api = AuthApi(configuration=config)
        try:
            response = auth_api.post_oauth_token(
                grant_type="client_credentials",
                client_id=self.client_id,
                client_secret=self.client_secret
            )
            return response.access_token
        except Exception as e:
            logger.error(f"Token retrieval failed: {e}")
            return None

    async def handle_message(self, message: str):
        """
        Processes incoming WebSocket messages.
        """
        try:
            data = json.loads(message)
            event_type = data.get('type')
            
            if event_type == 'connected':
                logger.info("Server confirmed connection.")
            elif event_type == 'conversation:updated':
                self.process_conversation_event(data)
            elif event_type == 'error':
                logger.error(f"Server error: {data.get('message')}")
            else:
                logger.info(f"Received event type: {event_type}")
        except json.JSONDecodeError:
            logger.warning(f"Received non-JSON message: {message}")

    def process_conversation_event(self, event_data: dict):
        """
        Handles the conversation:updated event payload.
        """
        conversation = event_data.get('conversation')
        if conversation:
            conv_id = conversation.get('id')
            conv_type = conversation.get('type')
            state = conversation.get('state')
            
            logger.info(f"Conversation Update | ID: {conv_id} | Type: {conv_type} | State: {state}")
            
            # Check for specific participants or interactions
            interactions = conversation.get('interactions', [])
            for interaction in interactions:
                participant = interaction.get('participant')
                if participant:
                    participant_id = participant.get('id')
                    participant_status = participant.get('status')
                    logger.debug(f"  Participant {participant_id} status: {participant_status}")

Step 4: Handling Reconnection and Token Expiry

The OAuth token expires after one hour. The WebSocket server will likely close the connection or send an error when the token becomes invalid. You must detect this and reconnect with a fresh token.

We wrap the connect_and_subscribe method in a retry loop.

async def run_notification_listener():
    env = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    # Replace with actual conversation IDs you want to monitor
    target_conversations = [
        "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
    ]

    client = GenesysNotificationClient(env, client_id, client_secret, target_conversations)

    max_retries = 5
    retry_count = 0

    while retry_count < max_retries:
        try:
            await client.connect_and_subscribe()
        except websockets.exceptions.ConnectionClosed as e:
            retry_count += 1
            logger.warning(f"Connection closed: {e}. Retrying ({retry_count}/{max_retries})...")
            await asyncio.sleep(2 ** retry_count)  # Exponential backoff
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            break
    else:
        logger.error("Max retries reached. Exiting.")

Complete Working Example

Below is the full, copy-pasteable script. Save this as genesys_ws_listener.py. Ensure you have the required environment variables set.

import asyncio
import json
import os
import logging
import websockets
from purecloudplatformclientv2 import Configuration, AuthApi

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class GenesysNotificationClient:
    def __init__(self, environment: str, client_id: str, client_secret: str, conversation_ids: list[str]):
        self.environment = environment
        self.client_id = client_id
        self.client_secret = client_secret
        self.conversation_ids = conversation_ids
        self.ws_url = f"wss://{environment}/api/v2/notifications"
        self.token = None

    def _get_token(self) -> str:
        """
        Retrieves an OAuth access token using the Client Credentials flow.
        """
        config = Configuration()
        config.host = f"https://{self.environment}"
        auth_api = AuthApi(configuration=config)
        
        try:
            response = auth_api.post_oauth_token(
                grant_type="client_credentials",
                client_id=self.client_id,
                client_secret=self.client_secret
            )
            return response.access_token
        except Exception as e:
            logger.error(f"Token retrieval failed: {e}")
            raise

    async def connect_and_subscribe(self):
        """
        Establishes the WebSocket connection, authenticates, and subscribes to events.
        """
        # 1. Get Token
        logger.info("Retrieving OAuth token...")
        self.token = self._get_token()
        
        # 2. Construct WebSocket URL with Token
        ws_uri = f"{self.ws_url}?access_token={self.token}"
        
        logger.info(f"Connecting to {ws_uri}...")
        
        # 3. Connect
        # ping_interval and ping_timeout help keep the connection alive and detect stale connections
        async with websockets.connect(ws_uri, ping_interval=20, ping_timeout=10) as websocket:
            logger.info("Connected to Notification API.")

            # 4. Send Subscription Payload
            # We subscribe to conversation:updated for specific conversation IDs
            subscription_payload = {
                "subscribeTo": ["conversation:updated"],
                "conversationIds": self.conversation_ids
            }
            
            await websocket.send(json.dumps(subscription_payload))
            logger.info("Subscription request sent.")

            # 5. Listen for Events
            async for message in websocket:
                await self.handle_message(message)

    async def handle_message(self, message: str):
        """
        Processes incoming WebSocket messages.
        """
        try:
            data = json.loads(message)
            event_type = data.get('type')
            
            if event_type == 'connected':
                logger.info("Server confirmed connection.")
            elif event_type == 'conversation:updated':
                self.process_conversation_event(data)
            elif event_type == 'error':
                logger.error(f"Server error: {data.get('message')}")
            else:
                logger.info(f"Received event type: {event_type}")
        except json.JSONDecodeError:
            logger.warning(f"Received non-JSON message: {message}")

    def process_conversation_event(self, event_data: dict):
        """
        Handles the conversation:updated event payload.
        """
        conversation = event_data.get('conversation')
        if conversation:
            conv_id = conversation.get('id')
            conv_type = conversation.get('type')
            state = conversation.get('state')
            
            logger.info(f"Conversation Update | ID: {conv_id} | Type: {conv_type} | State: {state}")
            
            # Optional: Log interactions for deeper inspection
            interactions = conversation.get('interactions', [])
            for interaction in interactions:
                participant = interaction.get('participant')
                if participant:
                    participant_id = participant.get('id')
                    participant_status = participant.get('status')
                    logger.debug(f"  Participant {participant_id} status: {participant_status}")

async def main():
    env = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    # Replace with actual conversation IDs you want to monitor
    # If you leave this empty or None, you must have broad permissions and will receive ALL conversations
    target_conversations = [
        "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
    ]

    if not client_id or not client_secret:
        logger.error("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
        return

    client = GenesysNotificationClient(env, client_id, client_secret, target_conversations)

    max_retries = 5
    retry_count = 0

    while retry_count < max_retries:
        try:
            await client.connect_and_subscribe()
        except websockets.exceptions.ConnectionClosed as e:
            retry_count += 1
            logger.warning(f"Connection closed: {e}. Retrying ({retry_count}/{max_retries})...")
            await asyncio.sleep(2 ** retry_count)  # Exponential backoff
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            break
    else:
        logger.error("Max retries reached. Exiting.")

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token in the WebSocket URL query parameter is invalid, expired, or missing.
  • Fix: Ensure the token is retrieved immediately before establishing the WebSocket connection. Check that your client ID and secret are correct. Verify that the token has not expired (standard lifetime is 3600 seconds).
  • Code Check: Verify the ws_uri construction includes ?access_token={self.token}.

Error: 403 Forbidden

  • Cause: The OAuth token does not have the required scopes to subscribe to the requested event type.
  • Fix: Add notification:subscribe and the relevant analytics scopes (e.g., analytics:call:view) to your OAuth application in the Genesys Cloud Admin portal.
  • Code Check: Ensure the subscription_payload only requests event types that the token is authorized for.

Error: Connection Refused or Timeout

  • Cause: The environment URL is incorrect, or a firewall/proxy is blocking WebSocket upgrades.
  • Fix: Verify the GENESYS_ENVIRONMENT variable. Ensure your network allows outbound connections to port 443 and supports the Upgrade: websocket header.
  • Code Check: Test connectivity with curl -v wss://{environment}/api/v2/notifications?access_token={token}.

Error: High CPU or Memory Usage

  • Cause: Subscribing to conversation:updated without filtering by conversationIds can result in thousands of events per second in large organizations.
  • Fix: Always provide a list of conversationIds in the subscription payload. If you need to monitor all conversations, implement server-side filtering or use the Conversation API to poll specific conversations instead of using the high-volume Notification API.
  • Code Check: Verify that self.conversation_ids is populated before sending the subscription payload.

Official References