Querying Genesys Cloud Analytics by Queue and Media Type
What You Will Build
- A Python script that constructs a valid aggregation query to retrieve conversation metrics grouped by queue ID and media type.
- This tutorial uses the Genesys Cloud
analytics/conversations/details/queryendpoint and the officialgenesys-cloud-sdk-pythonlibrary. - The implementation is written in Python 3.9+ using the
requestslibrary for low-level HTTP control and the SDK for object construction.
Prerequisites
- OAuth Client Type: Service Account or Web Application (Client Credentials Grant).
- Required Scopes:
analytics:query:readis mandatory for executing aggregation queries. - SDK Version:
genesys-cloud-sdk-python>= 140.0.0. - Runtime: Python 3.9 or higher.
- Dependencies:
genesys-cloud-sdk-python: The official client library.httpx: A modern HTTP client library for making raw requests to verify payload structure.
Install the dependencies:
pip install genesys-cloud-sdk-python httpx
Authentication Setup
Genesys Cloud APIs require a valid access token obtained via OAuth 2.0. For backend integrations, the Client Credentials Grant is the standard flow. You must create a Service Account in the Genesys Cloud Admin Console and assign it the analytics:query:read scope.
The following code demonstrates how to obtain a token using httpx. In production, you should cache this token and refresh it before expiration (tokens typically last 1 hour).
import httpx
import os
import time
# Configuration from environment variables
CLIENT_ID = os.environ.get("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.environ.get("GENESYS_CLIENT_SECRET")
REGION = os.environ.get("GENESYS_REGION", "us-east-1")
def get_access_token() -> str:
"""
Retrieves an OAuth 2.0 access token from Genesys Cloud.
"""
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
base_url = f"https://{REGION}.mygenesys.com"
url = f"{base_url}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
try:
response = httpx.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
except httpx.HTTPStatusError as e:
print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
print(f"Unexpected error during authentication: {e}")
raise
# Retrieve token
token = get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
Implementation
Step 1: Constructing the Aggregation Query Body
The analytics/conversations/details/query endpoint accepts a complex JSON body defining the date range, metrics, and groupings. The critical component for this tutorial is the groupings array. To group by queue and media type, you must specify the type and id for each grouping dimension.
- Queue Grouping:
type: "queue",id: "id"(ornameif you prefer name-based grouping, butidis recommended for performance and consistency). - Media Type Grouping:
type: "mediaType",id: "id".
The following code constructs the query payload. Note the use of startTime and endTime in ISO 8601 format.
from datetime import datetime, timedelta
def build_query_payload(start_time: datetime, end_time: datetime) -> dict:
"""
Constructs the JSON payload for the analytics aggregation query.
Groups by Queue ID and Media Type.
"""
# Define the metrics to retrieve.
# 'totalConversations' is a standard metric available across most media types.
metrics = ["totalConversations", "totalHandledConversations"]
payload = {
"dateFrom": start_time.isoformat(),
"dateTo": end_time.isoformat(),
"interval": "P1D", # Daily interval (ISO 8601 Duration format)
"metricFilters": [],
"metrics": metrics,
"groupings": [
{
"type": "queue",
"id": "id"
},
{
"type": "mediaType",
"id": "id"
}
],
"pageSize": 1000,
"totalCount": True
}
return payload
# Example usage: Last 7 days
end_dt = datetime.utcnow()
start_dt = end_dt - timedelta(days=7)
query_body = build_query_payload(start_dt, end_dt)
print("Query Payload:")
import json
print(json.dumps(query_body, indent=2))
Expected Response Structure Preview:
When sent to the API, this payload will return a JSON object containing a data array. Each item in data represents a unique combination of Queue and Media Type for a specific time interval.
{
"data": [
{
"interval": "2023-10-25T00:00:00Z",
"groups": [
{
"queue": {
"id": "12345678-1234-1234-1234-123456789012",
"name": "Sales Support"
},
"mediaType": {
"id": "voice",
"name": "Voice"
},
"metricResults": {
"totalConversations": {
"value": 150
},
"totalHandledConversations": {
"value": 145
}
}
},
{
"queue": {
"id": "12345678-1234-1234-1234-123456789012",
"name": "Sales Support"
},
"mediaType": {
"id": "chat",
"name": "Chat"
},
"metricResults": {
"totalConversations": {
"value": 30
},
"totalHandledConversations": {
"value": 30
}
}
}
]
}
],
"totalCount": 14,
"nextPageToken": null
}
Step 2: Executing the Query with Retry Logic
Analytics queries can be resource-intensive. Genesys Cloud may return a 429 Too Many Requests if you exceed rate limits, or a 503 Service Unavailable if the analytics engine is under heavy load. Production code must implement exponential backoff for retries.
We will use httpx to send the raw request to demonstrate full control over headers and retries, rather than relying solely on the SDK’s internal retry mechanism, which can sometimes obscure specific error details.
import httpx
import time
import random
def query_analytics(base_url: str, headers: dict, payload: dict) -> dict:
"""
Executes the analytics query with exponential backoff retry logic.
"""
url = f"{base_url}/api/v2/analytics/conversations/details/query"
max_retries = 3
base_delay = 2.0 # seconds
for attempt in range(max_retries):
try:
response = httpx.post(url, headers=headers, json=payload, timeout=60.0)
# Handle Rate Limiting
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
print(f"Rate limited. Retrying in {retry_after} seconds...")
time.sleep(retry_after)
continue
# Handle Service Unavailable
if response.status_code == 503:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Service unavailable. Retrying in {delay:.2f} seconds...")
time.sleep(delay)
continue
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in [401, 403]:
print(f"Authentication/Authorization error: {e.response.status_code}")
raise
elif e.response.status_code == 422:
print(f"Unprocessable Entity. Check query payload syntax: {e.response.text}")
raise
else:
print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
print(f"Network error: {e}")
time.sleep(base_delay * (2 ** attempt))
raise Exception("Max retries exceeded for analytics query.")
# Execute the query
region = os.environ.get("GENESYS_REGION", "us-east-1")
base_url = f"https://{region}.mygenesys.com"
try:
result = query_analytics(base_url, headers, query_body)
print("Query successful.")
except Exception as e:
print(f"Failed to retrieve analytics data: {e}")
Step 3: Processing and Flattening Results
The response from analytics/conversations/details/query is nested. The data array contains intervals. Inside each interval, the groups array contains the aggregated metrics for each unique combination of the grouping dimensions (Queue and Media Type).
To make this data useful for reporting or database insertion, we should flatten it into a list of dictionaries, where each dictionary represents a single row of data (Queue ID, Queue Name, Media Type, Metric Value, Interval).
def flatten_analytics_data(response: dict) -> list:
"""
Flattens the nested analytics response into a list of flat records.
"""
flat_records = []
if "data" not in response:
return flat_records
for interval_data in response["data"]:
interval = interval_data.get("interval")
groups = interval_data.get("groups", [])
for group in groups:
queue_info = group.get("queue", {})
media_info = group.get("mediaType", {})
metrics = group.get("metricResults", {})
# Extract metric values safely
total_conversations = metrics.get("totalConversations", {}).get("value", 0)
total_handled = metrics.get("totalHandledConversations", {}).get("value", 0)
record = {
"interval": interval,
"queue_id": queue_info.get("id"),
"queue_name": queue_info.get("name"),
"media_type_id": media_info.get("id"),
"media_type_name": media_info.get("name"),
"total_conversations": total_conversations,
"total_handled_conversations": total_handled
}
flat_records.append(record)
return flat_records
# Process the result from Step 2
if 'result' in locals():
flat_data = flatten_analytics_data(result)
for row in flat_data[:5]: # Print first 5 rows
print(row)
Complete Working Example
The following script combines authentication, query construction, execution with retries, and result flattening into a single runnable module.
import os
import httpx
import time
import random
import json
from datetime import datetime, timedelta
# --- Configuration ---
CLIENT_ID = os.environ.get("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.environ.get("GENESYS_CLIENT_SECRET")
REGION = os.environ.get("GENESYS_REGION", "us-east-1")
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
BASE_URL = f"https://{REGION}.mygenesys.com"
def get_access_token() -> str:
"""Retrieves an OAuth 2.0 access token."""
url = f"{BASE_URL}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = httpx.post(url, headers=headers, data=data)
response.raise_for_status()
return response.json()["access_token"]
def build_query_payload(start_time: datetime, end_time: datetime) -> dict:
"""Constructs the analytics aggregation query payload."""
return {
"dateFrom": start_time.isoformat() + "Z",
"dateTo": end_time.isoformat() + "Z",
"interval": "P1D",
"metrics": ["totalConversations", "totalHandledConversations"],
"groupings": [
{"type": "queue", "id": "id"},
{"type": "mediaType", "id": "id"}
],
"pageSize": 1000,
"totalCount": True
}
def execute_query(headers: dict, payload: dict) -> dict:
"""Executes the query with exponential backoff."""
url = f"{BASE_URL}/api/v2/analytics/conversations/details/query"
max_retries = 3
base_delay = 2.0
for attempt in range(max_retries):
try:
response = httpx.post(url, headers=headers, json=payload, timeout=60.0)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
time.sleep(retry_after)
continue
if response.status_code == 503:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
continue
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in [401, 403, 422]:
raise
print(f"HTTP Error {e.response.status_code}: {e.response.text}")
time.sleep(base_delay * (2 ** attempt))
except httpx.RequestError as e:
print(f"Network error: {e}")
time.sleep(base_delay * (2 ** attempt))
raise Exception("Max retries exceeded.")
def flatten_data(response: dict) -> list:
"""Flattens nested analytics response."""
flat_records = []
for interval_data in response.get("data", []):
interval = interval_data.get("interval")
for group in interval_data.get("groups", []):
queue = group.get("queue", {})
media = group.get("mediaType", {})
metrics = group.get("metricResults", {})
flat_records.append({
"interval": interval,
"queue_id": queue.get("id"),
"queue_name": queue.get("name"),
"media_type": media.get("id"),
"total_conversations": metrics.get("totalConversations", {}).get("value", 0),
"total_handled": metrics.get("totalHandledConversations", {}).get("value", 0)
})
return flat_records
if __name__ == "__main__":
print("Starting Analytics Query...")
# 1. Authenticate
token = get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# 2. Build Query
end_dt = datetime.utcnow()
start_dt = end_dt - timedelta(days=7)
payload = build_query_payload(start_dt, end_dt)
print(f"Querying data from {start_dt.isoformat()} to {end_dt.isoformat()}")
# 3. Execute
try:
result = execute_query(headers, payload)
# 4. Process
flat_data = flatten_data(result)
print(f"Retrieved {len(flat_data)} records.")
# Output sample
for record in flat_data[:3]:
print(json.dumps(record, indent=2))
except Exception as e:
print(f"Error: {e}")
Common Errors & Debugging
Error: 422 Unprocessable Entity
Cause: The query payload contains invalid syntax, unsupported metrics, or conflicting groupings.
Fix:
- Verify that the
metricsarray contains only valid metric names for the selected media types. For example,talkTimeis not valid for Chat media. - Ensure
dateFromanddateToare in strict ISO 8601 format (e.g.,2023-10-01T00:00:00Z). - Check that
groupingsuse valid types (queue,mediaType,user, etc.).
# Debugging Tip: Print the payload before sending
print(json.dumps(payload, indent=2))
Error: 403 Forbidden
Cause: The OAuth token lacks the required scope.
Fix: Ensure the Service Account or Web Application used to generate the token has the analytics:query:read scope assigned in the Genesys Cloud Admin Console under Users > Service Accounts or Integrations > OAuth 2.0.
Error: 429 Too Many Requests
Cause: Rate limit exceeded. The Analytics API has strict rate limits, especially for heavy aggregation queries.
Fix: Implement exponential backoff as shown in the execute_query function. Monitor the Retry-After header in the response.
Error: Empty data Array
Cause: No conversations match the criteria.
Fix:
- Verify the date range contains actual conversation data.
- Ensure the
intervalis compatible with the date range. AP1Dinterval on a 1-hour range may return empty results if no daily bucket is fully formed. - Check if the queues specified (if using
metricFilters) exist and are active.