Constructing Analytics Aggregation Queries Grouped by Queue and Media Type
What You Will Build
- You will build a Python script that queries the Genesys Cloud Analytics API to retrieve conversation metrics aggregated by queue and media type.
- This tutorial uses the Genesys Cloud
analyticsdomain, specifically thePOST /api/v2/analytics/conversations/details/queryendpoint. - The implementation is written in Python using the
requestslibrary for HTTP handling andjsonfor payload construction.
Prerequisites
- OAuth Client Type: Service Account (Client Credentials Grant) or User Agent (Authorization Code Grant). Service Account is recommended for background aggregation jobs.
- Required Scopes:
analytics:conversation:view(Required to query conversation details)analytics:report:view(Optional, if you plan to save this as a saved report later, but not required for the query itself)
- SDK/Library Version: This tutorial uses raw HTTP via
requeststo expose the exact JSON structure required by the API. If using the official Python SDK (genesyscloudv13+), the underlying JSON structure remains identical. - Runtime Requirements: Python 3.8+
- External Dependencies:
requests(for HTTP calls)python-dateutil(for parsing ISO dates, though standard librarydatetimeis sufficient for this example)
Authentication Setup
Genesys Cloud APIs require a valid Bearer token in the Authorization header. For automated scripts, the Client Credentials Grant is the standard pattern. You must obtain a token from the OAuth2 endpoint before making any API calls.
The following function handles token retrieval and basic caching. In a production environment, you should implement a TTL (Time-To-Live) cache to avoid requesting a new token on every single API call, as tokens are valid for one hour.
import requests
import json
import time
from typing import Optional
class GenesysClient:
def __init__(self, client_id: str, client_secret: str, org_id: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self._access_token: Optional[str] = None
self._token_expiry: float = 0.0
def _get_token(self) -> str:
"""
Retrieves an OAuth2 access token using Client Credentials Grant.
Returns:
str: The access token.
"""
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
# Note: In a real production script, check if self._access_token is still valid
# against self._token_expiry before making this network call.
try:
response = requests.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data["access_token"]
self._token_expiry = time.time() + token_data["expires_in"]
return self._access_token
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise ValueError("Invalid Client ID or Secret. Check your credentials.") from e
raise e
def _get_headers(self) -> dict:
"""
Returns the headers required for all Genesys Cloud API calls.
"""
if not self._access_token or time.time() >= self._token_expiry:
self._get_token()
return {
"Authorization": f"Bearer {self._access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
Implementation
Step 1: Constructing the Analytics Query Payload
The core of this tutorial is constructing the correct JSON payload for the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint uses a complex nested structure to define time ranges, groupings, and metrics.
To group by Queue and Media Type, you must define these entities in the groupBy array. Genesys Cloud analytics allows grouping by multiple dimensions, but the order matters for the resulting JSON structure. The first item in groupBy becomes the primary key, and subsequent items become nested keys.
Critical Parameters:
dateFrom/dateTo: Must be in ISO 8601 format.groupBy: An array of strings. Use"queue"and"mediaType".metrics: An array of metric IDs. For a basic volume check, useconversationsHandled. For wait times, usewaitTime.interval: Defines the time resolution. Use"P1D"for daily,"PT1H"for hourly, or"PT15M"for 15-minute intervals.
def build_aggregation_query(date_from: str, date_to: str, interval: str = "P1D") -> dict:
"""
Constructs the analytics query payload.
Args:
date_from: Start date in ISO 8601 format (e.g., "2023-10-01T00:00:00.000Z")
date_to: End date in ISO 8601 format (e.g., "2023-10-02T00:00:00.000Z")
interval: Time interval string. Default is P1D (Daily).
Returns:
dict: The JSON payload ready for POST request.
"""
payload = {
"dateFrom": date_from,
"dateTo": date_to,
"interval": interval,
"groupBy": [
"queue",
"mediaType"
],
"metrics": [
"conversationsHandled",
"waitTime",
"handleTime"
],
"filter": {
"type": "and",
"clauses": []
# Optional: Add filters here if you want to exclude specific queues or media types
# Example:
# {
# "type": "in",
# "field": "queueId",
# "values": ["queue-id-1", "queue-id-2"]
# }
}
}
return payload
Step 2: Executing the Query and Handling Pagination
The Analytics API does not return all data in a single response if the dataset is large. It uses pagination via the nextPageToken field. You must loop through responses until nextPageToken is null or empty.
Additionally, the Analytics API is subject to rate limiting. A 429 status code requires an exponential backoff strategy.
import time
def fetch_analytics_data(client: GenesysClient, query_payload: dict) -> list:
"""
Executes the analytics query and handles pagination.
Args:
client: The GenesysClient instance.
query_payload: The dictionary payload from build_aggregation_query.
Returns:
list: A list of all response objects from the API.
"""
url = f"{client.base_url}/api/v2/analytics/conversations/details/query"
headers = client._get_headers()
all_results = []
page_token = None
max_retries = 3
while True:
# Prepare request data
data = query_payload.copy()
# Add pagination token if it exists
if page_token:
data["pageToken"] = page_token
retries = 0
while retries < max_retries:
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
all_results.append(result)
# Check for next page
page_token = result.get("nextPageToken")
if not page_token:
return all_results
# Small delay to be respectful of rate limits between pages
time.sleep(0.5)
break # Break retry loop, continue pagination loop
elif response.status_code == 429:
# Rate limit hit
retries += 1
wait_time = 2 ** retries
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
elif response.status_code in [401, 403]:
# Token expired or insufficient permissions
client._access_token = None # Force token refresh
headers = client._get_headers()
retries += 1
else:
response.raise_for_status()
except requests.exceptions.RequestException as e:
retries += 1
if retries >= max_retries:
raise Exception(f"Failed to fetch analytics data after {max_retries} retries: {e}")
time.sleep(2 ** retries)
return all_results
Step 3: Processing the Nested GroupBy Structure
When you group by multiple fields (queue and mediaType), the response structure is nested. The partitions object in the response will have keys representing the first group-by field (Queue IDs). Each Queue ID key will contain another object with keys representing the second group-by field (Media Types).
Response Structure Example:
{
"partitions": {
"queue-id-123": {
"voice": {
"metrics": {
"conversationsHandled": { "value": 150 }
}
},
"chat": {
"metrics": {
"conversationsHandled": { "value": 45 }
}
}
},
"queue-id-456": {
"voice": {
"metrics": {
"conversationsHandled": { "value": 200 }
}
}
}
}
}
The following function flattens this nested structure into a list of dictionaries, which is easier to export to CSV or load into a DataFrame.
def flatten_analytics_results(all_results: list) -> list:
"""
Flattens the nested partitions from the analytics API response.
Args:
all_results: List of response dictionaries from fetch_analytics_data.
Returns:
list: List of dictionaries with flat keys: queueId, mediaType, metricName, value.
"""
flat_data = []
for result in all_results:
partitions = result.get("partitions", {})
# Iterate over Queue IDs (First GroupBy)
for queue_id, queue_data in partitions.items():
# queue_data is a dict of mediaType -> metrics
# Iterate over Media Types (Second GroupBy)
for media_type, media_data in queue_data.items():
metrics = media_data.get("metrics", {})
# Iterate over Metrics
for metric_name, metric_data in metrics.items():
value = metric_data.get("value", 0)
flat_data.append({
"queueId": queue_id,
"mediaType": media_type,
"metricName": metric_name,
"value": value,
"interval": result.get("interval", "unknown")
})
return flat_data
Complete Working Example
The following script combines all previous steps into a single runnable file. Replace the placeholder credentials with your actual Genesys Cloud Service Account details.
import requests
import json
import time
from typing import Optional, List, Dict
# ==========================
# Configuration
# ==========================
CLIENT_ID = "your-client-id-here"
CLIENT_SECRET = "your-client-secret-here"
ORG_ID = "your-org-id-here"
BASE_URL = "https://api.mypurecloud.com"
# Date range for the query (ISO 8601)
# Example: Last 24 hours
from datetime import datetime, timedelta
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=1)
DATE_FROM = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
DATE_TO = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# ==========================
# Genesys Client Class
# ==========================
class GenesysClient:
def __init__(self, client_id: str, client_secret: str, org_id: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.org_id = org_id
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self._access_token: Optional[str] = None
self._token_expiry: float = 0.0
def _get_token(self) -> str:
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data["access_token"]
self._token_expiry = time.time() + token_data["expires_in"]
return self._access_token
def _get_headers(self) -> dict:
if not self._access_token or time.time() >= self._token_expiry:
self._get_token()
return {
"Authorization": f"Bearer {self._access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
# ==========================
# Logic Functions
# ==========================
def build_aggregation_query(date_from: str, date_to: str, interval: str = "P1D") -> dict:
return {
"dateFrom": date_from,
"dateTo": date_to,
"interval": interval,
"groupBy": [
"queue",
"mediaType"
],
"metrics": [
"conversationsHandled",
"waitTime",
"handleTime"
],
"filter": {
"type": "and",
"clauses": []
}
}
def fetch_analytics_data(client: GenesysClient, query_payload: dict) -> list:
url = f"{client.base_url}/api/v2/analytics/conversations/details/query"
headers = client._get_headers()
all_results = []
page_token = None
max_retries = 3
while True:
data = query_payload.copy()
if page_token:
data["pageToken"] = page_token
retries = 0
while retries < max_retries:
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
all_results.append(result)
page_token = result.get("nextPageToken")
if not page_token:
return all_results
time.sleep(0.5)
break
elif response.status_code == 429:
retries += 1
wait_time = 2 ** retries
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
elif response.status_code in [401, 403]:
client._access_token = None
headers = client._get_headers()
retries += 1
else:
response.raise_for_status()
except requests.exceptions.RequestException as e:
retries += 1
if retries >= max_retries:
raise Exception(f"Failed to fetch analytics data: {e}")
time.sleep(2 ** retries)
return all_results
def flatten_analytics_results(all_results: list) -> list:
flat_data = []
for result in all_results:
partitions = result.get("partitions", {})
for queue_id, queue_data in partitions.items():
for media_type, media_data in queue_data.items():
metrics = media_data.get("metrics", {})
for metric_name, metric_data in metrics.items():
value = metric_data.get("value", 0)
flat_data.append({
"queueId": queue_id,
"mediaType": media_type,
"metricName": metric_name,
"value": value,
"interval": result.get("interval", "unknown")
})
return flat_data
# ==========================
# Main Execution
# ==========================
if __name__ == "__main__":
print("Initializing Genesys Client...")
client = GenesysClient(CLIENT_ID, CLIENT_SECRET, ORG_ID, BASE_URL)
print("Building Query...")
query = build_aggregation_query(DATE_FROM, DATE_TO, interval="P1D")
print(f"Fetching data from {DATE_FROM} to {DATE_TO}...")
try:
raw_results = fetch_analytics_data(client, query)
print(f"Received {len(raw_results)} page(s) of data.")
print("Flattening results...")
flat_data = flatten_analytics_results(raw_results)
print(f"Total rows extracted: {len(flat_data)}")
# Output sample data
if flat_data:
print("\nSample Data (First 5 rows):")
print(json.dumps(flat_data[:5], indent=2))
else:
print("No data found for the specified criteria.")
except Exception as e:
print(f"Error: {e}")
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or the Client ID/Secret is incorrect.
Fix: Ensure your CLIENT_ID and CLIENT_SECRET are correct. Check that the Service Account has not been disabled. In the code, the _get_token method handles refresh, but if the credentials themselves are wrong, it will throw a 401 on the token endpoint.
Error: 403 Forbidden
Cause: The Service Account lacks the required scope analytics:conversation:view.
Fix: Go to the Genesys Cloud Admin Console → Platform → Integrations → [Your Integration]. Edit the OAuth Scopes and ensure analytics:conversation:view is checked. Save the integration and regenerate the Client Secret if prompted.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Analytics API. Analytics queries are computationally expensive.
Fix: The code includes a retry mechanism with exponential backoff. If you still see 429s, increase the time.sleep duration between requests or reduce the frequency of your queries. Do not parallelize analytics queries aggressively.
Error: Empty Partitions
Cause: The query returned successfully, but partitions is empty.
Fix:
- Check the
dateFromanddateTorange. Ensure it is not in the future. - Ensure the
intervalis appropriate. If you query a 1-hour range withP1D(Daily), you may get aggregated data, but if there is no data in that day, it will be empty. - Verify that the queues you expect to see actually had conversations in that media type during that time.
Error: KeyError in Flatten Function
Cause: The API response structure changed, or the metric requested does not exist for a specific partition.
Fix: The flatten function uses .get() methods with default values (e.g., metrics.get("metrics", {})). If you encounter a KeyError, inspect the raw result JSON. Sometimes, if a queue has no data for a specific media type, that media type key might be omitted entirely from the nested object. The current code handles this by iterating over existing keys only.