How to Subscribe to Conversation Events via the Genesys Cloud Notification API WebSocket
What You Will Build
- You will build a persistent WebSocket client that subscribes to real-time conversation events (start, wrap-up, transfer) in Genesys Cloud.
- This tutorial uses the Genesys Cloud v2 Notification API and the
websocketslibrary in Python. - The implementation covers authentication, subscription management, heartbeat handling, and reconnection logic.
Prerequisites
- OAuth Client Type: Public Client or Confidential Client. For this tutorial, we assume a Confidential Client (Client Credentials Grant) for server-to-server communication, or a Public Client with User Agent flow for user-specific events. We will use the Client Credentials flow for stability in this example.
- Required Scopes:
view:conversation,view:call,view:email,view:webchat,view:task(depending on conversation types you wish to monitor). For broad visibility,view:conversationis often sufficient for metadata, but specific type scopes are required for detailed event payloads. - SDK Version:
genesyscloudPython SDK v2.x (though we will usehttpxfor auth andwebsocketsfor the WS connection to demonstrate raw protocol control, which is often more reliable for long-running WS scripts than the high-level SDK wrapper). - Language/Runtime: Python 3.9+.
- External Dependencies:
pip install httpx websockets pyjwt
Authentication Setup
The Genesys Cloud Notification API WebSocket requires a valid Access Token. Unlike REST calls, you cannot pass the token in the query string. You must pass it as a query parameter named access_token in the initial WebSocket handshake URL.
First, you must obtain the token. We will use httpx to handle the OAuth 2.0 Client Credentials flow.
import httpx
import json
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expires_at: float = 0
async def get_access_token(self) -> str:
"""
Retrieves an access token using Client Credentials Grant.
Implements simple caching to avoid requesting new tokens unnecessarily.
"""
# Check if token is still valid (subtract 60s for buffer)
if self.access_token and time.time() < (self.token_expires_at - 60):
return self.access_token
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expires_at = time.time() + token_data["expires_in"]
return self.access_token
Note on Scopes: If your client application does not have the necessary scopes, the WebSocket connection will open, but the subscription request will fail with a 403 Forbidden event payload. Ensure your OAuth client in the Genesys Admin console has view:conversation and specific interaction type scopes enabled.
Implementation
Step 1: Establishing the WebSocket Connection
The Notification API WebSocket endpoint is located at wss://api.{org_id}.mypurecloud.com/api/v2/notifications.
You must append the access_token as a query parameter. Unlike HTTP headers, WebSocket handshakes rely on the URL path and query string for initial authentication.
import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GenesysWebSocketClient:
def __init__(self, org_id: str, auth_service: GenesysAuth):
self.org_id = org_id
self.auth = auth_service
self.ws_uri = f"wss://api.{org_id}.mypurecloud.com/api/v2/notifications"
self.subscriptions = []
self.is_running = False
async def connect_and_subscribe(self, subscription_ids: list[str]):
"""
Connects to the WebSocket and subscribes to a list of existing subscription IDs.
"""
self.is_running = True
token = await self.auth.get_access_token()
# The token must be passed in the query string
ws_url = f"{self.ws_uri}?access_token={token}"
try:
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
logger.info("WebSocket connected successfully.")
# Send subscription requests
for sub_id in subscription_ids:
await self.subscribe_to_notification(websocket, sub_id)
# Start listening for messages
await self.listen_for_events(websocket)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"Connection closed: {e.code} {e.reason}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
self.is_running = False
Step 2: Managing Subscriptions
You do not create subscriptions in real-time via the WebSocket. You must pre-create them using the REST API (POST /api/v2/notifications/subscriptions). The WebSocket client only subscribes to these existing IDs.
However, for this tutorial to be complete, we will include a helper to create a subscription if one does not exist, or you can manually create one in the Genesys Admin UI under “Integrations > Notifications”.
The subscription payload for conversation events looks like this:
{
"name": "Realtime Conversation Monitor",
"description": "Monitors all conversation lifecycle events",
"enabled": true,
"topics": [
"conversation"
],
"notificationType": "REALTIME",
"endpointType": "WEBHOOK",
"webhookEndpoint": {
"url": "https://placeholder.com"
}
}
Note: For WebSocket consumption, the endpointType is irrelevant to the payload structure, but the API requires an endpoint. You can use a dummy URL. The key is the topics array containing "conversation".
Once you have the id from the created subscription, pass it to the subscribe_to_notification method:
async def subscribe_to_notification(self, websocket, subscription_id: str):
"""
Sends a 'subscribe' message to the WebSocket server for a specific subscription ID.
"""
subscribe_message = {
"id": subscription_id
}
await websocket.send(json.dumps(subscribe_message))
logger.info(f"Sent subscribe request for ID: {subscription_id}")
self.subscriptions.append(subscription_id)
Step 3: Processing Events and Heartbeats
The WebSocket stream is a mix of two types of messages:
- Heartbeats: To keep the connection alive. These are usually empty or contain a specific heartbeat identifier.
- Notification Events: JSON payloads containing the actual data.
You must handle the heartbeat to prevent the server from timing you out, and you must parse the event payload to extract the conversation data.
async def listen_for_events(self, websocket):
"""
Listens for incoming messages from the WebSocket server.
Handles heartbeats and conversation events.
"""
async for message in websocket:
try:
data = json.loads(message)
# Genesys Cloud sends empty strings or specific heartbeat objects
# Depending on the SDK version and region, heartbeats might be "" or {"type": "heartbeat"}
if not data or (isinstance(data, dict) and data.get("type") == "heartbeat"):
logger.debug("Received heartbeat.")
continue
# Process the actual notification event
await self.handle_notification_event(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message[:100]}")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def handle_notification_event(self, event: dict):
"""
Parses the notification event and extracts conversation details.
"""
# The event structure typically looks like:
# {
# "subscriptionId": "...",
# "topic": "conversation",
# "event": "conversation:wrapup" (or start, transfer, etc.),
# "data": { ... conversation object ... }
# }
topic = event.get("topic")
event_type = event.get("event")
data = event.get("data", {})
if topic == "conversation":
logger.info(f"Event: {event_type}")
self.process_conversation_data(event_type, data)
else:
logger.warning(f"Unhandled topic: {topic}")
def process_conversation_data(self, event_type: str, data: dict):
"""
Extracts key fields from the conversation data.
"""
conv_id = data.get("id", "unknown")
conv_type = data.get("type", "unknown")
if event_type == "conversation:start":
logger.info(f"New {conv_type} started: {conv_id}")
elif event_type == "conversation:wrapup":
logger.info(f"{conv_type} wrapped up: {conv_id}")
elif event_type == "conversation:transfer":
logger.info(f"{conv_type} transferred: {conv_id}")
elif event_type == "conversation:participant:added":
logger.info(f"Participant added to {conv_type}: {conv_id}")
else:
logger.info(f"Other event ({event_type}) for {conv_type}: {conv_id}")
Complete Working Example
Below is the full, copy-pasteable script. It includes a simple reconnection loop to handle network blips or token expiration.
import asyncio
import json
import logging
import time
import httpx
import websockets
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.token_url = f"https://api.{org_id}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expires_at: float = 0
async def get_access_token(self) -> str:
if self.access_token and time.time() < (self.token_expires_at - 60):
return self.access_token
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expires_at = time.time() + token_data["expires_in"]
logger.info("Successfully retrieved new access token.")
return self.access_token
except httpx.HTTPStatusError as e:
logger.error(f"Failed to get token: {e.response.status_code} {e.response.text}")
raise
class GenesysWebSocketClient:
def __init__(self, org_id: str, auth_service: GenesysAuth, subscription_ids: list[str]):
self.org_id = org_id
self.auth = auth_service
self.subscription_ids = subscription_ids
self.ws_uri = f"wss://api.{org_id}.mypurecloud.com/api/v2/notifications"
self.is_running = False
async def run(self):
"""
Main loop with reconnection logic.
"""
while True:
try:
await self.connect_and_subscribe()
except Exception as e:
logger.error(f"Connection failed or lost: {e}. Retrying in 10 seconds...")
await asyncio.sleep(10)
async def connect_and_subscribe(self):
"""
Connects to the WebSocket and subscribes to notifications.
"""
token = await self.auth.get_access_token()
ws_url = f"{self.ws_uri}?access_token={token}"
logger.info(f"Connecting to {ws_url}...")
try:
async with websockets.connect(
ws_url,
ping_interval=20,
ping_timeout=10,
close_timeout=5
) as websocket:
logger.info("WebSocket connection established.")
# Subscribe to all provided subscription IDs
for sub_id in self.subscription_ids:
await self.subscribe_to_notification(websocket, sub_id)
# Listen for events until connection drops
await self.listen_for_events(websocket)
except websockets.exceptions.ConnectionClosedError as e:
logger.warning(f"Connection closed by server: {e.code} {e.reason}")
except websockets.exceptions.InvalidStatusCode as e:
logger.error(f"Invalid status code: {e.status_code} {e.reason}")
raise
except Exception as e:
logger.error(f"Unexpected error in WebSocket loop: {e}")
raise
async def subscribe_to_notification(self, websocket, subscription_id: str):
"""
Sends a subscribe message for a specific subscription ID.
"""
subscribe_message = {
"id": subscription_id
}
await websocket.send(json.dumps(subscribe_message))
logger.info(f"Subscribed to notification ID: {subscription_id}")
async def listen_for_events(self, websocket):
"""
Listens for incoming messages.
"""
async for message in websocket:
try:
# Handle heartbeat (empty string or specific object)
if not message.strip():
continue
data = json.loads(message)
if isinstance(data, dict) and data.get("type") == "heartbeat":
continue
await self.process_event(data)
except json.JSONDecodeError:
logger.warning("Received malformed JSON.")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def process_event(self, event: dict):
"""
Processes the notification event payload.
"""
topic = event.get("topic")
event_type = event.get("event")
data = event.get("data", {})
sub_id = event.get("subscriptionId")
logger.info(f"[Sub: {sub_id}] Topic: {topic} | Event: {event_type}")
if topic == "conversation":
conv_id = data.get("id")
conv_type = data.get("type")
logger.info(f" -> Conversation ID: {conv_id}, Type: {conv_type}")
# Example: Extract participant details if available
participants = data.get("participants", [])
if participants:
logger.info(f" -> Participants count: {len(participants)}")
# --- Main Execution ---
async def main():
# CONFIGURATION
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
ORG_ID = "your_org_id"
# REPLACE THIS WITH YOUR ACTUAL SUBSCRIPTION ID
# You must create a subscription in Genesys Admin > Integrations > Notifications
# Or via API: POST /api/v2/notifications/subscriptions
SUBSCRIPTION_IDS = ["your_subscription_id_here"]
if CLIENT_ID == "your_client_id":
logger.error("Please update the CONFIGURATION section with your credentials.")
return
auth_service = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_ID)
client = GenesysWebSocketClient(ORG_ID, auth_service, SUBSCRIPTION_IDS)
try:
await client.run()
except KeyboardInterrupt:
logger.info("Shutting down...")
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The access token is expired, invalid, or missing scopes.
Fix:
- Ensure your
GenesysAuthclass is refreshing the token before it expires. The code above checkstoken_expires_at - 60. - Verify that the OAuth Client in Genesys Cloud has the
view:conversationscope. If you are trying to see detailed call data, ensureview:callis also granted. - Check that the
access_tokenis correctly appended to the WebSocket URL as a query parameter.
Error: 403 Forbidden (on Subscribe)
Cause: The subscription ID exists, but the client does not have permission to view the topics defined in that subscription.
Fix:
- Go to Genesys Admin > Integrations > Notifications.
- Open the subscription.
- Check the “Topics” configured. If it includes “call”, ensure your OAuth client has
view:call. - Ensure the subscription is “Enabled”.
Error: Connection Closed Immediately (1006)
Cause: Network firewall blocking WebSocket upgrades, or the server rejecting the handshake due to malformed token.
Fix:
- Verify that your server can reach
wss://api.{org_id}.mypurecloud.comon port 443. - Ensure you are using
wss://(secure), notws://. - Check that the token URL used for authentication matches the WebSocket URL’s organization domain.
Error: No Events Received
Cause: The subscription is active, but no conversations are starting, or the subscription filter is too narrow.
Fix:
- Create a test subscription with topic
conversationand no additional filters. - Initiate a test conversation (call, chat, or email) in Genesys Cloud.
- Check the logs. If you see
conversation:start, the pipeline is working. If not, check the subscription’s “Filter” expression in the Genesys Admin UI.