Choosing Between Real-Time Conversations and Analytics History in Genesys Cloud
What You Will Build
- You will build a Python script that retrieves active, in-progress conversations using the Real-Time API.
- You will build a second Python script that queries historical conversation data using the Analytics API.
- The tutorial covers Python using the
purecloudplatformclientv2SDK and therequestslibrary for raw HTTP comparison.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Flow).
- Required Scopes:
- Real-Time:
conversation:active:read,conversation:participant:read. - Analytics:
analytics:query:read.
- Real-Time:
- SDK Version:
purecloudplatformclientv2>= 160.0.0. - Runtime: Python 3.8+.
- Dependencies:
pip install purecloudplatformclientv2 requests pyyaml
Authentication Setup
Both APIs require a valid OAuth 2.0 access token. The token acquisition process is identical for both endpoints. You must use the Client Credentials Grant flow.
import requests
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://login.{environment}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: Optional[float] = None
def get_token(self) -> str:
"""
Retrieves an OAuth token. Caches it until expiry.
"""
if self.access_token and self.token_expiry and time.time() < self.token_expiry:
return self.access_token
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Tokens expire in 3600 seconds; refresh 60 seconds early
self.token_expiry = time.time() + (token_data["expires_in"] - 60)
return self.access_token
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
Implementation
Step 1: Understanding the Real-Time API (/api/v2/conversations)
The /api/v2/conversations endpoint is part of the Real-Time API. It returns a snapshot of conversations that are currently active. “Active” means the conversation has not yet been terminated. This includes:
- Calls currently on the phone.
- Chats currently connected to an agent.
- Messages currently being processed in a queue.
- Screen shares currently active.
Key Characteristic: This is a live snapshot. If you poll this endpoint every second, the data will change. It is designed for dashboards, live wallboards, and real-time routing logic. It does not store historical data. Once a conversation ends, it disappears from this endpoint.
Code: Retrieving Active Conversations
We will use the official Python SDK for this example, as it handles the complex filtering parameters for real-time data.
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
ConversationApi,
ConversationFilterOptions,
ConversationState
)
import purecloudplatformclientv2
def get_active_conversations(auth: GenesysAuth):
"""
Retrieves currently active conversations using the Real-Time API.
"""
# 1. Configure the SDK
configuration = Configuration()
configuration.host = f"https://api.{auth.environment}"
configuration.access_token = auth.get_token()
# 2. Initialize the API client
api_client = ApiClient(configuration)
conversation_api = ConversationApi(api_client)
try:
# 3. Define filter options
# We only want conversations that are currently 'active'
filter_options = ConversationFilterOptions(
states=[ConversationState.ACTIVE]
)
# 4. Execute the query
# Note: max_records defaults to 1000. For large deployments,
# you may need to implement pagination if you expect >1000 active chats.
response = conversation_api.post_conversations_query(filter_options=filter_options)
print(f"--- Real-Time API Result ---")
print(f"Total Active Conversations: {len(response.entities)}")
for conv in response.entities[:5]: # Show first 5
print(f"ID: {conv.id}, Type: {conv.type}, State: {conv.state}")
if conv.participants:
print(f" Participants: {len(conv.participants)}")
return response
except purecloudplatformclientv2.exceptions.ApiException as e:
print(f"Real-Time API Error: {e.status} - {e.reason}")
print(f"Response Body: {e.body}")
raise
# Usage
# auth = GenesysAuth("CLIENT_ID", "CLIENT_SECRET")
# get_active_conversations(auth)
Why use this?
- You need to know how many agents are currently busy.
- You need to trigger an action immediately when a chat enters a queue.
- You are building a live supervisor dashboard.
Step 2: Understanding the Analytics API (/api/v2/analytics/conversations)
The /api/v2/analytics/conversations endpoint is part of the Analytics API. It returns historical data based on a time range. This data is aggregated and stored in Genesys Cloud’s data warehouse.
Key Characteristic: This is a historical query. It is not real-time. There is a delay (typically 15-60 minutes, depending on your data freshness settings) before a completed conversation appears here. It is designed for reporting, compliance, quality assurance, and business intelligence.
Code: Querying Historical Conversations
The Analytics API uses a different query structure. Instead of simple filters, it uses a JSON body with groupBy, interval, and where clauses.
import json
from datetime import datetime, timedelta
import requests
def get_historical_conversations(auth: GenesysAuth):
"""
Retrieves historical conversation data using the Analytics API.
"""
base_url = f"https://api.{auth.environment}"
endpoint = "/api/v2/analytics/conversations/details/query"
# 1. Define the time range
# Analytics requires a start and end time.
# We query the last 24 hours.
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
# Format as ISO 8601 with timezone (UTC)
start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# 2. Construct the query body
# This query asks for all conversation details in the last 24 hours.
# It groups by 'type' (voice, chat, etc.) to show counts per type.
query_body = {
"interval": f"{start_iso}/{end_iso}",
"groupBy": ["type"],
"where": [],
"select": [
"count",
"sum(handleDuration), sum(waitDuration)"
]
}
headers = auth.get_headers()
try:
# 3. Execute the POST request
response = requests.post(
f"{base_url}{endpoint}",
headers=headers,
json=query_body
)
response.raise_for_status()
data = response.json()
print(f"--- Analytics API Result ---")
print(f"Query Interval: {data['interval']}")
print(f"Total Records Returned: {len(data['entities'])}")
for entity in data['entities']:
conv_type = entity.get('type', 'Unknown')
count = entity.get('count', 0)
handle_dur = entity.get('sum(handleDuration)', 0)
wait_dur = entity.get('sum(waitDuration)', 0)
print(f"Type: {conv_type}")
print(f" Count: {count}")
print(f" Total Handle Duration (ms): {handle_dur}")
print(f" Total Wait Duration (ms): {wait_dur}")
print("-" * 20)
return data
except requests.exceptions.HTTPError as e:
print(f"Analytics API HTTP Error: {e.response.status_code}")
print(f"Response Body: {e.response.text}")
raise
except Exception as e:
print(f"Unexpected Error: {e}")
raise
# Usage
# auth = GenesysAuth("CLIENT_ID", "CLIENT_SECRET")
# get_historical_conversations(auth)
Why use this?
- You need to calculate average handle time (AHT) for last week.
- You need to generate a PDF report of all calls handled by a specific queue.
- You are training an ML model on past conversation transcripts.
Step 3: Comparing Performance and Rate Limits
The two APIs have different performance characteristics and rate limits.
| Feature | Real-Time (/conversations) |
Analytics (/analytics/conversations) |
|---|---|---|
| Data Freshness | Live (Seconds) | Delayed (15-60 mins) |
| Rate Limit | Lower (Polling heavy) | Higher (Batch queries) |
| Response Size | Small (Current state only) | Large (Historical aggregates) |
| Use Case | Live Dashboards | Reporting/BI |
Handling Rate Limits (429 Errors)
If you poll the Real-Time API too frequently, you will hit a 429 Too Many Requests error. The Analytics API is less prone to this because queries are heavier but less frequent.
Here is a robust retry mechanism for the Analytics query, which is more likely to timeout or be throttled due to complex data aggregation.
import time
def robust_analytics_query(auth: GenesysAuth, max_retries: int = 3):
"""
Executes an analytics query with exponential backoff for 429 errors.
"""
base_url = f"https://api.{auth.environment}"
endpoint = "/api/v2/analytics/conversations/details/query"
# Simplified query for demonstration
query_body = {
"interval": "2023-01-01T00:00:00.000Z/2023-01-02T00:00:00.000Z",
"groupBy": ["type"],
"select": ["count"]
}
headers = auth.get_headers()
for attempt in range(max_retries):
try:
response = requests.post(
f"{base_url}{endpoint}",
headers=headers,
json=query_body
)
if response.status_code == 429:
# Extract Retry-After header if present
retry_after = response.headers.get("Retry-After", 5)
wait_time = float(retry_after) * (2 ** attempt) # Exponential backoff
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
raise Exception("Max retries exceeded for Analytics Query")
Complete Working Example
Below is a unified script that demonstrates both calls. Replace CLIENT_ID and CLIENT_SECRET with your actual credentials.
import requests
import time
from datetime import datetime, timedelta
import json
from typing import Optional
# --- Configuration ---
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
ENVIRONMENT = "mypurecloud.com"
# --- Authentication Module ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, environment: str):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://login.{environment}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: Optional[float] = None
def get_token(self) -> str:
if self.access_token and self.token_expiry and time.time() < self.token_expiry:
return self.access_token
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + (token_data["expires_in"] - 60)
return self.access_token
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
# --- Real-Time API Function ---
def fetch_realtime_conversations(auth: GenesysAuth):
print("\n=== Fetching Real-Time Conversations ===")
# For Real-Time, we often use raw HTTP if we want to avoid SDK overhead for simple polls
# Endpoint: POST /api/v2/conversations/query
url = f"https://api.{auth.environment}/api/v2/conversations/query"
# Filter for active conversations
body = {
"states": ["active"],
"maxRecords": 100
}
try:
response = requests.post(url, headers=auth.get_headers(), json=body)
response.raise_for_status()
data = response.json()
print(f"Found {len(data.get('entities', []))} active conversations.")
for conv in data.get('entities', [])[:3]:
print(f" - ID: {conv['id']}, Type: {conv['type']}, State: {conv['state']}")
except requests.exceptions.HTTPError as e:
print(f"Error: {e.response.status_code} - {e.response.text}")
# --- Analytics API Function ---
def fetch_analytics_conversations(auth: GenesysAuth):
print("\n=== Fetching Historical Analytics Conversations ===")
url = f"https://api.{auth.environment}/api/v2/analytics/conversations/details/query"
# Define last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
body = {
"interval": f"{start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')}/{end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')}",
"groupBy": ["type"],
"select": ["count", "sum(handleDuration)"]
}
try:
response = requests.post(url, headers=auth.get_headers(), json=body)
response.raise_for_status()
data = response.json()
print(f"Query Interval: {data['interval']}")
print(f"Total Entities in Report: {len(data.get('entities', []))}")
for entity in data.get('entities', []):
conv_type = entity.get('type', 'N/A')
count = entity.get('count', 0)
handle_dur = entity.get('sum(handleDuration)', 0)
print(f" - Type: {conv_type}, Count: {count}, Total Handle (ms): {handle_dur}")
except requests.exceptions.HTTPError as e:
print(f"Error: {e.response.status_code} - {e.response.text}")
# --- Main Execution ---
if __name__ == "__main__":
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
# 1. Get Real-Time Data
fetch_realtime_conversations(auth)
# 2. Get Historical Data
fetch_analytics_conversations(auth)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or is invalid.
- Fix: Ensure your
GenesysAuthclass checks thetoken_expirybefore every request. The token expires in 3600 seconds. If your script runs longer than an hour, it must refresh the token.
Error: 403 Forbidden
- Cause: The OAuth client does not have the required scopes.
- Fix:
- For Real-Time: Add
conversation:active:readto your client’s scopes in the Genesys Admin console. - For Analytics: Add
analytics:query:readto your client’s scopes. - After changing scopes, you must generate a new token.
- For Real-Time: Add
Error: 400 Bad Request (Analytics)
- Cause: The
intervalformat is incorrect or theselectfields are invalid. - Fix: Ensure the interval is in ISO 8601 format (
YYYY-MM-DDTHH:mm:ss.sssZ). Verify that the fields inselectare valid for thegroupBydimensions. You cannot select a metric that is not compatible with the group by dimension.
Error: Empty Results from Real-Time API
- Cause: There are no active conversations at the moment of the poll.
- Fix: This is normal behavior. The Real-Time API returns an empty
entitiesarray if no conversations match the filter. It does not return historical data.