Building a Queue and Media Type Analytics Aggregation Query
What You Will Build
- You will write a script that retrieves historical conversation metrics aggregated by queue and media type.
- This tutorial uses the Genesys Cloud CX Analytics API (
/api/v2/analytics/conversations/details/query). - The implementation is provided in Python using the official
genesyscloudSDK.
Prerequisites
- OAuth Client Type: Service Account or Client Credentials flow.
- Required Scopes:
analytics:conversation:viewandanalytics:queue:view. - SDK Version:
genesyscloudPython SDK version 137.0.0 or later. - Runtime: Python 3.8 or later.
- Dependencies:
genesyscloudpython-dotenv(for managing secrets securely)
Install the dependencies via pip:
pip install genesyscloud python-dotenv
Authentication Setup
The Genesys Cloud SDK handles token acquisition and refresh automatically when configured correctly. You must provide the client ID, client secret, and environment URL.
Create a .env file in your project root:
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_ENV_URL=https://api.mypurecloud.com
Initialize the SDK in your code. This step establishes the session and caches the token for subsequent requests.
import os
from dotenv import load_dotenv
from genesyscloud import Configuration, ApiClient, AnalyticsApi
# Load environment variables
load_dotenv()
def get_authenticated_analytics_client():
"""
Configures and returns an authenticated AnalyticsApi client.
"""
# Load credentials from environment
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
env_url = os.getenv("GENESYS_ENV_URL")
if not all([client_id, client_secret, env_url]):
raise ValueError("Missing required environment variables.")
# Configure the SDK
config = Configuration()
config.host = env_url
config.oauth_client_id = client_id
config.oauth_client_secret = client_secret
# Create the API client
api_client = ApiClient(configuration=config)
# Initialize the Analytics API wrapper
analytics_api = AnalyticsApi(api_client)
return analytics_api
Implementation
Step 1: Define the Aggregation Query Body
The core of this tutorial is constructing the body parameter for the post_analytics_conversations_details_query method. The Analytics API uses a query object that defines the time window, the metrics to retrieve, and the grouping keys.
To group by queue and media type, you must specify these entities in the groupBys array. The API requires that the interval is specified for time-series data, even if you are only interested in the total aggregate.
from genesyscloud import PostAnalyticsConversationsDetailsQueryBody, Interval
def build_query_body(start_time: str, end_time: str) -> dict:
"""
Constructs the request body for the analytics query.
Args:
start_time: ISO 8601 formatted start time (e.g., '2023-10-01T00:00:00Z')
end_time: ISO 8601 formatted end time (e.g., '2023-10-02T00:00:00Z')
Returns:
A dictionary representing the query body.
"""
# Define the time interval
# Note: The API returns data for each interval bucket.
# For a simple total, use a single large interval or iterate over daily intervals.
interval_obj = Interval(
startTime=start_time,
endTime=end_time,
size="P1D" # One day granularity
)
# Define the metrics you want to aggregate
# Common metrics: answered, abandoned, offered, totalhandle
metrics = [
"answered",
"abandoned",
"offered",
"totalhandle",
"wait"
]
# Define the grouping keys
# This is the critical part for the tutorial
group_bys = ["queue", "mediaType"]
# Construct the body object
# Using a dictionary here for clarity, though the SDK supports model objects
query_body = {
"interval": {
"startTime": start_time,
"endTime": end_time,
"size": "P1D"
},
"metrics": metrics,
"groupBys": group_bys,
"filter": {
"type": "and",
"clauses": [
{
"type": "equals",
"path": "mediaType",
"value": "voice" # Optional: Filter only voice if desired
}
]
}
}
return query_body
Key Parameter Explanation:
interval: Defines the time range. Thesizeparameter determines the granularity of the returned buckets. If you setsizetoP1D, you get one row per day per queue per media type. If you want a single total for the entire range, you can omit thesizeor use a size equal to the duration, but the API often expects a standard ISO 8601 duration format.groupBys: This array tells the engine how to slice the data.queuegroups by the queue ID/name.mediaTypegroups by voice, chat, email, etc.filter: You can pre-filter the data before aggregation. In the example above, we filter forvoiceonly. Remove the filter block to include all media types.
Step 2: Execute the Query and Handle Pagination
The Analytics API does not return all data in one call if the dataset is large. You must handle the nextPageUri to fetch subsequent pages. Additionally, you must implement retry logic for 429 Too Many Requests errors, which are common in analytics queries due to the heavy computational load.
import time
import requests
from typing import List, Dict, Any
def fetch_analytics_data(
analytics_api: AnalyticsApi,
query_body: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Executes the analytics query and handles pagination.
Args:
analytics_api: The authenticated AnalyticsApi client.
query_body: The query definition.
Returns:
A list of aggregation result dictionaries.
"""
all_results = []
next_page_uri = None
max_retries = 3
retry_delay = 2 # seconds
while True:
attempt = 0
while attempt < max_retries:
try:
# The SDK method call
# Note: The SDK maps the body to the correct endpoint
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
page_uri=next_page_uri if next_page_uri else None
)
# Break the retry loop on success
break
except Exception as e:
attempt += 1
# Check for 429 Too Many Requests
if hasattr(e, 'status') and e.status == 429:
print(f"Rate limited (429). Retrying in {retry_delay * attempt} seconds...")
time.sleep(retry_delay * attempt)
else:
# Re-raise other exceptions
raise e
if attempt >= max_retries:
raise Exception("Max retries exceeded for analytics query.")
# Extract entities from the response
if response.entities:
all_results.extend(response.entities)
# Check for pagination
if response.next_page_uri:
next_page_uri = response.next_page_uri
else:
break
return all_results
Error Handling Note:
The SDK raises exceptions for HTTP errors. Always check the status code. A 400 error usually indicates a malformed query body (e.g., invalid ISO date or unsupported metric name). A 401 or 403 indicates authentication or scope issues.
Step 3: Process and Flatten the Results
The response from the Analytics API is hierarchical. Each entity contains an interval, groupBys, and metrics. The groupBys is a dictionary mapping the group key name to its value. The metrics is a dictionary mapping the metric name to its value.
You need to flatten this structure into a usable format, such as a list of dictionaries or a pandas DataFrame.
def flatten_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
"""
Flattens the hierarchical analytics response into a list of flat dictionaries.
Args:
results: The list of entity objects from the API response.
Returns:
A list of dictionaries with keys: interval, queueId, queueName, mediaType, metrics...
"""
flattened = []
for entity in results:
# Initialize the row
row = {
"interval": entity.interval,
"startTime": entity.start_time,
"endTime": entity.end_time
}
# Extract groupBys
if hasattr(entity, 'group_bys') and entity.group_bys:
for key, value in entity.group_bys.items():
if key == "queue":
row["queueId"] = value.id
row["queueName"] = value.name
elif key == "mediaType":
row["mediaType"] = value
else:
row[f"group_{key}"] = value
# Extract metrics
if hasattr(entity, 'metrics') and entity.metrics:
for metric_name, metric_value in entity.metrics.items():
# Handle complex metric objects (e.g., distribution, histogram)
# For simple aggregations, the value is usually a number or a simple object
if hasattr(metric_value, 'count'):
row[f"{metric_name}_count"] = metric_value.count
elif hasattr(metric_value, 'value'):
row[f"{metric_name}_value"] = metric_value.value
else:
# Simple numeric value
row[f"{metric_name}"] = metric_value
flattened.append(row)
return flattened
Complete Working Example
This script combines all steps into a runnable module. It fetches voice conversation metrics for the last 7 days, grouped by queue and media type.
import os
import sys
from datetime import datetime, timedelta
from dotenv import load_dotenv
from genesyscloud import Configuration, ApiClient, AnalyticsApi
# Import helper functions defined above
# In a real module, these would be imported from a separate file
# from analytics_utils import get_authenticated_analytics_client, build_query_body, fetch_analytics_data, flatten_analytics_results
def main():
# 1. Setup Authentication
try:
analytics_api = get_authenticated_analytics_client()
except Exception as e:
print(f"Authentication failed: {e}")
sys.exit(1)
# 2. Define Time Window (Last 7 Days)
end_time = datetime.utcnow().isoformat() + "Z"
start_time = (datetime.utcnow() - timedelta(days=7)).isoformat() + "Z"
# 3. Build Query
query_body = build_query_body(start_time, end_time)
print(f"Executing analytics query for {start_time} to {end_time}...")
# 4. Fetch Data
try:
raw_results = fetch_analytics_data(analytics_api, query_body)
except Exception as e:
print(f"Query execution failed: {e}")
sys.exit(1)
# 5. Process Results
if not raw_results:
print("No data found for the specified criteria.")
return
flat_data = flatten_analytics_results(raw_results)
# 6. Output Results
print(f"Retrieved {len(flat_data)} aggregation records.")
print("-" * 50)
# Print header
print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Answered':<10} | {'Abandoned':<10}")
print("-" * 50)
for row in flat_data:
queue_name = row.get("queueName", "Unknown")
media_type = row.get("mediaType", "Unknown")
answered = row.get("answered", 0)
abandoned = row.get("abandoned", 0)
print(f"{queue_name:<20} | {media_type:<10} | {answered:<10} | {abandoned:<10}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request - Invalid Interval Size
Cause: The interval.size parameter must be a valid ISO 8601 duration string. Common mistakes include using “1 day” instead of “P1D” or “24 hours” instead of “PT24H”.
Fix: Ensure the size string follows ISO 8601 duration format.
- Daily:
P1D - Hourly:
PT1H - Weekly:
P1W
Error: 403 Forbidden - Insufficient Scope
Cause: The OAuth token does not have the analytics:conversation:view scope.
Fix: Verify the OAuth Client in the Genesys Cloud Admin Console has the correct scope assigned. Regenerate the token.
Error: 429 Too Many Requests
Cause: Analytics queries are resource-intensive. The API limits the number of concurrent queries or the frequency of queries for a single client.
Fix: Implement exponential backoff. The code above includes a basic retry loop. For production systems, use a robust retry library like tenacity or backoff.
Error: Empty Results
Cause: No conversations match the filter criteria within the time window.
Fix:
- Verify the time window contains data.
- Check the
filterclause. If you filter bymediaType: voicebut the queue only handles chat, results will be empty. - Confirm the queues exist and were active during the period.