Querying Live Conversations vs. Historical Analytics: A Developer’s Guide to Genesys Cloud APIs
What You Will Build
- You will build two distinct Python modules: one to fetch active, real-time conversation details and another to query historical conversation metrics for reporting.
- This tutorial uses the Genesys Cloud Platform APIs (
/api/v2/conversationsand/api/v2/analytics/conversations) and the official Python SDK (genesyscloud). - The programming language covered is Python 3.9+, utilizing the
requestslibrary for raw HTTP examples and the official SDK for production-ready patterns.
Prerequisites
- OAuth Client Type: A Confidential Client (Client Credentials Grant) or Public Client (Authorization Code Grant with PKCE). For background services, Confidential is recommended.
- Required Scopes:
- For Live Conversations:
conversation:read,telephony:read,user:read(if accessing user-specific context). - For Analytics:
analytics:conversations:read,analytics:conversations:query.
- For Live Conversations:
- SDK Version:
genesyscloud>= 3.0.0. - Runtime Requirements: Python 3.9 or higher.
- External Dependencies:
pip install genesyscloudpip install requestspip install python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0. For API-to-API communication, the Client Credentials flow is standard. You must store your Client ID, Client Secret, and Environment URL securely. Never hardcode these values.
Below is a helper function using requests to obtain and cache an access token. In a production application, you would implement token caching to avoid requesting a new token on every API call, as tokens are valid for one hour.
import os
import requests
from datetime import datetime, timedelta
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, env_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.env_url = env_url.rstrip('/')
self.token_url = f"{self.env_url}/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: Optional[datetime] = None
def get_token(self) -> str:
"""
Retrieves a valid OAuth2 access token.
Returns an existing token if valid, otherwise fetches a new one.
"""
if self.access_token and self.expires_at and datetime.utcnow() < self.expires_at:
return self.access_token
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Authentication failed: Invalid client ID or secret.")
elif response.status_code == 403:
raise Exception("Authentication failed: Client does not have permission to request tokens.")
else:
raise Exception(f"Unexpected error during token request: {e}")
token_data = response.json()
self.access_token = token_data['access_token']
# Token lifespan is typically 3600 seconds.
# We subtract 60 seconds to ensure we refresh before expiry.
expires_in = int(token_data.get('expires_in', 3600)) - 60
self.expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
return self.access_token
# Usage Example
# auth = GenesysAuth(
# client_id=os.getenv("GENESYS_CLIENT_ID"),
# client_secret=os.getenv("GENESYS_CLIENT_SECRET"),
# env_url=os.getenv("GENESYS_ENV_URL") # e.g., "https://api.mypurecloud.com"
# )
# token = auth.get_token()
Implementation
Step 1: Fetching Live Conversations (/api/v2/conversations)
The /api/v2/conversations endpoint provides a snapshot of currently active interactions. This includes calls, chats, messages, and screen shares that are currently in progress or recently concluded (within the immediate session window). This endpoint is designed for real-time dashboards, agent assist tools, or immediate routing logic.
Key Characteristics:
- Latency: Low (sub-second).
- Data Depth: Shallow. It returns the current state, not historical metrics.
- Pagination: Supports
pageSizeandpageNumber, but typically returns a manageable set of active items. - Scope:
conversation:read.
Code Example: Retrieving Active Calls
import requests
from typing import List, Dict, Any
def get_active_conversations(auth: GenesysAuth, conversation_type: str = "call") -> List[Dict[str, Any]]:
"""
Fetches currently active conversations of a specified type.
Args:
auth: GenesysAuth instance with a valid token.
conversation_type: Type of conversation (e.g., 'call', 'message', 'chat').
Returns:
List of conversation objects.
"""
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json"
}
# The base URL for conversations
url = f"{auth.env_url}/api/v2/conversations"
# Query parameters
params = {
"conversationTypes": conversation_type,
"pageSize": 25,
"pageNumber": 1
}
try:
response = requests.get(url, headers=headers, params=params)
# Handle 401 Unauthorized (Token expired or invalid)
if response.status_code == 401:
print("Warning: Token expired. Refreshing...")
auth.access_token = None # Force refresh
return get_active_conversations(auth, conversation_type) # Retry
# Handle 403 Forbidden (Missing Scopes)
if response.status_code == 403:
raise Exception("Forbidden: Ensure the OAuth client has 'conversation:read' scope.")
response.raise_for_status()
data = response.json()
return data.get('entities', [])
except requests.exceptions.RequestException as e:
print(f"Network error fetching conversations: {e}")
return []
# Example Usage:
# active_calls = get_active_conversations(auth, "call")
# for call in active_calls:
# print(f"Active Call ID: {call['id']}")
# print(f"State: {call['state']}")
Expected Response Structure:
{
"entities": [
{
"id": "7a6f8b1c-2d3e-4f5a-9b8c-7d6e5f4a3b2c",
"type": "call",
"state": "connected",
"createdTimestamp": "2023-10-27T10:00:00.000Z",
"updatedTimestamp": "2023-10-27T10:00:05.000Z",
"participants": [
{
"id": "user-123",
"role": "agent",
"state": "connected"
},
{
"id": "external-456",
"role": "customer",
"state": "connected"
}
]
}
],
"pageSize": 25,
"pageNumber": 1,
"pageCount": 1,
"total": 1
}
Step 2: Querying Historical Analytics (/api/v2/analytics/conversations)
The /api/v2/analytics/conversations endpoint is used for historical reporting. It does not return the live state of a conversation. Instead, it aggregates data about conversations that have already occurred within a specified time window. This endpoint is computationally intensive because it aggregates metrics (duration, hold time, wait time) across potentially millions of records.
Key Characteristics:
- Latency: High (seconds to minutes, depending on date range and granularity).
- Data Depth: Deep. Returns aggregated metrics and detailed records if
detailLevelis set. - Pagination: Uses
nextPageTokeninstead of simple page numbers. - Scope:
analytics:conversations:read. - Rate Limiting: Strict. This endpoint is expensive. Use exponential backoff for 429 errors.
Code Example: Querying Conversation Metrics
import json
import time
from typing import Dict, Any, Generator
def query_conversation_analytics(auth: GenesysAuth, start_time: str, end_time: str) -> Generator[Dict[str, Any], None, None]:
"""
Queries historical conversation analytics.
Uses a generator to handle pagination efficiently.
Args:
auth: GenesysAuth instance.
start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00.000Z").
end_time: ISO 8601 end time (e.g., "2023-10-02T00:00:00.000Z").
Yields:
Individual conversation analytics records.
"""
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json"
}
url = f"{auth.env_url}/api/v2/analytics/conversations/details/query"
# The request body is a JSON object defining the query
payload = {
"dateFrom": start_time,
"dateTo": end_time,
"interval": "PT1H", # Aggregate by 1-hour intervals
"metrics": [
"conversationCount",
"duration",
"holdTime",
"waitTime"
],
"groupBy": ["conversationType"],
"detailLevel": "all" # Returns detailed records for each conversation
}
next_page_token = None
max_retries = 3
while True:
if next_page_token:
payload['nextPageToken'] = next_page_token
attempt = 0
while attempt < max_retries:
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 429:
# Rate Limited: Wait and retry
wait_time = (2 ** attempt) + 1
print(f"Rate limited (429). Retrying in {wait_time} seconds...")
time.sleep(wait_time)
attempt += 1
continue
if response.status_code == 401:
auth.access_token = None
headers["Authorization"] = f"Bearer {auth.get_token()}"
continue # Retry with new token
if response.status_code == 403:
raise Exception("Forbidden: Ensure the OAuth client has 'analytics:conversations:read' scope.")
response.raise_for_status()
break # Success, exit retry loop
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
attempt += 1
if attempt == max_retries:
return
data = response.json()
# Yield individual entities
for entity in data.get('entities', []):
yield entity
# Check for pagination
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
# Example Usage:
# start = "2023-10-01T00:00:00.000Z"
# end = "2023-10-02T00:00:00.000Z"
# for record in query_conversation_analytics(auth, start, end):
# print(f"Conversation ID: {record.get('conversationId')}")
# print(f"Duration (ms): {record.get('metrics', {}).get('duration', {}).get('sum', 0)}")
Expected Response Structure (Simplified):
{
"entities": [
{
"dateFrom": "2023-10-01T00:00:00.000Z",
"dateTo": "2023-10-01T01:00:00.000Z",
"conversationType": "call",
"metrics": {
"conversationCount": {
"count": 150
},
"duration": {
"sum": 4500000 // Total milliseconds
}
}
}
],
"nextPageToken": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"pageSize": 100,
"total": 1500
}
Step 3: Processing Results and Edge Cases
When processing analytics data, you must handle the aggregation logic. The /analytics endpoint returns sums, averages, and counts. If you need the raw duration of a single call, you must ensure detailLevel is set to all or summarized appropriately, but be aware that all can return massive payloads.
Edge Case: Large Date Ranges
If you query a large date range (e.g., 30 days) with detailLevel: all, the API may timeout or return incomplete data. Always paginate and process in smaller intervals (e.g., 1 hour or 1 day) if you need detailed records.
Edge Case: Missing Metrics
If a metric is not applicable to a conversation type (e.g., holdTime for a chat), the metric value may be 0 or null. Always check for null values before performing arithmetic.
def process_analytics_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Safely processes an analytics record, handling missing metrics.
"""
metrics = record.get('metrics', {})
duration_ms = metrics.get('duration', {}).get('sum', 0) or 0
hold_time_ms = metrics.get('holdTime', {}).get('sum', 0) or 0
# Calculate hold percentage
if duration_ms > 0:
hold_percentage = (hold_time_ms / duration_ms) * 100
else:
hold_percentage = 0
return {
"conversation_type": record.get('conversationType'),
"duration_seconds": round(duration_ms / 1000, 2),
"hold_percentage": round(hold_percentage, 2)
}
Complete Working Example
This script combines authentication, live conversation fetching, and historical analytics querying into a single module.
import os
import sys
import requests
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
# --- Authentication Module ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, env_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.env_url = env_url.rstrip('/')
self.token_url = f"{self.env_url}/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: Optional[datetime] = None
def get_token(self) -> str:
if self.access_token and self.expires_at and datetime.utcnow() < self.expires_at:
return self.access_token
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise Exception(f"Auth Error: {e}")
token_data = response.json()
self.access_token = token_data['access_token']
expires_in = int(token_data.get('expires_in', 3600)) - 60
self.expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
return self.access_token
# --- Live Conversations Module ---
def get_active_calls(auth: GenesysAuth) -> List[Dict[str, Any]]:
headers = {"Authorization": f"Bearer {auth.get_token()}"}
url = f"{auth.env_url}/api/v2/conversations"
params = {"conversationTypes": "call", "pageSize": 10}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json().get('entities', [])
# --- Analytics Module ---
def get_daily_analytics(auth: GenesysAuth, days_back: int = 1) -> List[Dict[str, Any]]:
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
url = f"{auth.env_url}/api/v2/analytics/conversations/details/query"
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days_back)
payload = {
"dateFrom": start_time.isoformat() + "Z",
"dateTo": end_time.isoformat() + "Z",
"interval": "PT1H",
"metrics": ["conversationCount", "duration"],
"groupBy": ["conversationType"],
"detailLevel": "all"
}
all_records = []
next_page_token = None
while True:
if next_page_token:
payload['nextPageToken'] = next_page_token
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 429:
print("Rate limited. Waiting 10s...")
import time; time.sleep(10)
continue
response.raise_for_status()
data = response.json()
all_records.extend(data.get('entities', []))
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
return all_records
# --- Main Execution ---
if __name__ == "__main__":
# Load credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
env_url = os.getenv("GENESYS_ENV_URL", "https://api.mypurecloud.com")
if not all([client_id, client_secret, env_url]):
print("Error: Missing environment variables. Set GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_ENV_URL")
sys.exit(1)
auth = GenesysAuth(client_id, client_secret, env_url)
print("=== Fetching Active Calls ===")
try:
active_calls = get_active_calls(auth)
print(f"Found {len(active_calls)} active calls.")
for call in active_calls[:3]: # Print first 3
print(f"ID: {call['id']}, State: {call['state']}")
except Exception as e:
print(f"Error fetching active calls: {e}")
print("\n=== Fetching Historical Analytics (Last 1 Day) ===")
try:
analytics_records = get_daily_analytics(auth, days_back=1)
print(f"Found {len(analytics_records)} analytics records.")
# Aggregate total duration
total_duration_ms = sum(r.get('metrics', {}).get('duration', {}).get('sum', 0) for r in analytics_records)
print(f"Total Duration (ms): {total_duration_ms}")
except Exception as e:
print(f"Error fetching analytics: {e}")
Common Errors & Debugging
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scopes.
- Fix: Go to Genesys Cloud Admin > Platform > Applications > OAuth Clients. Select your client and ensure
conversation:read(for live) oranalytics:conversations:read(for analytics) is checked. Save and re-authenticate.
Error: 429 Too Many Requests
- Cause: You have exceeded the API rate limit. This is common with the Analytics endpoint when querying large datasets.
- Fix: Implement exponential backoff. Wait
2^nseconds before retrying, wherenis the retry attempt number. Reduce the date range of your query if the issue persists.
Error: 400 Bad Request (Analytics)
- Cause: Invalid date format or
dateFromis afterdateTo. - Fix: Ensure ISO 8601 format with ‘Z’ suffix (UTC). Verify
dateFrom<dateTo. The analytics API does not support future dates.
Error: Token Expired During Long Query
- Cause: The analytics query takes longer than the token lifespan (1 hour).
- Fix: Implement token refresh logic in your retry loop. If a 401 is received, regenerate the token and retry the request.