Constructing Analytics Aggregation Queries by Queue and Media Type in Genesys Cloud
What You Will Build
- You will build a Python script that queries the Genesys Cloud Analytics API to retrieve interaction metrics aggregated by queue and media type.
- You will use the Genesys Cloud REST API directly via the
requestslibrary to construct the query payload. - You will use Python 3.9+ with type hints and robust error handling for production-grade reliability.
Prerequisites
- OAuth Client Type: Service Account or Client Credentials flow.
- Required Scopes:
analytics:query:readis mandatory. If querying specific conversation details,conversation:readmay be required, but for aggregate analytics, only the analytics scope is needed. - SDK/API Version: Genesys Cloud API v2.
- Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests(HTTP client)python-dotenv(for secure credential management)
Install the dependencies:
pip install requests python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials flow is the standard. This flow exchanges your client ID and secret for an access token. You must implement token caching to avoid hitting rate limits on the /oauth/token endpoint.
Create a .env file in your project root:
GENESYS_CLOUD_REGION=us-east-1 # Change to your region, e.g., eu-west-1
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
Create a helper module auth.py to handle token acquisition and caching:
import os
import time
import requests
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
# Configuration
REGION = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
# Base URLs
AUTH_URL = f"https://api.{REGION}.mypurecloud.com/oauth/token"
API_BASE_URL = f"https://api.{REGION}.mypurecloud.com/api/v2"
class GenesysAuth:
def __init__(self):
self.access_token: Optional[str] = None
self.token_expiry: float = 0
self.headers: dict = {}
def _get_token(self) -> str:
"""Exchanges client credentials for an access token."""
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("Client ID and Secret must be set in environment variables.")
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(AUTH_URL, data=payload)
response.raise_for_status()
return response.json()["access_token"]
def get_headers(self) -> dict:
"""Returns headers with a valid Bearer token, refreshing if necessary."""
# Check if token exists and is not expired (subtract 60s buffer)
if self.access_token and time.time() < (self.token_expiry - 60):
return self.headers
# Refresh token
token = self._get_token()
self.access_token = token
# Token expiry is typically 3600 seconds (1 hour)
self.token_expiry = time.time() + 3600
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
return self.headers
# Singleton instance for reuse
auth_client = GenesysAuth()
Implementation
The core of this tutorial involves constructing the correct JSON payload for the /api/v2/analytics/conversations/details/query endpoint. This endpoint accepts a POST request with a complex query body that defines what data to retrieve, how to group it, and what time range to cover.
Step 1: Constructing the Query Payload
The query body consists of several key components:
dateFrom/dateTo: The time range for the data.groupBy: A list of grouping dimensions. We will usequeueandmediaType.select: The metrics to aggregate (e.g.,conversationCount,handleTime).filter: Optional constraints to narrow down the data (e.g., specific queues or media types).
Here is the function to build this payload:
from datetime import datetime, timedelta
from typing import List, Dict, Any
def build_analytics_query(
days_back: int = 7,
group_by: List[str] = None,
select_metrics: List[str] = None,
filter_queues: List[str] = None
) -> Dict[str, Any]:
"""
Constructs a Genesys Cloud Analytics query payload.
Args:
days_back: Number of days to look back for data.
group_by: List of fields to group by (e.g., ['queue', 'mediaType']).
select_metrics: List of metrics to include in the response.
filter_queues: Optional list of queue IDs to filter by.
Returns:
A dictionary representing the JSON payload for the API.
"""
if group_by is None:
group_by = ["queue", "mediaType"]
if select_metrics is None:
select_metrics = [
"conversationCount",
"holdTime",
"talkTime",
"waitTime",
"wrapUpTime"
]
now = datetime.utcnow()
start_time = now - timedelta(days=days_back)
# Format dates to ISO 8601 with UTC timezone
date_from = start_time.isoformat() + "Z"
date_to = now.isoformat() + "Z"
payload: Dict[str, Any] = {
"dateFrom": date_from,
"dateTo": date_to,
"groupBy": group_by,
"select": select_metrics,
"timeGrain": "none" # No time aggregation, just total for the range
}
# Add filter if specific queues are requested
if filter_queues:
payload["filter"] = {
"type": "or",
"clauses": [
{
"type": "field",
"field": "queue.id",
"op": "in",
"values": filter_queues
}
]
}
else:
# Default filter to exclude empty queues if desired, or leave empty for all
# Here we leave it empty to query all queues
pass
return payload
Step 2: Executing the Query and Handling Pagination
The Analytics API supports large result sets. While aggregation queries often return fewer rows than detail queries, you must still handle pagination if the number of groups exceeds the page size (default is 100, max is 1000).
Create the execution function in main.py:
import json
import requests
from typing import List, Dict, Any
def execute_analytics_query(payload: Dict[str, Any], auth_headers: dict) -> List[Dict[str, Any]]:
"""
Executes the analytics query and handles pagination.
Args:
payload: The query payload constructed in Step 1.
auth_headers: Headers with valid OAuth token.
Returns:
A list of all result groups.
"""
url = f"{API_BASE_URL}/analytics/conversations/details/query"
all_results: List[Dict[str, Any]] = []
page_token = None
max_pages = 10 # Safety break to prevent infinite loops
for _ in range(max_pages):
# Add pagination token if present
if page_token:
payload["pageToken"] = page_token
print(f"Requesting data... (Page Token: {page_token or 'None'})")
try:
response = requests.post(
url,
json=payload,
headers=auth_headers
)
# Handle HTTP Errors
response.raise_for_status()
data = response.json()
# Extract results
if "results" in data and data["results"]:
all_results.extend(data["results"])
print(f"Retrieved {len(data['results'])} groups.")
else:
print("No results returned.")
break
# Check for next page
page_token = data.get("nextPageToken")
if not page_token:
print("No more pages.")
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
print("Rate limited. Waiting 5 seconds before retry...")
time.sleep(5)
# Retry the same request
continue
elif e.response.status_code == 401:
print("Authentication failed. Token may be expired.")
raise
else:
print(f"HTTP Error: {e.response.status_code}")
print(e.response.text)
raise
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
raise
return all_results
Step 3: Processing and Formatting Results
The raw response from Genesys Cloud contains grouped data. Each result object includes the grouping keys (e.g., queue.id, mediaType) and the selected metrics. You need to parse this into a clean structure for reporting or database insertion.
def process_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Flattens the analytics results into a clean list of dictionaries.
Args:
results: Raw results from the API.
Returns:
A list of simplified dictionaries.
"""
cleaned_data = []
for row in results:
# Extract group keys
queue_id = row.get("groupBy", {}).get("queue.id", "Unknown")
queue_name = row.get("groupBy", {}).get("queue.name", "Unknown")
media_type = row.get("groupBy", {}).get("mediaType", "Unknown")
# Extract metrics
metrics = row.get("metrics", {})
cleaned_row = {
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"conversation_count": metrics.get("conversationCount", 0),
"total_hold_time_sec": metrics.get("holdTime", 0),
"total_talk_time_sec": metrics.get("talkTime", 0),
"total_wait_time_sec": metrics.get("waitTime", 0),
"total_wrap_up_time_sec": metrics.get("wrapUpTime", 0)
}
cleaned_data.append(cleaned_row)
return cleaned_data
Complete Working Example
Combine all parts into a single executable script analytics_runner.py.
import os
import time
import json
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
# --- Authentication Module ---
load_dotenv()
REGION = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
AUTH_URL = f"https://api.{REGION}.mypurecloud.com/oauth/token"
API_BASE_URL = f"https://api.{REGION}.mypurecloud.com/api/v2"
class GenesysAuth:
def __init__(self):
self.access_token: Optional[str] = None
self.token_expiry: float = 0
self.headers: dict = {}
def _get_token(self) -> str:
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("Client ID and Secret must be set in environment variables.")
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(AUTH_URL, data=payload)
response.raise_for_status()
return response.json()["access_token"]
def get_headers(self) -> dict:
if self.access_token and time.time() < (self.token_expiry - 60):
return self.headers
token = self._get_token()
self.access_token = token
self.token_expiry = time.time() + 3600
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
return self.headers
auth_client = GenesysAuth()
# --- Query Construction ---
def build_analytics_query(
days_back: int = 7,
group_by: List[str] = None,
select_metrics: List[str] = None,
filter_queues: List[str] = None
) -> Dict[str, Any]:
if group_by is None:
group_by = ["queue", "mediaType"]
if select_metrics is None:
select_metrics = [
"conversationCount",
"holdTime",
"talkTime",
"waitTime",
"wrapUpTime"
]
now = datetime.utcnow()
start_time = now - timedelta(days=days_back)
date_from = start_time.isoformat() + "Z"
date_to = now.isoformat() + "Z"
payload: Dict[str, Any] = {
"dateFrom": date_from,
"dateTo": date_to,
"groupBy": group_by,
"select": select_metrics,
"timeGrain": "none"
}
if filter_queues:
payload["filter"] = {
"type": "or",
"clauses": [
{
"type": "field",
"field": "queue.id",
"op": "in",
"values": filter_queues
}
]
}
return payload
# --- Execution & Processing ---
def execute_analytics_query(payload: Dict[str, Any], auth_headers: dict) -> List[Dict[str, Any]]:
url = f"{API_BASE_URL}/analytics/conversations/details/query"
all_results: List[Dict[str, Any]] = []
page_token = None
max_pages = 10
for _ in range(max_pages):
if page_token:
payload["pageToken"] = page_token
print(f"Requesting data... (Page Token: {page_token or 'None'})")
try:
response = requests.post(
url,
json=payload,
headers=auth_headers
)
response.raise_for_status()
data = response.json()
if "results" in data and data["results"]:
all_results.extend(data["results"])
print(f"Retrieved {len(data['results'])} groups.")
else:
print("No results returned.")
break
page_token = data.get("nextPageToken")
if not page_token:
print("No more pages.")
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
print("Rate limited. Waiting 5 seconds before retry...")
time.sleep(5)
continue
elif e.response.status_code == 401:
print("Authentication failed.")
raise
else:
print(f"HTTP Error: {e.response.status_code}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
raise
return all_results
def process_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
cleaned_data = []
for row in results:
queue_id = row.get("groupBy", {}).get("queue.id", "Unknown")
queue_name = row.get("groupBy", {}).get("queue.name", "Unknown")
media_type = row.get("groupBy", {}).get("mediaType", "Unknown")
metrics = row.get("metrics", {})
cleaned_row = {
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"conversation_count": metrics.get("conversationCount", 0),
"total_hold_time_sec": metrics.get("holdTime", 0),
"total_talk_time_sec": metrics.get("talkTime", 0),
"total_wait_time_sec": metrics.get("waitTime", 0),
"total_wrap_up_time_sec": metrics.get("wrapUpTime", 0)
}
cleaned_data.append(cleaned_row)
return cleaned_data
# --- Main Entry Point ---
if __name__ == "__main__":
try:
# 1. Get Auth Headers
headers = auth_client.get_headers()
# 2. Build Query
# Example: Query last 7 days, group by queue and media type
query_payload = build_analytics_query(days_back=7)
print("Query Payload:")
print(json.dumps(query_payload, indent=2))
print("-" * 50)
# 3. Execute Query
raw_results = execute_analytics_query(query_payload, headers)
# 4. Process Results
final_data = process_results(raw_results)
# 5. Output
print(f"Total Groups Retrieved: {len(final_data)}")
print("Sample Data:")
for item in final_data[:3]: # Print first 3 items
print(json.dumps(item, indent=2))
except Exception as e:
print(f"Fatal Error: {e}")
exit(1)
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth token does not have the required scope.
Fix: Ensure your Service Account or API Client has the analytics:query:read scope assigned in the Genesys Cloud Admin Portal under Admin > Security > API Clients.
Error: 400 Bad Request - “Invalid Group By”
Cause: The groupBy field contains an unsupported value for the selected metrics or time grain.
Fix: Verify that queue and mediaType are valid grouping dimensions for the conversations/details/query endpoint. If you use timeGrain (e.g., “hourly”), ensure you do not group by fields that conflict with time aggregation in unexpected ways. For simple aggregations, timeGrain: "none" is safest.
Error: 429 Too Many Requests
Cause: You have exceeded the API rate limit for your organization.
Fix: Implement exponential backoff. The code above includes a basic 5-second retry for 429s. For production systems, use a library like tenacity to implement robust retry logic with jitter.
Error: Empty Results
Cause: No conversations occurred in the specified time range for the selected queues/media types.
Fix: Verify the dateFrom and dateTo are correct. Check if the filter_queues list contains valid queue IDs. Ensure the queues are active and have received interactions.