Real-Time Conversation Events via Genesys Cloud Notification API WebSocket
What You Will Build
- One sentence: You will build a Python application that connects to the Genesys Cloud Notification API WebSocket, subscribes to conversation events, and prints real-time updates for call states, transcripts, and participant changes.
- One sentence: This tutorial uses the Genesys Cloud Platform API v2 Notification endpoint and the
websocket-clientlibrary. - One sentence: The programming language covered is Python 3.8+.
Prerequisites
- OAuth Client Type: Machine-to-Machine (M2M) or User-to-User (U2U). M2M is recommended for server-side integrations.
- Required Scopes:
view:conversation(to view conversation details)view:interaction(to view interaction data)view:agent(optional, for agent status context)
- SDK Version: This tutorial uses raw HTTP/WebSocket connections for transparency, but the logic applies to the
genesys-cloud-purecloud-platform-clientSDK if you prefer wrapper methods. - Language/Runtime: Python 3.8 or higher.
- External Dependencies:
requests(for OAuth token retrieval)websocket-client(for WebSocket connection)pyjwt(optional, for debugging JWT tokens)
Install dependencies:
pip install requests websocket-client
Authentication Setup
The Genesys Cloud Notification API requires a valid OAuth 2.0 access token. Unlike REST endpoints where you send the token in the header, the WebSocket subscription message itself must contain the token.
First, obtain the token using the Client Credentials flow (M2M).
import requests
import os
import json
def get_access_token(client_id: str, client_secret: str, environment: str = "us-east-1") -> str:
"""
Retrieves an OAuth 2.0 access token from Genesys Cloud.
Args:
client_id: Your OAuth client ID.
client_secret: Your OAuth client secret.
environment: The Genesys Cloud region (e.g., us-east-1, eu-west-1).
Returns:
The access token string.
"""
# Construct the base URL based on the environment
base_url = f"https://{environment}.mygen.com"
# Define the token endpoint
token_url = f"{base_url}/oauth/token"
# Define the payload
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
except requests.exceptions.HTTPError as e:
print(f"Failed to get token: {e}")
raise
except requests.exceptions.ConnectionError:
print("Connection error. Check your internet connection and environment URL.")
raise
# Example usage
# TOKEN = get_access_token(os.getenv("GENESYS_CLIENT_ID"), os.getenv("GENESYS_CLIENT_SECRET"))
Critical Note: The token expires in 1 hour. For a production WebSocket connection, you must implement a background thread to refresh the token before expiration and re-subscribe if the connection drops. This tutorial focuses on the subscription mechanism; a production system should wrap the WebSocket client in a retry loop with token refresh logic.
Implementation
Step 1: Establish the WebSocket Connection
The Genesys Cloud Notification API WebSocket endpoint is wss://<environment>.mygen.com/api/v2/notifications.
Unlike standard REST calls, you do not pass the Authorization header in the WebSocket handshake. Instead, you send a JSON subscription message immediately after connecting.
import websocket
import json
import time
def connect_to_notifications(environment: str = "us-east-1"):
"""
Establishes a WebSocket connection to the Genesys Cloud Notification API.
Args:
environment: The Genesys Cloud region.
Returns:
A websocket.WebSocketApp instance.
"""
base_url = f"wss://{environment}.mygen.com/api/v2/notifications"
# Create the WebSocket application
ws = websocket.WebSocketApp(
base_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
return ws
Step 2: Define Event Handlers and Subscription Logic
You must define callbacks for on_open, on_message, on_error, and on_close. The most critical part is the on_open handler, where you send the subscription request.
The subscription payload follows a specific schema. You subscribe to event types (e.g., conversation) and can filter by conversation IDs or participant IDs.
import json
import sys
# Global variables for demonstration; use a class in production
ws_app = None
access_token = None
def on_open(ws):
"""
Called when the WebSocket connection is opened.
Sends the subscription message.
"""
global access_token
if not access_token:
print("Error: No access token available. Cannot subscribe.")
ws.close()
return
# Define the subscription message
# We subscribe to 'conversation' events.
# You can also subscribe to 'interaction', 'agent', 'routing', etc.
subscription_message = {
"topic": "conversation",
"subscribed": True,
"filter": {
# Optional: Filter by specific conversation IDs
# "conversationIds": ["your-conversation-id-123"],
# Optional: Filter by participant IDs (e.g., specific agents)
# "participantIds": ["your-agent-id-456"]
}
}
# Send the subscription message
try:
ws.send(json.dumps(subscription_message))
print("Subscription message sent. Waiting for events...")
except Exception as e:
print(f"Failed to send subscription message: {e}")
ws.close()
def on_message(ws, message):
"""
Called when a message is received from the server.
"""
try:
data = json.loads(message)
# The Notification API returns a wrapper object
# The actual event is in the 'events' list
if 'events' in data:
for event in data['events']:
process_event(event)
else:
# Handle other server messages (e.g., keep-alive, errors)
print(f"Server message: {message}")
except json.JSONDecodeError:
print(f"Received non-JSON message: {message}")
def on_error(ws, error):
"""
Called when an error occurs.
"""
print(f"WebSocket Error: {error}")
def on_close(ws, close_status_code, close_msg):
"""
Called when the connection is closed.
"""
print(f"WebSocket Closed: {close_status_code} - {close_msg}")
Step 3: Processing Conversation Events
The process_event function handles the payload structure. Genesys Cloud notification events contain a topic, id, and data object. The data object structure varies by event type. For conversation events, the data contains the full conversation snapshot or delta.
def process_event(event: dict):
"""
Processes a single notification event.
Args:
event: The event dictionary from the 'events' list.
"""
topic = event.get("topic")
event_id = event.get("id")
data = event.get("data", {})
print(f"\n--- New Event: {topic} (ID: {event_id}) ---")
if topic == "conversation":
handle_conversation_event(data)
elif topic == "interaction":
handle_interaction_event(data)
else:
print(f"Unhandled topic: {topic}")
def handle_conversation_event(data: dict):
"""
Handles conversation-specific events.
The 'data' object contains the conversation resource.
Key fields:
- id: Conversation ID
- state: Current state (e.g., 'connected', 'ended')
- participants: List of participants
- type: Conversation type (e.g., 'voice', 'chat', 'email')
"""
conversation_id = data.get("id")
state = data.get("state")
conv_type = data.get("type")
print(f"Conversation ID: {conversation_id}")
print(f"State: {state}")
print(f"Type: {conv_type}")
# Example: Check if the conversation has ended
if state == "ended":
print("Conversation has ended. You can now archive or process the transcript.")
# Example: Log participant changes
participants = data.get("participants", [])
print(f"Participant Count: {len(participants)}")
for participant in participants:
part_id = participant.get("id")
part_type = participant.get("type")
part_state = participant.get("state")
print(f" Participant {part_id} ({part_type}): {part_state}")
def handle_interaction_event(data: dict):
"""
Handles interaction-specific events.
"""
interaction_id = data.get("id")
print(f"Interaction ID: {interaction_id}")
# Interaction events are less frequent than conversation events
# and usually relate to billing or routing analytics.
Complete Working Example
Below is the full, copy-pasteable script. It retrieves the token, connects to the WebSocket, and processes events.
import requests
import websocket
import json
import time
import sys
import os
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
# Global state
access_token = None
ws_app = None
def get_access_token(client_id: str, client_secret: str, environment: str) -> str:
"""Retrieves an OAuth 2.0 access token."""
base_url = f"https://{environment}.mygen.com"
token_url = f"{base_url}/oauth/token"
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
except requests.exceptions.HTTPError as e:
print(f"Failed to get token: {e}")
sys.exit(1)
except Exception as e:
print(f"Error retrieving token: {e}")
sys.exit(1)
def on_open(ws):
"""Sends subscription message on connection open."""
global access_token
if not access_token:
print("Error: No access token. Closing connection.")
ws.close()
return
subscription_message = {
"topic": "conversation",
"subscribed": True,
"filter": {}
}
try:
ws.send(json.dumps(subscription_message))
print("Subscribed to conversation events.")
except Exception as e:
print(f"Failed to send subscription: {e}")
ws.close()
def on_message(ws, message):
"""Handles incoming messages."""
try:
data = json.loads(message)
if 'events' in data:
for event in data['events']:
process_event(event)
except json.JSONDecodeError:
pass
def on_error(ws, error):
"""Handles WebSocket errors."""
print(f"WebSocket Error: {error}")
def on_close(ws, close_status_code, close_msg):
"""Handles WebSocket closure."""
print(f"WebSocket Closed: {close_status_code} - {close_msg}")
def process_event(event: dict):
"""Processes a single event."""
topic = event.get("topic")
data = event.get("data", {})
if topic == "conversation":
conv_id = data.get("id")
state = data.get("state")
print(f"[{state}] Conversation {conv_id}")
def main():
global access_token, ws_app
print("Starting Genesys Cloud Notification Subscriber...")
# Step 1: Get Token
print("Retrieving access token...")
access_token = get_access_token(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
print("Token retrieved successfully.")
# Step 2: Connect WebSocket
base_url = f"wss://{ENVIRONMENT}.mygen.com/api/v2/notifications"
ws_app = websocket.WebSocketApp(
base_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
# Step 3: Run the WebSocket loop
try:
ws_app.run_forever()
except KeyboardInterrupt:
print("\nShutting down...")
ws_app.close()
print("Closed.")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Handshake or Subscription
Cause: The access token is invalid, expired, or missing the required scopes.
Fix:
- Verify your Client ID and Secret.
- Ensure the OAuth client has the
view:conversationscope. - Check if the token has expired. The Notification API does not automatically refresh tokens. If you receive a 401 error in the WebSocket stream, the server will close the connection. You must reconnect with a new token.
Error: Connection Refused or Timeout
Cause: Incorrect environment URL or network firewall blocking WebSocket ports.
Fix:
- Verify the environment URL. Common regions are
us-east-1,us-east-2,eu-west-1,ap-southeast-1. - Ensure your server allows outbound traffic on port 443 (WSS).
- Check if a proxy is intercepting the WebSocket upgrade request. Proxies must support HTTP/1.1 upgrade headers.
Error: No Events Received
Cause: The subscription filter is too narrow, or there is no active conversation matching the criteria.
Fix:
- Remove all filters from the subscription message to receive all conversation events for the user/client.
- Trigger a test conversation in the Genesys Cloud UI (e.g., place a call to an agent) and check if the event appears.
- Ensure the OAuth user has permissions to view the conversations. If the user is not a participant or supervisor, they may not receive events for all conversations.
Error: JSON Decode Error
Cause: The server sends a non-JSON message (e.g., a ping/pong frame or a binary frame).
Fix:
The websocket-client library handles ping/pong frames automatically. If you receive a JSON decode error, check if the message is empty or binary. In the on_message handler, wrap the json.loads call in a try-except block and ignore non-JSON messages.