How to Subscribe to Conversation Events via the Genesys Cloud Notification API WebSocket

How to Subscribe to Conversation Events via the Genesys Cloud Notification API WebSocket

What You Will Build

  • You will build a persistent WebSocket client that subscribes to real-time conversation events (start, wrap-up, transfer) in Genesys Cloud.
  • This tutorial uses the Genesys Cloud v2 Notification API and the websockets library in Python.
  • The implementation covers authentication, subscription management, heartbeat handling, and reconnection logic.

Prerequisites

  • OAuth Client Type: Public Client or Confidential Client. For this tutorial, we assume a Confidential Client (Client Credentials Grant) for server-to-server communication, or a Public Client with User Agent flow for user-specific events. We will use the Client Credentials flow for stability in this example.
  • Required Scopes: view:conversation, view:call, view:email, view:webchat, view:task (depending on conversation types you wish to monitor). For broad visibility, view:conversation is often sufficient for metadata, but specific type scopes are required for detailed event payloads.
  • SDK Version: genesyscloud Python SDK v2.x (though we will use httpx for auth and websockets for the WS connection to demonstrate raw protocol control, which is often more reliable for long-running WS scripts than the high-level SDK wrapper).
  • Language/Runtime: Python 3.9+.
  • External Dependencies: pip install httpx websockets pyjwt

Authentication Setup

The Genesys Cloud Notification API WebSocket requires a valid Access Token. Unlike REST calls, you cannot pass the token in the query string. You must pass it as a query parameter named access_token in the initial WebSocket handshake URL.

First, you must obtain the token. We will use httpx to handle the OAuth 2.0 Client Credentials flow.

import httpx
import json
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expires_at: float = 0

    async def get_access_token(self) -> str:
        """
        Retrieves an access token using Client Credentials Grant.
        Implements simple caching to avoid requesting new tokens unnecessarily.
        """
        # Check if token is still valid (subtract 60s for buffer)
        if self.access_token and time.time() < (self.token_expires_at - 60):
            return self.access_token

        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            
            response.raise_for_status()
            token_data = response.json()
            
            self.access_token = token_data["access_token"]
            self.token_expires_at = time.time() + token_data["expires_in"]
            
            return self.access_token

Note on Scopes: If your client application does not have the necessary scopes, the WebSocket connection will open, but the subscription request will fail with a 403 Forbidden event payload. Ensure your OAuth client in the Genesys Admin console has view:conversation and specific interaction type scopes enabled.

Implementation

Step 1: Establishing the WebSocket Connection

The Notification API WebSocket endpoint is located at wss://api.{org_id}.mypurecloud.com/api/v2/notifications.

You must append the access_token as a query parameter. Unlike HTTP headers, WebSocket handshakes rely on the URL path and query string for initial authentication.

import asyncio
import websockets
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GenesysWebSocketClient:
    def __init__(self, org_id: str, auth_service: GenesysAuth):
        self.org_id = org_id
        self.auth = auth_service
        self.ws_uri = f"wss://api.{org_id}.mypurecloud.com/api/v2/notifications"
        self.subscriptions = []
        self.is_running = False

    async def connect_and_subscribe(self, subscription_ids: list[str]):
        """
        Connects to the WebSocket and subscribes to a list of existing subscription IDs.
        """
        self.is_running = True
        token = await self.auth.get_access_token()
        
        # The token must be passed in the query string
        ws_url = f"{self.ws_uri}?access_token={token}"
        
        try:
            async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
                logger.info("WebSocket connected successfully.")
                
                # Send subscription requests
                for sub_id in subscription_ids:
                    await self.subscribe_to_notification(websocket, sub_id)
                
                # Start listening for messages
                await self.listen_for_events(websocket)
                
        except websockets.exceptions.ConnectionClosed as e:
            logger.error(f"Connection closed: {e.code} {e.reason}")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.is_running = False

Step 2: Managing Subscriptions

You do not create subscriptions in real-time via the WebSocket. You must pre-create them using the REST API (POST /api/v2/notifications/subscriptions). The WebSocket client only subscribes to these existing IDs.

However, for this tutorial to be complete, we will include a helper to create a subscription if one does not exist, or you can manually create one in the Genesys Admin UI under “Integrations > Notifications”.

The subscription payload for conversation events looks like this:

{
  "name": "Realtime Conversation Monitor",
  "description": "Monitors all conversation lifecycle events",
  "enabled": true,
  "topics": [
    "conversation"
  ],
  "notificationType": "REALTIME",
  "endpointType": "WEBHOOK", 
  "webhookEndpoint": {
    "url": "https://placeholder.com" 
  }
}

Note: For WebSocket consumption, the endpointType is irrelevant to the payload structure, but the API requires an endpoint. You can use a dummy URL. The key is the topics array containing "conversation".

Once you have the id from the created subscription, pass it to the subscribe_to_notification method:

    async def subscribe_to_notification(self, websocket, subscription_id: str):
        """
        Sends a 'subscribe' message to the WebSocket server for a specific subscription ID.
        """
        subscribe_message = {
            "id": subscription_id
        }
        
        await websocket.send(json.dumps(subscribe_message))
        logger.info(f"Sent subscribe request for ID: {subscription_id}")
        self.subscriptions.append(subscription_id)

Step 3: Processing Events and Heartbeats

The WebSocket stream is a mix of two types of messages:

  1. Heartbeats: To keep the connection alive. These are usually empty or contain a specific heartbeat identifier.
  2. Notification Events: JSON payloads containing the actual data.

You must handle the heartbeat to prevent the server from timing you out, and you must parse the event payload to extract the conversation data.

    async def listen_for_events(self, websocket):
        """
        Listens for incoming messages from the WebSocket server.
        Handles heartbeats and conversation events.
        """
        async for message in websocket:
            try:
                data = json.loads(message)
                
                # Genesys Cloud sends empty strings or specific heartbeat objects
                # Depending on the SDK version and region, heartbeats might be "" or {"type": "heartbeat"}
                if not data or (isinstance(data, dict) and data.get("type") == "heartbeat"):
                    logger.debug("Received heartbeat.")
                    continue
                
                # Process the actual notification event
                await self.handle_notification_event(data)
                
            except json.JSONDecodeError:
                logger.warning(f"Received non-JSON message: {message[:100]}")
            except Exception as e:
                logger.error(f"Error processing message: {e}")

    async def handle_notification_event(self, event: dict):
        """
        Parses the notification event and extracts conversation details.
        """
        # The event structure typically looks like:
        # {
        #   "subscriptionId": "...",
        #   "topic": "conversation",
        #   "event": "conversation:wrapup" (or start, transfer, etc.),
        #   "data": { ... conversation object ... }
        # }
        
        topic = event.get("topic")
        event_type = event.get("event")
        data = event.get("data", {})
        
        if topic == "conversation":
            logger.info(f"Event: {event_type}")
            self.process_conversation_data(event_type, data)
        else:
            logger.warning(f"Unhandled topic: {topic}")

    def process_conversation_data(self, event_type: str, data: dict):
        """
        Extracts key fields from the conversation data.
        """
        conv_id = data.get("id", "unknown")
        conv_type = data.get("type", "unknown")
        
        if event_type == "conversation:start":
            logger.info(f"New {conv_type} started: {conv_id}")
        elif event_type == "conversation:wrapup":
            logger.info(f"{conv_type} wrapped up: {conv_id}")
        elif event_type == "conversation:transfer":
            logger.info(f"{conv_type} transferred: {conv_id}")
        elif event_type == "conversation:participant:added":
            logger.info(f"Participant added to {conv_type}: {conv_id}")
        else:
            logger.info(f"Other event ({event_type}) for {conv_type}: {conv_id}")

Complete Working Example

Below is the full, copy-pasteable script. It includes a simple reconnection loop to handle network blips or token expiration.

import asyncio
import json
import logging
import time
import httpx
import websockets
from typing import Optional

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expires_at: float = 0

    async def get_access_token(self) -> str:
        if self.access_token and time.time() < (self.token_expires_at - 60):
            return self.access_token

        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    self.token_url,
                    data={
                        "grant_type": "client_credentials",
                        "client_id": self.client_id,
                        "client_secret": self.client_secret
                    },
                    headers={"Content-Type": "application/x-www-form-urlencoded"}
                )
                response.raise_for_status()
                token_data = response.json()
                
                self.access_token = token_data["access_token"]
                self.token_expires_at = time.time() + token_data["expires_in"]
                logger.info("Successfully retrieved new access token.")
                return self.access_token
            except httpx.HTTPStatusError as e:
                logger.error(f"Failed to get token: {e.response.status_code} {e.response.text}")
                raise

class GenesysWebSocketClient:
    def __init__(self, org_id: str, auth_service: GenesysAuth, subscription_ids: list[str]):
        self.org_id = org_id
        self.auth = auth_service
        self.subscription_ids = subscription_ids
        self.ws_uri = f"wss://api.{org_id}.mypurecloud.com/api/v2/notifications"
        self.is_running = False

    async def run(self):
        """
        Main loop with reconnection logic.
        """
        while True:
            try:
                await self.connect_and_subscribe()
            except Exception as e:
                logger.error(f"Connection failed or lost: {e}. Retrying in 10 seconds...")
                await asyncio.sleep(10)

    async def connect_and_subscribe(self):
        """
        Connects to the WebSocket and subscribes to notifications.
        """
        token = await self.auth.get_access_token()
        ws_url = f"{self.ws_uri}?access_token={token}"
        
        logger.info(f"Connecting to {ws_url}...")
        
        try:
            async with websockets.connect(
                ws_url, 
                ping_interval=20, 
                ping_timeout=10,
                close_timeout=5
            ) as websocket:
                logger.info("WebSocket connection established.")
                
                # Subscribe to all provided subscription IDs
                for sub_id in self.subscription_ids:
                    await self.subscribe_to_notification(websocket, sub_id)
                
                # Listen for events until connection drops
                await self.listen_for_events(websocket)
                
        except websockets.exceptions.ConnectionClosedError as e:
            logger.warning(f"Connection closed by server: {e.code} {e.reason}")
        except websockets.exceptions.InvalidStatusCode as e:
            logger.error(f"Invalid status code: {e.status_code} {e.reason}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error in WebSocket loop: {e}")
            raise

    async def subscribe_to_notification(self, websocket, subscription_id: str):
        """
        Sends a subscribe message for a specific subscription ID.
        """
        subscribe_message = {
            "id": subscription_id
        }
        await websocket.send(json.dumps(subscribe_message))
        logger.info(f"Subscribed to notification ID: {subscription_id}")

    async def listen_for_events(self, websocket):
        """
        Listens for incoming messages.
        """
        async for message in websocket:
            try:
                # Handle heartbeat (empty string or specific object)
                if not message.strip():
                    continue
                
                data = json.loads(message)
                
                if isinstance(data, dict) and data.get("type") == "heartbeat":
                    continue
                
                await self.process_event(data)
                
            except json.JSONDecodeError:
                logger.warning("Received malformed JSON.")
            except Exception as e:
                logger.error(f"Error processing message: {e}")

    async def process_event(self, event: dict):
        """
        Processes the notification event payload.
        """
        topic = event.get("topic")
        event_type = event.get("event")
        data = event.get("data", {})
        sub_id = event.get("subscriptionId")
        
        logger.info(f"[Sub: {sub_id}] Topic: {topic} | Event: {event_type}")
        
        if topic == "conversation":
            conv_id = data.get("id")
            conv_type = data.get("type")
            logger.info(f"  -> Conversation ID: {conv_id}, Type: {conv_type}")
            
            # Example: Extract participant details if available
            participants = data.get("participants", [])
            if participants:
                logger.info(f"  -> Participants count: {len(participants)}")

# --- Main Execution ---

async def main():
    # CONFIGURATION
    CLIENT_ID = "your_client_id"
    CLIENT_SECRET = "your_client_secret"
    ORG_ID = "your_org_id"
    
    # REPLACE THIS WITH YOUR ACTUAL SUBSCRIPTION ID
    # You must create a subscription in Genesys Admin > Integrations > Notifications
    # Or via API: POST /api/v2/notifications/subscriptions
    SUBSCRIPTION_IDS = ["your_subscription_id_here"]
    
    if CLIENT_ID == "your_client_id":
        logger.error("Please update the CONFIGURATION section with your credentials.")
        return

    auth_service = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_ID)
    client = GenesysWebSocketClient(ORG_ID, auth_service, SUBSCRIPTION_IDS)
    
    try:
        await client.run()
    except KeyboardInterrupt:
        logger.info("Shutting down...")

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The access token is expired, invalid, or missing scopes.
Fix:

  1. Ensure your GenesysAuth class is refreshing the token before it expires. The code above checks token_expires_at - 60.
  2. Verify that the OAuth Client in Genesys Cloud has the view:conversation scope. If you are trying to see detailed call data, ensure view:call is also granted.
  3. Check that the access_token is correctly appended to the WebSocket URL as a query parameter.

Error: 403 Forbidden (on Subscribe)

Cause: The subscription ID exists, but the client does not have permission to view the topics defined in that subscription.
Fix:

  1. Go to Genesys Admin > Integrations > Notifications.
  2. Open the subscription.
  3. Check the “Topics” configured. If it includes “call”, ensure your OAuth client has view:call.
  4. Ensure the subscription is “Enabled”.

Error: Connection Closed Immediately (1006)

Cause: Network firewall blocking WebSocket upgrades, or the server rejecting the handshake due to malformed token.
Fix:

  1. Verify that your server can reach wss://api.{org_id}.mypurecloud.com on port 443.
  2. Ensure you are using wss:// (secure), not ws://.
  3. Check that the token URL used for authentication matches the WebSocket URL’s organization domain.

Error: No Events Received

Cause: The subscription is active, but no conversations are starting, or the subscription filter is too narrow.
Fix:

  1. Create a test subscription with topic conversation and no additional filters.
  2. Initiate a test conversation (call, chat, or email) in Genesys Cloud.
  3. Check the logs. If you see conversation:start, the pipeline is working. If not, check the subscription’s “Filter” expression in the Genesys Admin UI.

Official References