Constructing Analytics Queries Grouped by Queue and Media Type in Genesys Cloud
What You Will Build
- A Python script that constructs and executes an analytics aggregation query against the Genesys Cloud API.
- The query groups conversation metrics by queue ID and media type to produce a matrix of volume and handle time data.
- The implementation uses the Genesys Cloud Python SDK (
genesys-cloud-purecloud-platform-client) and handles pagination, rate limiting, and date range formatting.
Prerequisites
- OAuth Client Type: Service Account or Confidential Client.
- Required Scopes:
analytics:query:readis mandatory for executing aggregation queries. - SDK Version:
genesys-cloud-purecloud-platform-client >= 170.0.0. - Runtime: Python 3.8 or higher.
- Dependencies: Install the SDK via pip:
pip install genesys-cloud-purecloud-platform-client
Authentication Setup
Genesys Cloud uses OAuth 2.0 for all API access. For service integrations, the Client Credentials flow is the standard approach. This flow exchanges a Client ID and Client Secret for an access token.
The following code initializes the Genesys Cloud SDK client. The SDK handles the internal token caching and refresh logic automatically once configured.
import os
from purecloudplatformclientv2 import (
Configuration,
PureCloudPlatformClientV2,
AnalyticsApi,
CreateQueryRequest,
QuerySettings,
DataQuery,
GroupBy,
Interval,
Metric,
Filter,
FilterGroup
)
from purecloudplatformclientv2.rest import ApiException
import logging
# Configure logging to see SDK debug info if needed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_purecloud_client() -> PureCloudPlatformClientV2:
"""
Initializes and returns a configured PureCloudPlatformClientV2 instance.
Uses environment variables for credentials.
"""
config = Configuration()
config.host = os.getenv("GENESYS_CLOUD_REGION", "us-east-1") # e.g., us-east-1, eu-west-1
config.client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not config.client_id or not config.client_secret:
raise ValueError("GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET environment variables are required.")
client = PureCloudPlatformClientV2(config)
return client
if __name__ == "__main__":
try:
client = get_purecloud_client()
print("Authentication successful. Client initialized.")
except Exception as e:
logger.error(f"Failed to initialize client: {e}")
Implementation
Step 1: Constructing the DataQuery Object
The core of the analytics API is the CreateQueryRequest. This object contains the QuerySettings, which defines the DataQuery. The DataQuery object specifies the interval (time granularity), groupBys (dimensions), and metrics (measures).
To group by queue and media type, we must define two GroupBy objects: one for queue and one for mediaType.
from datetime import datetime, timezone
def build_query_request(start_date: str, end_date: str) -> CreateQueryRequest:
"""
Constructs a CreateQueryRequest that groups by queue and media type.
Args:
start_date: ISO 8601 string (e.g., "2023-10-01T00:00:00Z")
end_date: ISO 8601 string (e.g., "2023-10-02T00:00:00Z")
"""
# 1. Define the time interval
# 'hourly' is common for analytics. Options include: 'daily', 'weekly', 'monthly', 'hourly', '15min'
time_interval = Interval(
type_="hourly",
startDate=start_date,
endDate=end_date
)
# 2. Define GroupBys (Dimensions)
# Grouping by queue allows us to see data per queue.
group_by_queue = GroupBy(
type_="queue",
idType="id" # Use 'id' for UUIDs, 'name' for string names
)
# Grouping by mediaType allows us to distinguish between Voice, Chat, Email, etc.
group_by_media = GroupBy(
type_="mediaType"
)
# 3. Define Metrics (Measures)
# Common metrics: 'conversationCount', 'totalHandleTime', 'abandonedCount'
metric_volume = Metric(
type_="conversationCount",
displayType="sum"
)
metric_handle_time = Metric(
type_="totalHandleTime",
displayType="sum"
)
# 4. Assemble the DataQuery
data_query = DataQuery(
interval=time_interval,
groupBys=[group_by_queue, group_by_media],
metrics=[metric_volume, metric_handle_time],
# Optional: Filter by specific queues if needed.
# For this example, we retrieve all queues.
filter=None
)
# 5. Wrap in QuerySettings and CreateQueryRequest
query_settings = QuerySettings(
dataQueries=[data_query]
)
request = CreateQueryRequest(
querySettings=query_settings
)
return request
Step 2: Executing the Query and Handling Pagination
The Analytics API returns results in pages. The AnalyticsApi.post_analytics_conversations_details_query method returns a response object containing a nextPageUri. You must loop through these pages until nextPageUri is null.
Additionally, the API enforces rate limits. A 429 Too Many Requests response requires implementing exponential backoff.
from purecloudplatformclientv2 import AnalyticsApi
import time
import json
def fetch_analytics_data(client: PureCloudPlatformClientV2, request: CreateQueryRequest) -> list:
"""
Executes the analytics query and handles pagination and rate limiting.
Args:
client: The authenticated PureCloudPlatformClientV2 client.
request: The CreateQueryRequest object.
Returns:
A list of all aggregated result rows.
"""
analytics_api = AnalyticsApi(client)
all_results = []
page_uri = "/api/v2/analytics/conversations/details/query"
max_retries = 3
retry_count = 0
while page_uri:
try:
# Use the API client's post method
# Note: The SDK method name corresponds to the endpoint
response = analytics_api.post_analytics_conversations_details_query(
body=request
)
# Process the current page
if response.result and response.result.dataQueries:
data_query_result = response.result.dataQueries[0]
if data_query_result.rows:
all_results.extend(data_query_result.rows)
logger.info(f"Retrieved {len(data_query_result.rows)} rows from page.")
# Check for pagination
page_uri = response.result.nextPageUri
retry_count = 0 # Reset retry count on success
except ApiException as e:
logger.error(f"API Exception: {e.status} {e.reason}")
# Handle Rate Limiting (429)
if e.status == 429:
retry_count += 1
if retry_count > max_retries:
raise Exception("Max retries exceeded for 429 Too Many Requests.")
# Exponential backoff: 2^retry_count seconds
backoff_time = 2 ** retry_count
logger.warning(f"Rate limited. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
continue
# Handle other errors (401, 403, 500)
elif e.status in [401, 403]:
logger.error("Authentication or Authorization failed. Check scopes.")
raise
else:
logger.error(f"Unexpected error: {e.body}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
return all_results
Step 3: Processing and Formatting Results
The raw response from the Analytics API contains complex nested objects. Each row in data_query_result.rows contains groupBys and metrics. We need to flatten this structure for consumption by a dashboard or database.
def format_results(rows: list) -> list:
"""
Flattens the analytics response rows into a list of dictionaries.
Args:
rows: List of Row objects from the Analytics API response.
Returns:
List of dictionaries with keys: queue_id, queue_name, media_type, conversation_count, total_handle_time
"""
formatted_data = []
for row in rows:
# Extract GroupBy values
queue_info = None
media_type = None
for group_by in row.groupBys:
if group_by.name == "queue":
queue_info = group_by.value
# Note: group_by.value is usually the ID if idType="id"
# To get the name, you might need a separate lookup or use idType="name" in the query.
# For this example, we assume the value is the ID.
elif group_by.name == "mediaType":
media_type = group_by.value
# Extract Metric values
conversation_count = 0
total_handle_time = 0.0
for metric in row.metrics:
if metric.type_ == "conversationCount":
conversation_count = metric.value
elif metric.type_ == "totalHandleTime":
# Handle time is returned in milliseconds
total_handle_time = metric.value / 1000.0 # Convert to seconds
formatted_data.append({
"queue_id": queue_info,
"media_type": media_type,
"conversation_count": conversation_count,
"total_handle_time_seconds": total_handle_time
})
return formatted_data
Complete Working Example
The following script combines all previous steps into a single executable module. It defines the date range, builds the query, executes it with pagination, and prints the results as JSON.
import os
import json
import logging
from datetime import datetime, timezone, timedelta
from purecloudplatformclientv2 import (
Configuration,
PureCloudPlatformClientV2,
AnalyticsApi,
CreateQueryRequest,
QuerySettings,
DataQuery,
GroupBy,
Interval,
Metric
)
from purecloudplatformclientv2.rest import ApiException
import time
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_purecloud_client() -> PureCloudPlatformClientV2:
config = Configuration()
# Default to us-east-1 if not set
config.host = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
config.client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not config.client_id or not config.client_secret:
raise ValueError("Environment variables GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET are required.")
return PureCloudPlatformClientV2(config)
def build_query_request(start_date: str, end_date: str) -> CreateQueryRequest:
# Define Interval
interval = Interval(
type_="hourly",
startDate=start_date,
endDate=end_date
)
# Define GroupBys
group_by_queue = GroupBy(type_="queue", idType="id")
group_by_media = GroupBy(type_="mediaType")
# Define Metrics
metric_count = Metric(type_="conversationCount", displayType="sum")
metric_handle_time = Metric(type_="totalHandleTime", displayType="sum")
# Build DataQuery
data_query = DataQuery(
interval=interval,
groupBys=[group_by_queue, group_by_media],
metrics=[metric_count, metric_handle_time]
)
# Build Request
return CreateQueryRequest(
querySettings=QuerySettings(dataQueries=[data_query])
)
def execute_query(client: PureCloudPlatformClientV2, request: CreateQueryRequest) -> list:
analytics_api = AnalyticsApi(client)
all_rows = []
page_uri = "/api/v2/analytics/conversations/details/query"
max_retries = 5
retry_count = 0
while page_uri:
try:
response = analytics_api.post_analytics_conversations_details_query(body=request)
if response.result and response.result.dataQueries:
dq_result = response.result.dataQueries[0]
if dq_result.rows:
all_rows.extend(dq_result.rows)
logger.info(f"Page fetched. Total rows so far: {len(all_rows)}")
page_uri = response.result.nextPageUri
retry_count = 0
except ApiException as e:
if e.status == 429:
retry_count += 1
if retry_count > max_retries:
raise Exception("Max retries exceeded due to rate limiting.")
wait_time = 2 ** retry_count
logger.warning(f"Rate limited (429). Waiting {wait_time}s...")
time.sleep(wait_time)
continue
else:
raise
except Exception as e:
logger.error(f"Error during query execution: {e}")
raise
return all_rows
def format_and_print_results(rows: list):
if not rows:
print("No data found for the specified range and filters.")
return
output_data = []
for row in rows:
queue_id = None
media_type = None
for gb in row.groupBys:
if gb.name == "queue":
queue_id = gb.value
elif gb.name == "mediaType":
media_type = gb.value
conv_count = 0
handle_time_ms = 0
for m in row.metrics:
if m.type_ == "conversationCount":
conv_count = m.value
elif m.type_ == "totalHandleTime":
handle_time_ms = m.value
output_data.append({
"queue_id": queue_id,
"media_type": media_type,
"conversation_count": conv_count,
"total_handle_time_seconds": round(handle_time_ms / 1000, 2)
})
print(json.dumps(output_data, indent=2))
if __name__ == "__main__":
try:
# 1. Initialize Client
client = get_purecloud_client()
# 2. Define Date Range (Last 24 Hours)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=1)
# Format as ISO 8601 with Z suffix
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
logger.info(f"Querying analytics from {start_str} to {end_str}")
# 3. Build Query
request = build_query_request(start_str, end_str)
# 4. Execute Query
rows = execute_query(client, request)
# 5. Format and Output
format_and_print_results(rows)
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
Cause: The OAuth token is invalid, expired, or lacks the required scope.
Fix: Ensure your environment variables are correct. Verify that the OAuth client has the analytics:query:read scope assigned in the Genesys Cloud Admin Console under Manage > Integrations > OAuth 2.0 Clients.
# Debug tip: Print the token info if available
# The SDK does not expose the raw token easily, so check the Admin Console.
# If using a Service Account, ensure the user associated with the service account has the "Analytics Query User" role or higher.
Error: 429 Too Many Requests
Cause: The Analytics API has strict rate limits. Aggregation queries are resource-intensive.
Fix: Implement exponential backoff, as shown in the execute_query function. Do not fire multiple concurrent analytics queries. Serialize your requests.
Error: Empty Results
Cause: The date range might have no data, or the filters are too restrictive.
Fix:
- Check the date range. Ensure
startDateis beforeendDate. - Verify that the queues selected actually had conversations in that period.
- If filtering by specific queue IDs, ensure the IDs are valid UUIDs.
# To debug empty results, remove filters first.
# data_query = DataQuery(interval=interval, groupBys=[group_by_queue, group_by_media], metrics=[metric_count], filter=None)
Error: AttributeError: 'NoneType' object has no attribute 'rows'
Cause: The API response structure might vary slightly if the query fails silently or returns an error object instead of a result object.
Fix: Always check if response.result and response.result.dataQueries before accessing rows.