How to query agent utilization metrics (tHandle, tAcw, tHold) broken down by 30-minute intervals
What You Will Build
- A script that retrieves historical conversation details for a specific agent or group, filtering for conversations that occurred within a defined time window.
- This tutorial uses the Genesys Cloud CX Analytics Conversations API (
/api/v2/analytics/conversations/details/query) to fetch granular interaction data. - The implementation is provided in Python using the official Genesys Cloud Python SDK (
genesys-cloud-sdk).
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC) for testing. For production, use Client Credentials.
- Required OAuth Scopes:
analytics:conversation:view(Required to query conversation details)user:view(Optional, if you need to resolve user IDs from names)
- SDK Version:
genesys-cloud-sdkv1.0.0 or later. - Language/Runtime: Python 3.8+.
- External Dependencies:
genesys-cloud-sdk: The official SDK.pandas: For data manipulation and time-bucketing logic (optional but recommended for interval calculation).python-dotenv: To manage environment variables securely.
Install dependencies:
pip install genesys-cloud-sdk pandas python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. The Python SDK handles the token acquisition and refresh automatically when initialized with client credentials. You must provide your Client ID and Client Secret.
Create a .env file in your project root:
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_REGION=us-east-1
Initialize the SDK client:
import os
from dotenv import load_dotenv
from platform_sdk.client import PlatformClient
from platform_sdk.auth import OAuthClientCredentials
from platform_sdk.auth import OAuthResourceOwner
# Load environment variables
load_dotenv()
def get_platform_client() -> PlatformClient:
"""
Initializes and returns the Genesys Cloud Platform Client.
Uses Client Credentials flow for service-to-service authentication.
"""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")
# Configure the OAuth provider
oauth_provider = OAuthClientCredentials(
client_id=client_id,
client_secret=client_secret,
region=region
)
# Initialize the platform client
platform_client = PlatformClient(oauth_provider)
# Verify connection by fetching a simple resource (optional but good for debugging)
try:
platform_client.users.get_user_by_id("0") # This will fail but validates auth handshake
except Exception as e:
# We expect this to fail because user ID 0 doesn't exist,
# but if it fails with 401, auth is broken.
if "401" in str(e) or "403" in str(e):
print(f"Authentication successful (unauthorized access to invalid user is expected).")
else:
print(f"Connection test failed: {e}")
return platform_client
Implementation
Step 1: Construct the Analytics Query Payload
The core of this tutorial is the POST /api/v2/analytics/conversations/details/query endpoint. Unlike simple list endpoints, this accepts a complex JSON body that defines the time window, filter criteria, and specific metrics to return.
To get tHandle (Total Handle Time), tAcw (After Call Work), and tHold (Hold Time), you must request the conversations view with specific metrics.
Key Parameters:
view: Must beconversations.interval: Set toPT30Mfor 30-minute intervals. Note: The API aggregates data into these buckets. If you need precise per-conversation timestamps to calculate custom buckets, you must useview=conversationswithincludeMetrics=falseor rely on thetimestampfield in the result. However, for utilization metrics, the aggregated view is more efficient.filter: Useentitiesto scope down to specific agents or groups.metrics: SpecifytHandle,tAcw,tHold.
from platform_sdk.models import AnalyticsConversationDetailsQueryBody
from platform_sdk.models import AnalyticsConversationFilter
from platform_sdk.models import AnalyticsConversationEntityFilter
import datetime
def build_query_body(agent_ids: list[str], start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
"""
Constructs the payload for the analytics conversation details query.
Args:
agent_ids: List of user IDs (strings) to filter by.
start_time: Start of the reporting window (UTC).
end_time: End of the reporting window (UTC).
Returns:
dict: The JSON-compatible payload for the API call.
"""
# Define the filter for specific agents
filter_criteria = AnalyticsConversationFilter(
entities=[
AnalyticsConversationEntityFilter(
ids=agent_ids,
type="users" # Filter by User (Agent) IDs
)
]
)
# Define the query body
# Note: The SDK models may vary slightly by version.
# We construct the dict directly here to ensure compatibility with the raw API structure
# if the SDK model is incomplete for complex nested queries.
payload = {
"view": "conversations",
"interval": "PT30M", # 30-minute intervals
"dateFrom": start_time.isoformat(),
"dateTo": end_time.isoformat(),
"filter": {
"entities": [
{
"ids": agent_ids,
"type": "users"
}
]
},
"metrics": [
"tHandle",
"tAcw",
"tHold",
"count" # Total number of conversations in the bucket
],
"groupings": [] # No additional groupings; we are aggregating by time interval only
}
return payload
Step 2: Execute the Query and Handle Pagination
The Analytics API returns a maximum of 1000 intervals per request. If your date range is large (e.g., last 30 days with 30-minute intervals), you will exceed this limit. You must implement pagination using the nextPageUrl provided in the response.
import requests
import time
def fetch_conversation_analytics(platform_client: PlatformClient, payload: dict) -> list[dict]:
"""
Fetches analytics data, handling pagination automatically.
Args:
platform_client: The initialized PlatformClient.
payload: The query payload constructed in Step 1.
Returns:
list[dict]: A list of all interval buckets returned by the API.
"""
all_intervals = []
next_page_url = platform_client.base_url + "/api/v2/analytics/conversations/details/query"
page_count = 0
while next_page_url:
page_count += 1
print(f"Fetching page {page_count}...")
try:
# Get the OAuth token from the platform client
token = platform_client.oauth_provider.get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.post(next_page_url, json=payload, headers=headers)
# Handle HTTP Errors
if response.status_code == 429:
# Rate Limited. Wait and retry.
retry_after = int(response.headers.get("Retry-After", 5))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
elif response.status_code == 401:
raise Exception("Authentication failed. Token may be expired.")
elif response.status_code == 403:
raise Exception("Forbidden. Check OAuth scopes (analytics:conversation:view).")
elif response.status_code >= 400:
raise Exception(f"API Error {response.status_code}: {response.text}")
data = response.json()
# Append intervals from this page
if "intervals" in data and data["intervals"]:
all_intervals.extend(data["intervals"])
# Check for next page
next_page_url = data.get("nextPageUrl")
# Safety break to prevent infinite loops if API is buggy
if page_count > 100:
print("Safety break: Too many pages.")
break
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
time.sleep(5)
continue
return all_intervals
Step 3: Process Results and Calculate Utilization
The API returns data in buckets. Each bucket contains the sum of metrics for that 30-minute window. To calculate utilization, you need to compare the sum of tHandle, tAcw, and tHold against the total available time in the interval (30 minutes = 1800 seconds).
Note on Logic:
tHandle= Talk + Hold + Wrap.tAcw= After Call Work (often included in Handle depending on view, but explicitly requested here).tHold= Hold time.
If you request tHandle, tAcw, and tHold separately, be aware of overlaps. In Genesys Cloud:
tHandletypically includestTalkandtHold.tAcwis separate fromtHandlein some views, but in theconversationsview,tHandleis often the total duration from answer to wrap-up.- To avoid double counting, we will use
tHandleas the primary “busy” metric. If you need specific breakdowns, you can analyzetTalk,tHold, andtAcwindividually.
For this tutorial, we will calculate Agent Occupancy for each 30-minute bucket:
$$ \text{Occupancy} = \frac{\sum tHandle}{\text{Bucket Duration (seconds)}} $$
import pandas as pd
from datetime import datetime
def process_utilization_data(intervals: list[dict]) -> pd.DataFrame:
"""
Converts the raw API intervals into a DataFrame with calculated utilization metrics.
Args:
intervals: List of interval dictionaries from the API.
Returns:
pd.DataFrame: Processed data with occupancy percentages.
"""
if not intervals:
return pd.DataFrame()
# Flatten the intervals for easier processing
records = []
for interval in intervals:
# The interval key is the start time of the bucket (ISO format)
interval_start = interval.get("interval")
if not interval_start:
continue
# Extract metrics. The API returns sums for the bucket.
# Metrics are nested under 'metrics' or directly in the interval object
# depending on the view. For 'conversations' view with groupings=[],
# the metrics are usually at the top level of the interval object if no entities are grouped,
# OR inside an 'entities' array if grouped by user.
# In our query, we did NOT group by user, we filtered by user.
# Therefore, the result is aggregated across all selected users.
# If you want per-user breakdown, you must add "users" to "groupings" in Step 1.
# Let's assume we want per-user breakdown for accurate utilization.
# We will modify the query in Step 1 to include groupings.
pass
# REVISION: The previous query aggregates all agents into one bucket.
# To get per-agent utilization, we MUST group by user.
# Let's update the processing logic to handle grouped results.
# If the query included "groupings": ["users"], the response structure changes.
# It returns a list of entities (users), each with their own intervals.
# Since we didn't include groupings in Step 1, let's assume the user wants
# aggregate utilization for a TEAM, or we will adjust the code to support per-agent.
# For this tutorial, let's assume we want PER-AGENT metrics.
# We will re-read the intervals structure. If no groupings, it's one set of metrics.
# If groupings=["users"], it's a list of users.
# Let's stick to the simpler aggregate view for the code example below,
# but note that for individual agent utilization, you must group by user.
# Parsing the aggregate view (no groupings):
for interval in intervals:
timestamp = interval.get("interval")
metrics = interval.get("metrics", {})
# Extract values, defaulting to 0 if null
t_handle = metrics.get("tHandle", 0)
t_acw = metrics.get("tAcw", 0)
t_hold = metrics.get("tHold", 0)
count = metrics.get("count", 0)
# Convert handle time from seconds to minutes for readability
handle_minutes = t_handle / 60 if t_handle else 0
hold_minutes = t_hold / 60 if t_hold else 0
acw_minutes = t_acw / 60 if t_acw else 0
# Calculate Occupancy (Handle Time / 30 mins)
# 30 minutes = 1800 seconds
occupancy = (t_handle / 1800) * 100 if t_handle else 0
records.append({
"interval_start": timestamp,
"conversation_count": count,
"t_handle_seconds": t_handle,
"t_handle_minutes": handle_minutes,
"t_hold_seconds": t_hold,
"t_hold_minutes": hold_minutes,
"t_acw_seconds": t_acw,
"t_acw_minutes": acw_minutes,
"occupancy_percent": round(occupancy, 2)
})
df = pd.DataFrame(records)
return df
Revised Step 1: Query for Per-Agent Utilization
To get utilization for each agent, you must add "groupings": ["users"] to the payload. This changes the response structure significantly. The API returns a list of entities (users), and each user has their own intervals array.
Update build_query_body:
def build_query_body_grouped(agent_ids: list[str], start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
payload = {
"view": "conversations",
"interval": "PT30M",
"dateFrom": start_time.isoformat(),
"dateTo": end_time.isoformat(),
"filter": {
"entities": [
{
"ids": agent_ids,
"type": "users"
}
]
},
"metrics": [
"tHandle",
"tAcw",
"tHold",
"count"
],
"groupings": ["users"] # Crucial for per-agent breakdown
}
return payload
Revised Step 3: Process Grouped Results
def process_grouped_utilization_data(intervals: list[dict]) -> pd.DataFrame:
"""
Processes intervals where data is grouped by user.
"""
records = []
# The top-level 'intervals' in the response contains the aggregated time buckets.
# Inside each bucket, there is an 'entities' array containing the users.
for interval in intervals:
interval_start = interval.get("interval")
entities = interval.get("entities", [])
for entity in entities:
user_id = entity.get("id")
user_name = entity.get("name")
metrics = entity.get("metrics", {})
t_handle = metrics.get("tHandle", 0)
t_acw = metrics.get("tAcw", 0)
t_hold = metrics.get("tHold", 0)
count = metrics.get("count", 0)
# Calculate Occupancy
occupancy = (t_handle / 1800) * 100 if t_handle else 0
records.append({
"interval_start": interval_start,
"user_id": user_id,
"user_name": user_name,
"conversation_count": count,
"t_handle_seconds": t_handle,
"t_handle_minutes": round(t_handle / 60, 2),
"t_hold_seconds": t_hold,
"t_hold_minutes": round(t_hold / 60, 2),
"t_acw_seconds": t_acw,
"t_acw_minutes": round(t_acw / 60, 2),
"occupancy_percent": round(occupancy, 2)
})
df = pd.DataFrame(records)
return df
Complete Working Example
This script ties everything together. It authenticates, queries the last 24 hours for a specific agent, and prints a summary of their 30-minute utilization buckets.
import os
import datetime
from dotenv import load_dotenv
from platform_sdk.client import PlatformClient
from platform_sdk.auth import OAuthClientCredentials
import requests
import time
import pandas as pd
# --- Authentication ---
def get_platform_client() -> PlatformClient:
load_dotenv()
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
oauth_provider = OAuthClientCredentials(client_id=client_id, client_secret=client_secret, region=region)
return PlatformClient(oauth_provider)
# --- Query Construction ---
def build_query_body(agent_id: str, start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
return {
"view": "conversations",
"interval": "PT30M",
"dateFrom": start_time.isoformat(),
"dateTo": end_time.isoformat(),
"filter": {
"entities": [{"ids": [agent_id], "type": "users"}]
},
"metrics": ["tHandle", "tAcw", "tHold", "count"],
"groupings": ["users"]
}
# --- Data Fetching ---
def fetch_analytics(platform_client: PlatformClient, payload: dict) -> list[dict]:
all_intervals = []
next_page_url = platform_client.base_url + "/api/v2/analytics/conversations/details/query"
while next_page_url:
token = platform_client.oauth_provider.get_access_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
response = requests.post(next_page_url, json=payload, headers=headers)
if response.status_code == 429:
time.sleep(int(response.headers.get("Retry-After", 5)))
continue
elif response.status_code != 200:
raise Exception(f"API Error {response.status_code}: {response.text}")
data = response.json()
if data.get("intervals"):
all_intervals.extend(data["intervals"])
next_page_url = data.get("nextPageUrl")
return all_intervals
# --- Processing ---
def process_data(intervals: list[dict]) -> pd.DataFrame:
records = []
for interval in intervals:
interval_start = interval.get("interval")
for entity in interval.get("entities", []):
metrics = entity.get("metrics", {})
t_handle = metrics.get("tHandle", 0) or 0
t_hold = metrics.get("tHold", 0) or 0
t_acw = metrics.get("tAcw", 0) or 0
records.append({
"Time": interval_start,
"Agent": entity.get("name"),
"Handle (min)": round(t_handle / 60, 2),
"Hold (min)": round(t_hold / 60, 2),
"ACW (min)": round(t_acw / 60, 2),
"Occupancy %": round((t_handle / 1800) * 100, 2)
})
return pd.DataFrame(records)
# --- Main Execution ---
if __name__ == "__main__":
# Configuration
AGENT_ID = "YOUR_AGENT_ID_HERE" # Replace with a valid User ID
END_TIME = datetime.datetime.utcnow()
START_TIME = END_TIME - datetime.timedelta(hours=24) # Last 24 hours
if AGENT_ID == "YOUR_AGENT_ID_HERE":
print("Please set AGENT_ID in the script.")
exit()
try:
client = get_platform_client()
print("Authenticating...")
payload = build_query_body(AGENT_ID, START_TIME, END_TIME)
print(f"Querying analytics for {AGENT_ID} from {START_TIME} to {END_TIME}...")
intervals = fetch_analytics(client, payload)
print(f"Fetched {len(intervals)} time buckets.")
df = process_data(intervals)
if not df.empty:
print("\n--- Agent Utilization Report (30-min Intervals) ---")
print(df.to_string(index=False))
# Optional: Save to CSV
df.to_csv("agent_utilization_report.csv", index=False)
print("\nReport saved to agent_utilization_report.csv")
else:
print("No data found for the specified agent and time range.")
except Exception as e:
print(f"Error: {e}")
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Invalid Client ID/Secret, or the OAuth token has expired.
- Fix: Ensure your
.envfile contains correct credentials. The SDK handles refresh, but if you are using a raw HTTP request outside the SDK, ensure you are fetching a fresh token.
Error: 403 Forbidden
- Cause: The OAuth client lacks the
analytics:conversation:viewscope. - Fix: Go to Genesys Cloud Admin > Security > OAuth Clients. Edit your client and add the
analytics:conversation:viewscope. Save and regenerate the client secret if necessary.
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the Analytics API.
- Fix: Implement exponential backoff. The code example above checks the
Retry-Afterheader and sleeps.
Error: Empty Response or Null Metrics
- Cause: The time range does not contain any conversations for the specified agent, or the agent ID is invalid.
- Fix: Verify the
AGENT_IDis a valid User ID (not a name). Check thedateFromanddateToformat (ISO 8601 UTC). Ensure the agent actually had conversations in that window.
Error: Metrics Not Summing Correctly
- Cause: Confusion between
tHandleandtTalk. - Fix:
tHandleincludes Talk, Hold, and Wrap. If you sumtTalk+tHold+tAcw, you may get a different number thantHandledepending on how the system defines “Wrap”. For utilization,tHandleis the standard metric for “busy” time.