Querying Live Conversations vs. Analytical Data: A Practical Guide to Genesys Cloud APIs
What You Will Build
- You will build two distinct Python modules: one to poll active conversation states in real-time and another to query historical conversation metrics for reporting.
- This tutorial utilizes the Genesys Cloud Platform API v2, specifically the
conversationsandanalyticsendpoints. - The code is written in Python 3.9+ using the
genesyscloudSDK and therequestslibrary for raw HTTP examples.
Prerequisites
- OAuth Client Type: Machine-to-Machine (M2M) or Client Credentials Grant.
- Required Scopes:
- For
/conversations:conversation:read - For
/analytics:analytics:conversation:read
- For
- SDK Version:
genesyscloudPython SDK v11.0.0 or later. - Runtime Requirements: Python 3.9+.
- Dependencies:
pip install genesyscloud requests python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. For backend services, the Client Credentials Grant is the standard. You must obtain an access token before making any API calls. The token expires after 30 minutes, so your application must handle refresh logic or re-authentication.
Below is a robust authentication helper using the requests library. This approach gives you visibility into the raw HTTP headers, which is useful for debugging scope issues.
import os
import requests
from dotenv import load_dotenv
load_dotenv()
GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
def get_access_token() -> str:
"""
Retrieves an OAuth 2.0 access token from Genesys Cloud.
Returns the token string. Raises an exception on failure.
"""
url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(url, headers=headers, data=data)
if response.status_code != 200:
raise Exception(f"Authentication failed with status {response.status_code}: {response.text}")
return response.json()["access_token"]
# Example usage
# token = get_access_token()
Implementation
Step 1: Understanding /api/v2/conversations (Real-Time State)
The /api/v2/conversations endpoint returns the current state of conversations. It is a snapshot of what is happening right now. It is not designed for historical reporting, aggregations, or long-term data retention.
When to use this:
- Building a real-time dashboard showing active queue wait times.
- Triggering immediate actions based on conversation status (e.g., sending a notification when a call connects).
- Debugging active session issues.
Key Constraints:
- Data is ephemeral. Once a conversation ends, it disappears from this endpoint (moved to analytics).
- It returns full object graphs (participants, media streams, properties).
- It is expensive to poll frequently due to payload size.
Code Example: Polling Active Conversations
We will use the Genesys Cloud Python SDK for this example, as it handles the complex object mapping of conversation entities.
from genesyscloud import PlatformClient
from genesyscloud.conversations.api import ConversationApi
from genesyscloud.auth import OAuthClientCredentialsConfig
def fetch_active_conversations():
"""
Fetches all currently active conversations.
"""
# Initialize the platform client
auth_config = OAuthClientCredentialsConfig(CLIENT_ID, CLIENT_SECRET)
platform_client = PlatformClient()
platform_client.set_auth_config(auth_config)
# Initialize the Conversation API
conversation_api = ConversationApi(platform_client)
try:
# Fetch active conversations
# Note: 'active' is the default filter, but explicit is better
response = conversation_api.get_conversations_active()
if response.body is None:
print("No active conversations found.")
return
for conversation in response.body.entities:
print(f"Conversation ID: {conversation.id}")
print(f"Type: {conversation.conversation_type}")
print(f"State: {conversation.state}")
# Access participants
for participant in conversation.participants:
print(f" Participant: {participant.name} (Role: {participant.role})")
except Exception as e:
print(f"Error fetching active conversations: {e}")
# fetch_active_conversations()
Expected Response Structure (Simplified):
{
"entities": [
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"conversation_type": "call",
"state": "connected",
"participants": [
{
"id": "participant-id-1",
"name": "John Doe",
"role": "agent"
}
]
}
]
}
Step 2: Understanding /api/v2/analytics/conversations (Historical & Aggregated Data)
The /api/v2/analytics/conversations endpoint is designed for reporting, analytics, and historical queries. It processes data that has been settled into the Genesys Cloud analytics warehouse.
When to use this:
- Generating daily/weekly/monthly performance reports.
- Calculating average handle time (AHT) for agents.
- Querying data from the last 30 days (or more, depending on retention settings).
- Aggregating metrics (sum, avg, count) across multiple conversations.
Key Constraints:
- Latency: There is a delay (typically 1-5 minutes) between a conversation ending and it appearing in analytics. Do not use this for real-time triggers.
- Query Structure: You must send a JSON body with
dateRange,interval, andmetrics. - Pagination: Results are paginated via
nextPageUri.
Code Example: Querying Historical Analytics
We will use requests for this example to demonstrate the raw JSON payload structure, which is critical for understanding the analytics query language.
import json
from datetime import datetime, timedelta
def query_historical_conversations():
"""
Queries conversation analytics for the last 24 hours.
"""
token = get_access_token()
url = f"https://{GENESYS_CLOUD_REGION}/api/v2/analytics/conversations/details/query"
# Define the time range: Last 24 hours
end_time = datetime.utcnow().isoformat() + "Z"
start_time = (datetime.utcnow() - timedelta(days=1)).isoformat() + "Z"
# Define the query body
# This query fetches individual conversation details, not aggregates
query_body = {
"dateRange": {
"startTime": start_time,
"endTime": end_time
},
"interval": "PT1H", # Hourly intervals
"metrics": [
"totalHandleTime",
"talkTime",
"holdTime"
],
"groupings": [
"wrapupcode"
],
"selection": {
"type": "conversation",
"filter": {
"type": "conversation",
"path": "type",
"operator": "eq",
"value": "call"
}
},
"pageSize": 100
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
# Note: analytics:conversation:read scope is required
}
try:
response = requests.post(url, headers=headers, json=query_body)
if response.status_code == 401:
raise Exception("Unauthorized: Check OAuth token and scopes.")
elif response.status_code == 403:
raise Exception("Forbidden: Check if the user has analytics:conversation:read scope.")
elif response.status_code == 429:
print("Rate limited. Implement exponential backoff.")
return
response.raise_for_status()
data = response.json()
if not data.get("entities"):
print("No analytics data found for the specified range.")
return
print(f"Total entities returned: {len(data['entities'])}")
for entity in data["entities"][:5]: # Print first 5
print(f"Interval: {entity['interval']}")
for metric in entity.get("metrics", []):
print(f" Metric: {metric['name']} = {metric['value']}")
except requests.exceptions.RequestException as e:
print(f"HTTP Request failed: {e}")
# query_historical_conversations()
Step 3: Handling Pagination in Analytics
Analytics endpoints almost always return paginated results. You must follow the nextPageUri until it is null.
def fetch_all_analytics_pages(token: str, initial_url: str, query_body: dict) -> list:
"""
Iterates through all pages of an analytics query.
"""
all_entities = []
next_page_uri = initial_url
while next_page_uri:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# On first request, use POST with body. On subsequent requests, use GET with URI.
if next_page_uri == initial_url:
response = requests.post(next_page_uri, headers=headers, json=query_body)
else:
# The nextPageUri is a full URL, often with query params
response = requests.get(next_page_uri, headers=headers)
response.raise_for_status()
data = response.json()
all_entities.extend(data.get("entities", []))
# Check for next page
next_page_uri = data.get("nextPageUri")
# Simple rate limiting to avoid 429s
import time
time.sleep(0.5)
return all_entities
Complete Working Example
Below is a complete, runnable Python script that demonstrates both approaches. It fetches active calls and then queries the last hour of call analytics.
import os
import requests
import time
from datetime import datetime, timedelta
from dotenv import load_dotenv
from genesyscloud import PlatformClient
from genesyscloud.conversations.api import ConversationApi
from genesyscloud.auth import OAuthClientCredentialsConfig
load_dotenv()
GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
def get_access_token() -> str:
url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
return response.json()["access_token"]
def get_active_conversations():
"""
Retrieves live conversation states using the SDK.
"""
auth_config = OAuthClientCredentialsConfig(CLIENT_ID, CLIENT_SECRET)
platform_client = PlatformClient()
platform_client.set_auth_config(auth_config)
conversation_api = ConversationApi(platform_client)
try:
response = conversation_api.get_conversations_active()
if not response.body or not response.body.entities:
print("No active conversations found.")
return []
active_ids = []
for conv in response.body.entities:
print(f"[ACTIVE] ID: {conv.id}, Type: {conv.conversation_type}, State: {conv.state}")
active_ids.append(conv.id)
return active_ids
except Exception as e:
print(f"Error fetching active conversations: {e}")
return []
def get_analytics_conversations():
"""
Retrieves historical conversation data using raw requests.
"""
token = get_access_token()
url = f"https://{GENESYS_CLOUD_REGION}/api/v2/analytics/conversations/details/query"
end_time = datetime.utcnow().isoformat() + "Z"
start_time = (datetime.utcnow() - timedelta(hours=1)).isoformat() + "Z"
query_body = {
"dateRange": {
"startTime": start_time,
"endTime": end_time
},
"interval": "PT1H",
"metrics": ["totalHandleTime", "talkTime"],
"selection": {
"type": "conversation",
"filter": {
"type": "conversation",
"path": "type",
"operator": "eq",
"value": "call"
}
},
"pageSize": 10
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = requests.post(url, headers=headers, json=query_body)
response.raise_for_status()
data = response.json()
if not data.get("entities"):
print("No analytics data found for the last hour.")
return
print(f"\n[ANALYTICS] Found {len(data['entities'])} conversation records in last hour.")
for entity in data["entities"]:
print(f" Interval: {entity['interval']}")
for metric in entity.get("metrics", []):
print(f" {metric['name']}: {metric['value']}")
except Exception as e:
print(f"Error fetching analytics: {e}")
if __name__ == "__main__":
print("=== Fetching Active Conversations ===")
get_active_conversations()
print("\n=== Fetching Analytics Conversations ===")
get_analytics_conversations()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token is expired, invalid, or missing.
- Fix: Ensure your
get_access_token()function is called before every request or that you implement a token refresh mechanism. Genesys tokens expire in 30 minutes.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope.
- Fix:
- For
/conversations: Addconversation:readto your OAuth client. - For
/analytics: Addanalytics:conversation:readto your OAuth client. - Note: Scopes are added in the Genesys Cloud Admin Console under Administration > Security > OAuth clients.
- For
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the API endpoint. Analytics endpoints have stricter rate limits than conversation endpoints.
- Fix: Implement exponential backoff. Do not poll analytics endpoints in a tight loop. Use the
Retry-Afterheader if provided.
Error: Empty Entities in Analytics
- Cause: The time range is too short, or the data has not yet propagated to the analytics warehouse.
- Fix: Increase the time range. Remember that there is a 1-5 minute delay between a conversation ending and it appearing in analytics. Do not query for “now”.
Error: Invalid JSON in Analytics Query
- Cause: The
dateRangeformat is incorrect or themetricsarray contains invalid metric names. - Fix: Ensure ISO 8601 format with “Z” suffix for UTC. Use the official metric names (e.g.,
totalHandleTime, nothandleTime).