Constructing Analytics API Aggregation Queries Grouped by Queue and Media Type
What You Will Build
- A Python script that queries the Genesys Cloud CX Analytics API to retrieve conversation metrics aggregated by specific queue and media type.
- The solution utilizes the
POST /api/v2/analytics/conversations/details/queryendpoint to handle complex filtering and grouping requirements that exceed GET parameter limits. - The implementation is written in Python 3.9+ using the
requestslibrary for HTTP communication and standard library modules for JSON processing.
Prerequisites
- OAuth Client Type: Machine-to-Machine (MTM) client credentials.
- Required Scopes:
analytics:conversation:readis mandatory for accessing conversation details and aggregate metrics. - SDK/API Version: Genesys Cloud CX REST API v2.
- Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests(v2.28+). Install viapip install requests.
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 for authentication. For backend integrations and analytics queries, the Client Credentials flow is the standard approach. This flow exchanges your client ID and client secret for a short-lived access token.
The following function handles the token exchange. It caches the token to avoid unnecessary requests, as tokens are valid for one hour.
import requests
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.us.genesyscloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
def get_access_token(self) -> str:
# Check if we have a valid token (buffer 5 minutes before expiry)
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
# Exchange client credentials for a token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Genesys tokens typically expire in 3600 seconds (1 hour)
self.token_expiry = time.time() + token_data.get("expires_in", 3600)
return self.access_token
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception("Authentication failed: Invalid client ID or secret.")
elif e.response.status_code == 403:
raise Exception("Authentication failed: Client lacks permission or is disabled.")
else:
raise Exception(f"Authentication error: {e}")
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during authentication: {e}")
def get_headers(self) -> dict:
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.get_access_token()}"
}
Implementation
Step 1: Constructing the Query Payload
The Analytics API distinguishes between simple queries (via GET) and complex aggregations (via POST). When grouping by multiple dimensions like queue and mediaType, you must use the POST /api/v2/analytics/conversations/details/query endpoint. This allows you to pass a detailed JSON body containing groupBy, interval, metrics, and filters.
The groupBy array defines the dimensions for aggregation. Each dimension object must specify an id (the metric name) and a type (the dimension type, e.g., queue, mediaType).
def build_analytics_query(
start_time: str,
end_time: str,
queue_ids: list[str],
media_types: list[str] = None
) -> dict:
"""
Constructs the JSON payload for the Analytics API.
Args:
start_time: ISO 8601 start time (e.g., '2023-10-01T00:00:00.000Z')
end_time: ISO 8601 end time (e.g., '2023-10-02T00:00:00.000Z')
queue_ids: List of queue IDs to filter by.
media_types: Optional list of media types (voice, chat, email). Defaults to ['voice'] if not provided.
Returns:
dict: The query payload ready for POST.
"""
if not media_types:
media_types = ["voice"]
# Define the dimensions to group by
group_by = [
{"id": "queue", "type": "queue"},
{"id": "mediaType", "type": "mediaType"}
]
# Define the metrics to retrieve
# 'handledCount' is the number of conversations handled
# 'waitTime' is the total wait time in milliseconds
metrics = [
{"id": "handledCount"},
{"id": "waitTime"}
]
# Define the time interval
# 'R/DAILY' means rollup daily. Other options: R/HOURLY, R/WEEKLY
interval = "R/DAILY"
# Build the query object
query_payload = {
"groupBy": group_by,
"interval": interval,
"metrics": metrics,
"filter": {
"type": "and",
"clauses": [
{
"type": "dimension",
"dimension": "queueId",
"operator": "in",
"values": queue_ids
},
{
"type": "dimension",
"dimension": "mediaType",
"operator": "in",
"values": media_types
},
{
"type": "dimension",
"dimension": "conversationDateTime",
"operator": "ge",
"values": [start_time]
},
{
"type": "dimension",
"dimension": "conversationDateTime",
"operator": "lt",
"values": [end_time]
}
]
},
# Limit results to prevent memory overflow on large datasets
"pageSize": 1000,
"pageToken": None
}
return query_payload
Step 2: Executing the Query and Handling Pagination
The Analytics API returns paginated results. The pageToken in the response indicates if more data is available. You must loop through pages until the token is null. Additionally, the API enforces rate limits. If you receive a 429 Too Many Requests response, you must implement exponential backoff.
import json
import time
def fetch_analytics_data(auth: GenesysAuth, query_payload: dict) -> list[dict]:
"""
Executes the analytics query, handling pagination and rate limiting.
Args:
auth: GenesysAuth instance.
query_payload: The JSON payload from build_analytics_query.
Returns:
list: A flat list of all aggregated result rows.
"""
base_url = auth.base_url
endpoint = f"{base_url}/api/v2/analytics/conversations/details/query"
headers = auth.get_headers()
all_results = []
page_token = None
max_retries = 3
base_delay = 1 # Second
while True:
# Update pagetoken in payload if it exists
if page_token:
query_payload["pageToken"] = page_token
else:
# Ensure no pagetoken on first request if already set
if "pageToken" in query_payload:
del query_payload["pageToken"]
retries = 0
while retries < max_retries:
try:
response = requests.post(endpoint, json=query_payload, headers=headers)
if response.status_code == 200:
data = response.json()
rows = data.get("rows", [])
all_results.extend(rows)
# Check for next page
page_token = data.get("nextPageToken")
if not page_token:
return all_results
# Break retry loop to proceed to next page
break
elif response.status_code == 429:
# Rate limited
wait_time = base_delay * (2 ** retries)
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
retries += 1
elif response.status_code == 400:
# Bad Request - likely invalid query structure
raise Exception(f"Bad Request: {response.text}")
elif response.status_code == 401:
# Token might have expired, force refresh
auth.access_token = None
auth.token_expiry = 0
headers = auth.get_headers()
raise Exception("Token refreshed, retrying...")
else:
raise Exception(f"API Error {response.status_code}: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Network error: {e}. Retrying in {base_delay * (2 ** retries)}s...")
time.sleep(base_delay * (2 ** retries))
retries += 1
if retries == max_retries:
raise Exception("Max retries reached due to rate limiting or network errors.")
return all_results
Step 3: Processing and Formatting Results
The raw response contains nested objects. Each row in the rows array represents a unique combination of the grouped dimensions (Queue ID + Media Type) for the specified time interval. You must parse these to make them useful.
def process_results(results: list[dict], queue_map: dict[str, str]) -> list[dict]:
"""
Transforms raw API rows into a readable format.
Args:
results: List of row dictionaries from the API.
queue_map: Dictionary mapping queue IDs to queue names.
Returns:
list: List of cleaned data dictionaries.
"""
processed_data = []
for row in results:
# Extract dimension values
# The structure is typically: row -> dimensions -> [ {id: 'queue', value: 'queue_id'}, ... ]
dimensions = row.get("dimensions", [])
queue_id = None
media_type = None
interval_start = None
for dim in dimensions:
if dim.get("id") == "queue":
queue_id = dim.get("value")
elif dim.get("id") == "mediaType":
media_type = dim.get("value")
elif dim.get("id") == "interval":
interval_start = dim.get("value")
# Extract metric values
# The structure is: row -> metrics -> [ {id: 'handledCount', value: 10}, ... ]
metrics = row.get("metrics", [])
handled_count = 0
wait_time_ms = 0
for metric in metrics:
if metric.get("id") == "handledCount":
handled_count = metric.get("value", 0)
elif metric.get("id") == "waitTime":
wait_time_ms = metric.get("value", 0)
# Resolve queue name
queue_name = queue_map.get(queue_id, f"Unknown Queue ({queue_id})")
processed_data.append({
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"interval_start": interval_start,
"handled_count": handled_count,
"wait_time_seconds": round(wait_time_ms / 1000, 2)
})
return processed_data
Complete Working Example
This script combines authentication, query construction, execution, and processing. It retrieves analytics for the last 24 hours for a specified set of queues.
import os
import sys
from datetime import datetime, timedelta
import requests
# Import classes defined above
# In a real project, these would be in separate modules
# from auth import GenesysAuth
# from query_builder import build_analytics_query
# from fetcher import fetch_analytics_data
# from processor import process_results
# Re-define classes here for copy-paste functionality
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.us.genesyscloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_access_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data.get("expires_in", 3600)
return self.access_token
except Exception as e:
raise Exception(f"Auth Error: {e}")
def get_headers(self) -> dict:
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.get_access_token()}"
}
def build_analytics_query(start_time: str, end_time: str, queue_ids: list, media_types: list = None) -> dict:
if not media_types:
media_types = ["voice"]
group_by = [
{"id": "queue", "type": "queue"},
{"id": "mediaType", "type": "mediaType"}
]
metrics = [
{"id": "handledCount"},
{"id": "waitTime"}
]
interval = "R/DAILY"
query_payload = {
"groupBy": group_by,
"interval": interval,
"metrics": metrics,
"filter": {
"type": "and",
"clauses": [
{"type": "dimension", "dimension": "queueId", "operator": "in", "values": queue_ids},
{"type": "dimension", "dimension": "mediaType", "operator": "in", "values": media_types},
{"type": "dimension", "dimension": "conversationDateTime", "operator": "ge", "values": [start_time]},
{"type": "dimension", "dimension": "conversationDateTime", "operator": "lt", "values": [end_time]}
]
},
"pageSize": 1000
}
return query_payload
def fetch_analytics_data(auth: GenesysAuth, query_payload: dict) -> list:
base_url = auth.base_url
endpoint = f"{base_url}/api/v2/analytics/conversations/details/query"
headers = auth.get_headers()
all_results = []
page_token = None
max_retries = 3
base_delay = 1
while True:
if page_token:
query_payload["pageToken"] = page_token
elif "pageToken" in query_payload:
del query_payload["pageToken"]
retries = 0
while retries < max_retries:
try:
response = requests.post(endpoint, json=query_payload, headers=headers)
if response.status_code == 200:
data = response.json()
rows = data.get("rows", [])
all_results.extend(rows)
page_token = data.get("nextPageToken")
if not page_token:
return all_results
break
elif response.status_code == 429:
time.sleep(base_delay * (2 ** retries))
retries += 1
elif response.status_code == 401:
auth.access_token = None
auth.token_expiry = 0
headers = auth.get_headers()
raise Exception("Token refreshed")
else:
raise Exception(f"API Error: {response.text}")
except requests.exceptions.RequestException:
retries += 1
if retries == max_retries:
raise Exception("Max retries reached")
if retries == max_retries:
raise Exception("Max retries reached due to rate limiting")
return all_results
def process_results(results: list, queue_map: dict) -> list:
processed_data = []
for row in results:
dimensions = row.get("dimensions", [])
queue_id = media_type = interval_start = None
for dim in dimensions:
if dim.get("id") == "queue": queue_id = dim.get("value")
elif dim.get("id") == "mediaType": media_type = dim.get("value")
elif dim.get("id") == "interval": interval_start = dim.get("value")
metrics = row.get("metrics", [])
handled_count = wait_time_ms = 0
for metric in metrics:
if metric.get("id") == "handledCount": handled_count = metric.get("value", 0)
elif metric.get("id") == "waitTime": wait_time_ms = metric.get("value", 0)
queue_name = queue_map.get(queue_id, f"Unknown ({queue_id})")
processed_data.append({
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"interval_start": interval_start,
"handled_count": handled_count,
"wait_time_seconds": round(wait_time_ms / 1000, 2)
})
return processed_data
def get_queue_map(auth: GenesysAuth) -> dict:
"""Fetches all queues to map IDs to Names"""
headers = auth.get_headers()
base_url = auth.base_url
endpoint = f"{base_url}/api/v2/queues"
queue_map = {}
next_page_token = None
while True:
params = {"pageSize": 1000}
if next_page_token:
params["pageToken"] = next_page_token
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
data = response.json()
for queue in data.get("entities", []):
queue_map[queue["id"]] = queue["name"]
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
return queue_map
if __name__ == "__main__":
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
REGION = os.getenv("GENESYS_REGION", "us") # us, eu, au, jp
if not CLIENT_ID or not CLIENT_SECRET:
print("Error: Set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")
sys.exit(1)
# Determine base URL based on region
if REGION == "eu":
BASE_URL = "https://api.eus.genesyscloud.com"
elif REGION == "au":
BASE_URL = "https://api.ap-sydney-1.genesyscloud.com"
elif REGION == "jp":
BASE_URL = "https://api.jp-tokyo-1.genesyscloud.com"
else:
BASE_URL = "https://api.us.genesyscloud.com"
try:
# 1. Authenticate
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, BASE_URL)
print("Authenticated successfully.")
# 2. Get Queue Map for readable output
print("Fetching queue list...")
queue_map = get_queue_map(auth)
# Select first 5 queues for demonstration (or provide specific IDs)
target_queue_ids = list(queue_map.keys())[:5]
if not target_queue_ids:
print("No queues found.")
sys.exit(0)
print(f"Analyzing queues: {target_queue_ids}")
# 3. Define Time Range (Last 24 Hours)
end_time = datetime.utcnow().isoformat() + "Z"
start_time = (datetime.utcnow() - timedelta(days=1)).isoformat() + "Z"
# 4. Build Query
query_payload = build_analytics_query(start_time, end_time, target_queue_ids, media_types=["voice", "chat"])
# 5. Fetch Data
print("Fetching analytics data...")
raw_results = fetch_analytics_data(auth, query_payload)
# 6. Process Results
final_data = process_results(raw_results, queue_map)
# 7. Output
print(f"\nRetrieved {len(final_data)} aggregation rows.\n")
print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Handled':<10} | {'Wait (s)':<10} | {'Interval Start'}")
print("-" * 80)
for row in final_data:
print(f"{row['queue_name']:<20} | {row['media_type']:<10} | {row['handled_count']:<10} | {row['wait_time_seconds']:<10} | {row['interval_start']}")
except Exception as e:
print(f"Fatal Error: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 400 Bad Request - Invalid Filter Clause
Cause: The filter object in the query payload is malformed. Common mistakes include using invalid operators (e.g., equals instead of eq) or mismatched dimension types.
Fix: Ensure the operator matches the dimension type. For queueId (a string ID), use in for lists or eq for single values. For conversationDateTime (a string ISO date), use ge (greater than or equal) and lt (less than).
# Correct
{"type": "dimension", "dimension": "queueId", "operator": "in", "values": ["id1", "id2"]}
# Incorrect
{"type": "dimension", "dimension": "queueId", "operator": "equals", "values": ["id1"]} # 'equals' is not valid here
Error: 403 Forbidden - Insufficient Scope
Cause: The OAuth token was generated without the analytics:conversation:read scope.
Fix: Update your Machine-to-Machine application in the Genesys Cloud Admin Portal. Navigate to Admin > Apps & Integrations > Machine-to-Machine. Edit your client and ensure analytics:conversation:read is checked. Regenerate the secret if necessary, and update your environment variables.
Error: 429 Too Many Requests
Cause: The Analytics API has strict rate limits, particularly for complex aggregation queries. The default limit is often 10-20 requests per minute per tenant for analytics endpoints.
Fix: Implement exponential backoff as shown in the fetch_analytics_data function. If you are running multiple queries in parallel, serialize them or add a delay between requests.
# Example of adding a static delay if backoff is not sufficient
import time
time.sleep(2) # Wait 2 seconds before next request
Error: Empty Results
Cause: The time range does not overlap with any conversation data, or the queue IDs are invalid.
Fix: Verify the start_time and end_time are in correct ISO 8601 format with the āZā suffix (UTC). Check that the queue_ids exist and have had conversations in the specified media types during the window. Use the GET /api/v2/queues/{queueId} endpoint to verify queue existence.