Constructing an Analytics API Aggregation Query: Grouping by Queue and Media Type
What You Will Build
- This tutorial demonstrates how to construct a robust analytics query that aggregates conversation metrics, specifically grouping results by queue ID and media type.
- The implementation utilizes the Genesys Cloud CX Analytics API endpoint
/api/v2/analytics/conversations/details/query. - The code examples are provided in Python using the official
genesys-cloud-purecloud-sdkand raw HTTP requests viahttpx.
Prerequisites
- OAuth Client Type: A Service Account or Client Credentials Flow setup with the following scopes:
analytics:conversation:readanalytics:dashboard:read
- SDK Version:
genesys-cloud-purecloud-sdkversion3.4.0or higher. - Language/Runtime: Python 3.9+.
- External Dependencies:
pip install genesys-cloud-purecloud-sdkpip install httpxpip install python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud APIs require a valid OAuth 2.0 Bearer token. For backend integrations and analytics queries, the Client Credentials flow is the standard approach. This flow exchanges a Client ID and Client Secret for an access token without user interaction.
The following Python function handles token acquisition. In a production environment, you must implement token caching to avoid hitting rate limits on the /oauth/token endpoint. Tokens are valid for one hour by default.
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mytmyt") # e.g., "mypurecloud.com"
OAUTH_URL = f"https://{ENVIRONMENT}.mypurecloud.com/oauth/token"
async def get_access_token() -> str:
"""
Retrieves an OAuth2 access token using Client Credentials flow.
Returns:
str: The access token string.
Raises:
httpx.HTTPStatusError: If authentication fails (401, 403).
"""
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
OAUTH_URL,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
"grant_type": "client_credentials",
"scope": "analytics:conversation:read analytics:dashboard:read"
}
)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
except httpx.HTTPStatusError as e:
print(f"Authentication failed with status {e.response.status_code}: {e.response.text}")
raise
Implementation
Step 1: Defining the Query Structure
The core of this tutorial is the request body sent to /api/v2/analytics/conversations/details/query. This endpoint uses a complex JSON structure to define time windows, filters, and groupings.
To group by Queue and Media Type, you must specify these fields in the groupBys array. Additionally, you must define which metrics you want to aggregate (e.g., totalHandled, totalAbandoned, avgTalkTime) in the metrics array.
Critical Parameter Explanation:
interval: Defines the time bucket size. For aggregation queries,"PT0H"(zero hour) is often used if you want a single total sum across the entire time window. If you want hourly breakdowns, use"PT1H".groupBys: An array of strings. Valid values include"queueId","mediaType","wrapUpCode", etc.timeGroup: Defines the start and end of the analysis window. It uses ISO 8601 format with inclusive/exclusive bounds.
Here is the construction of the query payload:
from datetime import datetime, timedelta
import json
def build_analytics_query(queue_ids: list[str] | None = None) -> dict:
"""
Constructs the JSON payload for the analytics query.
Args:
queue_ids: Optional list of queue IDs to filter. If None, all queues are included.
Returns:
dict: The query payload.
"""
# Define time window: Last 7 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
# Format times for the API (ISO 8601 with inclusive/exclusive markers)
# [ indicates inclusive start, ) indicates exclusive end
time_group = {
"start": start_time.isoformat() + "Z",
"end": end_time.isoformat() + "Z",
"type": "absolute"
}
# Base query structure
query_payload = {
"interval": "PT0H", # Aggregate into a single bucket
"metrics": [
"totalHandled",
"totalAbandoned",
"avgTalkTime",
"avgHoldTime",
"avgWaitTime"
],
"groupBys": [
"queueId",
"mediaType"
],
"timeGroup": time_group,
"filter": {
"type": "and",
"predicates": []
}
}
# Add queue filter if specific queues are requested
if queue_ids and len(queue_ids) > 0:
query_payload["filter"]["predicates"].append({
"type": "in",
"field": "queueId",
"values": queue_ids
})
return query_payload
Step 2: Executing the Query with Raw HTTP
While SDKs are convenient, understanding the raw HTTP request is essential for debugging. The following function sends the query constructed in Step 1.
Note on Rate Limiting: The Analytics API is subject to strict rate limits (typically 10-20 requests per minute per tenant). If you receive a 429 Too Many Requests, you must wait before retrying. The code below includes a basic retry mechanism for 429s.
import asyncio
ANALYTICS_URL = f"https://{ENVIRONMENT}.mypurecloud.com/api/v2/analytics/conversations/details/query"
async def fetch_analytics_data(query_payload: dict, token: str, max_retries: int = 3) -> dict:
"""
Sends the analytics query to Genesys Cloud.
Args:
query_payload: The JSON body for the query.
token: The OAuth access token.
max_retries: Number of retries for 429 errors.
Returns:
dict: The parsed JSON response.
"""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
async with httpx.AsyncClient(timeout=30.0) as client:
for attempt in range(max_retries):
try:
response = await client.post(
ANALYTICS_URL,
headers=headers,
json=query_payload
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Retry-After header is usually present
wait_time = int(response.headers.get("Retry-After", 2 ** attempt))
print(f"Rate limited. Waiting {wait_time} seconds before retry {attempt + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
# Handle other errors
print(f"Request failed with status {response.status_code}: {response.text}")
raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
except httpx.RequestError as e:
print(f"Network error: {e}")
break
raise Exception("Max retries exceeded for analytics query.")
Step 3: Processing Results with the SDK
The raw JSON response from the Analytics API can be deeply nested. The Genesys Cloud Python SDK provides dataclasses that map directly to this structure, making it easier to iterate over results without manual type checking.
The response object contains a results list. Each item in this list represents a unique combination of the groupBys (Queue ID + Media Type) and contains the calculated metrics.
from genesyscloud.rest import Configuration
from genesyscloud.analytics.api import AnalyticsApi
from genesyscloud.analytics.model.conversation_details_query import ConversationDetailsQuery
from genesyscloud.analytics.model.conversation_details_query_result import ConversationDetailsQueryResult
def process_analytics_with_sdk(token: str) -> list[dict]:
"""
Uses the official SDK to fetch and parse analytics data.
Args:
token: OAuth access token.
Returns:
list[dict]: A simplified list of analytics records.
"""
# Configure the SDK client
configuration = Configuration(
host=f"https://{ENVIRONMENT}.mypurecloud.com",
access_token=token
)
with AnalyticsApi(configuration) as analytics_api:
# Construct the query using SDK models
# Note: The SDK models require specific datetime objects and enums
from datetime import datetime, timedelta
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
# Create the query object
query = ConversationDetailsQuery(
interval="PT0H",
metrics=["totalHandled", "totalAbandoned", "avgTalkTime"],
group_bys=["queueId", "mediaType"],
time_group={
"start": start_time.isoformat() + "Z",
"end": end_time.isoformat() + "Z",
"type": "absolute"
}
)
try:
# Execute the query
api_response = analytics_api.post_analytics_conversations_details_query(
body=query
)
# api_response is a ConversationDetailsQueryResult
# It contains a 'results' attribute which is a list of ConversationDetailsQueryResultData
simplified_results = []
if api_response.results:
for result_item in api_response.results:
# Extract metrics safely
total_handled = 0
total_abandoned = 0
avg_talk = 0.0
if result_item.metrics:
for metric in result_item.metrics:
if metric.name == "totalHandled":
total_handled = metric.total or 0
elif metric.name == "totalAbandoned":
total_abandoned = metric.total or 0
elif metric.name == "avgTalkTime":
avg_talk = metric.average or 0.0
simplified_results.append({
"queue_id": result_item.queue_id,
"media_type": result_item.media_type,
"total_handled": total_handled,
"total_abandoned": total_abandoned,
"avg_talk_time_seconds": avg_talk
})
return simplified_results
except Exception as e:
print(f"SDK Error: {e}")
raise
Complete Working Example
The following script combines authentication, query construction, and result processing into a single runnable module. It uses the raw HTTP approach for maximum transparency but can be adapted to the SDK pattern shown above.
import asyncio
import httpx
import os
import json
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
# --- Configuration ---
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mytmyt")
OAUTH_URL = f"https://{ENVIRONMENT}.mypurecloud.com/oauth/token"
ANALYTICS_URL = f"https://{ENVIRONMENT}.mypurecloud.com/api/v2/analytics/conversations/details/query"
# --- Authentication ---
async def get_access_token() -> str:
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("Missing Credentials")
async with httpx.AsyncClient() as client:
response = await client.post(
OAUTH_URL,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
"grant_type": "client_credentials",
"scope": "analytics:conversation:read analytics:dashboard:read"
}
)
response.raise_for_status()
return response.json()["access_token"]
# --- Query Construction ---
def build_query() -> dict:
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
return {
"interval": "PT0H",
"metrics": [
"totalHandled",
"totalAbandoned",
"avgTalkTime",
"avgHoldTime"
],
"groupBys": [
"queueId",
"mediaType"
],
"timeGroup": {
"start": start_time.isoformat() + "Z",
"end": end_time.isoformat() + "Z",
"type": "absolute"
},
"filter": {
"type": "and",
"predicates": []
}
}
# --- Execution ---
async def run_analytics_report():
try:
# 1. Get Token
print("Authenticating...")
token = await get_access_token()
# 2. Build Query
query_payload = build_query()
print(f"Query Interval: {query_payload['interval']}")
print(f"Group By: {query_payload['groupBys']}")
# 3. Execute Query
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
ANALYTICS_URL,
headers=headers,
json=query_payload
)
if response.status_code == 200:
data = response.json()
# 4. Process Results
print("\n--- Analytics Results ---")
print(f"{'Queue ID':<35} | {'Media Type':<10} | {'Handled':<8} | {'Abandoned':<10} | {'Avg Talk (s)':<12}")
print("-" * 80)
if "results" in data and data["results"]:
for item in data["results"]:
queue_id = item.get("queueId", "N/A")
media_type = item.get("mediaType", "N/A")
# Extract metrics from the nested metrics array
metrics = {m["name"]: m for m in item.get("metrics", [])}
handled = metrics.get("totalHandled", {}).get("total", 0)
abandoned = metrics.get("totalAbandoned", {}).get("total", 0)
avg_talk = metrics.get("avgTalkTime", {}).get("average", 0.0)
print(f"{queue_id:<35} | {media_type:<10} | {handled:<8} | {abandoned:<10} | {avg_talk:<12.2f}")
else:
print("No results found for the specified criteria.")
else:
print(f"API Error {response.status_code}: {response.text}")
except Exception as e:
print(f"Fatal Error: {e}")
if __name__ == "__main__":
asyncio.run(run_analytics_report())
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token is invalid, expired, or missing the required scope.
- Fix: Ensure your
client_idandclient_secretare correct. Verify that the scopeanalytics:conversation:readis included in the token request. Tokens expire after one hour; implement a refresh mechanism for long-running scripts.
Error: 403 Forbidden
- Cause: The service account lacks permissions to view analytics data for the specified queues or the tenant.
- Fix: In the Genesys Cloud Admin portal, navigate to the Service Account user. Ensure the “Analytics” capability is enabled. Check that the account has “Read” access to the specific Queues if you are filtering by them.
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the Analytics API. Genesys Cloud enforces strict limits on analytics queries to protect database performance.
- Fix: Implement exponential backoff. Check the
Retry-Afterheader in the response. Do not fire multiple analytics queries in parallel without staggering them. Cache results if the data does not need to be real-time.
Error: Empty Results
- Cause: The time window is too narrow, or the filters are too restrictive.
- Fix: Verify the
timeGroupstart and end times. Ensure the timezone is correct (UTC is required for the API). If filtering byqueueId, confirm that the queue IDs exist and had activity during the specified period. Check thatmediaTypematches actual data (e.g.,"voice","chat","callback").