Choosing Between Real-Time Conversations and Analytics Data in Genesys Cloud
What You Will Build
- You will build two distinct data retrieval scripts: one for live, event-driven conversation updates and one for historical, aggregated business intelligence.
- This tutorial uses the Genesys Cloud REST API and the Python
genesys-cloud-pythonSDK. - The primary language is Python, with specific focus on handling WebSocket streams versus paginated REST queries.
Prerequisites
- OAuth Client Type: Service Account (Confidential Client) with
client_credentialsgrant type. - Required Scopes:
- For Real-Time:
conversation:read,presence:read,routing:read(depending on data needs). - For Analytics:
analytics:conversation:read,analytics:detail:read.
- For Real-Time:
- SDK Version:
genesys-cloud-pythonv2.0.0 or later. - Runtime: Python 3.9+.
- Dependencies:
pip install genesys-cloud-python requests python-dotenv
Authentication Setup
Both endpoints require a valid OAuth 2.0 access token. The Genesys Cloud API does not support API key authentication for these specific endpoints. You must use the Client Credentials flow.
The following utility function handles token acquisition and caching. In production, implement a cache with a TTL slightly less than the token expiration (usually 59 minutes for a 60-minute token).
import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
# In-memory cache for demo purposes. Use Redis or database in production.
_token_cache = {
"token": None,
"expires_at": None
}
def get_access_token() -> str:
"""
Retrieves a Genesys Cloud OAuth access token.
Returns cached token if valid, otherwise fetches a new one.
"""
now = datetime.utcnow()
# Check cache
if _token_cache["token"] and _token_cache["expires_at"] and now < _token_cache["expires_at"]:
return _token_cache["token"]
# Fetch new token
url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
# Cache the token
expires_in = token_data.get("expires_in", 3600)
_token_cache["token"] = token_data["access_token"]
_token_cache["expires_at"] = now + timedelta(seconds=expires_in - 60) # Buffer for safety
return _token_cache["token"]
except requests.exceptions.HTTPError as e:
raise Exception(f"Failed to fetch token: {e.response.text}")
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during token fetch: {str(e)}")
Implementation
Step 1: Real-Time Data with /api/v2/conversations (WebSocket)
The /api/v2/conversations endpoint is not a standard REST resource you query for a list. It is the entry point for a WebSocket subscription. You use this when you need to know right now what is happening. Use this for:
- Live dashboards showing current queue wait times.
- Real-time agent assistance tools.
- Triggering immediate workflows based on conversation events (e.g., “send SMS if customer waits > 2 mins”).
Technical Constraint: You cannot “pull” history from this endpoint. If you miss a WebSocket event, you must query the Analytics API to fill the gap.
Code: Subscribing to Conversation Events
This example uses the websockets library (standard in modern Python async stacks) to connect to the Genesys Cloud WebSocket server.
import asyncio
import json
import websockets
import ssl
from urllib.parse import urlencode
async def listen_to_conversations(token: str):
"""
Connects to the Genesys Cloud WebSocket for real-time conversation updates.
"""
# The WebSocket endpoint differs from the REST API base URL
ws_url = f"wss://{GENESYS_CLOUD_REGION.replace('my.', '')}/api/v2/conversations"
# Query parameters for subscription filters
# You can filter by types (voice, chat, message, callback, etc.)
params = {
"types": "voice,chat,message",
"limit": "1000" # Max events per buffer
}
ws_url_with_params = f"{ws_url}?{urlencode(params)}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Configure SSL context to handle potential certificate issues in dev environments
ssl_context = ssl._create_unverified_context() if os.getenv("DEV_MODE") else ssl.create_default_context()
try:
print(f"Connecting to {ws_url_with_params}...")
async with websockets.connect(ws_url_with_params, extra_headers=headers, ssl=ssl_context) as websocket:
print("Connected. Listening for events...")
async for message in websocket:
try:
data = json.loads(message)
event_type = data.get("eventType")
conversation_id = data.get("conversationId")
if event_type == "conversation:created":
print(f"[NEW] Conversation started: {conversation_id}")
elif event_type == "conversation:updated":
# Check for specific updates like status changes
changes = data.get("changes", [])
for change in changes:
if change.get("field") == "state":
print(f"[UPDATE] Conversation {conversation_id} moved to: {change.get('value')}")
elif event_type == "conversation:deleted":
print(f"[END] Conversation ended: {conversation_id}")
except json.JSONDecodeError:
print(f"Received non-JSON data: {message}")
except Exception as e:
print(f"Error processing message: {e}")
except websockets.exceptions.InvalidStatusCode as e:
print(f"Connection failed with status {e.status_code}. Check OAuth scope: conversation:read")
except ConnectionRefusedError:
print("Could not connect to WebSocket server. Check network/firewall.")
except Exception as e:
print(f"Unexpected error: {e}")
# To run this, you would call:
# asyncio.run(listen_to_conversations(get_access_token()))
Error Handling Note: If you receive a 401 Unauthorized on the WebSocket handshake, it usually means the token has expired or lacks the conversation:read scope. The WebSocket connection does not automatically refresh tokens. You must implement a heartbeat monitor and reconnect with a fresh token when the current one nears expiration.
Step 2: Historical Data with /api/v2/analytics/conversations (REST Query)
The /api/v2/analytics/conversations/details/query endpoint is a standard REST POST endpoint. You use this when you need to answer questions about the past. Use this for:
- Daily performance reports (Average Handle Time, Abandon Rate).
- Compliance audits (retrieving transcripts from last week).
- Training analysis (finding specific interactions where a keyword was spoken).
Technical Constraint: This is not real-time. There is typically a latency of 1-5 minutes before a completed conversation appears in the analytics store. Do not use this for live monitoring.
Code: Querying Historical Conversation Details
This example uses the genesys-cloud-python SDK to construct a complex query. The Analytics API uses a specific query object structure that supports filtering, grouping, and sorting.
from genesyscloud import AnalyticsApi, Configuration
from genesyscloud.rest import ApiException
from datetime import datetime, timedelta
def get_historical_conversations(token: str) -> list:
"""
Queries the Analytics API for completed voice conversations from the last 24 hours.
"""
# Configure the API client
configuration = Configuration()
configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
configuration.access_token = token
analytics_api = AnalyticsApi(configuration)
# Define the time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
# Construct the query body
# Note: The SDK generates the classes, but the structure mirrors the JSON body
from genesyscloud.models import ConversationDetailQuery
query_body = ConversationDetailQuery(
view="raw", # 'raw' returns individual records, 'summary' returns aggregates
time_grouping="none", # Do not aggregate by time bucket
interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
size=25, # Page size (max 250)
sort_by=["conversation.startTime"],
sort_order="desc",
filters=[
{
"type": "conversation",
"path": "type",
"operation": "eq",
"value": ["voice"] # Filter for voice only
}
]
)
try:
print(f"Querying analytics for {start_time.isoformat()} to {end_time.isoformat()}...")
# Execute the query
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
if response.entity and response.entity.total > 0:
print(f"Found {response.entity.total} conversations.")
return response.entity.conversations
else:
print("No conversations found in the specified time range.")
return []
except ApiException as e:
print(f"Analytics API Error: {e.status} {e.reason}")
if e.body:
print(f"Response Body: {e.body}")
raise e
except Exception as e:
print(f"Unexpected error during analytics query: {e}")
raise e
# Example usage:
# conversations = get_historical_conversations(get_access_token())
# if conversations:
# print(f"Latest conversation ID: {conversations[0].id}")
Pagination Handling: The Analytics API supports pagination via the nextPageToken field in the response. For large datasets, you must loop until nextPageToken is null.
def get_all_historical_conversations(token: str) -> list:
"""
Handles pagination to retrieve all matching conversations.
"""
configuration = Configuration()
configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
configuration.access_token = token
analytics_api = AnalyticsApi(configuration)
all_conversations = []
next_page_token = None
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
while True:
query_body = ConversationDetailQuery(
view="raw",
time_grouping="none",
interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
size=250, # Max page size
sort_by=["conversation.startTime"],
sort_order="desc",
filters=[{"type": "conversation", "path": "type", "operation": "eq", "value": ["voice"]}]
)
if next_page_token:
query_body.page_token = next_page_token
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
if response.entity and response.entity.conversations:
all_conversations.extend(response.entity.conversations)
print(f"Retrieved {len(response.entity.conversations)} records. Total so far: {len(all_conversations)}")
next_page_token = response.entity.next_page_token if response.entity else None
if not next_page_token:
break
except ApiException as e:
print(f"Error on page fetch: {e}")
break
return all_conversations
Step 3: Processing Results and Mapping Data Models
The data models returned by these two endpoints are fundamentally different.
- Real-Time (
/api/v2/conversations): Returns event objects. A single conversation generates multiple events (created,updated,participant:added,media:connected). You must maintain state in your application to reconstruct the current status of a conversation. - Analytics (
/api/v2/analytics/conversations): Returns a denormalized snapshot of the conversation at the time of completion (or the latest update). It contains calculated fields liketotalHoldTime,wrapUpTime, andqueueWaitTimewhich are not available in real-time events.
Code: Comparing Data Structures
def analyze_data_difference():
"""
Illustrates the structural difference between real-time events and analytics records.
"""
# Example Real-Time Event Payload (simplified)
real_time_event = {
"eventType": "conversation:updated",
"conversationId": "abc-123",
"changes": [
{"field": "state", "value": "connected", "previousValue": "queued"}
]
}
# Example Analytics Record Payload (simplified)
analytics_record = {
"id": "abc-123",
"type": "voice",
"startTime": "2023-10-27T10:00:00Z",
"endTime": "2023-10-27T10:05:00Z",
"metrics": {
"totalHoldTime": 120.5,
"wrapUpTime": 30.0,
"queueWaitTime": 15.0
}
}
print("Real-Time Data Structure:")
print(f"- Focus: Event-driven changes")
print(f"- Key Field: eventType")
print(f"- Usage: Immediate reaction to state change")
print("\nAnalytics Data Structure:")
print(f"- Focus: Aggregated metrics and historical state")
print(f"- Key Field: metrics (calculated values)")
print(f"- Usage: Reporting, auditing, trend analysis")
Complete Working Example
The following script combines authentication, real-time listening, and historical querying into a single module. It demonstrates the proper separation of concerns.
import os
import asyncio
import json
import websockets
import ssl
from datetime import datetime, timedelta
from urllib.parse import urlencode
from dotenv import load_dotenv
from genesyscloud import AnalyticsApi, Configuration
from genesyscloud.rest import ApiException
from genesyscloud.models import ConversationDetailQuery
load_dotenv()
GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
_token_cache = {"token": None, "expires_at": None}
def get_access_token() -> str:
from datetime import timedelta
import requests
now = datetime.utcnow()
if _token_cache["token"] and _token_cache["expires_at"] and now < _token_cache["expires_at"]:
return _token_cache["token"]
url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
_token_cache["token"] = token_data["access_token"]
_token_cache["expires_at"] = now + timedelta(seconds=token_data.get("expires_in", 3600) - 60)
return _token_cache["token"]
except Exception as e:
raise Exception(f"Token error: {e}")
async def realtime_listener(token: str, duration_seconds: int = 60):
ws_url = f"wss://{GENESYS_CLOUD_REGION.replace('my.', '')}/api/v2/conversations"
params = {"types": "voice", "limit": "100"}
ws_url_with_params = f"{ws_url}?{urlencode(params)}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
ssl_context = ssl.create_default_context()
try:
async with websockets.connect(ws_url_with_params, extra_headers=headers, ssl=ssl_context) as websocket:
end_time = asyncio.get_event_loop().time() + duration_seconds
while asyncio.get_event_loop().time() < end_time:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
data = json.loads(message)
print(f"[REAL-TIME] {data.get('eventType')}: {data.get('conversationId')}")
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"WS Error: {e}")
except Exception as e:
print(f"WS Connection Error: {e}")
def historical_query(token: str):
configuration = Configuration()
configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
configuration.access_token = token
analytics_api = AnalyticsApi(configuration)
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
query_body = ConversationDetailQuery(
view="raw",
time_grouping="none",
interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
size=10,
filters=[{"type": "conversation", "path": "type", "operation": "eq", "value": ["voice"]}]
)
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
if response.entity and response.entity.conversations:
print(f"[ANALYTICS] Found {len(response.entity.conversations)} conversations in last hour.")
for conv in response.entity.conversations:
print(f" - ID: {conv.id}, Start: {conv.startTime}")
else:
print("[ANALYTICS] No conversations found.")
except ApiException as e:
print(f"Analytics Error: {e}")
def main():
token = get_access_token()
print("=== Fetching Historical Data ===")
historical_query(token)
print("\n=== Listening to Real-Time Data (30 seconds) ===")
asyncio.run(realtime_listener(token, duration_seconds=30))
print("\n=== Done ===")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket
- Cause: The OAuth token provided in the WebSocket header has expired or is invalid.
- Fix: Ensure your token refresh logic is robust. WebSocket connections are long-lived. If your token expires mid-connection, the server will close the connection. You must detect this closure and reconnect with a fresh token.
- Code Fix: Implement a reconnection loop with exponential backoff in your
realtime_listener.
Error: 403 Forbidden on Analytics Query
- Cause: The OAuth client lacks the
analytics:conversation:readscope. - Fix: Go to the Genesys Cloud Admin Console > Applications > [Your App] > Scopes. Add
analytics:conversation:readand regenerate the client secret if necessary (though usually, scope updates are immediate).
Error: “No data found” in Analytics
- Cause: You are querying a time range that has not yet been processed.
- Fix: Analytics data is not real-time. It is typically available 1-5 minutes after the conversation ends. If you just ended a test call, wait 5 minutes before querying.
- Debugging: Use the
view="summary"to check if any data exists in the bucket, or widen the time interval tolast 24 hours.
Error: WebSocket Connection Refused
- Cause: Firewall or network proxy blocking WebSocket traffic.
- Fix: Ensure your environment allows outbound traffic to
wss://*.genesys.cloud/api/v2/conversations. Standard HTTP proxies often block WebSocket upgrades. You may need to configure your proxy to allowUpgrade: websocketheaders.