Subscribing to Real-Time Genesys Cloud Analytics Streams with Python Async WebSocket Consumer
What You Will Build
A Python asynchronous consumer that connects to the Genesys Cloud /api/v2/events WebSocket, subscribes to conversation and queue state changes, and pushes parsed events into an in-memory dashboard structure with sub-second latency. This tutorial uses the Genesys Cloud WebSocket API alongside the websockets and httpx libraries. The implementation covers OAuth token lifecycle management, connection heartbeats, event filtering, and automatic reconnection logic.
Prerequisites
- OAuth 2.0 Client Credentials grant configured with
platform:events:readandanalytics:conversations:readscopes - Genesys Cloud API v2
- Python 3.10 or higher
- External dependencies:
pip install websockets httpx aiofiles - A valid Genesys Cloud tenant environment identifier (e.g.,
us-east-1,eu-west-1)
Authentication Setup
Genesys Cloud WebSocket connections require a valid OAuth access token passed as a query parameter. The token must be refreshed before expiration to prevent connection drops. The following code demonstrates a production-grade token fetcher with exponential backoff for 429 Too Many Requests responses.
import asyncio
import httpx
import logging
from typing import Optional
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
GENESYS_BASE_URL = "https://api.us-east-1.mypurecloud.com"
OAUTH_TOKEN_URL = f"{GENESYS_BASE_URL}/oauth/token"
async def fetch_oauth_token(
client_id: str,
client_secret: str,
scopes: str = "platform:events:read analytics:conversations:read"
) -> dict:
"""
Acquires an OAuth 2.0 client credentials token with 429 retry logic.
Returns a dict containing 'access_token' and 'expires_in'.
"""
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"scope": scopes
}
async with httpx.AsyncClient(timeout=15.0) as client:
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
response = await client.post(OAUTH_TOKEN_URL, headers=headers, data=data)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
logger.warning("Received 429 rate limit. Retrying in %d seconds (attempt %d/%d)",
retry_after, attempt, max_retries)
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error("OAuth request failed with status %d: %s", e.response.status_code, e.response.text)
raise
except httpx.RequestError as e:
logger.error("Network error during OAuth request: %s", e)
await asyncio.sleep(2 ** attempt)
raise RuntimeError("Failed to acquire OAuth token after maximum retries")
Expected HTTP Request:
POST /oauth/token HTTP/1.1
Host: api.us-east-1.mypurecloud.com
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&scope=platform%3Aevents%3Aread%20analytics%3Aconversations%3Aread
Expected HTTP Response:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 86400,
"scope": "platform:events:read analytics:conversations:read"
}
Implementation
Step 1: WebSocket Connection and Subscription
The Genesys Cloud events WebSocket endpoint accepts a subscription message immediately after connection. The message must specify eventTypes and optional filters. The connection URL appends the access token as a query parameter. The code below establishes the connection, sends the subscription payload, and validates the initial server acknowledgment.
import json
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
async def establish_websocket_connection(access_token: str, queue_ids: list[str]) -> websockets.WebSocketClientProtocol:
"""
Connects to the Genesys Cloud events WebSocket and subscribes to specified event types.
"""
ws_url = f"wss://api.us-east-1.mypurecloud.com/api/v2/events?access_token={access_token}"
subscription_message = {
"type": "subscribe",
"subscription": {
"eventTypes": ["conversation:updated", "routing:queue:updated"],
"filters": {
"routingQueueIds": queue_ids
}
}
}
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
logger.info("WebSocket connection established. Sending subscription request.")
await websocket.send(json.dumps(subscription_message))
# Wait for subscription acknowledgment
ack = await websocket.recv()
ack_data = json.loads(ack)
if ack_data.get("type") != "ack":
raise RuntimeError(f"Subscription failed. Server responded with: {ack_data}")
logger.info("Subscription acknowledged. Waiting for events.")
return websocket
Critical Parameter Explanation:
ping_interval=20: Sends a WebSocket ping every 20 seconds to prevent idle timeouts from load balancers or NAT gateways.routingQueueIds: Filters events to only those matching your target queues. Omitting this filter returns events for all queues accessible by the token, which may trigger rate limiting or memory pressure.
Step 2: Async Event Processing and Dashboard Update
Real-time analytics streams require non-blocking event consumption. The following consumer reads messages continuously, parses the JSON payload, and pushes structured data into an asyncio.Queue. A separate dashboard updater task consumes this queue at sub-second intervals without blocking the WebSocket reader.
from dataclasses import dataclass, field
from datetime import datetime, timezone
import asyncio
@dataclass
class DashboardState:
"""Thread-safe in-memory representation of the custom agent dashboard."""
active_conversations: int = 0
queue_wait_times: dict = field(default_factory=dict)
last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def update(self, event_data: dict) -> None:
"""Processes a parsed Genesys Cloud event and mutates dashboard state."""
event_type = event_data.get("type")
if event_type == "conversation:updated":
# Simulate real-time conversation count tracking
state = event_data.get("data", {}).get("state")
if state in ("connected", "consulting"):
self.active_conversations += 1
elif state in ("wrapup", "closed"):
self.active_conversations = max(0, self.active_conversations - 1)
elif event_type == "routing:queue:updated":
queue_id = event_data.get("data", {}).get("id")
wait_time = event_data.get("data", {}).get("statistics", {}).get("averageWaitTime", 0)
self.queue_wait_times[queue_id] = wait_time
self.last_updated = datetime.now(timezone.utc)
logger.debug("Dashboard updated. Active conversations: %d", self.active_conversations)
async def event_consumer(websocket: websockets.WebSocketClientProtocol, dashboard: DashboardState, event_queue: asyncio.Queue) -> None:
"""
Continuously reads WebSocket messages, parses events, and pushes them to the processing queue.
"""
try:
async for message in websocket:
try:
payload = json.loads(message)
if payload.get("type") == "event":
event_payload = payload.get("event", {})
await event_queue.put(event_payload)
elif payload.get("type") == "error":
logger.error("WebSocket server reported error: %s", payload.get("message"))
except json.JSONDecodeError:
logger.warning("Received non-JSON WebSocket message: %s", message[:100])
except ConnectionClosed as e:
logger.warning("WebSocket connection closed: code=%s, reason=%s", e.code, e.reason)
raise
Step 3: Dashboard Updater and Reconnection Logic
The dashboard updater runs concurrently, draining the event queue and simulating a UI refresh. The outer connection loop handles authentication expiry and network interruptions by catching ConnectionClosed exceptions, refreshing the OAuth token, and re-establishing the WebSocket session automatically.
async def dashboard_updater(dashboard: DashboardState, event_queue: asyncio.Queue) -> None:
"""
Consumes events from the queue and updates the dashboard state.
Runs independently to maintain sub-second latency perception.
"""
while True:
try:
event_data = await asyncio.wait_for(event_queue.get(), timeout=0.2)
dashboard.update(event_data)
event_queue.task_done()
except asyncio.TimeoutError:
# Non-blocking drain allows this loop to yield control back to the event loop
continue
async def run_realtime_consumer(
client_id: str,
client_secret: str,
queue_ids: list[str],
dashboard: DashboardState,
max_reconnect_attempts: int = 5
) -> None:
"""
Orchestrates token refresh, WebSocket connection, and automatic reconnection.
"""
event_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
current_token: Optional[dict] = None
reconnect_attempts = 0
for attempt in range(1, max_reconnect_attempts + 1):
try:
logger.info("Attempt %d: Acquiring OAuth token...", attempt)
current_token = await fetch_oauth_token(client_id, client_secret)
token_expiry_seconds = current_token.get("expires_in", 86400)
async with websockets.connect(
f"wss://api.us-east-1.mypurecloud.com/api/v2/events?access_token={current_token['access_token']}",
ping_interval=20,
ping_timeout=10
) as websocket:
await websocket.send(json.dumps({
"type": "subscribe",
"subscription": {
"eventTypes": ["conversation:updated", "routing:queue:updated"],
"filters": {"routingQueueIds": queue_ids}
}
}))
ack = json.loads(await websocket.recv())
if ack.get("type") != "ack":
raise RuntimeError(f"Subscription rejected: {ack}")
reconnect_attempts = 0
logger.info("Connected successfully. Streaming events.")
# Run consumer and updater concurrently
async with asyncio.TaskGroup() as tg:
tg.create_task(event_consumer(websocket, dashboard, event_queue))
tg.create_task(dashboard_updater(dashboard, event_queue))
# Keep connection alive until token expires or error occurs
await asyncio.sleep(token_expiry_seconds - 300)
except (ConnectionClosed, WebSocketException, RuntimeError) as e:
reconnect_attempts += 1
logger.warning("Connection lost (%s). Reconnecting in 5 seconds...", e)
await asyncio.sleep(5)
except Exception as e:
logger.error("Fatal error in consumer loop: %s", e)
break
logger.error("Maximum reconnection attempts reached. Stopping consumer.")
Complete Working Example
The following script combines all components into a single runnable module. Replace the placeholder credentials and queue identifiers before execution.
import asyncio
import json
import logging
import httpx
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
GENESYS_BASE_URL = "https://api.us-east-1.mypurecloud.com"
OAUTH_TOKEN_URL = f"{GENESYS_BASE_URL}/oauth/token"
@dataclass
class DashboardState:
active_conversations: int = 0
queue_wait_times: dict = field(default_factory=dict)
last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def update(self, event_data: dict) -> None:
event_type = event_data.get("type")
if event_type == "conversation:updated":
state = event_data.get("data", {}).get("state")
if state in ("connected", "consulting"):
self.active_conversations += 1
elif state in ("wrapup", "closed"):
self.active_conversations = max(0, self.active_conversations - 1)
elif event_type == "routing:queue:updated":
queue_id = event_data.get("data", {}).get("id")
wait_time = event_data.get("data", {}).get("statistics", {}).get("averageWaitTime", 0)
self.queue_wait_times[queue_id] = wait_time
self.last_updated = datetime.now(timezone.utc)
async def fetch_oauth_token(client_id: str, client_secret: str, scopes: str) -> dict:
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials", "scope": scopes}
async with httpx.AsyncClient(timeout=15.0) as client:
for attempt in range(1, 4):
try:
response = await client.post(OAUTH_TOKEN_URL, headers=headers, data=data)
if response.status_code == 429:
await asyncio.sleep(2 ** attempt)
continue
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error("OAuth failed: %d %s", e.response.status_code, e.response.text)
raise
except httpx.RequestError as e:
logger.error("Network error: %s", e)
await asyncio.sleep(2 ** attempt)
raise RuntimeError("Max retries exceeded for OAuth token")
async def run_realtime_consumer(client_id: str, client_secret: str, queue_ids: list[str]) -> DashboardState:
dashboard = DashboardState()
event_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
max_reconnect_attempts = 5
for attempt in range(1, max_reconnect_attempts + 1):
try:
token_data = await fetch_oauth_token(client_id, client_secret, "platform:events:read analytics:conversations:read")
ws_url = f"wss://api.us-east-1.mypurecloud.com/api/v2/events?access_token={token_data['access_token']}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
await websocket.send(json.dumps({
"type": "subscribe",
"subscription": {
"eventTypes": ["conversation:updated", "routing:queue:updated"],
"filters": {"routingQueueIds": queue_ids}
}
}))
ack = json.loads(await websocket.recv())
if ack.get("type") != "ack":
raise RuntimeError(f"Subscription rejected: {ack}")
logger.info("Connected. Streaming events.")
async with asyncio.TaskGroup() as tg:
tg.create_task(
asyncio.gather(
asyncio.sleep(token_data.get("expires_in", 86400) - 300)
)
)
async for message in websocket:
try:
payload = json.loads(message)
if payload.get("type") == "event":
await event_queue.put(payload.get("event", {}))
elif payload.get("type") == "error":
logger.error("Server error: %s", payload.get("message"))
except json.JSONDecodeError:
continue
while not event_queue.empty():
dashboard.update(await event_queue.get())
except (ConnectionClosed, WebSocketException, RuntimeError) as e:
logger.warning("Connection lost. Reconnecting in 5s... (%s)", e)
await asyncio.sleep(5)
except Exception as e:
logger.error("Fatal error: %s", e)
break
return dashboard
if __name__ == "__main__":
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
TARGET_QUEUE_IDS = ["your-queue-id-1", "your-queue-id-2"]
try:
final_dashboard = asyncio.run(run_realtime_consumer(CLIENT_ID, CLIENT_SECRET, TARGET_QUEUE_IDS))
print("Final dashboard state:", final_dashboard)
except KeyboardInterrupt:
print("\nConsumer stopped by user.")
Common Errors & Debugging
Error: HTTP 401 Unauthorized on WebSocket Connect
What causes it: The access token passed in the query parameter has expired, been revoked, or was generated with insufficient scopes. Genesys Cloud validates the token immediately upon TCP handshake completion.
How to fix it: Implement token refresh logic before the expires_in window closes. The provided consumer refreshes the token on each reconnection cycle. Verify that the OAuth client has the platform:events:read scope assigned in the Admin Console under Users and Integrations.
Code showing the fix:
# Refresh token 5 minutes before expiry
await asyncio.sleep(token_data.get("expires_in", 86400) - 300)
# Reconnect loop automatically fetches a fresh token
Error: HTTP 403 Forbidden on Subscription Acknowledgment
What causes it: The token lacks permissions to view the specified queue IDs or event types. The routingQueueIds filter requires the token to have read access to those specific routing queues.
How to fix it: Assign the OAuth user or service account to a security profile with Routing Queue Read permissions. Alternatively, remove the filters block to subscribe to all accessible queues, then filter events client-side.
Code showing the fix:
# Remove filter to bypass queue-specific permission checks
subscription_message = {
"type": "subscribe",
"subscription": {
"eventTypes": ["conversation:updated", "routing:queue:updated"]
}
}
Error: WebSocket Close Code 1006 (Abnormal Closure)
What causes it: Network interruption, idle timeout from intermediate proxies, or server-side resource exhaustion. Genesys Cloud terminates connections that fail to respond to pings.
How to fix it: Ensure ping_interval and ping_timeout are configured on the client. Implement exponential backoff reconnection. The provided code uses ping_interval=20 and ping_timeout=10, which aligns with Genesys Cloud load balancer expectations.
Code showing the fix:
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as websocket:
# Ping/pong handled automatically by the websockets library
Error: Memory Pressure from Unbounded Event Queue
What causes it: High-volume queues generate thousands of events per second. If the dashboard updater cannot drain the queue fast enough, asyncio.Queue will grow unbounded.
How to fix it: Set a maxsize on the queue and implement a drop-tail or oldest-first eviction strategy when the queue fills. For production dashboards, aggregate events into fixed time windows before updating the UI state.
Code showing the fix:
# Limit queue size to prevent OOM
event_queue: asyncio.Queue = asyncio.Queue(maxsize=5000)
# Evict oldest event when full
try:
event_queue.put_nowait(event_data)
except asyncio.QueueFull:
event_queue.get_nowait() # Drop oldest
event_queue.put_nowait(event_data)