Querying Live vs. Historical Conversations in Genesys Cloud CX
What You Will Build
- This tutorial provides working Python code to retrieve live, in-progress interactions using the Conversations API and historical, aggregated data using the Analytics API.
- It demonstrates the precise technical differences between real-time presence data and post-interaction analytics records.
- The implementation covers Python 3.9+ using the official
purecloud-platform-client-v2SDK.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant) or JWT Client.
- Required Scopes:
- For Conversations API:
conversation:read,interaction:read - For Analytics API:
analytics:read,analytics:query:read
- For Conversations API:
- SDK Version:
purecloud-platform-client-v2>= 130.0.0 - Runtime: Python 3.9 or later
- Dependencies:
pip install purecloud-platform-client-v2 python-dotenv
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 for all API access. The Conversations API and Analytics API share the same authentication mechanism, but they enforce different scopes. You must ensure your OAuth client has the specific scopes listed above.
The following code sets up the SDK client with automatic token refresh handling. This is the foundation for both API calls.
import os
from purecloudplatformclientv2 import Configuration, ApiClient, ConversationApi, AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_auth_configuration() -> Configuration:
"""
Configures the Genesys Cloud API client with OAuth2 credentials.
"""
config = Configuration()
config.host = "https://api.mypurecloud.com" # Update to your environment (e.g., api.us-gov-purecloud.com)
config.access_token_url = "https://login.mypurecloud.com/oauth/token"
# Credentials from environment variables
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
# Define required scopes
# Note: You can request multiple scopes in one token
config.scope = [
"conversation:read",
"interaction:read",
"analytics:read",
"analytics:query:read"
]
return config
def create_api_clients(config: Configuration):
"""
Instantiates the ConversationApi and AnalyticsApi clients.
"""
api_client = ApiClient(config)
# Client for real-time/live conversations
conversation_api = ConversationApi(api_client)
# Client for historical/analytics data
analytics_api = AnalyticsApi(api_client)
return conversation_api, analytics_api
Implementation
Step 1: Retrieving Live Conversations with /api/v2/conversations
The /api/v2/conversations endpoint returns active, in-progress interactions. This data is ephemeral. Once a conversation ends (hangup, disconnect, or status change to closed), it is removed from this endpoint’s result set after a short grace period.
Use Case: You need to display the current status of an agent (e.g., “On Call with John Doe”) or trigger a real-time event (e.g., send a Slack notification when a call starts).
Key Parameters:
type: Filter by interaction type (voice,chat,email,sms,webchat).status: Filter by conversation status (queued,contact,wrapup,closed).
def get_live_conversations(conversation_api: ConversationApi, interaction_type: str = "voice"):
"""
Retrieves currently active conversations of a specific type.
Args:
conversation_api: The initialized ConversationApi client.
interaction_type: The type of conversation (e.g., 'voice', 'chat').
Returns:
A list of active Conversation objects.
"""
try:
# GET /api/v2/conversations
# We filter by type to keep the response payload small
response = conversation_api.get_conversations(
type=interaction_type,
status="contact" # Only return conversations currently in progress
)
if not response.entities:
print(f"No active {interaction_type} conversations found.")
return []
print(f"Found {len(response.entities)} active {interaction_type} conversation(s).")
return response.entities
except ApiException as e:
print(f"Exception when calling ConversationApi->get_conversations: {e}")
if e.status == 401:
print("Error: Unauthorized. Check your OAuth token or scopes.")
elif e.status == 403:
print("Error: Forbidden. Ensure you have 'conversation:read' scope.")
raise
Expected Response Structure (Simplified):
{
"entities": [
{
"id": "conv-uuid-123",
"type": "voice",
"state": "contact",
"direction": "inbound",
"to": "+12025550199",
"from": "+12025550100",
"participants": [
{
"id": "participant-uuid-456",
"name": "Agent Name",
"address": "agent-id-789",
"type": "agent",
"state": "connected"
}
]
}
],
"pageSize": 25,
"pageNumber": 1
}
Step 2: Querying Historical Data with /api/v2/analytics/conversations/details/query
The /api/v2/analytics/conversations/details/query endpoint returns completed interactions. This data is immutable and stored in the analytics data warehouse. It includes detailed metrics such as wait time, talk time, and hold time, which are only known after the conversation ends.
Use Case: You need to generate a report of call durations for the last hour, calculate average handle time (AHT), or archive conversation transcripts for compliance.
Key Parameters:
interval: ISO 8601 time range (e.g.,2023-10-01T00:00:00Z/2023-10-01T01:00:00Z).groupBy: Optional aggregation (e.g.,by:interaction).select: Fields to include in the response (e.g.,id,startTime,endTime,metrics.talk).
from datetime import datetime, timedelta, timezone
def get_historical_conversations(analytics_api: AnalyticsApi, lookback_hours: int = 1):
"""
Queries completed conversations for a specific time interval.
Args:
analytics_api: The initialized AnalyticsApi client.
lookback_hours: How many hours back to query.
Returns:
A list of ConversationDetail objects.
"""
# Define the time interval
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=lookback_hours)
# Format as ISO 8601 with Z suffix for UTC
interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
# Define the query body
# We want details for each interaction, grouped by the interaction itself
query_body = {
"interval": interval,
"groupBy": ["by:interaction"],
"select": [
"id",
"type",
"startTime",
"endTime",
"direction",
"metrics.talk",
"metrics.hold",
"metrics.wait"
],
"filter": [
{
"type": "timeFilter",
"field": "startTime",
"operator": "within",
"value": interval
}
],
"size": 25 # Max 1000 per page
}
try:
# POST /api/v2/analytics/conversations/details/query
response = analytics_api.post_analytics_conversations_details_query(
body=query_body
)
if not response.entities:
print(f"No completed conversations found in the last {lookback_hours} hour(s).")
return []
print(f"Found {len(response.entities)} completed conversation(s).")
return response.entities
except ApiException as e:
print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}")
if e.status == 400:
print("Error: Bad Request. Check the interval format or query syntax.")
elif e.status == 429:
print("Error: Rate Limited. Implement exponential backoff.")
raise
Expected Response Structure (Simplified):
{
"interval": "2023-10-27T10:00:00Z/2023-10-27T11:00:00Z",
"entities": [
{
"id": "conv-uuid-123",
"type": "voice",
"startTime": "2023-10-27T10:15:00Z",
"endTime": "2023-10-27T10:20:00Z",
"direction": "inbound",
"metrics": {
"talk": {
"value": 240.5,
"unit": "seconds"
},
"hold": {
"value": 10.2,
"unit": "seconds"
},
"wait": {
"value": 5.0,
"unit": "seconds"
}
}
}
],
"pageSize": 25,
"pageNumber": 1
}
Step 3: Handling Pagination and Large Datasets
The Analytics API can return thousands of records. The Conversations API is generally smaller but still paginates if many agents are active. Both endpoints return pageSize and pageNumber. For Analytics, you must check the nextPage link or increment the page number manually until entities is empty.
def get_all_historical_conversations(analytics_api: AnalyticsApi, lookback_hours: int = 1):
"""
Iterates through all pages of historical conversation data.
"""
all_conversations = []
page = 1
size = 1000 # Max page size for analytics
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=lookback_hours)
interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
query_body = {
"interval": interval,
"groupBy": ["by:interaction"],
"select": ["id", "type", "startTime", "endTime", "metrics.talk"],
"size": size
}
while True:
try:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
page=page
)
if not response.entities:
break # No more data
all_conversations.extend(response.entities)
print(f"Fetched page {page}: {len(response.entities)} records.")
# If we got fewer records than requested, we have reached the end
if len(response.entities) < size:
break
page += 1
except ApiException as e:
print(f"Error fetching page {page}: {e}")
break
return all_conversations
Complete Working Example
This script combines authentication, live query, and historical query into a single runnable module.
import os
import sys
from datetime import datetime, timedelta, timezone
from purecloudplatformclientv2 import Configuration, ApiClient, ConversationApi, AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
def load_config() -> Configuration:
load_dotenv()
config = Configuration()
config.host = "https://api.mypurecloud.com"
config.access_token_url = "https://login.mypurecloud.com/oauth/token"
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
config.scope = ["conversation:read", "interaction:read", "analytics:read", "analytics:query:read"]
return config
def main():
if not os.getenv("GENESYS_CLIENT_ID") or not os.getenv("GENESYS_CLIENT_SECRET"):
print("Error: GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
sys.exit(1)
config = load_config()
api_client = ApiClient(config)
conversation_api = ConversationApi(api_client)
analytics_api = AnalyticsApi(api_client)
# 1. Get Live Voice Conversations
print("--- Fetching Live Voice Conversations ---")
try:
live_convs = get_live_conversations(conversation_api, interaction_type="voice")
for conv in live_convs:
print(f"Live ID: {conv.id}, State: {conv.state}, From: {conv.from_}")
except Exception as e:
print(f"Failed to fetch live conversations: {e}")
# 2. Get Historical Voice Conversations (Last 1 Hour)
print("\n--- Fetching Historical Voice Conversations (Last 1 Hour) ---")
try:
hist_convs = get_all_historical_conversations(analytics_api, lookback_hours=1)
for conv in hist_convs:
talk_time = conv.metrics.get('talk', {}).get('value', 0) if conv.metrics else 0
print(f"Hist ID: {conv.id}, Start: {conv.startTime}, Talk Time: {talk_time}s")
except Exception as e:
print(f"Failed to fetch historical conversations: {e}")
if __name__ == "__main__":
main()
# Include helper functions from previous steps here for a single-file run
def get_live_conversations(conversation_api, interaction_type="voice"):
try:
response = conversation_api.get_conversations(type=interaction_type, status="contact")
return response.entities if response.entities else []
except ApiException as e:
print(f"Conversations API Error: {e}")
return []
def get_all_historical_conversations(analytics_api, lookback_hours=1):
all_conversations = []
page = 1
size = 1000
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=lookback_hours)
interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
query_body = {
"interval": interval,
"groupBy": ["by:interaction"],
"select": ["id", "type", "startTime", "endTime", "metrics.talk"],
"size": size
}
while True:
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body, page=page)
if not response.entities:
break
all_conversations.extend(response.entities)
if len(response.entities) < size:
break
page += 1
except ApiException as e:
print(f"Analytics API Error: {e}")
break
return all_conversations
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is expired, invalid, or the client credentials are incorrect.
Fix: Ensure your client_id and client_secret match a Confidential Client in Genesys Cloud. The SDK handles token refresh automatically if the initial token is valid. If you see this immediately, check your environment variables.
Error: 403 Forbidden
Cause: The OAuth token does not contain the required scope.
Fix:
- For Conversations API: Add
conversation:readto your client’s scopes. - For Analytics API: Add
analytics:readandanalytics:query:read. - Note: Scopes are granted at the client level, not the token request level (for Client Credentials). You must update the client in the Genesys Cloud Admin Console.
Error: 400 Bad Request (Analytics)
Cause: Invalid interval format or unsupported select fields.
Fix:
- Ensure the
intervalis in ISO 8601 format with aZsuffix for UTC. - Ensure the start time is before the end time.
- Check that the
selectfields exist in the documentation forConversationDetail.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the API endpoint.
Fix: Implement exponential backoff. The Analytics API has stricter rate limits than the Conversations API. Do not query analytics in tight loops without delays.
import time
def api_call_with_retry(func, *args, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except ApiException as e:
if e.status == 429:
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")