Choosing the Right Endpoint: /api/v2/conversations vs /api/v2/analytics/conversations in Genesys Cloud CX
What You Will Build
- This tutorial demonstrates how to retrieve real-time or recent conversation data using the Conversations API versus aggregated historical metrics using the Analytics API.
- The code samples utilize the Genesys Cloud PureCloud Platform Client V2 SDK in Python.
- The programming language covered is Python 3.9+.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials) or Resource Owner Password Credentials (ROPC).
- Required Scopes:
- For Conversations API:
conversation:all(read all conversations) orconversation:read(read specific conversations). - For Analytics API:
analytics:conversation:query(query conversation analytics).
- For Conversations API:
- SDK Version:
genesyscloud-purecloud-platform-client-v2>= 140.0.0. - Runtime: Python 3.9 or higher.
- External Dependencies:
genesyscloud-purecloud-platform-client-v2python-dotenv(for secure credential management)
Authentication Setup
Before making any API calls, you must authenticate with the Genesys Cloud environment. The Python SDK handles token generation and refresh automatically when configured correctly.
Create a .env file in your project root with the following variables:
GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
GENESYS_CLOUD_USER_EMAIL=your_user_email
GENESYS_CLOUD_USER_PASSWORD=your_user_password
Install the required packages:
pip install genesyscloud-purecloud-platform-client-v2 python-dotenv
Initialize the SDK client in your Python script:
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
OAuthClient,
ConversationApi,
AnalyticsApi
)
load_dotenv()
def get_api_client():
"""
Initializes and returns an authenticated API client.
Uses ROPC flow for demonstration. In production, consider Client Credentials
if the actions do not require user-specific context.
"""
configuration = Configuration()
configuration.host = f"https://{os.getenv('GENESYS_CLOUD_REGION')}.pure.cloudengage.com"
api_client = ApiClient(configuration)
oauth_client = OAuthClient(configuration)
# Authenticate using Resource Owner Password Credentials
token = oauth_client.authenticate(
os.getenv('GENESYS_CLOUD_USER_EMAIL'),
os.getenv('GENESYS_CLOUD_USER_PASSWORD')
)
api_client.configuration.access_token = token.access_token
return api_client
# Initialize clients for both APIs
api_client = get_api_client()
conversation_api = ConversationApi(api_client)
analytics_api = AnalyticsApi(api_client)
Implementation
Step 1: Understanding the Data Model Difference
The fundamental difference between these two endpoints lies in the granularity and purpose of the data.
-
/api/v2/conversations(Conversations API)- Purpose: Retrieves the actual conversation objects. This includes the transcript, media type (voice, chat, email), participants, and the current state (active, queued, closed).
- Use Case: You need to display the chat transcript to an agent, perform real-time sentiment analysis on live text, or retrieve the specific email body for an archival system.
- Data Structure: Returns a list of
Conversationobjects. Each object contains detailed message history, participant IDs, and media-specific attributes.
-
/api/v2/analytics/conversations/details/query(Analytics API)- Purpose: Retrieves aggregated metrics about conversations. This includes wait times, talk times, wrap-up codes, and disposition outcomes. It does not contain the transcript.
- Use Case: You need to generate a weekly report on average handle time (AHT), identify agents with high abandonment rates, or calculate service level percentages.
- Data Structure: Returns a
QueryResponsecontaining agroupsarray with numeric metrics (e.g.,sum(talkTime),avg(waitTime)).
Step 2: Retrieving Recent Conversations (Conversations API)
Let us retrieve the 10 most recent voice conversations. This endpoint is ideal when you need the context of the interaction.
Required Scope: conversation:all
from purecloudplatformclientv2 import ConversationApi, Conversation
def get_recent_conversations(api_client: ApiClient, limit: int = 10) -> list[Conversation]:
"""
Retrieves the most recent conversations.
Args:
api_client: Authenticated API client.
limit: Number of conversations to retrieve (max 100 per request).
Returns:
List of Conversation objects.
"""
conversation_api = ConversationApi(api_client)
try:
# GET /api/v2/conversations
# Parameters:
# - limit: Max items returned (1-100)
# - expand: Optional. Use 'participants' to include participant details
response = conversation_api.get_conversations(
limit=limit,
expand="participants"
)
if response.entities:
print(f"Retrieved {len(response.entities)} conversations.")
return response.entities
else:
print("No recent conversations found.")
return []
except Exception as e:
# Handle 401, 403, 429 errors
if hasattr(e, 'status') and e.status == 429:
print("Rate limit exceeded. Implement exponential backoff.")
elif hasattr(e, 'status') and e.status in [401, 403]:
print(f"Authentication/Authorization error: {e}")
else:
print(f"Error retrieving conversations: {e}")
raise
# Execute
recent_convos = get_recent_conversations(api_client)
# Inspect the first conversation
if recent_convos:
first_convo = recent_convos[0]
print(f"Conversation ID: {first_convo.id}")
print(f"Type: {first_convo.type}")
print(f"State: {first_convo.state}")
# Access transcript for chat/email
if first_convo.type == "chat" and first_convo.transcript:
print(f"Transcript Lines: {len(first_convo.transcript)}")
Expected Response Snippet:
{
"entities": [
{
"id": "8c7b6a5d-4e3f-2a1b-9c8d-7e6f5a4b3c2d",
"type": "voice",
"state": "closed",
"createdTime": "2023-10-27T14:30:00.000Z",
"updatedTime": "2023-10-27T14:35:00.000Z",
"participants": [
{
"id": "agent-id-123",
"name": "John Doe",
"type": "agent"
}
]
}
],
"pageSize": 10,
"pageNumber": 1
}
Step 3: Querying Conversation Metrics (Analytics API)
Now, let us retrieve the same data but focused on performance metrics. We will query for conversations from the last 24 hours.
Required Scope: analytics:conversation:query
The Analytics API uses a POST body to define the query, which is more powerful than simple GET parameters. This allows for complex filtering and grouping.
from purecloudplatformclientv2 import (
AnalyticsApi,
QueryPostBody,
QueryFilter,
QueryGroupBy,
Metric
)
from datetime import datetime, timedelta
def get_daily_metrics(api_client: ApiClient) -> dict:
"""
Queries analytics for conversations from the last 24 hours.
Args:
api_client: Authenticated API client.
Returns:
Dictionary containing aggregated metrics.
"""
analytics_api = AnalyticsApi(api_client)
# Define the time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
# Define the metrics to retrieve
# Common metrics: talkTime, waitTime, wrapTime, holdTime, transferTime
metrics = [
Metric(name="talkTime"),
Metric(name="waitTime"),
Metric(name="count")
]
# Define filters
# Filter by type (voice) and state (closed)
filters = [
QueryFilter(name="type", operator="eq", value="voice"),
QueryFilter(name="state", operator="eq", value="closed")
]
# Define grouping
# Group by date to see daily trends
group_by = [
QueryGroupBy(name="date")
]
# Construct the query body
query_body = QueryPostBody(
metrics=metrics,
filters=filters,
group_by=group_by,
time_range={
"startTime": start_time.isoformat(),
"endTime": end_time.isoformat()
}
)
try:
# POST /api/v2/analytics/conversations/details/query
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
)
if response.groups:
print(f"Query returned {len(response.groups)} groups.")
for group in response.groups:
print(f"Date: {group.group_by}")
for metric in group.metrics:
if metric.name == "talkTime":
print(f" Total Talk Time (ms): {metric.total}")
elif metric.name == "count":
print(f" Conversation Count: {metric.total}")
return response.groups
else:
print("No analytics data found for the specified period.")
return []
except Exception as e:
if hasattr(e, 'status') and e.status == 400:
print(f"Bad Request. Check filter syntax: {e}")
elif hasattr(e, 'status') and e.status == 429:
print("Rate limit exceeded. Wait before retrying.")
else:
print(f"Error querying analytics: {e}")
raise
# Execute
daily_metrics = get_daily_metrics(api_client)
Expected Response Snippet:
{
"queryId": "abc123-def456",
"status": "complete",
"groups": [
{
"groupBy": {
"date": "2023-10-27"
},
"metrics": [
{
"name": "talkTime",
"total": 1250000,
"avg": 45000
},
{
"name": "count",
"total": 28
}
]
}
]
}
Step 4: Handling Pagination and Large Datasets
Both APIs handle large datasets differently.
Conversations API:
Uses standard pagination with pageSize and pageNumber. The maximum page size is 100. To fetch all conversations, you must loop until response.pageNumber * response.pageSize exceeds the total count or no more entities are returned.
Analytics API:
The Analytics API is designed for large aggregations. It does not use traditional pagination for the result set in the same way. Instead, you define the timeRange and groupBy. If the result set is extremely large, the API may return a queryId and a status of processing. You must then poll the GET /api/v2/analytics/conversations/details/query/{queryId} endpoint until the status changes to complete.
Here is how to handle asynchronous analytics queries:
import time
def poll_analytics_query(api_client: ApiClient, query_id: str, max_wait_seconds: int = 300) -> dict:
"""
Polls an asynchronous analytics query until completion.
Args:
api_client: Authenticated API client.
query_id: The ID returned from the initial POST request.
max_wait_seconds: Maximum time to wait for the query to complete.
Returns:
The final query response.
"""
analytics_api = AnalyticsApi(api_client)
start_time = time.time()
while time.time() - start_time < max_wait_seconds:
try:
response = analytics_api.get_analytics_conversations_details_query(query_id)
if response.status == "complete":
return response
elif response.status == "failed":
raise Exception(f"Analytics query failed: {response.message}")
else:
# Still processing
time.sleep(5)
except Exception as e:
print(f"Error polling query: {e}")
raise
raise TimeoutError("Analytics query did not complete within the specified time.")
Complete Working Example
This script combines authentication, conversation retrieval, and analytics querying into a single runnable module.
import os
import time
from datetime import datetime, timedelta
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
OAuthClient,
ConversationApi,
AnalyticsApi,
QueryPostBody,
QueryFilter,
QueryGroupBy,
Metric
)
load_dotenv()
def init_api_client():
configuration = Configuration()
configuration.host = f"https://{os.getenv('GENESYS_CLOUD_REGION')}.pure.cloudengage.com"
api_client = ApiClient(configuration)
oauth_client = OAuthClient(configuration)
token = oauth_client.authenticate(
os.getenv('GENESYS_CLOUD_USER_EMAIL'),
os.getenv('GENESYS_CLOUD_USER_PASSWORD')
)
api_client.configuration.access_token = token.access_token
return api_client
def fetch_recent_voice_conversations(api_client: ApiClient):
"""Fetches the 5 most recent closed voice conversations."""
conversation_api = ConversationApi(api_client)
try:
# Filter for voice conversations is not directly available in GET /conversations
# We retrieve all recent and filter client-side, or use expand='participants' to check type
response = conversation_api.get_conversations(limit=5, expand="participants")
voice_conversations = []
for convo in response.entities:
if convo.type == "voice":
voice_conversations.append(convo)
return voice_conversations
except Exception as e:
print(f"Error fetching conversations: {e}")
return []
def fetch_voice_metrics(api_client: ApiClient):
"""Fetches talk time and wait time metrics for the last 24 hours."""
analytics_api = AnalyticsApi(api_client)
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
query_body = QueryPostBody(
metrics=[
Metric(name="talkTime"),
Metric(name="waitTime"),
Metric(name="count")
],
filters=[
QueryFilter(name="type", operator="eq", value="voice"),
QueryFilter(name="state", operator="eq", value="closed")
],
group_by=[QueryGroupBy(name="date")],
time_range={
"startTime": start_time.isoformat(),
"endTime": end_time.isoformat()
}
)
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
# Handle async processing
if response.status == "processing":
print("Query processing asynchronously...")
for _ in range(60): # Wait up to 5 minutes
time.sleep(5)
response = analytics_api.get_analytics_conversations_details_query(response.query_id)
if response.status == "complete":
break
else:
raise TimeoutError("Query did not complete.")
return response.groups
except Exception as e:
print(f"Error fetching analytics: {e}")
return []
def main():
print("Initializing Genesys Cloud API Client...")
api_client = init_api_client()
print("\n--- Fetching Recent Voice Conversations ---")
convos = fetch_recent_voice_conversations(api_client)
for convo in convos:
print(f"ID: {convo.id}, Created: {convo.createdTime}")
print("\n--- Fetching Voice Metrics (Last 24 Hours) ---")
metrics = fetch_voice_metrics(api_client)
if metrics:
for group in metrics:
date = group.group_by.get("date", "Unknown")
talk_time_ms = next((m.total for m in group.metrics if m.name == "talkTime"), 0)
count = next((m.total for m in group.metrics if m.name == "count"), 0)
print(f"Date: {date}, Count: {count}, Total Talk Time: {talk_time_ms}ms")
else:
print("No metrics data available.")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or was never generated correctly.
- Fix: Ensure the
authenticatemethod is called before any API requests. The SDK does not automatically refresh tokens if the initial authentication fails. Check your.envcredentials.
Error: 403 Forbidden
- Cause: The OAuth token lacks the required scope.
- Fix:
- For Conversations API: Add
conversation:allto your client’s scopes in the Genesys Admin console. - For Analytics API: Add
analytics:conversation:queryto your client’s scopes. - Re-authenticate after changing scopes.
- For Conversations API: Add
Error: 400 Bad Request (Analytics)
- Cause: Invalid filter syntax or unsupported metric name.
- Fix: Verify metric names against the Analytics API documentation. Common mistakes include using
talktimeinstead oftalkTime(case-sensitive) or using an operator likeeqon a numeric field that expectsgtorlt.
Error: 429 Too Many Requests
- Cause: Exceeding the rate limit (typically 10-20 requests per second per client).
- Fix: Implement exponential backoff. The
time.sleep()function in the polling example demonstrates a basic wait. For production systems, use a library liketenacityto handle retries automatically.
Error: Empty Results
- Cause: The time range is too narrow, or the filters are too restrictive.
- Fix:
- For Conversations API: Ensure you are querying for a time frame where conversations actually occurred.
- For Analytics API: Check that the
time_rangeis in ISO 8601 format and that thestartTimeis beforeendTime. Also, verify that thetypefilter matches the actual conversation types in your environment (e.g.,voice,chat,email,sms).