Constructing Analytics Aggregation Queries for Queue and Media Type in Genesys Cloud
What You Will Build
- You will build a Python script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
- The query will group results by
queueandmediaTypeto identify performance trends across different communication channels. - The tutorial uses the Genesys Cloud Python SDK (
genesys-cloud-sdk) and therequestslibrary for raw HTTP validation.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:query:read(Required for accessing aggregated analytics data).analytics:conversations:view(Optional, if you need to drill down into specific conversation details later).
- SDK Version: Genesys Cloud Python SDK >= 160.0.0.
- Runtime: Python 3.8 or higher.
- External Dependencies:
genesys-cloud-sdkrequests(for debugging raw payloads)pydantic(included with SDK, used for model validation)
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API access. You must obtain an access token before making any analytics queries. The SDK handles token caching and refresh automatically, but understanding the underlying flow is critical for debugging 401 errors.
Step 1: Install the SDK
Install the official Genesys Cloud Python SDK via pip.
pip install genesys-cloud-sdk requests
Step 2: Initialize the Platform Client
The PureCloudPlatformClientV2 is the entry point for all SDK interactions. It manages the OAuth token lifecycle.
import os
from purecloudplatformclientv2 import PlatformClient
def get_platform_client():
"""
Initializes and returns a configured PureCloudPlatformClientV2 instance.
"""
# These environment variables must be set in your execution environment
client_id = os.environ.get('GENESYS_CLIENT_ID')
client_secret = os.environ.get('GENESYS_CLIENT_SECRET')
environment = os.environ.get('GENESYS_ENVIRONMENT', 'mypurecloud.com')
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# The SDK automatically handles token acquisition and refresh
platform_client = PlatformClient(client_id, client_secret, environment)
return platform_client
# Initialize the client
platform_client = get_platform_client()
Note: The SDK caches the token in memory. If your script runs for a long duration, the SDK will automatically refresh the token before it expires. If you are running this in a short-lived container (like AWS Lambda), the token will be fetched on every invocation.
Implementation
The core of this tutorial is constructing the QueryBody for the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint allows for complex aggregations using a JSON-based query language.
Step 1: Constructing the Query Body
The Analytics API uses a specific structure for aggregation queries. You must define:
interval: The time range for the query.view: The data view (e.g.,default,custom).groupBy: The dimensions to aggregate by (queue,mediaType).metrics: The specific metrics to calculate (e.g.,conversationCount,handledCount).
Defining the Time Interval
Analytics queries require a start and end time. The API accepts ISO 8601 formatted strings.
from datetime import datetime, timedelta
def get_time_interval(days_back=7):
"""
Returns a start and end time for the query.
"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days_back)
# Format as ISO 8601 with timezone
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
end_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
return start_str, end_str
start_time, end_time = get_time_interval(days_back=7)
print(f"Querying from {start_time} to {end_time}")
Building the Aggregation Query
We will use the SDK’s data models to build the query. This ensures type safety and proper JSON serialization.
from purecloudplatformclientv2 import (
QueryBody,
MetricFilter,
GroupBy,
View
)
def build_analytics_query(start_time: str, end_time: str):
"""
Constructs the QueryBody for the Analytics API.
"""
# 1. Define the metrics we want to aggregate
# We want the total number of conversations and the number of handled conversations
metrics = [
"conversationCount",
"handledCount",
"abandonedCount",
"averageHandleTime"
]
# 2. Define the groupBy dimensions
# We group by 'queue' and 'mediaType'
group_by = [
GroupBy(name="queue"),
GroupBy(name="mediaType")
]
# 3. Define the view
# 'default' is the standard view for conversation analytics
view_name = "default"
# 4. Construct the QueryBody
# The SDK allows us to build this object programmatically
query_body = QueryBody(
interval=f"{start_time}/{end_time}",
view=view_name,
group_by=group_by,
metrics=metrics
)
return query_body
query_body = build_analytics_query(start_time, end_time)
Important: The interval field in QueryBody expects a string in the format start/end. The SDK does not automatically concatenate the datetime strings, so you must provide the full interval string.
Step 2: Executing the Query
Now that the query body is constructed, we execute it using the AnalyticsApi.
from purecloudplatformclientv2 import AnalyticsApi, ApiException
import json
def execute_analytics_query(platform_client: PlatformClient, query_body: QueryBody):
"""
Executes the analytics query and returns the response.
"""
analytics_api = AnalyticsApi(platform_client)
try:
# The endpoint is POST /api/v2/anversations/details/query
# The SDK maps this to analytics_api.post_analytics_conversations_details_query
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
return response
except ApiException as e:
print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}\n")
# Print the response body for debugging
print(f"Response Body: {e.body}")
raise
# Execute the query
response = execute_analytics_query(platform_client, query_body)
Step 3: Processing the Results
The response from the Analytics API is a complex JSON structure. The results array contains the aggregated data. Each item in the results array represents a unique combination of queue and mediaType.
Understanding the Response Structure
A typical response looks like this:
{
"results": [
{
"groupByValues": [
{
"name": "queue",
"values": [
{
"id": "queue-id-1",
"name": "Sales Queue"
}
]
},
{
"name": "mediaType",
"values": [
{
"id": "voice",
"name": "Voice"
}
]
}
],
"metrics": {
"conversationCount": {
"value": 150
},
"handledCount": {
"value": 140
},
"abandonedCount": {
"value": 10
},
"averageHandleTime": {
"value": 180.5
}
}
}
],
"nextPageUri": null
}
Parsing the Results
We will write a function to parse this response and output a clean table of data.
def process_analytics_results(response):
"""
Parses the analytics response and prints a summary table.
"""
if not response or not response.results:
print("No results found.")
return
print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Conversations':<12} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (s)':<12}")
print("-" * 80)
for result in response.results:
# Extract groupBy values
queue_name = "Unknown"
media_type = "Unknown"
for group in result.group_by_values:
if group.name == "queue" and group.values:
queue_name = group.values[0].name
elif group.name == "mediaType" and group.values:
media_type = group.values[0].name
# Extract metrics
metrics = result.metrics
conversation_count = metrics.get("conversationCount", {}).get("value", 0)
handled_count = metrics.get("handledCount", {}).get("value", 0)
abandoned_count = metrics.get("abandonedCount", {}).get("value", 0)
avg_handle_time = metrics.get("averageHandleTime", {}).get("value", 0)
# Format and print
print(f"{queue_name:<20} | {media_type:<10} | {conversation_count:<12} | {handled_count:<10} | {abandoned_count:<10} | {avg_handle_time:<12.2f}")
# Process the results
process_analytics_results(response)
Complete Working Example
Below is the complete, copy-pasteable Python script. Save this as analytics_query.py.
import os
import sys
from datetime import datetime, timedelta
from purecloudplatformclientv2 import (
PlatformClient,
AnalyticsApi,
QueryBody,
GroupBy,
ApiException
)
def get_platform_client():
"""
Initializes and returns a configured PureCloudPlatformClientV2 instance.
"""
client_id = os.environ.get('GENESYS_CLIENT_ID')
client_secret = os.environ.get('GENESYS_CLIENT_SECRET')
environment = os.environ.get('GENESYS_ENVIRONMENT', 'mypurecloud.com')
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
platform_client = PlatformClient(client_id, client_secret, environment)
return platform_client
def get_time_interval(days_back=7):
"""
Returns a start and end time for the query in ISO 8601 format.
"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days_back)
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
end_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
return start_str, end_str
def build_analytics_query(start_time: str, end_time: str):
"""
Constructs the QueryBody for the Analytics API.
Groups by queue and mediaType.
"""
metrics = [
"conversationCount",
"handledCount",
"abandonedCount",
"averageHandleTime"
]
group_by = [
GroupBy(name="queue"),
GroupBy(name="mediaType")
]
view_name = "default"
query_body = QueryBody(
interval=f"{start_time}/{end_time}",
view=view_name,
group_by=group_by,
metrics=metrics
)
return query_body
def execute_analytics_query(platform_client: PlatformClient, query_body: QueryBody):
"""
Executes the analytics query and returns the response.
"""
analytics_api = AnalyticsApi(platform_client)
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
return response
except ApiException as e:
print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}\n")
print(f"Response Body: {e.body}")
raise
def process_analytics_results(response):
"""
Parses the analytics response and prints a summary table.
"""
if not response or not response.results:
print("No results found.")
return
print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Conversations':<12} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (s)':<12}")
print("-" * 80)
for result in response.results:
queue_name = "Unknown"
media_type = "Unknown"
if result.group_by_values:
for group in result.group_by_values:
if group.name == "queue" and group.values:
queue_name = group.values[0].name
elif group.name == "mediaType" and group.values:
media_type = group.values[0].name
metrics = result.metrics
conversation_count = metrics.get("conversationCount", {}).get("value", 0)
handled_count = metrics.get("handledCount", {}).get("value", 0)
abandoned_count = metrics.get("abandonedCount", {}).get("value", 0)
avg_handle_time = metrics.get("averageHandleTime", {}).get("value", 0)
print(f"{queue_name:<20} | {media_type:<10} | {conversation_count:<12} | {handled_count:<10} | {abandoned_count:<10} | {avg_handle_time:<12.2f}")
def main():
try:
# 1. Initialize Client
platform_client = get_platform_client()
# 2. Define Time Interval
start_time, end_time = get_time_interval(days_back=7)
print(f"Querying analytics data from {start_time} to {end_time}")
# 3. Build Query
query_body = build_analytics_query(start_time, end_time)
# 4. Execute Query
response = execute_analytics_query(platform_client, query_body)
# 5. Process Results
process_analytics_results(response)
except Exception as e:
print(f"An error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.
Fix:
- Verify that
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare set correctly. - Ensure the client has the
analytics:query:readscope. You can check this in the Genesys Cloud Admin Console under Developers > OAuth Clients. - If using the SDK, ensure you are not sharing the
PlatformClientinstance across multiple threads without proper locking, as the token cache is not thread-safe by default.
Error: 400 Bad Request - Invalid Interval
Cause: The interval string is malformed or the time range is too large.
Fix:
- Ensure the interval format is
start/end(e.g.,2023-10-01T00:00:00.000Z/2023-10-08T00:00:00.000Z). - The maximum time range for a single query is 90 days. If you need more data, you must paginate using multiple queries.
- Ensure the start time is before the end time.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Analytics API.
Fix:
- Implement exponential backoff and retry logic.
- The Analytics API has a lower rate limit than other APIs. Do not fire queries in a tight loop.
- Use the
Retry-Afterheader in the response to determine how long to wait.
import time
def execute_with_retry(platform_client, query_body, max_retries=3):
analytics_api = AnalyticsApi(platform_client)
for attempt in range(max_retries):
try:
return analytics_api.post_analytics_conversations_details_query(body=query_body)
except ApiException as e:
if e.status == 429:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error: Empty Results
Cause: No conversations match the query criteria.
Fix:
- Verify that the time interval contains data. Try expanding the date range.
- Ensure that the queues you are querying have active conversations.
- Check if the
viewis correct. Thedefaultview is usually sufficient, but custom views may have different data availability.