Querying Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals
What You Will Build
- This tutorial demonstrates how to retrieve granular agent utilization metrics, specifically handle time, after-call work, and hold time, segmented into 30-minute intervals.
- The solution utilizes the Genesys Cloud CX Analytics Conversations API (
/api/v2/analytics/conversations/details/query). - The implementation is provided in Python using the
genesys-cloud-sdk-pythonpackage.
Prerequisites
- OAuth Client Type: Service Account or Confidential Client.
- Required Scopes:
analytics:conversation:view(Primary requirement for querying conversation data).conversation:view(Optional, but often required if you need to resolve user IDs to names in post-processing).
- SDK Version:
genesys-cloud-sdk-pythonv10.0.0 or higher. - Language/Runtime: Python 3.8+.
- External Dependencies:
genesys-cloud-sdk-python: The official Genesys Cloud SDK.python-dotenv: For secure credential management.
Install the dependencies using pip:
pip install genesys-cloud-sdk-python python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials Flow is the standard approach. The SDK handles token acquisition and refresh automatically when initialized correctly.
Create a .env file in your project root to store your credentials securely. Never hardcode these values.
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_REGION=us-east-1
Initialize the SDK client in your script. The PlatformClient acts as the facade for all API calls.
from genesyscloud.platform.client import PlatformClient
from genesyscloud import configuration
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
def get_platform_client():
"""
Initializes and returns the Genesys Cloud Platform Client.
"""
config = configuration.Configuration()
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
# Set the region explicitly. Default is us-east-1.
# Change this if your organization is in a different region.
if os.getenv("GENESYS_REGION"):
config.base_path = f"https://{os.getenv('GENESYS_REGION')}.mypurecloud.com"
return PlatformClient(config)
Implementation
Step 1: Constructing the Analytics Query Body
The core of this tutorial is the query payload sent to the Analytics API. To get metrics broken down by 30-minute intervals, we must use the interval parameter within the groupBy array and specify the timeUnit and intervalSize.
The critical fields for utilization are:
tHandle: Total handle time (Talk + Hold + Wrap-up).tAcw: After-call work time.tHold: Hold time.
We will group by user to get per-agent data and by interval to get the time segmentation.
from genesyscloud.analytics.rest import AnalyticsApi
def build_query_body(start_time: str, end_time: str, user_ids: list = None) -> dict:
"""
Constructs the JSON body for the analytics query.
Args:
start_time: ISO 8601 start time (e.g., '2023-10-01T00:00:00Z')
end_time: ISO 8601 end time (e.g., '2023-10-01T23:59:59Z')
user_ids: Optional list of user IDs to filter by. If None, queries all users.
Returns:
dict: The query payload.
"""
# Define the metrics we want to aggregate
metrics = [
"tHandle",
"tAcw",
"tHold"
]
# Define the group-by clauses
group_by = [
"user",
"interval"
]
# Configure the time interval
# This is the key to getting 30-minute buckets
interval_config = {
"timeUnit": "minute",
"intervalSize": 30
}
query_body = {
"dateFrom": start_time,
"dateTo": end_time,
"metrics": metrics,
"groupBy": group_by,
"interval": interval_config
}
# Optional: Filter by specific agents
if user_ids:
query_body["filter"] = {
"type": "and",
"filters": [
{
"type": "field",
"field": "userId",
"op": "in",
"values": user_ids
}
]
}
return query_body
Why this structure?
The interval object inside the query body tells the analytics engine to bucket the raw conversation data into 30-minute windows starting from dateFrom. Without this, the API returns a single aggregated total for the entire dateFrom to dateTo range.
Step 2: Executing the Query and Handling Pagination
The Analytics API returns paginated results. The nextPageUri field indicates if more data is available. You must loop through pages until nextPageUri is null.
Additionally, you must handle HTTP errors. Common errors include:
401 Unauthorized: Invalid OAuth token.403 Forbidden: Missinganalytics:conversation:viewscope.400 Bad Request: Invalid date format or query structure.429 Too Many Requests: Rate limiting. The SDK handles basic retries, but you should implement exponential backoff for robustness.
import time
from genesyscloud.analytics.rest import ApiException
def fetch_utilization_data(client: PlatformClient, query_body: dict) -> list:
"""
Fetches analytics data, handling pagination and errors.
Args:
client: The initialized PlatformClient.
query_body: The dictionary payload for the query.
Returns:
list: A list of all result objects from all pages.
"""
analytics_api = AnalyticsApi(client)
all_results = []
try:
# Initial request
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
# Process the first page
if response.entities:
all_results.extend(response.entities)
# Pagination loop
while response.next_page_uri:
print(f"Fetching next page: {response.next_page_uri}")
# The SDK helper method to follow the next page URI
# Note: In newer SDK versions, you might need to parse the URI and call
# get_analytics_conversations_details_query_result directly if
# post_analytics... doesn't have a built-in follow_paginator.
# Here we use the standard approach for the Python SDK:
try:
# The Python SDK often returns a response object with a method to follow pages
# However, for precise control, we often reconstruct the GET request if the POST
# returns a temporary URI.
# Check if the response has a next_page_uri that is a full URL
if response.next_page_uri.startswith("http"):
# Use the API client's get method for the result
# Extract the ID from the URI if necessary, or use the SDK's convenience method
# For simplicity in this tutorial, we assume the SDK handles the redirect
# or we use the explicit GET endpoint.
# Actual SDK pattern for following pages in Python:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
# Some SDK versions allow passing the page token directly
# If not, we rely on the response object's built-in iterator if available
# or manual GET.
# Below is the manual GET approach which is more robust across SDK versions
)
# Let's use the explicit GET method for the result if the POST returns a URI
# This is the most reliable pattern for complex analytics queries
pass
except Exception as e:
print(f"Error fetching next page: {e}")
break
return all_results
except ApiException as e:
print(f"API Exception: {e.status} {e.reason}")
if e.body:
print(f"Error Body: {e.body}")
raise
Correction on Pagination Strategy:
The Genesys Cloud Python SDK for Analytics post_analytics_conversations_details_query returns a PostAnalyticsConversationsDetailsQueryResponse. If the dataset is large, it returns a nextPageUri. You must use get_analytics_conversations_details_query_result with the ID extracted from that URI, or use the SDK’s paginator if available.
Here is the robust implementation using the get method for subsequent pages:
import re
def fetch_all_pages(client: PlatformClient, query_body: dict) -> list:
"""
Robust pagination handler for Analytics API.
"""
analytics_api = AnalyticsApi(client)
all_results = []
try:
# 1. Post the query to create the temporary result set
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
current_response = response
while True:
# 2. Collect entities from the current page
if current_response.entities:
all_results.extend(current_response.entities)
# 3. Check for next page
if not current_response.next_page_uri:
break
# 4. Extract the query result ID from the next_page_uri
# URI format: /api/v2/analytics/conversations/details/query/{id}/result
match = re.search(r"/api/v2/analytics/conversations/details/query/([^/]+)/result", current_response.next_page_uri)
if not match:
print("Could not parse next page URI.")
break
query_result_id = match.group(1)
# 5. Fetch the next page using GET
print(f"Fetching page with ID: {query_result_id}")
current_response = analytics_api.get_analytics_conversations_details_query_result(query_result_id)
# 6. Respect rate limits (sleep briefly if needed, though SDK usually handles this)
time.sleep(0.5)
except ApiException as e:
print(f"Failed to fetch analytics data: {e.status} - {e.reason}")
raise
return all_results
Step 3: Processing and Formatting Results
The raw response contains a list of entity objects. Each entity represents a unique combination of user and interval. You need to map the user IDs to names (optional but recommended) and format the metrics into a readable structure.
Note: The metrics are returned in seconds. You should convert them to minutes or hours for reporting.
from typing import Dict, List
from datetime import datetime
def process_results(results: list, user_map: Dict[str, str] = None) -> List[dict]:
"""
Processes raw analytics entities into a clean list of utilization records.
Args:
results: List of entity objects from the API.
user_map: Optional dictionary mapping userId to userName.
Returns:
List[dict]: Cleaned data records.
"""
processed_data = []
for entity in results:
# Extract metrics (in seconds)
t_handle_sec = entity.metrics.get("tHandle", {}).get("value", 0)
t_acw_sec = entity.metrics.get("tAcw", {}).get("value", 0)
t_hold_sec = entity.metrics.get("tHold", {}).get("value", 0)
# Extract interval start time
interval_start = entity.interval.get("timeFrom", "")
# Extract User ID
user_id = entity.user.get("id", "")
user_name = user_id
if user_map and user_id in user_map:
user_name = user_map[user_id]
# Convert seconds to minutes for readability
record = {
"user_id": user_id,
"user_name": user_name,
"interval_start": interval_start,
"t_handle_minutes": round(t_handle_sec / 60, 2),
"t_acw_minutes": round(t_acw_sec / 60, 2),
"t_hold_minutes": round(t_hold_sec / 60, 2)
}
processed_data.append(record)
return processed_data
def get_user_map(client: PlatformClient, user_ids: list) -> Dict[str, str]:
"""
Fetches user names for a list of user IDs.
"""
users_api = client.users
user_map = {}
for user_id in user_ids:
try:
user = users_api.get_user(user_id, expand=["name"])
user_map[user_id] = user.name
except Exception:
user_map[user_id] = user_id # Fallback to ID if fetch fails
return user_map
Complete Working Example
This script combines all steps into a single executable module. It queries the last 24 hours of data for all agents, groups them by 30-minute intervals, and prints the results.
import os
import sys
import time
from datetime import datetime, timedelta
from genesyscloud.platform.client import PlatformClient
from genesyscloud import configuration
from genesyscloud.analytics.rest import AnalyticsApi, ApiException
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def init_client():
config = configuration.Configuration()
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
config.base_path = f"https://{region}.mypurecloud.com"
return PlatformClient(config)
def get_query_body():
# Define time range: Last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
# Format as ISO 8601 with Z suffix for UTC
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
query_body = {
"dateFrom": start_str,
"dateTo": end_str,
"metrics": ["tHandle", "tAcw", "tHold"],
"groupBy": ["user", "interval"],
"interval": {
"timeUnit": "minute",
"intervalSize": 30
}
}
return query_body
def main():
print("Initializing Genesys Cloud Client...")
client = init_client()
print("Building Analytics Query...")
query_body = get_query_body()
print("Fetching Data (this may take a moment)...")
analytics_api = AnalyticsApi(client)
try:
# Step 1: Post Query
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
all_entities = []
current_response = response
# Step 2: Handle Pagination
while True:
if current_response.entities:
all_entities.extend(current_response.entities)
if not current_response.next_page_uri:
break
# Extract ID from next_page_uri
import re
match = re.search(r"/api/v2/analytics/conversations/details/query/([^/]+)/result", current_response.next_page_uri)
if not match:
break
query_id = match.group(1)
print(f"Fetching next page ID: {query_id}")
# Small delay to avoid rate limiting
time.sleep(0.5)
current_response = analytics_api.get_analytics_conversations_details_query_result(query_id)
# Step 3: Process Data
if not all_entities:
print("No conversation data found for the selected time period.")
return
print(f"\nRetrieved {len(all_entities)} data points.")
print("User ID | Interval Start | Handle (min) | ACW (min) | Hold (min)")
print("-" * 70)
for entity in all_entities:
user_id = entity.user.get("id", "Unknown")
interval_start = entity.interval.get("timeFrom", "Unknown")
# Get metric values safely
t_handle = entity.metrics.get("tHandle", {}).get("value", 0)
t_acw = entity.metrics.get("tAcw", {}).get("value", 0)
t_hold = entity.metrics.get("tHold", {}).get("value", 0)
# Convert to minutes
h_min = round(t_handle / 60, 2)
a_min = round(t_acw / 60, 2)
ho_min = round(t_hold / 60, 2)
print(f"{user_id[:10]:<10} | {interval_start:<20} | {h_min:<12} | {a_min:<10} | {ho_min}")
except ApiException as e:
print(f"API Error: {e.status} {e.reason}")
print(f"Response Body: {e.body}")
sys.exit(1)
except Exception as e:
print(f"Unexpected Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request - “Invalid interval configuration”
Cause: The intervalSize does not align with the timeUnit or the total query duration is too large for the engine to process in one go.
Fix: Ensure intervalSize is a positive integer. For 30-minute intervals, timeUnit must be minute and intervalSize must be 30. If querying a very large date range (e.g., 1 year), break it down into smaller chunks (e.g., monthly queries).
# Correct Configuration
"interval": {
"timeUnit": "minute",
"intervalSize": 30
}
Error: 403 Forbidden - “Insufficient permissions”
Cause: The OAuth token lacks the analytics:conversation:view scope.
Fix: Check your Service Account or Client Credentials configuration in the Genesys Cloud Admin Console. Navigate to Admin > Security > OAuth Clients (or Service Accounts) and ensure the scope is checked. Regenerate the token after updating the scope.
Error: 429 Too Many Requests
Cause: You are hitting the rate limit for the Analytics API. Analytics queries are computationally expensive.
Fix: Implement exponential backoff. The provided code includes a time.sleep(0.5) between pages. For production systems, monitor the Retry-After header in 429 responses and delay accordingly.
import time
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
Error: Empty Results for Specific Users
Cause: The user did not have any conversations in the specified time range, or the user ID is incorrect.
Fix: Verify the user ID is valid. Check if the user was active in the system during the dateFrom to dateTo window. Analytics only returns data for users who participated in conversations.