Constructing an Analytics API Aggregation Query Grouped by Queue and Media Type
What You Will Build
- A Python script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
- The query groups results by queue ID and media type, allowing you to see volume and handle times per channel per queue.
- The implementation uses the official Genesys Cloud Python SDK (
genesyscloud) with production-grade error handling and pagination logic.
Prerequisites
- OAuth Client Type: Private Key Authentication (PKCS #8) or Client Credentials. Private Key is recommended for server-to-server integrations.
- Required Scopes:
analytics:query:readis the minimum scope required. If you need detailed conversation data,analytics:conversation:readmay also be needed depending on thegroupBydepth, but for standard aggregations,analytics:query:readsuffices. - SDK Version:
genesyscloudPython package version 12.0.0 or higher. - Runtime: Python 3.8+.
- External Dependencies:
genesyscloud,pydantic(included with SDK),cryptography(for private key handling).
Install the dependencies via pip:
pip install genesyscloud cryptography
Authentication Setup
Genesys Cloud uses OAuth 2.0 with JWT assertions for server-to-server authentication. The Genesys Cloud Python SDK simplifies this via the genesyscloud.init module. You must provide your Organization ID, Client ID, and the Private Key content.
The following code demonstrates how to initialize the platform client with proper error handling for authentication failures.
import sys
import logging
from genesyscloud import init
from genesyscloud.platformclient.rest import ApiException
# Configure logging to see SDK debug info if necessary
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def initialize_client(org_id: str, client_id: str, private_key_content: str):
"""
Initializes the Genesys Cloud Platform Client.
Args:
org_id: The unique identifier for your Genesys Cloud organization.
client_id: The OAuth client ID.
private_key_content: The raw string content of the PKCS#8 private key.
Returns:
PureCloudPlatformClientV2: The initialized platform client instance.
"""
try:
# The init function handles the OAuth token exchange and caching.
# It uses the provided private key to sign a JWT and exchange it for an access token.
init.init(
org_id=org_id,
client_id=client_id,
private_key=private_key_content,
debug=False # Set to True for verbose HTTP tracing during debugging
)
# Access the platform client from the global context
from genesyscloud.platformclient import platform_client
return platform_client
except Exception as e:
logger.error(f"Failed to initialize Genesys Cloud client: {e}")
raise SystemExit(1)
# Example usage configuration (load these from environment variables in production)
ORG_ID = "your-organization-id"
CLIENT_ID = "your-client-id"
PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC...
-----END PRIVATE KEY-----"""
# Initialize the client
try:
client = initialize_client(ORG_ID, CLIENT_ID, PRIVATE_KEY)
logging.info("Successfully authenticated with Genesys Cloud.")
except SystemExit:
logging.error("Authentication failed. Check credentials and scopes.")
sys.exit(1)
Implementation
Step 1: Define the Query Parameters
The core of the Analytics API is the AnalyticsQuery. To group by queue and media type, you must construct a query body that specifies the groupBy fields, the interval, the dateFrom/dateTo range, and the metrics you wish to aggregate.
Key considerations for the query body:
groupBy: This is an array of strings. To group by queue and media type, you must include"queueId"and"mediaType".metrics: You must specify which metrics to return. Common metrics includeconversationCount,handledCount,offerCount, andhandleTime.interval: Defines the time granularity. Use"PT1H"for hourly,"P1D"for daily, or"P1W"for weekly. For a simple aggregation over a range,"P1D"is often useful.dateFromanddateTo: ISO 8601 timestamps.
The following code constructs the query body using the SDK’s data models. Using the SDK models ensures type safety and proper JSON serialization.
from genesyscloud.models.analytics_query import AnalyticsQuery
from genesyscloud.models.query_filter import QueryFilter
from datetime import datetime, timedelta
def build_analytics_query(
date_from: datetime,
date_to: datetime,
metrics: list[str],
group_by: list[str]
) -> AnalyticsQuery:
"""
Constructs an AnalyticsQuery object for the Genesys Cloud SDK.
Args:
date_from: Start of the query range.
date_to: End of the query range.
metrics: List of metric names to aggregate (e.g., ['conversationCount', 'handleTime']).
group_by: List of grouping fields (e.g., ['queueId', 'mediaType']).
Returns:
AnalyticsQuery: The configured query object.
"""
# Define the query filter (optional, but good practice to limit scope if needed)
# Here we leave it empty to query all queues and all media types.
# Construct the AnalyticsQuery
query = AnalyticsQuery(
group_by=group_by,
interval="P1D", # Daily aggregation
date_from=date_from.isoformat(),
date_to=date_to.isoformat(),
metrics=metrics
)
return query
# Example: Query the last 7 days
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# Define metrics of interest
requested_metrics = [
"conversationCount",
"handledCount",
"offerCount",
"handleTime",
"wrapUpTime"
]
# Define grouping dimensions
requested_group_by = [
"queueId",
"mediaType"
]
query_body = build_analytics_query(start_date, end_date, requested_metrics, requested_group_by)
logging.info(f"Constructed query for range {start_date} to {end_date}")
Step 2: Execute the Query and Handle Pagination
The Genesys Cloud Analytics API returns paginated results. The response contains a nextPageUri if more data is available. You must loop through these pages to get the complete dataset.
The SDK method post_analytics_conversations_details_query is used for conversation-level analytics. Note that for high-volume aggregations, post_analytics_conversations_summary_query is more efficient, but details provides more granular grouping options in some contexts. For standard queue/media grouping, summary is often preferred for performance, but details is more flexible for complex groupings. We will use post_analytics_conversations_details_query as it is the most common entry point for custom aggregations.
OAuth Scope Required: analytics:query:read
from genesyscloud.analytics.api import AnalyticsApi
from genesyscloud.platformclient.rest import ApiException
import time
def fetch_analytics_data(client: PureCloudPlatformClientV2, query_body: AnalyticsQuery):
"""
Fetches analytics data using the provided query, handling pagination.
Args:
client: The initialized Genesys Cloud platform client.
query_body: The AnalyticsQuery object.
Returns:
list: A list of all result entities from all pages.
"""
analytics_api = AnalyticsApi(client)
all_results = []
next_page_uri = None
logging.info("Starting analytics query execution...")
while True:
try:
# Execute the query
# The SDK converts the AnalyticsQuery object to JSON automatically
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
next_page_uri=next_page_uri
)
# Append the entities from this page
if response.entities:
all_results.extend(response.entities)
logging.info(f"Fetched {len(response.entities)} records. Total so far: {len(all_results)}")
else:
logging.info("No entities returned in this page.")
# Check for pagination
if response.next_page_uri:
next_page_uri = response.next_page_uri
# Respect rate limits: add a small delay between requests
time.sleep(0.5)
else:
logging.info("No more pages. Pagination complete.")
break
except ApiException as e:
if e.status == 429:
logging.warning("Rate limit (429) encountered. Waiting 5 seconds before retrying...")
time.sleep(5)
continue
elif e.status == 401 or e.status == 403:
logging.error(f"Authentication/Authorization error: {e.reason}")
raise
else:
logging.error(f"API Exception occurred: {e.status} - {e.reason}")
raise
except Exception as e:
logging.error(f"Unexpected error during query: {e}")
raise
return all_results
# Execute the fetch
try:
analytics_results = fetch_analytics_data(client, query_body)
logging.info(f"Query complete. Total records retrieved: {len(analytics_results)}")
except Exception as e:
logging.error(f"Failed to fetch analytics data: {e}")
sys.exit(1)
Step 3: Process and Structure the Results
The raw response from the Analytics API is a list of ConversationDetail or ConversationSummary entities. Each entity contains a groupBy object that mirrors the structure of your query. Since we grouped by queueId and mediaType, each entity will have these keys in its groupBy field.
To make this data useful, we will transform it into a dictionary keyed by (queue_id, media_type) for easy lookup, and then print a structured summary.
from typing import Dict, Tuple, Any
import json
def process_analytics_results(results: list) -> Dict[Tuple[str, str], Any]:
"""
Transforms the raw analytics results into a structured dictionary.
Args:
results: The list of analytics entities returned by the API.
Returns:
dict: A dictionary keyed by (queue_id, media_type) with metric values.
"""
structured_data = {}
for entity in results:
# Extract the groupBy values
group_by = entity.group_by
queue_id = group_by.get("queueId")
media_type = group_by.get("mediaType")
if not queue_id or not media_type:
continue
key = (queue_id, media_type)
# Initialize the key if it does not exist
if key not in structured_data:
structured_data[key] = {
"queue_id": queue_id,
"media_type": media_type,
"metrics": {}
}
# Extract metrics
# The metrics field is a dictionary where keys are metric names and values are metric objects
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
# metric_obj usually has a 'count' or 'value' field depending on the metric type
# For handleTime, it might be 'sum' or 'value'
# We will store the raw metric object for flexibility
structured_data[key]["metrics"][metric_name] = metric_obj
return structured_data
def print_summary(data: Dict[Tuple[str, str], Any]):
"""
Prints a human-readable summary of the aggregated data.
"""
if not data:
logging.info("No data to display.")
return
print("\n--- Analytics Summary ---")
print(f"{'Queue ID':<35} | {'Media Type':<15} | {'Conversations':<15} | {'Handled':<15} | {'Avg Handle Time (ms)':<20}")
print("-" * 100)
for (queue_id, media_type), info in sorted(data.items()):
metrics = info["metrics"]
conv_count = metrics.get("conversationCount", {}).get("count", 0)
handled_count = metrics.get("handledCount", {}).get("count", 0)
# Handle time is typically in milliseconds, stored as 'sum' or 'value'
# For aggregation queries, handleTime is often returned as a sum or average depending on the query type
# In details query, it might be individual, but in summary it aggregates.
# Let's assume we are looking for the sum or count.
# If this were a summary query, we might see 'value' for averages.
handle_time_ms = metrics.get("handleTime", {}).get("sum", 0)
# Calculate average handle time if conversations exist
avg_handle_time = (handle_time_ms / handled_count) if handled_count > 0 else 0
print(f"{queue_id:<35} | {media_type:<15} | {conv_count:<15} | {handled_count:<15} | {avg_handle_time:<20.0f}")
print("-" * 100)
# Process and print
processed_data = process_analytics_results(analytics_results)
print_summary(processed_data)
# Optional: Save to JSON for further analysis
with open("analytics_results.json", "w") as f:
# Convert the data to JSON-serializable format
json_data = {
k: v for k, v in processed_data.items()
}
# Note: The keys are tuples, which are not JSON serializable.
# We need to convert them to strings or a different structure.
serializable_data = {
f"{queue_id}_{media_type}": v
for (queue_id, media_type), v in processed_data.items()
}
json.dump(serializable_data, f, indent=2, default=str)
logging.info("Results saved to analytics_results.json")
Complete Working Example
Below is the complete, consolidated script. You can copy this into a file named genesys_analytics_query.py, update the credentials at the top, and run it.
import sys
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, Tuple, Any
import json
# Import Genesys Cloud SDK modules
from genesyscloud import init
from genesyscloud.platformclient.rest import ApiException
from genesyscloud.models.analytics_query import AnalyticsQuery
from genesyscloud.analytics.api import AnalyticsApi
from genesyscloud.platformclient import platform_client as pc
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def initialize_client(org_id: str, client_id: str, private_key_content: str):
"""
Initializes the Genesys Cloud Platform Client.
"""
try:
init.init(
org_id=org_id,
client_id=client_id,
private_key=private_key_content,
debug=False
)
return pc
except Exception as e:
logger.error(f"Failed to initialize Genesys Cloud client: {e}")
raise SystemExit(1)
def build_analytics_query(
date_from: datetime,
date_to: datetime,
metrics: list,
group_by: list
) -> AnalyticsQuery:
"""
Constructs an AnalyticsQuery object.
"""
query = AnalyticsQuery(
group_by=group_by,
interval="P1D",
date_from=date_from.isoformat(),
date_to=date_to.isoformat(),
metrics=metrics
)
return query
def fetch_analytics_data(client, query_body: AnalyticsQuery):
"""
Fetches analytics data with pagination and retry logic.
"""
analytics_api = AnalyticsApi(client)
all_results = []
next_page_uri = None
logger.info("Starting analytics query execution...")
while True:
try:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
next_page_uri=next_page_uri
)
if response.entities:
all_results.extend(response.entities)
logger.info(f"Fetched {len(response.entities)} records. Total: {len(all_results)}")
else:
logger.info("No entities in this page.")
if response.next_page_uri:
next_page_uri = response.next_page_uri
time.sleep(0.5) # Rate limiting courtesy
else:
logger.info("Pagination complete.")
break
except ApiException as e:
if e.status == 429:
logger.warning("Rate limit (429). Waiting 5s...")
time.sleep(5)
continue
else:
logger.error(f"API Error {e.status}: {e.reason}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
return all_results
def process_and_print_results(results: list):
"""
Processes results and prints a summary.
"""
structured_data = {}
for entity in results:
group_by = entity.group_by
queue_id = group_by.get("queueId")
media_type = group_by.get("mediaType")
if not queue_id or not media_type:
continue
key = (queue_id, media_type)
if key not in structured_data:
structured_data[key] = {
"queue_id": queue_id,
"media_type": media_type,
"metrics": {}
}
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
structured_data[key]["metrics"][metric_name] = metric_obj
if not structured_data:
logger.info("No data to display.")
return
print("\n--- Analytics Summary ---")
print(f"{'Queue ID':<35} | {'Media Type':<15} | {'Conversations':<15} | {'Handled':<15} | {'Sum Handle Time (ms)':<25}")
print("-" * 100)
for (queue_id, media_type), info in sorted(structured_data.items()):
metrics = info["metrics"]
conv_count = metrics.get("conversationCount", {}).get("count", 0)
handled_count = metrics.get("handledCount", {}).get("count", 0)
handle_time_sum = metrics.get("handleTime", {}).get("sum", 0)
print(f"{queue_id:<35} | {media_type:<15} | {conv_count:<15} | {handled_count:<15} | {handle_time_sum:<25.0f}")
print("-" * 100)
# Save to JSON
serializable_data = {
f"{q}_{m}": v for (q, m), v in structured_data.items()
}
with open("analytics_output.json", "w") as f:
json.dump(serializable_data, f, indent=2, default=str)
logger.info("Saved results to analytics_output.json")
if __name__ == "__main__":
# CONFIGURATION
ORG_ID = "your-organization-id"
CLIENT_ID = "your-client-id"
PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC...
-----END PRIVATE KEY-----"""
# 1. Initialize Client
try:
client = initialize_client(ORG_ID, CLIENT_ID, PRIVATE_KEY)
except SystemExit:
sys.exit(1)
# 2. Build Query
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
metrics = ["conversationCount", "handledCount", "handleTime"]
group_by = ["queueId", "mediaType"]
query_body = build_analytics_query(start_date, end_date, metrics, group_by)
# 3. Fetch Data
try:
results = fetch_analytics_data(client, query_body)
except Exception as e:
logger.error(f"Failed to fetch data: {e}")
sys.exit(1)
# 4. Process Results
process_and_print_results(results)
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.
Fix: Ensure the private_key content is correct and matches the client_id. Check that the private key is in PKCS#8 format. If using environment variables, verify they are loaded correctly.
Code Check:
# Verify the private key is not empty
if not PRIVATE_KEY.strip():
raise ValueError("Private key is empty")
Error: 403 Forbidden
Cause: The OAuth client lacks the required scope analytics:query:read.
Fix: Go to the Genesys Cloud Admin Console → Security → OAuth Clients. Edit your client and ensure analytics:query:read is added to the scopes. Re-initialize the client to get a new token with the updated scopes.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Analytics API. Analytics queries are computationally expensive and have stricter rate limits than other APIs.
Fix: Implement exponential backoff. The code above includes a basic time.sleep(0.5) between pages. For high-frequency polling, reduce the query frequency or aggregate over longer time intervals (P1W instead of P1D).
Code Adjustment:
# Increase sleep duration if 429s persist
if e.status == 429:
time.sleep(5) # Wait 5 seconds before retrying
Error: Empty Results
Cause: The date range has no data, or the grouping filters are too restrictive.
Fix: Verify that date_from and date_to are in ISO 8601 format and cover a period with activity. Ensure the queues you expect to see are active and have had conversations. Check that mediaType values match Genesys Cloud standards (e.g., “voice”, “chat”, “email”, “sms”).