Constructing an Analytics API Aggregation Query by Queue and Media Type
What You Will Build
- You will build a script that queries the Genesys Cloud Analytics API to retrieve conversation metrics grouped by queue and media type.
- This uses the
/api/v2/analytics/conversations/details/queryendpoint via the Python SDK. - The implementation covers Python with the
genesyscloudSDK.
Prerequisites
- OAuth Client Type: Confidential client (Client Credentials Grant) or Public client with PKCE. For server-side scripts, Client Credentials is standard.
- Required Scopes:
analytics:conversation:viewis mandatory. If you need user-specific details, adduser:view. - SDK Version: Genesys Cloud Python SDK version 153.0.0 or higher.
- Language/Runtime: Python 3.8+.
- External Dependencies:
genesys-cloud,python-dotenvfor credential management.
Install the dependencies:
pip install genesys-cloud python-dotenv
Authentication Setup
The Genesys Cloud SDK handles the OAuth token lifecycle automatically when initialized with client credentials. You must set the region parameter correctly. For the US East region (the default), the hostname is api.mypurecloud.com. For EU, use api.eu.purecloud.com.
Create a .env file in your project root:
GENESYS_CLOUD_REGION=us_east
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
Initialize the SDK in your script:
import os
from dotenv import load_dotenv
from purecloud_platform_client_v2 import PlatformClient
load_dotenv()
def get_platform_client() -> PlatformClient:
"""
Initializes and returns a configured Genesys Cloud PlatformClient.
"""
client = PlatformClient()
# Configure OAuth
client.set_oauth_client_credentials(
os.getenv("GENESYS_CLOUD_CLIENT_ID"),
os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
)
# Set region explicitly to avoid defaulting errors if env var is missing
region = os.getenv("GENESYS_CLOUD_REGION", "us_east")
if region == "eu":
client.set_base_url("https://api.eu.purecloud.com")
elif region == "ap":
client.set_base_url("https://api.ap.purecloud.com")
# us_east defaults to https://api.mypurecloud.com
return client
client = get_platform_client()
analytics_api = client.analytics_api
Implementation
Step 1: Define the Date Range and Query Parameters
The Analytics API requires a strict ISO 8601 date range. The to date must be in the future relative to the from date. For aggregated data, you often want to pull data from “yesterday” to avoid partial day data, or use a fixed historical window.
We also need to define the groupBy fields. The prompt requires grouping by queue and mediaType.
from datetime import datetime, timedelta
from purecloud_platform_client_v2.model import AnalyticsQuery
def build_analytics_query() -> AnalyticsQuery:
"""
Constructs the AnalyticsQuery object with specific grouping and filtering.
"""
# Define date range: Last 7 days up to now
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
# Format to ISO 8601 with timezone offset (UTC)
from_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# Initialize the query object
query = AnalyticsQuery(
date_from=from_str,
date_to=to_str,
group_by=["queue", "mediaType"],
# Select specific metrics to reduce payload size and improve performance
select=[
"queue/name",
"queue/id",
"mediaType",
"conversationCount",
"wrapUpTime",
"talkTime",
"holdTime"
]
)
return query
Key Parameter Explanation:
group_by: This is the core of the aggregation. By includingqueueandmediaType, the API returns one row for every unique combination of Queue ID and Media Type (e.g., “Sales Queue” + “voice”, “Sales Queue” + “chat”).select: You cannot select*. You must list specific metrics.queue/namepulls the human-readable name.conversationCountis a standard metric.talkTimeandholdTimeare useful for calculating efficiency.date_from/date_to: Must be inclusive. The API calculates metrics for conversations that occurred within this window.
Step 2: Execute the Query and Handle Pagination
The Analytics API returns paginated results. The default page size is often small (e.g., 50-100 records). To get all data, you must iterate through pages using the nextPage token.
from purecloud_platform_client_v2.rest import ApiException
from typing import List, Dict, Any
def fetch_analytics_data(client: PlatformClient, query: AnalyticsQuery) -> List[Dict[str, Any]]:
"""
Executes the analytics query and handles pagination.
Returns a flat list of aggregated data rows.
"""
all_results = []
next_page_token = None
page_count = 0
# Maximum pages to prevent infinite loops in case of API errors
max_pages = 50
while page_count < max_pages:
try:
# The API call: post_analytics_conversations_details_query
# Parameters:
# - body: The AnalyticsQuery object
# - size: Records per page (max 1000 for details, but aggregation limits may vary)
# - next_page: Token from previous response
response = analytics_api.post_analytics_conversations_details_query(
body=query,
size=100,
next_page=next_page_token
)
# Check if response has entities
if response.entities:
all_results.extend(response.entities)
print(f"Fetched page {page_count + 1} with {len(response.entities)} records.")
# Check for next page
if response.next_page:
next_page_token = response.next_page
page_count += 1
else:
# No more pages
break
except ApiException as e:
print(f"API Exception: {e.status} {e.reason}")
print(f"Body: {e.body}")
if e.status == 429:
print("Rate limited. Waiting 5 seconds before retry...")
import time
time.sleep(5)
continue
elif e.status == 400:
raise ValueError(f"Bad Request. Check date format and group_by fields. {e.body}")
else:
raise e
except Exception as e:
print(f"Unexpected error: {e}")
raise e
return all_results
Error Handling Notes:
- 400 Bad Request: Often caused by invalid
date_from/date_toformats or requesting metrics that do not exist for the specifiedgroup_byfields. For example, you cannot group byuserand selectqueue/namein a single aggregation without flattening logic, butqueueandmediaTypeis a safe combination. - 401 Unauthorized: Ensure the OAuth token is valid and the client has the
analytics:conversation:viewscope. - 429 Too Many Requests: The Analytics API has strict rate limits (typically 10 requests per second for detail queries). If you hit this, implement exponential backoff. The example above uses a simple 5-second sleep for brevity.
Step 3: Process and Flatten the Results
The response structure from Genesys Cloud Analytics is hierarchical. When you group by queue and mediaType, the entities list contains objects where the grouping keys are nested.
A typical entity looks like this:
{
"queue": {
"id": "12345",
"name": "Support Queue"
},
"mediaType": "voice",
"conversationCount": 150,
"talkTime": "PT1200S",
"wrapUpTime": "PT300S"
}
You need to flatten this for consumption by a spreadsheet or database.
import pandas as pd
from datetime import timedelta
def process_results(results: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Flattens the hierarchical API response into a flat DataFrame.
Converts ISO duration strings (PT...S) to seconds.
"""
if not results:
return pd.DataFrame()
flat_data = []
for item in results:
# Extract nested queue info safely
queue_info = item.get("queue", {})
queue_id = queue_info.get("id", "Unknown")
queue_name = queue_info.get("name", "Unknown")
# Extract media type
media_type = item.get("mediaType", "Unknown")
# Extract metrics
conversation_count = item.get("conversationCount", 0)
# Parse ISO 8601 Duration strings (e.g., "PT1200S")
talk_time_sec = parse_duration_to_seconds(item.get("talkTime", "PT0S"))
hold_time_sec = parse_duration_to_seconds(item.get("holdTime", "PT0S"))
wrap_up_time_sec = parse_duration_to_seconds(item.get("wrapUpTime", "PT0S"))
flat_data.append({
"Queue ID": queue_id,
"Queue Name": queue_name,
"Media Type": media_type,
"Conversation Count": conversation_count,
"Talk Time (seconds)": talk_time_sec,
"Hold Time (seconds)": hold_time_sec,
"Wrap Up Time (seconds)": wrap_up_time_sec
})
df = pd.DataFrame(flat_data)
# Optional: Calculate average talk time per conversation
if not df.empty:
df["Avg Talk Time (sec)"] = df.apply(
lambda row: row["Talk Time (seconds)"] / row["Conversation Count"] if row["Conversation Count"] > 0 else 0,
axis=1
)
return df
def parse_duration_to_seconds(duration_str: str) -> float:
"""
Converts ISO 8601 duration string (e.g., PT1H30M20S) to total seconds.
"""
if not duration_str or duration_str == "PT0S":
return 0.0
try:
# Simple regex parser for common Genesys duration formats
import re
# Pattern matches PT[H]h[M]m[S]s
match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration_str)
if match:
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
return (hours * 3600) + (minutes * 60) + seconds
else:
return 0.0
except Exception:
return 0.0
Complete Working Example
This script combines all steps into a single executable module. Save this as analytics_queue_media_query.py.
import os
import sys
import pandas as pd
import re
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Import Genesys Cloud SDK
from purecloud_platform_client_v2 import PlatformClient
from purecloud_platform_client_v2.model import AnalyticsQuery
from purecloud_platform_client_v2.rest import ApiException
def load_env():
load_dotenv()
if not all([os.getenv("GENESYS_CLOUD_CLIENT_ID"), os.getenv("GENESYS_CLOUD_CLIENT_SECRET")]):
raise EnvironmentError("Missing GENESYS_CLOUD_CLIENT_ID or GENESYS_CLOUD_CLIENT_SECRET in .env")
def get_platform_client() -> PlatformClient:
client = PlatformClient()
client.set_oauth_client_credentials(
os.getenv("GENESYS_CLOUD_CLIENT_ID"),
os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
)
region = os.getenv("GENESYS_CLOUD_REGION", "us_east")
if region == "eu":
client.set_base_url("https://api.eu.purecloud.com")
elif region == "ap":
client.set_base_url("https://api.ap.purecloud.com")
return client
def build_analytics_query() -> AnalyticsQuery:
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
from_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
return AnalyticsQuery(
date_from=from_str,
date_to=to_str,
group_by=["queue", "mediaType"],
select=[
"queue/name",
"queue/id",
"mediaType",
"conversationCount",
"talkTime",
"holdTime",
"wrapUpTime"
]
)
def parse_duration_to_seconds(duration_str: str) -> float:
if not duration_str or duration_str == "PT0S":
return 0.0
try:
match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration_str)
if match:
hours = int(match.group(1)) if match.group(1) else 0
minutes = int(match.group(2)) if match.group(2) else 0
seconds = int(match.group(3)) if match.group(3) else 0
return (hours * 3600) + (minutes * 60) + seconds
except Exception:
pass
return 0.0
def fetch_and_process_analytics():
load_env()
client = get_platform_client()
analytics_api = client.analytics_api
query = build_analytics_query()
all_results = []
next_page_token = None
page_count = 0
max_pages = 50
print("Starting Analytics Query...")
while page_count < max_pages:
try:
response = analytics_api.post_analytics_conversations_details_query(
body=query,
size=100,
next_page=next_page_token
)
if response.entities:
all_results.extend(response.entities)
print(f"Page {page_count + 1}: Retrieved {len(response.entities)} records.")
if response.next_page:
next_page_token = response.next_page
page_count += 1
else:
break
except ApiException as e:
print(f"API Error {e.status}: {e.reason}")
if e.status == 429:
import time
print("Rate limited. Retrying in 5s...")
time.sleep(5)
continue
raise e
if not all_results:
print("No data returned.")
return
# Flatten Data
flat_data = []
for item in all_results:
queue_info = item.get("queue", {})
flat_data.append({
"Queue ID": queue_info.get("id", "Unknown"),
"Queue Name": queue_info.get("name", "Unknown"),
"Media Type": item.get("mediaType", "Unknown"),
"Conversation Count": item.get("conversationCount", 0),
"Talk Time (sec)": parse_duration_to_seconds(item.get("talkTime", "PT0S")),
"Hold Time (sec)": parse_duration_to_seconds(item.get("holdTime", "PT0S")),
"Wrap Up Time (sec)": parse_duration_to_seconds(item.get("wrapUpTime", "PT0S"))
})
df = pd.DataFrame(flat_data)
# Calculate Averages
if not df.empty:
df["Avg Talk Time (sec)"] = df.apply(
lambda row: row["Talk Time (sec)"] / row["Conversation Count"] if row["Conversation Count"] > 0 else 0,
axis=1
)
# Sort by Queue Name and Media Type
df = df.sort_values(by=["Queue Name", "Media Type"])
# Export to CSV
output_filename = "analytics_queue_media_report.csv"
df.to_csv(output_filename, index=False)
print(f"Report saved to {output_filename}")
print(df.head(10))
else:
print("No valid records to process.")
if __name__ == "__main__":
try:
fetch_and_process_analytics()
except Exception as e:
print(f"Fatal Error: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 400 Bad Request - “Invalid group by field”
Cause: The group_by field specified does not exist for the selected metrics or the query type. For example, trying to group by user and queue simultaneously in a single aggregation query can sometimes fail if the metrics requested are not compatible with that level of granularity.
Fix: Verify the group_by fields against the Genesys Cloud Analytics API documentation. Ensure that queue and mediaType are valid. If you need user-level data, you may need to run a separate query grouped by user and then join the data in your application logic.
Error: 401 Unauthorized - “Insufficient permissions”
Cause: The OAuth token does not have the analytics:conversation:view scope.
Fix: Go to the Genesys Cloud Admin Console → Platform → OAuth. Edit your client application. Add analytics:conversation:view to the Scopes list. Save the changes. Note: Existing tokens may need to be refreshed.
Error: 429 Too Many Requests
Cause: You are exceeding the rate limit for the Analytics API. The limit is typically 10 requests per second for detail queries.
Fix: Implement exponential backoff. In the provided code, a simple 5-second sleep is used. For production systems, use a library like tenacity or backoff to handle retries gracefully.
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5))
def safe_api_call():
# API call here
pass
Error: Empty Result Set
Cause: The date range is too narrow, or there is no data for the specific queues/media types in that window.
Fix: Expand the date_from window. Ensure that the queues selected (if you filter by queue ID) actually had conversations in that period. Check the select list to ensure you are requesting metrics that exist for mediaType (e.g., talkTime exists for voice, but not for SMS).