Querying Genesys Cloud Analytics Aggregation Data Grouped by Queue and Media Type
What You Will Build
This tutorial demonstrates how to construct a robust Analytics API aggregation query that retrieves conversation metrics grouped by queue and media type. You will build a Python script using the official Genesys Cloud SDK to authenticate, define a granular query body, execute the request with pagination handling, and parse the resulting data into a structured format. This covers the Python programming language.
Prerequisites
To follow this tutorial, you need the following:
- OAuth Application: A Genesys Cloud OAuth application with the
analytics:conversation:readscope. This scope is mandatory for accessing any analytics data. - SDK Version: Genesys Cloud Python SDK version
v2(purecloudplatformclientv2). Ensure you install the latest stable release. - Runtime: Python 3.8 or higher.
- Dependencies: The
purecloudplatformclientv2package andpython-dotenvfor secure credential management.
Install the required packages using pip:
pip install purecloudplatformclientv2 python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. The Python SDK handles the token retrieval and refresh logic internally, provided you configure the client correctly. You must store your Client ID and Client Secret securely. Using environment variables is the standard practice for production code.
Create a .env file in your project root with the following content:
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_ENVIRONMENT=us-east-1
The GENESYS_ENVIRONMENT variable determines the API endpoint region. Common values include us-east-1, eu-west-1, au-southeast-1, etc.
Below is the code to initialize the platform client. The SDK automatically caches the access token and handles refresh tokens when the initial token expires.
import os
from purecloudplatformclientv2 import (
PlatformClient,
AnalyticsApi,
PureCloudRegionHost
)
from dotenv import load_dotenv
def get_platform_client() -> PlatformClient:
"""
Initializes and returns an authenticated PlatformClient instance.
"""
load_dotenv()
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
client = PlatformClient()
# Set the region host based on the environment variable
try:
region_host = PureCloudRegionHost.get_host(environment)
except ValueError as e:
raise ValueError(f"Invalid environment: {environment}. {e}")
client.set_environment(region_host)
client.set_credentials(client_id, client_secret)
return client
# Initialize the client
client = get_platform_client()
Implementation
The core of this tutorial involves constructing the AnalyticsQueryBody. This object defines what data you want, how it is grouped, and the time range. The Analytics Aggregation API (/api/v2/analytics/conversations/details/query) is powerful but strict. If the grouping keys do not match the data types or if the time range is invalid, the API will return a 400 Bad Request.
Step 1: Constructing the Query Body
The AnalyticsQueryBody requires a groupBy array. To group by queue and media type, you must specify the exact grouping keys: queue and mediaType. You must also define the interval for the time series data. For a simple aggregation without time-series breakdown, use PT0S (zero seconds) or omit the interval if you only want totals. However, most use cases require an interval. We will use P1D (one day) to get daily aggregates.
The filter object allows you to scope the data to specific queues, users, or time ranges. We will filter by a specific date range to ensure the query is performant.
from purecloudplatformclientv2 import AnalyticsQueryBody, AnalyticsFilter
def build_analytics_query(start_date: str, end_date: str, queue_ids: list = None) -> AnalyticsQueryBody:
"""
Builds the AnalyticsQueryBody for grouping by queue and media type.
Args:
start_date: ISO 8601 start date (e.g., "2023-10-01T00:00:00.000Z")
end_date: ISO 8601 end date (e.g., "2023-10-31T23:59:59.999Z")
queue_ids: Optional list of Queue IDs to filter. If None, all queues are included.
Returns:
AnalyticsQueryBody instance
"""
# Define the grouping keys
group_by = ["queue", "mediaType"]
# Define the time interval
# P1D means one day. Other options: PT1H (hour), PT15M (15 minutes)
interval = "P1D"
# Define the filter
filter_obj = AnalyticsFilter()
filter_obj.date_from = start_date
filter_obj.date_to = end_date
# Optional: Filter by specific queues
if queue_ids and len(queue_ids) > 0:
filter_obj.queues = queue_ids
# Construct the query body
query_body = AnalyticsQueryBody()
query_body.group_by = group_by
query_body.interval = interval
query_body.filter = filter_obj
return query_body
Step 2: Executing the Query with Pagination
The Analytics Aggregation API returns data in pages. The response includes a nextPageId if more data is available. You must handle pagination to retrieve the complete dataset. The SDK method post_analytics_conversations_details_query sends the POST request.
Note that the API enforces rate limits. If you receive a 429 Too Many Requests response, you must wait before retrying. The SDK does not automatically retry, so you should implement exponential backoff in production code. For this tutorial, we will assume standard traffic conditions but include error handling for common HTTP errors.
from purecloudplatformclientv2.rest import ApiException
import time
def fetch_analytics_data(client: PlatformClient, query_body: AnalyticsQueryBody) -> list:
"""
Fetches analytics data with pagination handling.
Args:
client: Authenticated PlatformClient
query_body: The AnalyticsQueryBody to execute
Returns:
List of analytics result objects
"""
api_instance = AnalyticsApi(client)
all_results = []
page_id = None
print("Starting analytics query...")
while True:
try:
# Execute the query
# The nextPageId parameter is None for the first call
response = api_instance.post_analytics_conversations_details_query(
body=query_body,
next_page_id=page_id
)
# Append the results from this page
if response.entities:
all_results.extend(response.entities)
print(f"Retrieved {len(response.entities)} records. Total so far: {len(all_results)}")
# Check for pagination
if response.next_page_id:
page_id = response.next_page_id
# Small delay to respect rate limits between pages
time.sleep(0.5)
else:
# No more pages
break
except ApiException as e:
if e.status == 429:
print("Rate limit exceeded. Waiting 5 seconds before retrying...")
time.sleep(5)
continue
elif e.status == 400:
print(f"Bad Request: {e.body}")
raise ValueError(f"Invalid query body. Check date format and grouping keys. Error: {e.body}")
else:
print(f"API Exception: {e.status} - {e.reason}")
raise e
return all_results
Step 3: Processing and Structuring the Results
The raw response from the API contains nested objects. Each entity in response.entities represents a single combination of queue and media type for the specified interval. The summary object contains the aggregated metrics.
You need to extract the relevant fields: queue.name, mediaType, summary.handleTime, summary.wrapUpTime, and summary.totalConversations.
from typing import List, Dict, Any
def process_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
"""
Parses the raw analytics results into a clean list of dictionaries.
Args:
results: List of AnalyticsQueryResponseEntity objects
Returns:
List of dictionaries with flattened keys
"""
processed_data = []
for entity in results:
if not entity.summary:
continue
# Extract queue information
queue_name = entity.queue.name if entity.queue and entity.queue.name else "Unknown Queue"
queue_id = entity.queue.id if entity.queue and entity.queue.id else None
# Extract media type
media_type = entity.media_type if hasattr(entity, 'media_type') and entity.media_type else "Unknown"
# Extract summary metrics
handle_time = entity.summary.handle_time if entity.summary.handle_time else 0
wrap_up_time = entity.summary.wrap_up_time if entity.summary.wrap_up_time else 0
total_conversations = entity.summary.total_conversations if entity.summary.total_conversations else 0
abandoned_conversations = entity.summary.abandoned_conversations if entity.summary.abandoned_conversations else 0
# Construct the output dictionary
record = {
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"handle_time_seconds": handle_time,
"wrap_up_time_seconds": wrap_up_time,
"total_conversations": total_conversations,
"abandoned_conversations": abandoned_conversations,
"interval_start": entity.interval_start,
"interval_end": entity.interval_end
}
processed_data.append(record)
return processed_data
Complete Working Example
The following script combines all the previous steps into a single, runnable module. It authenticates, builds the query, fetches all pages of data, processes the results, and prints the final output.
Replace the placeholder dates in the main function with your desired date range.
import os
import sys
from purecloudplatformclientv2 import (
PlatformClient,
AnalyticsApi,
PureCloudRegionHost,
AnalyticsQueryBody,
AnalyticsFilter
)
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
import time
from typing import List, Dict, Any
def get_platform_client() -> PlatformClient:
load_dotenv()
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
client = PlatformClient()
region_host = PureCloudRegionHost.get_host(environment)
client.set_environment(region_host)
client.set_credentials(client_id, client_secret)
return client
def build_analytics_query(start_date: str, end_date: str, queue_ids: list = None) -> AnalyticsQueryBody:
group_by = ["queue", "mediaType"]
interval = "P1D"
filter_obj = AnalyticsFilter()
filter_obj.date_from = start_date
filter_obj.date_to = end_date
if queue_ids and len(queue_ids) > 0:
filter_obj.queues = queue_ids
query_body = AnalyticsQueryBody()
query_body.group_by = group_by
query_body.interval = interval
query_body.filter = filter_obj
return query_body
def fetch_analytics_data(client: PlatformClient, query_body: AnalyticsQueryBody) -> list:
api_instance = AnalyticsApi(client)
all_results = []
page_id = None
while True:
try:
response = api_instance.post_analytics_conversations_details_query(
body=query_body,
next_page_id=page_id
)
if response.entities:
all_results.extend(response.entities)
print(f"Retrieved {len(response.entities)} records. Total so far: {len(all_results)}")
if response.next_page_id:
page_id = response.next_page_id
time.sleep(0.5)
else:
break
except ApiException as e:
if e.status == 429:
print("Rate limit exceeded. Waiting 5 seconds before retrying...")
time.sleep(5)
continue
else:
raise e
return all_results
def process_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
processed_data = []
for entity in results:
if not entity.summary:
continue
queue_name = entity.queue.name if entity.queue and entity.queue.name else "Unknown Queue"
queue_id = entity.queue.id if entity.queue and entity.queue.id else None
media_type = entity.media_type if hasattr(entity, 'media_type') and entity.media_type else "Unknown"
handle_time = entity.summary.handle_time if entity.summary.handle_time else 0
total_conversations = entity.summary.total_conversations if entity.summary.total_conversations else 0
record = {
"queue_id": queue_id,
"queue_name": queue_name,
"media_type": media_type,
"handle_time_seconds": handle_time,
"total_conversations": total_conversations,
"interval_start": entity.interval_start,
"interval_end": entity.interval_end
}
processed_data.append(record)
return processed_data
def main():
try:
# 1. Authenticate
client = get_platform_client()
print("Authentication successful.")
# 2. Define Date Range
# Use ISO 8601 format
start_date = "2023-10-01T00:00:00.000Z"
end_date = "2023-10-31T23:59:59.999Z"
# 3. Build Query
# Optional: Pass specific queue IDs to filter
# queue_ids = ["queue-id-1", "queue-id-2"]
query_body = build_analytics_query(start_date, end_date)
# 4. Fetch Data
raw_results = fetch_analytics_data(client, query_body)
# 5. Process Results
final_data = process_analytics_results(raw_results)
# 6. Output
print("\n--- Final Analytics Data ---")
for row in final_data:
print(row)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request
Cause: The query body is malformed. Common reasons include:
- Invalid
date_fromordate_toformat. The API requires ISO 8601 with timezone. - Invalid
interval. The interval must be a valid ISO 8601 duration (e.g.,P1D,PT1H). - Missing required fields in the
filterobject.
Fix: Validate the ISO 8601 strings. Ensure the start date is before the end date. Check the SDK documentation for valid interval formats.
# Correct ISO 8601 format
start_date = "2023-10-01T00:00:00.000Z"
# Incorrect format (missing timezone)
# start_date = "2023-10-01T00:00:00"
Error: 403 Forbidden
Cause: The OAuth application does not have the required scope.
Fix: Ensure the OAuth application has the analytics:conversation:read scope. If you are using a custom OAuth app, go to the Admin Console > Apps > [Your App] > Scopes and add the missing scope. Then regenerate the client secret if you changed the scope configuration.
Error: 429 Too Many Requests
Cause: You are exceeding the API rate limits. The Analytics API has strict rate limits, especially for aggregation queries which are computationally expensive.
Fix: Implement exponential backoff. The code example above includes a simple 5-second wait. In production, implement a retry mechanism with increasing delays (e.g., 1s, 2s, 4s, 8s).
import time
import random
def retry_with_backoff(func, *args, max_retries=5, base_delay=1):
for attempt in range(max_retries):
try:
return func(*args)
except ApiException as e:
if e.status == 429:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Retrying in {delay:.2f} seconds...")
time.sleep(delay)
else:
raise e
raise Exception("Max retries exceeded")
Error: Empty Results
Cause: No conversations occurred in the specified date range, or the filter is too restrictive.
Fix: Check the date range. Ensure the queues exist and had activity. Try removing the queue_ids filter to see if data returns for all queues.