Choosing Between Real-Time Conversations and Historical Analytics APIs in Genesys Cloud CX
What You Will Build
- You will build a Python service that retrieves active conversation details using the Real-Time API and historical interaction data using the Analytics API.
- You will use the Genesys Cloud Python SDK (
genesys-cloud-sdk) and rawrequeststo demonstrate the distinct data models and query patterns. - You will implement a hybrid workflow that correlates a live conversation ID with its historical context to demonstrate when each API is appropriate.
Prerequisites
- Genesys Cloud Org: An active Genesys Cloud organization with API access.
- OAuth Client: A Confidential Client (Client Credentials Grant) with the following scopes:
conversation:view(for/api/v2/conversations)analytics:query(for/api/v2/analytics/conversations/details/query)conversation:read(optional, for deeper metadata)
- Python Environment: Python 3.9+ installed.
- Dependencies:
genesys-cloud-sdk(latest version)requestspydantic(for data validation in examples)
Install the dependencies:
pip install genesys-cloud-sdk requests pydantic
Authentication Setup
Genesys Cloud uses OAuth 2.0 for all API access. For server-to-server integrations, the Client Credentials Grant is the standard flow. The token expires after 59 minutes, so your application must handle token refresh or regeneration.
Below is a robust authentication helper using the genesys-cloud-sdk. This SDK handles the token caching and refresh logic automatically if configured correctly, but for explicit control in this tutorial, we will initialize the client manually.
import os
from genesyscloud.platform.client import PlatformClientBuilder
from genesyscloud.api.exceptions import ApiException
def get_genesis_client():
"""
Initializes and returns a Genesys Cloud Platform Client.
Uses environment variables for security.
"""
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")
try:
# The SDK automatically handles OAuth token acquisition and refresh
builder = PlatformClientBuilder()
client = builder.build(
environment=environment,
client_id=client_id,
client_secret=client_secret
)
return client
except Exception as e:
raise RuntimeError(f"Failed to initialize Genesys Client: {e}")
Implementation
Step 1: Understanding the Data Models
The fundamental difference between the two APIs lies in their temporal scope and data granularity.
-
/api/v2/conversations(Real-Time API):- Scope: Current, active interactions only.
- Data Model: Returns the full conversation object, including participants, channels, and real-time state (e.g.,
active,queued,closed). - Use Case: Live dashboards, real-time agent assist, immediate routing decisions, or fetching metadata for a conversation that is happening right now.
- Limitation: Data is ephemeral. Once a conversation closes, it may disappear from this endpoint depending on retention policies, but generally, this endpoint is for live state.
-
/api/v2/analytics/conversations/details/query(Analytics API):- Scope: Historical data, typically up to 30 days or more depending on your org’s storage tier.
- Data Model: Returns aggregated or detailed metrics (duration, wrap-up time, queue wait time) for interactions. It does not return the full conversation transcript or participant list in the same structure as the Real-Time API. It returns metrics about the conversation.
- Use Case: Reporting, SLA monitoring, historical trend analysis, and auditing past performance.
- Limitation: Not suitable for real-time state changes. There is a latency (usually 1-5 minutes) between an event occurring and it appearing in Analytics.
Step 2: Fetching Active Conversations (Real-Time API)
We will use the ConversationsApi from the SDK to fetch all active conversations. This is a simple GET request.
Required Scope: conversation:view
from genesyscloud.conversations.api import ConversationsApi
from genesyscloud.platform.client import PlatformClientBuilder
def get_active_conversations(client: PlatformClientBuilder):
"""
Fetches all currently active conversations.
"""
conversations_api = ConversationsApi(client)
try:
# Get all conversations. The SDK handles pagination automatically
# if you use the iterator, but for a simple list, we fetch the first page.
response = conversations_api.get_conversations(
expand=["participants"], # Include participant details
page_size=25
)
if response.body and response.body.conversations:
return response.body.conversations
else:
return []
except ApiError as e:
print(f"Error fetching conversations: {e.status_code} - {e.reason}")
raise
Expected Response Snippet:
{
"conversations": [
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"type": "voice",
"state": "active",
"startTime": "2023-10-27T10:00:00.000Z",
"participants": [
{
"id": "agent-uuid",
"type": "agent",
"state": "connected"
}
]
}
],
"pageSize": 25,
"pageCount": 1
}
Step 3: Querying Historical Data (Analytics API)
The Analytics API is significantly more complex. It uses a query body to filter and aggregate data. We will query for detailed conversation metrics from the last 24 hours.
Required Scope: analytics:query
import requests
import json
from datetime import datetime, timedelta, timezone
def get_historical_conversations(client: PlatformClientBuilder, days_back: int = 1):
"""
Queries the Analytics API for conversation details over a specific time range.
Uses raw requests to demonstrate the complex query body structure.
"""
# Get the current access token from the client
# Note: In production, use the SDK's analytics API if available,
# but raw requests offer clearer visibility into the query structure.
token = client.get_access_token()
base_url = client.get_base_url()
endpoint = "/api/v2/analytics/conversations/details/query"
# Calculate time range
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days_back)
# Format times in ISO 8601
start_time_str = start_time.isoformat()
end_time_str = end_time.isoformat()
# The Query Body
# This structure is critical. 'metrics' defines what data you want back.
# 'groupings' allows you to aggregate data (e.g., by queue or agent).
query_body = {
"interval": "PT1H", # Aggregate by 1-hour intervals
"dateRange": {
"startDate": start_time_str,
"endDate": end_time_str
},
"metrics": [
"conversation.count",
"conversation.duration",
"queue.waittime",
"agent.wrapuptime"
],
"groupings": [
{
"name": "queue",
"type": "queue"
}
],
"filter": {
"type": "conversation",
"typeFilter": {
"types": ["voice", "chat"]
}
}
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = requests.post(
f"{base_url}{endpoint}",
headers=headers,
json=query_body
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Analytics API Error: {e}")
raise
Key Parameters Explained:
interval: Defines the time bucket for aggregation. UsePT1Hfor hourly,P1Dfor daily.metrics: The specific data points you want.conversation.countis the number of interactions.conversation.durationis the total talk time.groupings: Allows you to slice the data. Here, we group byqueue. If you remove groupings, you get a single aggregated row for the entire time range.filter: Restricts the data to specific conversation types (voice, chat, webchat, etc.).
Step 4: Correlating Live and Historical Data
A common pattern is to fetch a live conversation and then check if that specific conversation ID has historical context or to verify if a conversation that was active is now in the analytics store.
However, note that the Analytics API does not allow you to query by a single conversation ID easily in the standard details query. It is designed for aggregation. To get details for a specific historical conversation, you would typically use the /api/v2/conversations/{id} endpoint if it is still within the short-term retention window, or rely on the Analytics API for aggregated trends.
For this tutorial, we will demonstrate a hybrid approach:
- Fetch active conversations.
- For each active conversation, extract the Queue ID.
- Query Analytics to see the average wait time for that Queue over the last hour.
- Display this context alongside the live conversation.
def correlate_live_and_history(client: PlatformClientBuilder):
"""
Demonstrates a hybrid workflow:
1. Get live conversations.
2. Extract Queue IDs.
3. Query Analytics for recent queue performance.
"""
# Step 1: Get Active Conversations
active_convs = get_active_conversations(client)
if not active_convs:
print("No active conversations found.")
return
# Collect unique queue IDs from active conversations
queue_ids = set()
for conv in active_convs:
# Participants might have queue info, but often we need to fetch
# the conversation details to get the routing context.
# For simplicity, assume we can derive queue from participant or
# we query all queues if unknown.
# In a real app, you might fetch /api/v2/conversations/{id} to get full routing details.
pass
# Step 2: Query Analytics for Queue Performance
# We will query all queues for the last hour
analytics_data = get_historical_conversations(client, days_back=0) # 0 days back implies current day
# Process Analytics Data
queue_metrics = {}
if 'data' in analytics_data:
for row in analytics_data['data']:
# The structure depends on groupings.
# If grouped by queue, each row has a 'queue' object.
if 'queue' in row and row['queue']:
queue_name = row['queue']['name']
queue_id = row['queue']['id']
metrics = row.get('metrics', {})
wait_time = metrics.get('queue.waittime', 0)
count = metrics.get('conversation.count', 0)
queue_metrics[queue_id] = {
'name': queue_name,
'avg_wait_time': wait_time,
'total_conversations': count
}
# Step 3: Output Correlated Data
print(f"{'Conversation ID':<40} | {'Type':<10} | {'Queue Avg Wait (Sec)':<20}")
print("-" * 80)
for conv in active_convs:
conv_id = conv.id
conv_type = conv.type
# In a real scenario, you would need to map the conversation to a queue.
# This requires fetching the conversation details or checking participant routing context.
# For this example, we will print the conversation and note that queue mapping
# requires an additional API call to /api/v2/conversations/{id}
print(f"{conv_id:<40} | {conv_type:<10} | {'(See Note)':<20}")
print("\nQueue Performance (Last Hour):")
for q_id, metrics in queue_metrics.items():
print(f"Queue: {metrics['name']} | Avg Wait: {metrics['avg_wait_time']}s | Count: {metrics['total_conversations']}")
Complete Working Example
The following script combines authentication, real-time fetching, and analytics querying into a single executable module.
import os
import sys
import requests
from datetime import datetime, timedelta, timezone
from genesyscloud.platform.client import PlatformClientBuilder
from genesyscloud.conversations.api import ConversationsApi
from genesyscloud.api.exceptions import ApiError
def get_genesis_client():
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")
try:
builder = PlatformClientBuilder()
client = builder.build(
environment=environment,
client_id=client_id,
client_secret=client_secret
)
return client
except Exception as e:
raise RuntimeError(f"Failed to initialize Genesys Client: {e}")
def get_active_conversations(client):
conversations_api = ConversationsApi(client)
try:
response = conversations_api.get_conversations(expand=["participants"], page_size=25)
if response.body and response.body.conversations:
return response.body.conversations
return []
except ApiError as e:
print(f"Error fetching conversations: {e.status_code} - {e.reason}")
return []
def get_queue_analytics(client, queue_id=None):
token = client.get_access_token()
base_url = client.get_base_url()
endpoint = "/api/v2/analytics/conversations/details/query"
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=1)
query_body = {
"interval": "P1D", # Aggregate for the whole day for simplicity in this example
"dateRange": {
"startDate": start_time.isoformat(),
"endDate": end_time.isoformat()
},
"metrics": [
"queue.waittime",
"conversation.count"
],
"groupings": [
{
"name": "queue",
"type": "queue"
}
],
"filter": {
"type": "conversation",
"typeFilter": {
"types": ["voice"]
}
}
}
# If a specific queue ID is provided, add it to the filter
if queue_id:
query_body["filter"]["queueFilter"] = {
"ids": [queue_id]
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = requests.post(f"{base_url}{endpoint}", headers=headers, json=query_body)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Analytics API Error: {e}")
return {}
def main():
print("Initializing Genesys Client...")
client = get_genesis_client()
print("Fetching active conversations...")
active_convs = get_active_conversations(client)
if not active_convs:
print("No active conversations found.")
return
print(f"Found {len(active_convs)} active conversation(s).")
# Fetch analytics for all queues
print("Fetching analytics data for last 24 hours...")
analytics_data = get_queue_analytics(client)
# Map queue IDs to metrics
queue_metrics = {}
if 'data' in analytics_data:
for row in analytics_data['data']:
if 'queue' in row and row['queue']:
q_id = row['queue']['id']
q_name = row['queue']['name']
metrics = row.get('metrics', {})
wait_time = metrics.get('queue.waittime', 0)
count = metrics.get('conversation.count', 0)
queue_metrics[q_id] = {
'name': q_name,
'wait_time': wait_time,
'count': count
}
print("\n--- Live Conversation Status ---")
for conv in active_convs:
print(f"ID: {conv.id}")
print(f"Type: {conv.type}")
print(f"State: {conv.state}")
print(f"Start Time: {conv.startTime}")
# Note: To link this conversation to a specific queue, you would need
# to inspect the participants or routing details.
# This example assumes you have that context from a deeper fetch.
print("-" * 40)
print("\n--- Queue Performance Summary ---")
for q_id, metrics in queue_metrics.items():
print(f"Queue: {metrics['name']} (ID: {q_id})")
print(f" Total Conversations: {metrics['count']}")
print(f" Avg Wait Time: {metrics['wait_time']} seconds")
print("-" * 40)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token is expired or invalid.
- Fix: Ensure your
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare correct. The SDK handles refresh, but if you are using rawrequests, you must regenerate the token every 59 minutes. Check the expiration timestamp in the token response.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope.
- Fix:
- For
/api/v2/conversations, ensureconversation:viewis added to the client’s scopes. - For
/api/v2/analytics/conversations/details/query, ensureanalytics:queryis added. - Go to Admin > Security > OAuth Clients, select your client, and edit the Scopes.
- For
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the API endpoint.
- Fix: Implement exponential backoff. The Analytics API has stricter rate limits than the Real-Time API. Do not poll the Analytics API in a tight loop. Cache analytics results if possible.
import time
def safe_analytics_request(func, *args, max_retries=3):
for attempt in range(max_retries):
try:
return func(*args)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise RuntimeError("Max retries exceeded for Analytics API")
Error: Empty Analytics Data
- Cause: The time range is incorrect, or no data exists for the filtered criteria.
- Fix: Verify the
startDateandendDateare in ISO 8601 format with timezone info (e.g.,2023-10-27T10:00:00Z). Ensure thefiltertypes match the conversations in your org (e.g., if you filter forvoicebut only havechatdata, you will get an empty result).