Constructing a Genesys Cloud Analytics Aggregation Query by Queue and Media Type
What You Will Build
- One sentence: You will build a script that queries the Genesys Cloud Analytics API to retrieve conversation volume metrics aggregated by specific queues and media types (voice, chat, email).
- One sentence: This tutorial uses the Genesys Cloud
analytics/conversations/details/queryendpoint via the Python SDK. - One sentence: The code is written in Python 3.9+ using the
genesyscloudSDK andhttpxfor underlying request handling.
Prerequisites
- OAuth Client Type: Service Account (Client Credentials Grant) or Resource Owner Password Credentials.
- Required Scopes:
analytics:conversation:read,analytics:report:read. - SDK Version:
genesyscloudPython SDK >= 140.0.0. - Runtime Requirements: Python 3.9 or higher.
- Dependencies:
pip install genesyscloud requests(the SDK relies onrequestsfor HTTP transport).
Authentication Setup
Genesys Cloud APIs require OAuth 2.0 authentication. For backend integrations and analytics queries, the Client Credentials Grant is the standard pattern. This flow exchanges a Client ID and Client Secret for an access token.
The Python SDK handles token management automatically if you configure the PlatformClient correctly. However, understanding the underlying token lifecycle is critical for debugging 401 Unauthorized errors.
import os
from purecloudplatformclientv2 import PlatformClient
# Environment variables must be set before running
# GENESYS_CLOUD_REGION: e.g., 'mypurecloud.com', 'usw2.pure.cloud', 'au02.pure.cloud'
# GENESYS_CLOUD_CLIENT_ID: Your OAuth Client ID
# GENESYS_CLOUD_CLIENT_SECRET: Your OAuth Client Secret
def get_platform_client() -> PlatformClient:
"""
Initializes and returns an authenticated PlatformClient instance.
"""
region = os.getenv("GENESYS_CLOUD_REGION")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([region, client_id, client_secret]):
raise ValueError("Missing required environment variables: REGION, CLIENT_ID, CLIENT_SECRET")
# The PlatformClient automatically handles token acquisition and refresh
# when using the client credentials flow.
platform_client = PlatformClient(
region=region,
client_id=client_id,
client_secret=client_secret
)
return platform_client
Implementation
Step 1: Define the Analytics Query Body
The core of this tutorial is constructing the body parameter for the post_analytics_conversations_details_query method. The Genesys Cloud Analytics API uses a declarative query structure. You define what you want (metrics), how you want it grouped (dimensions), and when it occurred (date range).
To group by Queue and Media Type, you must include queue.id and mediaType in the groupBy array.
from purecloudplatformclientv2 import PostAnalyticsConversationsDetailsQueryRequestBody
from purecloudplatformclientv2 import ConversationDetailsQuery
def build_query_body() -> PostAnalyticsConversationsDetailsQueryRequestBody:
"""
Constructs the request body for the Analytics API.
Returns:
PostAnalyticsConversationsDetailsQueryRequestBody: The configured query object.
"""
# Define the date range (ISO 8601 format)
# Example: Last 7 days
from datetime import datetime, timedelta
end_time = datetime.utcnow().isoformat()
start_time = (datetime.utcnow() - timedelta(days=7)).isoformat()
# Define the metrics you want to aggregate
# 'count' is the number of conversations
# 'talkTime' is the total time agents spent talking (in seconds)
metrics = ["count", "talkTime"]
# Define the dimensions to group by
# 'queue.id' groups results by Queue ID
# 'mediaType' groups results by Voice, Chat, Email, etc.
group_by = ["queue.id", "mediaType"]
# Define filters to narrow down the data
# Here we filter for active queues only.
# You can add more filters like 'queue.name' if needed.
filters = {
"queue.id": {
"type": "in",
"values": [] # We will populate this in Step 2
}
}
# Construct the query object
query = ConversationDetailsQuery(
date_from=start_time,
date_to=end_time,
group_by=group_by,
metrics=metrics,
filters=filters
)
# Wrap in the request body class
request_body = PostAnalyticsConversationsDetailsQueryRequestBody(
query=query
)
return request_body
Step 2: Resolve Queue IDs
The groupBy dimension queue.id returns results keyed by the internal Queue ID (e.g., a1b2c3d4-e5f6-7890-1234-567890abcdef). To make the output human-readable, you often need to map these IDs to Queue Names. While the Analytics API can return some display names in certain contexts, it is robust practice to fetch the list of queues separately if you need precise naming control, or simply handle the ID mapping in your post-processing logic.
For this tutorial, we will assume you want to query all queues. If you need to filter by specific queues, you must first retrieve their IDs.
from purecloudplatformclientv2 import QueueApi
from purecloudplatformclientv2 import PaginationConfiguration
def get_all_queue_ids(platform_client: PlatformClient) -> list:
"""
Retrieves a list of all active Queue IDs.
Args:
platform_client: Authenticated PlatformClient instance.
Returns:
list: A list of Queue ID strings.
"""
queue_api = QueueApi(platform_client)
queue_ids = []
# Pagination configuration for fetching queues
pagination_config = PaginationConfiguration(
page_size=100,
max_pages=10 # Adjust based on expected number of queues
)
try:
# Fetch all queues
for page in queue_api.get_queues(pagination_config=pagination_config):
for queue in page.entities:
# Filter for active queues only
if queue.status == "ACTIVE":
queue_ids.append(queue.id)
except Exception as e:
print(f"Error fetching queues: {e}")
raise
return queue_ids
Step 3: Execute the Query and Handle Pagination
The post_analytics_conversations_details_query endpoint returns a ConversationDetailsQueryResponse. This response contains a partitions array. Each partition represents a subset of the aggregated data. If the result set is large, Genesys Cloud splits it into multiple partitions. You must iterate through all partitions to get the complete dataset.
from purecloudplatformclientv2 import AnalyticsConversationsApi
from purecloudplatformclientv2 import ConversationDetailsQueryResponse
def execute_analytics_query(
platform_client: PlatformClient,
request_body: PostAnalyticsConversationsDetailsQueryRequestBody
) -> list:
"""
Executes the analytics query and flattens the results from all partitions.
Args:
platform_client: Authenticated PlatformClient instance.
request_body: The constructed query body.
Returns:
list: A list of dictionaries containing the aggregated metrics.
"""
analytics_api = AnalyticsConversationsApi(platform_client)
all_results = []
try:
# Execute the query
# The API returns a response with a 'partitions' field
response: ConversationDetailsQueryResponse = analytics_api.post_analytics_conversations_details_query(
body=request_body
)
# Check if partitions exist
if response.partitions:
for partition in response.partitions:
# Each partition contains 'rows' which are the aggregated data points
if partition.rows:
for row in partition.rows:
# Convert the SDK object to a dictionary for easier handling
# The 'group' field contains the values for the groupBy dimensions
# The 'metrics' field contains the values for the requested metrics
result_entry = {
"queue_id": row.group.get("queue.id"),
"media_type": row.group.get("mediaType"),
"conversation_count": row.metrics.get("count"),
"total_talk_time_seconds": row.metrics.get("talkTime")
}
all_results.append(result_entry)
else:
print("No partitions returned. Check date range and filters.")
except Exception as e:
# Handle specific API errors
if hasattr(e, 'status') and e.status == 429:
print("Rate limited (429). Implement exponential backoff.")
elif hasattr(e, 'status') and e.status == 400:
print(f"Bad Request (400). Check query syntax: {e.body}")
else:
print(f"Error executing query: {e}")
raise
return all_results
Complete Working Example
This script combines authentication, queue resolution, query construction, and execution. It outputs a CSV-formatted string of the results.
import os
import csv
import io
from purecloudplatformclientv2 import PlatformClient
from purecloudplatformclientv2 import PostAnalyticsConversationsDetailsQueryRequestBody
from purecloudplatformclientv2 import ConversationDetailsQuery
from purecloudplatformclientv2 import AnalyticsConversationsApi
from purecloudplatformclientv2 import QueueApi
from purecloudplatformclientv2 import PaginationConfiguration
from datetime import datetime, timedelta
def main():
# 1. Authenticate
region = os.getenv("GENESYS_CLOUD_REGION")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([region, client_id, client_secret]):
raise ValueError("Set GENESYS_CLOUD_REGION, CLIENT_ID, CLIENT_SECRET in environment.")
platform_client = PlatformClient(
region=region,
client_id=client_id,
client_secret=client_secret
)
# 2. Get Queue IDs (Optional: Filter specific queues here)
queue_api = QueueApi(platform_client)
queue_ids = []
pagination_config = PaginationConfiguration(page_size=100, max_pages=5)
try:
for page in queue_api.get_queues(pagination_config=pagination_config):
for queue in page.entities:
if queue.status == "ACTIVE":
queue_ids.append(queue.id)
except Exception as e:
print(f"Failed to fetch queues: {e}")
return
if not queue_ids:
print("No active queues found.")
return
# 3. Build Query
end_time = datetime.utcnow().isoformat()
start_time = (datetime.utcnow() - timedelta(days=7)).isoformat()
query = ConversationDetailsQuery(
date_from=start_time,
date_to=end_time,
group_by=["queue.id", "mediaType"],
metrics=["count", "talkTime"],
filters={
"queue.id": {
"type": "in",
"values": queue_ids
}
}
)
request_body = PostAnalyticsConversationsDetailsQueryRequestBody(query=query)
# 4. Execute Query
analytics_api = AnalyticsConversationsApi(platform_client)
try:
response = analytics_api.post_analytics_conversations_details_query(body=request_body)
# 5. Process Results
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Queue ID", "Media Type", "Conversation Count", "Total Talk Time (s)"])
if response.partitions:
for partition in response.partitions:
if partition.rows:
for row in partition.rows:
queue_id = row.group.get("queue.id", "N/A")
media_type = row.group.get("mediaType", "N/A")
count = row.metrics.get("count", 0)
talk_time = row.metrics.get("talkTime", 0)
writer.writerow([queue_id, media_type, count, talk_time])
print(output.getvalue())
except Exception as e:
print(f"Error: {e}")
if hasattr(e, 'body'):
print(f"Response Body: {e.body}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request - Invalid Group By Dimension
What causes it:
The groupBy array contains a dimension that is not supported for the requested metrics, or the syntax is incorrect. For example, trying to group by agent.id and queue.id simultaneously in certain metric combinations might be invalid if the metric does not support that granularity.
How to fix it:
Verify that the dimensions listed in groupBy are compatible with the metrics array. queue.id and mediaType are generally safe together. Ensure you are using the correct field names (queue.id, not queueId).
Code showing the fix:
# Incorrect
group_by = ["queueId", "mediaType"]
# Correct
group_by = ["queue.id", "mediaType"]
Error: 429 Too Many Requests
What causes it:
Genesys Cloud APIs enforce rate limits. Analytics queries can be resource-intensive. If you run this script in a loop or during peak hours, you may hit the limit.
How to fix it:
Implement exponential backoff. The Python SDK does not automatically retry 429s for all endpoints. You must catch the exception and retry with a delay.
Code showing the fix:
import time
def execute_with_retry(platform_client, request_body, max_retries=3):
analytics_api = AnalyticsConversationsApi(platform_client)
for attempt in range(max_retries):
try:
return analytics_api.post_analytics_conversations_details_query(body=request_body)
except Exception as e:
if hasattr(e, 'status') and e.status == 429:
wait_time = 2 ** attempt # Exponential backoff: 2s, 4s, 8s
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error: Empty Results
What causes it:
- The date range is in the future.
- The filters are too restrictive (e.g., filtering by a queue ID that does not exist).
- No conversations occurred in the selected queues during the time period.
How to fix it:
- Check
date_fromanddate_to. Ensuredate_fromis beforedate_to. - Remove filters temporarily to see if data appears.
- Verify that the queue IDs retrieved in Step 2 are active and have received traffic.
Code showing the fix:
# Debugging: Print the query JSON to verify syntax
import json
print(json.dumps(request_body.to_dict(), indent=2))