How to subscribe to conversation events via the Notification API WebSocket

How to subscribe to conversation events via the Notification API WebSocket

What You Will Build

  • This tutorial demonstrates how to establish a persistent WebSocket connection to Genesys Cloud to receive real-time conversation event streams.
  • This implementation uses the Genesys Cloud v2 API Notification endpoints and the websocket library for Python.
  • The primary programming language covered is Python, with concepts applicable to JavaScript and other WebSocket-capable languages.

Prerequisites

  • OAuth Client Type: Service Account or Client Credentials flow.
  • Required Scopes:
    • conversation:read (to subscribe to general conversation events)
    • conversation:view (if accessing specific conversation details)
    • user:read (if resolving user IDs in events)
    • notification:subscribe (implicit in most modern clients, but ensure your client has permission to create subscriptions)
  • SDK/API Version: Genesys Cloud v2 API.
  • Language/Runtime Requirements: Python 3.8+.
  • External Dependencies:
    • requests (for OAuth token acquisition)
    • websocket-client (for the WebSocket connection)
    • purecloudplatformclientv2 (optional, for helper functions, but this tutorial uses raw HTTP/WebSocket for transparency)

Authentication Setup

The Notification API requires a valid JWT (JSON Web Token) for authentication. Unlike REST calls where you pass the token in every header, the WebSocket handshake occurs over HTTP, so the token is passed as a query parameter during the upgrade request.

First, you must obtain a token using the Client Credentials flow. This token expires after 24 hours, but the WebSocket connection itself may drop or require re-authentication if idle for too long or if the token expires mid-stream.

import requests
import json
import time
from typing import Optional

def get_access_token(client_id: str, client_env: str) -> str:
    """
    Obtains a JWT from Genesys Cloud using Client Credentials flow.
    
    :param client_id: Your OAuth Client ID
    :param client_env: Your OAuth Client Secret
    :return: The access token string
    """
    auth_url = "https://api.mypurecloud.com/oauth/token"
    
    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_env
    }
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    try:
        response = requests.post(auth_url, data=payload, headers=headers)
        response.raise_for_status()
        token_data = response.json()
        return token_data["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e.response.text}")
        raise
    except Exception as e:
        print(f"Unexpected error during authentication: {str(e)}")
        raise

Implementation

Step 1: Establish the WebSocket Connection

The Genesys Cloud Notification API endpoint for WebSockets is wss://api.mypurecloud.com/api/v2/notifications. You must append your access token as a query parameter ?access_token=YOUR_TOKEN.

When the connection is established, Genesys Cloud expects you to send a JSON message to subscribe to specific event types. If you do not send a subscription message, the server will close the connection.

import websocket
import json
import threading
import time

class GenesysNotificationClient:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.ws_url = f"wss://api.mypurecloud.com/api/v2/notifications?access_token={self.access_token}"
        self.ws = None
        self.connected = False
        self.subscription_id = None
        
    def on_message(self, ws, message):
        """
        Callback for incoming messages from Genesys Cloud.
        """
        try:
            data = json.loads(message)
            # The message structure varies by event type.
            # Common fields: 'type', 'id', 'payload'
            print(f"Received event: {data.get('type')}")
            
            # Handle subscription confirmation
            if data.get("type") == "SubscriptionConfirmation":
                self.subscription_id = data.get("subscriptionId")
                print(f"Subscription confirmed. ID: {self.subscription_id}")
                
            # Handle actual conversation events
            elif data.get("type") == "ConversationEvent":
                self.process_conversation_event(data)
                
            # Handle errors
            elif data.get("type") == "Error":
                print(f"Server Error: {data.get('message')}")
                
        except json.JSONDecodeError:
            print(f"Received non-JSON data: {message}")
        except Exception as e:
            print(f"Error processing message: {str(e)}")

    def on_error(self, ws, error):
        print(f"WebSocket Error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        self.connected = False
        print(f"WebSocket closed. Status: {close_status_code}, Message: {close_msg}")
        # Implement reconnection logic here if desired

    def on_open(self, ws):
        self.connected = True
        print("WebSocket connection established. Sending subscription request...")
        self.subscribe_to_events()

    def subscribe_to_events(self):
        """
        Sends the initial subscription message to define which events to receive.
        """
        # Define the subscription payload
        # We are subscribing to Conversation events
        subscription_payload = {
            "type": "SubscriptionRequest",
            "topics": [
                {
                    "type": "ConversationEvent",
                    "filter": {
                        "include": ["conversationId", "eventType", "timestamp"]
                    }
                }
            ]
        }
        
        try:
            self.ws.send(json.dumps(subscription_payload))
        except Exception as e:
            print(f"Failed to send subscription request: {str(e)}")

    def process_conversation_event(self, event_data: dict):
        """
        Process the incoming conversation event.
        """
        payload = event_data.get("payload", {})
        conversation_id = payload.get("conversationId")
        event_type = payload.get("eventType")
        
        print(f"Conversation {conversation_id}: {event_type}")
        
        # Example: If a new message is added
        if event_type == "MessageAdded":
            message_text = payload.get("message", {}).get("text")
            print(f"  New Message: {message_text}")

    def connect(self):
        """
        Starts the WebSocket connection.
        """
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        
        # Run in a separate thread to keep the main thread free
        ws_thread = threading.Thread(target=self.ws.run_forever)
        ws_thread.daemon = True
        ws_thread.start()
        
        return ws_thread

Step 2: Handle Reconnection and Token Refresh

WebSocket connections are not permanent. Genesys Cloud may close the connection due to idle timeouts, network issues, or server maintenance. Furthermore, the JWT expires every 24 hours. A production-grade client must handle reconnection and token refresh seamlessly.

The following logic adds a reconnection loop with exponential backoff.

import time
import random

class RobustNotificationClient:
    def __init__(self, client_id: str, client_secret: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.current_token = None
        self.client = None
        self.max_retries = 5
        self.base_delay = 2  # seconds
        
    def get_token(self) -> str:
        # Reuse the get_access_token function defined earlier
        return get_access_token(self.client_id, self.client_secret)

    def start_listening(self):
        retry_count = 0
        
        while retry_count < self.max_retries:
            try:
                # 1. Get a fresh token
                print(f"Attempt {retry_count + 1}: Obtaining new token...")
                self.current_token = self.get_token()
                
                # 2. Initialize the WebSocket client
                self.client = GenesysNotificationClient(self.current_token)
                
                # 3. Start the connection
                thread = self.client.connect()
                
                # 4. Wait for the connection to stabilize (simplified check)
                time.sleep(5)
                
                if self.client.connected:
                    print("Connection stable. Listening for events...")
                    retry_count = 0 # Reset retry count on success
                    
                    # Keep the main thread alive
                    while self.client.connected:
                        time.sleep(1)
                    else:
                        print("Connection lost. Reconnecting...")
                        
                else:
                    print("Connection failed to stabilize.")
                    
            except Exception as e:
                print(f"Fatal error in loop: {str(e)}")
                retry_count += 1
                
                if retry_count < self.max_retries:
                    delay = self.base_delay * (2 ** retry_count) + random.uniform(0, 1)
                    print(f"Retrying in {delay:.2f} seconds...")
                    time.sleep(delay)
                else:
                    print("Max retries reached. Exiting.")
                    break

Step 3: Filtering and Specific Event Handling

Subscribing to ConversationEvent broadly can generate a high volume of traffic. Genesys Cloud allows filtering via the filter object in the subscription request. You can filter by conversationId to listen to only one specific conversation, or use eventType to ignore low-priority events.

For example, to listen only to MessageAdded and ConversationUpdated events for a specific conversation:

def subscribe_to_specific_conversation(self, conversation_id: str):
    """
    Overrides the default subscription to focus on a single conversation.
    """
    subscription_payload = {
        "type": "SubscriptionRequest",
        "topics": [
            {
                "type": "ConversationEvent",
                "filter": {
                    "conversationId": conversation_id,
                    "include": ["eventType", "timestamp", "message"],
                    "exclude": ["statusChanged"] # Ignore status changes if not needed
                }
            }
        ]
    }
    
    if self.ws and self.connected:
        self.ws.send(json.dumps(subscription_payload))
        print(f"Subscribed to conversation: {conversation_id}")
    else:
        print("Not connected. Cannot subscribe.")

Complete Working Example

This script combines authentication, WebSocket connection, error handling, and event processing into a single runnable module. It includes a simple reconnection mechanism.

import requests
import websocket
import json
import threading
import time
import sys
import os

# Configuration - Use environment variables in production
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")

if not CLIENT_ID or not CLIENT_SECRET:
    raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")

def get_access_token(client_id: str, client_secret: str) -> str:
    auth_url = "https://api.mypurecloud.com/oauth/token"
    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    try:
        response = requests.post(auth_url, data=payload, headers=headers)
        response.raise_for_status()
        return response.json()["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Auth Error: {e.response.text}")
        sys.exit(1)

class GenesysWSClient:
    def __init__(self, token: str):
        self.token = token
        self.ws_url = f"wss://api.mypurecloud.com/api/v2/notifications?access_token={token}"
        self.ws = None
        self.is_running = False

    def on_message(self, ws, message):
        try:
            data = json.loads(message)
            msg_type = data.get("type")
            
            if msg_type == "SubscriptionConfirmation":
                print(f"[INFO] Subscription Confirmed: {data.get('subscriptionId')}")
            elif msg_type == "ConversationEvent":
                payload = data.get("payload", {})
                conv_id = payload.get("conversationId")
                evt_type = payload.get("eventType")
                
                # Log relevant events
                if evt_type in ["MessageAdded", "ConversationUpdated", "StatusChanged"]:
                    timestamp = payload.get("timestamp")
                    print(f"[EVENT] {timestamp} | Conv: {conv_id} | Type: {evt_type}")
                    
                    if evt_type == "MessageAdded":
                        msg_text = payload.get("message", {}).get("text")
                        if msg_text:
                            print(f"  >> {msg_text}")
            elif msg_type == "Error":
                print(f"[ERROR] Server Error: {data.get('message')}")
                
        except Exception as e:
            print(f"[ERROR] Processing message: {e}")

    def on_error(self, ws, error):
        print(f"[ERROR] WebSocket Error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        print(f"[CLOSE] Connection closed. Status: {close_status_code}")
        self.is_running = False

    def on_open(self, ws):
        print("[OPEN] Connection established. Subscribing...")
        self.is_running = True
        
        # Subscribe to all conversation events
        sub_request = {
            "type": "SubscriptionRequest",
            "topics": [
                {
                    "type": "ConversationEvent",
                    "filter": {
                        "include": ["conversationId", "eventType", "timestamp", "message"]
                    }
                }
            ]
        }
        ws.send(json.dumps(sub_request))

    def start(self):
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        
        # Run in a thread
        thread = threading.Thread(target=self.ws.run_forever)
        thread.daemon = True
        thread.start()
        return thread

def main():
    print("Starting Genesys Cloud Notification Client...")
    
    while True:
        try:
            # 1. Authenticate
            print("Authenticating...")
            token = get_access_token(CLIENT_ID, CLIENT_SECRET)
            
            # 2. Connect
            client = GenesysWSClient(token)
            thread = client.start()
            
            # 3. Monitor
            while client.is_running:
                time.sleep(1)
                
            print("Connection lost. Reconnecting in 5 seconds...")
            time.sleep(5)
            
        except KeyboardInterrupt:
            print("Exiting...")
            break
        except Exception as e:
            print(f"Unexpected error: {e}")
            time.sleep(10)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Handshake

  • Cause: The access token provided in the query parameter is invalid, expired, or missing.
  • Fix: Verify that the client_id and client_secret are correct. Ensure the token was obtained successfully before initiating the WebSocket connection. Check the expiration time of the token.
  • Code Check:
    # Ensure token is not empty
    if not token:
        raise ValueError("Token is empty")
    

Error: Connection Closed Immediately After Opening

  • Cause: The server closed the connection because no SubscriptionRequest was sent within the timeout period (usually a few seconds).
  • Fix: Ensure your on_open handler sends the subscription JSON payload immediately after the connection is established.
  • Code Check:
    def on_open(self, ws):
        # Must send subscription request immediately
        ws.send(json.dumps(subscription_payload))
    

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for WebSocket connections or subscription requests. Genesys Cloud limits the number of concurrent WebSocket connections per client ID.
  • Fix: Implement connection pooling or reuse existing connections. Do not create a new WebSocket for every event listener. Use a single connection and filter events within the application.
  • Code Check:
    # Reuse the 'client' instance instead of creating a new GenesysWSClient()
    

Error: No Events Received

  • Cause: The subscription filter is too restrictive, or the client does not have the necessary permissions (conversation:read) to see the events.
  • Fix: Check the OAuth scopes of the client ID. Ensure the filter object in the SubscriptionRequest is not excluding the events you expect. Start with a broad filter (include: ["*"]) to verify connectivity, then narrow it down.
  • Code Check:
    "filter": {
        "include": ["*"] # Broad filter for debugging
    }
    

Official References