Querying Live Conversations vs. Analytics Data in Genesys Cloud
What You Will Build
- This tutorial demonstrates how to retrieve real-time conversation details using the Conversations API and historical conversation metrics using the Analytics API.
- The code uses the Genesys Cloud Platform REST API v2 and the Python
genesyscloudSDK. - The programming language covered is Python 3.9+.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth client with the following scopes:
- For Conversations API:
conversation:read,conversation:view - For Analytics API:
analytics:read,analytics:query
- For Conversations API:
- SDK Version:
genesyscloudPython SDK (version 150.0.0 or later). - Language/Runtime: Python 3.9 or higher.
- External Dependencies:
pip install genesyscloudpip install python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. The Python SDK handles token acquisition and refresh automatically when initialized correctly. You must store your client credentials securely. This example uses a .env file.
Create a .env file in your project root:
GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_REGION=us-east-1
Initialize the SDK in your script:
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow
# Load environment variables
load_dotenv()
def get_api_client():
"""
Initializes and returns a configured PureCloudPlatformClientV2 ApiClient.
"""
# Create configuration with OAuth flow
config = Configuration(
host=f"https://api.{os.getenv('GENESYS_REGION')}.mypurecloud.com",
oauth_client_id=os.getenv('GENESYS_CLIENT_ID'),
oauth_client_secret=os.getenv('GENESYS_CLIENT_SECRET')
)
# Configure authentication
config.access_token = PureCloudAuthFlow(
client_id=config.oauth_client_id,
client_secret=config.oauth_client_secret
)
# Create API client
api_client = ApiClient(configuration=config)
return api_client
# Initialize client
api_client = get_api_client()
Implementation
Step 1: Understanding the Core Distinction
The /api/v2/conversations endpoint returns live, active interaction objects. It is stateful, real-time, and designed for operational systems (e.g., CTI softphones, real-time dashboards, IVR state management). It does not aggregate data. It tells you “what is happening right now.”
The /api/v2/analytics/conversations endpoint returns historical, aggregated metrics. It is stateless, batch-oriented, and designed for reporting, BI, and performance analysis. It tells you “what happened over a period of time.”
Key Difference:
- Conversations API: Returns individual conversation entities with current status, participants, and media.
- Analytics API: Returns time-series buckets of aggregated statistics (handle time, wait time, count) across many conversations.
Step 2: Retrieving Live Conversations (Operational Use Case)
Use the Conversations API when you need to interact with an active call, chat, or task. For example, an agent assist tool that displays customer context during a live call.
Endpoint: GET /api/v2/conversations
Required Scopes: conversation:read, conversation:view
from purecloudplatformclientv2 import ConversationApi, ConversationType
def get_live_conversations():
"""
Retrieves all active conversations for the authenticated user/organization.
"""
conversation_api = ConversationApi(api_client)
try:
# Query parameters
# expand: Include additional details like participants and media
# type: Filter by conversation type (CALL, CHAT, TASK, etc.)
response = conversation_api.get_conversations(
expand=['participants', 'media'],
type=ConversationType.CALL # Optional: filter by type
)
print(f"Found {len(response.entities)} active conversations.")
for conv in response.entities:
print(f"Conversation ID: {conv.id}")
print(f"Status: {conv.state}")
print(f"Type: {conv.type}")
# Access participant details
if conv.participants:
for participant in conv.participants:
print(f" Participant: {participant.name} (Role: {participant.role})")
# Access media details
if conv.media and conv.media.call:
print(f" Call Direction: {conv.media.call.direction}")
print(f" Call Duration: {conv.media.call.duration}s")
except Exception as e:
print(f"Error retrieving conversations: {e}")
raise
# Execute
get_live_conversations()
Expected Response Structure (Simplified):
{
"entities": [
{
"id": "12345678-1234-1234-1234-123456789012",
"state": "connected",
"type": "call",
"participants": [
{
"id": "agent-123",
"name": "John Doe",
"role": "agent"
}
],
"media": {
"call": {
"direction": "inbound",
"duration": 120
}
}
}
]
}
Step 3: Querying Historical Analytics (Reporting Use Case)
Use the Analytics API when you need to calculate average handle time, total call volume, or agent utilization over a date range. This endpoint does not return individual conversation IDs by default; it returns aggregated metrics.
Endpoint: POST /api/v2/analytics/conversations/details/query
Required Scopes: analytics:read, analytics:query
from purecloudplatformclientv2 import AnalyticsApi, ConversationDetailsQueryRequest
from datetime import datetime, timedelta
def query_call_analytics():
"""
Queries aggregated call metrics for the last 7 days.
"""
analytics_api = AnalyticsApi(api_client)
# Define time range
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=7)
# Construct the query body
# This is a complex object; ensure all required fields are present
query_body = ConversationDetailsQueryRequest(
interval=f"{start_date.isoformat()}Z/{end_date.isoformat()}Z",
group_by=['user'], # Aggregate by agent
metric_ids=['callHandleTime', 'callCount', 'callWaitTime'],
filters=[
{
"field": "conversationType",
"operator": "eq",
"values": ["call"]
}
],
size=100 # Pagination size
)
try:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body
)
print(f"Query ID: {response.id}")
print(f"Total Results: {response.total}")
# Process buckets (time intervals)
if response.buckets:
for bucket in response.buckets:
print(f"Time Bucket: {bucket.interval}")
# Process entities (aggregated groups, e.g., per agent)
if bucket.entities:
for entity in bucket.entities:
print(f" Entity ID: {entity.entity_id}")
print(f" Entity Name: {entity.entity_name}")
# Extract metrics
for metric in entity.metrics:
if metric.metric_id == 'callCount':
print(f" Call Count: {metric.total}")
elif metric.metric_id == 'callHandleTime':
print(f" Total Handle Time: {metric.total}s")
except Exception as e:
print(f"Error querying analytics: {e}")
raise
# Execute
query_call_analytics()
Expected Response Structure (Simplified):
{
"id": "query-12345",
"total": 1,
"buckets": [
{
"interval": "2023-10-01T00:00:00Z/2023-10-08T00:00:00Z",
"entities": [
{
"entity_id": "agent-123",
"entity_name": "John Doe",
"metrics": [
{
"metric_id": "callCount",
"total": 150
},
{
"metric_id": "callHandleTime",
"total": 45000
}
]
}
]
}
]
}
Step 4: Handling Pagination and Rate Limits
Both APIs support pagination. The Conversations API uses pageSize and pageToken. The Analytics API uses size and continuationToken. Always implement retry logic for 429 Too Many Requests errors.
import time
from purecloudplatformclientv2.rest import ApiException
def fetch_all_live_conversations():
"""
Fetches all live conversations with pagination.
"""
conversation_api = ConversationApi(api_client)
all_conversations = []
page_token = None
while True:
try:
response = conversation_api.get_conversations(
expand=['participants'],
page_token=page_token,
page_size=50
)
all_conversations.extend(response.entities)
# Check for next page
if response.next_page_token:
page_token = response.next_page_token
time.sleep(1) # Rate limit protection
else:
break
except ApiException as e:
if e.status == 429:
# Retry after delay
retry_after = int(e.headers.get('Retry-After', 5))
print(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
continue
else:
raise e
return all_conversations
def fetch_all_analytics_data(query_body):
"""
Fetches all analytics data with pagination.
"""
analytics_api = AnalyticsApi(api_client)
all_entities = []
continuation_token = None
while True:
try:
# Update query body with continuation token if present
if continuation_token:
query_body.continuation_token = continuation_token
response = analytics_api.post_analytics_conversations_details_query(
body=query_body
)
# Collect entities from all buckets
for bucket in response.buckets:
if bucket.entities:
all_entities.extend(bucket.entities)
# Check for next page
if response.continuation_token:
continuation_token = response.continuation_token
time.sleep(1) # Rate limit protection
else:
break
except ApiException as e:
if e.status == 429:
retry_after = int(e.headers.get('Retry-After', 5))
print(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
continue
else:
raise e
return all_entities
Complete Working Example
This script demonstrates both APIs in a single workflow. It first checks for live calls, then retrieves historical analytics for the same agent.
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow, ConversationApi, AnalyticsApi, ConversationDetailsQueryRequest, ConversationType
from datetime import datetime, timedelta
import time
from purecloudplatformclientv2.rest import ApiException
load_dotenv()
def get_api_client():
config = Configuration(
host=f"https://api.{os.getenv('GENESYS_REGION')}.mypurecloud.com",
oauth_client_id=os.getenv('GENESYS_CLIENT_ID'),
oauth_client_secret=os.getenv('GENESYS_CLIENT_SECRET')
)
config.access_token = PureCloudAuthFlow(
client_id=config.oauth_client_id,
client_secret=config.oauth_client_secret
)
return ApiClient(configuration=config)
def get_live_calls(api_client):
conversation_api = ConversationApi(api_client)
try:
response = conversation_api.get_conversations(
expand=['participants', 'media'],
type=ConversationType.CALL
)
return response.entities
except ApiException as e:
print(f"Error getting live calls: {e}")
return []
def get_agent_analytics(api_client, agent_id):
analytics_api = AnalyticsApi(api_client)
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=7)
query_body = ConversationDetailsQueryRequest(
interval=f"{start_date.isoformat()}Z/{end_date.isoformat()}Z",
group_by=['user'],
metric_ids=['callCount', 'callHandleTime'],
filters=[
{
"field": "conversationType",
"operator": "eq",
"values": ["call"]
},
{
"field": "userId",
"operator": "eq",
"values": [agent_id]
}
],
size=100
)
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
return response
except ApiException as e:
print(f"Error getting analytics: {e}")
return None
def main():
api_client = get_api_client()
# 1. Get Live Conversations
print("=== Live Conversations ===")
live_convs = get_live_calls(api_client)
if live_convs:
for conv in live_convs:
print(f"ID: {conv.id}, State: {conv.state}")
else:
print("No active calls.")
# 2. Get Analytics for a specific agent (example)
print("\n=== Analytics for Agent ===")
# Replace with a real agent ID
agent_id = "your-agent-id-here"
if agent_id != "your-agent-id-here":
analytics = get_agent_analytics(api_client, agent_id)
if analytics and analytics.buckets:
for bucket in analytics.buckets:
for entity in bucket.entities:
print(f"Agent: {entity.entity_name}")
for metric in entity.metrics:
print(f" {metric.metric_id}: {metric.total}")
else:
print("Skipping analytics (no agent ID provided).")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Invalid OAuth credentials or expired token.
- Fix: Verify
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETin.env. Ensure the OAuth client is active and has the correct scopes. The SDK should refresh tokens automatically, but if the client secret is wrong, initialization fails.
Error: 403 Forbidden
- Cause: Missing OAuth scopes or insufficient user permissions.
- Fix:
- For Conversations API: Add
conversation:readandconversation:viewto the OAuth client. - For Analytics API: Add
analytics:readandanalytics:queryto the OAuth client. - Ensure the user associated with the OAuth client has access to the relevant data (e.g., can view calls, can view analytics).
- For Conversations API: Add
Error: 429 Too Many Requests
- Cause: Exceeding rate limits.
- Fix: Implement exponential backoff and respect the
Retry-Afterheader. The code examples above include basic retry logic. For high-volume queries, stagger requests and use pagination efficiently.
Error: 400 Bad Request (Analytics Query)
- Cause: Invalid query body structure.
- Fix:
- Ensure
intervalis in ISO 8601 format withZsuffix. - Ensure
metric_idsare valid for the conversation type (e.g.,callHandleTimeis for calls, not chats). - Ensure
filtersuse valid operators (eq,neq,gt, etc.). - Check that
group_byfields are compatible with the selected metrics.
- Ensure