Query Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals
What You Will Build
- You will build a Python script that queries Genesys Cloud CX for granular agent interaction metrics, specifically time in handle, after-call work, and hold.
- You will use the
GET /api/v2/analytics/conversations/details/queryendpoint with a 30-minute time bucket configuration. - You will use the Python
requestslibrary with manual OAuth2 token management to retrieve and parse the JSON response.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant). Public clients cannot access the Analytics API.
- Required OAuth Scopes:
analytics:conversations:viewanalytics:agents:view
- SDK/API Version: Genesys Cloud CX API v2.
- Language/Runtime: Python 3.8+.
- External Dependencies:
requests(standard HTTP library)python-dotenv(for secure credential management)
Install dependencies via pip:
pip install requests python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. For server-to-server integrations, the Client Credentials Grant is the standard. You must exchange your client ID and secret for an access token. This token expires after 3600 seconds (1 hour).
Create a .env file in your project root:
GENESYS_REGION=us-east-1
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
The following Python class handles token acquisition and basic caching. It avoids the overhead of importing the full Genesys SDK for this specific analytics query, giving you precise control over the HTTP payload.
import os
import time
import requests
from dotenv import load_dotenv
load_dotenv()
class GenesysAuth:
def __init__(self):
self.region = os.getenv("GENESYS_REGION", "my.genesys.cloud")
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not self.client_id or not self.client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
# Determine the base URL based on region
if self.region == "us-east-1":
self.base_url = "https://api.mypurecloud.com"
elif self.region == "eu-west-1":
self.base_url = "https://api.eu.pure.cloud"
else:
self.base_url = f"https://api.{self.region}.pure.cloud"
self.access_token = None
self.token_expiry = 0
def get_access_token(self) -> str:
"""
Returns a valid access token. If the current token is expired or missing,
it fetches a new one from the OAuth endpoint.
"""
# Check if token is still valid (add 60s buffer for clock skew)
if self.access_token and time.time() < (self.token_expiry - 60):
return self.access_token
token_url = f"{self.base_url}/oauth/token"
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "analytics:conversations:view analytics:agents:view"
}
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("OAuth 401: Invalid client credentials.") from e
raise Exception(f"OAuth Error {response.status_code}: {response.text}") from e
def get_headers(self) -> dict:
"""Returns headers required for API calls."""
token = self.get_access_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
Implementation
Step 1: Constructing the Analytics Query
The core of this tutorial is the query body sent to /api/v2/analytics/conversations/details/query. To get 30-minute intervals, you must set the timeBucket parameter to PT30M (ISO 8601 duration format).
You must also specify the groupBy field as agent to break down metrics by individual user. If you omit this, you get aggregate data for the entire organization, which is rarely useful for utilization analysis.
Define the query parameters. You need to specify a dateFrom and dateTo. For this example, we will query the last 24 hours.
import json
from datetime import datetime, timedelta, timezone
def build_query_body() -> dict:
"""
Constructs the JSON body for the analytics query.
"""
now = datetime.now(timezone.utc)
yesterday = now - timedelta(hours=24)
# Format dates as ISO 8601 strings
date_to = now.isoformat()
date_from = yesterday.isoformat()
query_body = {
"dateFrom": date_from,
"dateTo": date_to,
"timeBucket": "PT30M", # 30-minute intervals
"groupBy": ["agent"], # Breakdown by agent
"view": "default",
"select": [
"agent.name",
"agent.id",
"tHandle",
"tAcw",
"tHold",
"interactions"
],
"where": [
{
"path": "conversationType",
"operator": "eq",
"value": "voice"
}
],
"size": 1000 # Max records per page
}
return query_body
Critical Parameter Explanation:
timeBucket: Setting this toPT30Mtells the analytics engine to bin results into 30-minute slots. Without this, you receive a single aggregate sum for the entire time range.groupBy: Includingagenthere is mandatory. The API returns agroupsarray where each item represents a unique agent.select: Explicitly selectingtHandle,tAcw, andtHoldensures these fields are populated in themetricsobject. Whiledefaultview includes many metrics, being explicit reduces payload size and clarifies intent.
Step 2: Executing the Query with Pagination
The Analytics API supports pagination. If you have many agents or many time buckets, the response may exceed the size limit. You must check the nextPage token in the response and loop until all data is retrieved.
Additionally, you must handle HTTP 429 (Too Many Requests). Genesys Cloud enforces strict rate limits on analytics endpoints. A naive script will fail if it hammers the API. The following implementation includes a basic exponential backoff retry mechanism.
import time
import requests
def fetch_utilization_data(auth: GenesysAuth) -> list:
"""
Fetches agent utilization metrics with pagination and retry logic.
Returns a list of all result groups.
"""
endpoint = f"{auth.base_url}/api/v2/analytics/conversations/details/query"
headers = auth.get_headers()
body = build_query_body()
all_results = []
page_token = None
max_retries = 3
base_delay = 2 # seconds
while True:
# Add pagination token if it exists
if page_token:
body["nextPage"] = page_token
attempt = 0
while attempt < max_retries:
try:
response = requests.post(endpoint, headers=headers, json=body)
# Handle Rate Limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
print(f"Rate limited (429). Waiting {retry_after} seconds...")
time.sleep(retry_after)
attempt += 1
continue
# Handle other errors
response.raise_for_status()
break # Success, exit retry loop
except requests.exceptions.HTTPError as e:
if response.status_code in [401, 403]:
raise Exception(f"Authentication/Authorization failed: {response.status_code}") from e
attempt += 1
if attempt == max_retries:
raise Exception(f"Failed to fetch data after {max_retries} retries: {response.text}") from e
time.sleep(base_delay * (2 ** attempt))
data = response.json()
# Accumulate results
if "groups" in data and data["groups"]:
all_results.extend(data["groups"])
# Check for next page
page_token = data.get("nextPage")
if not page_token:
break
print(f"Page fetched. Total groups so far: {len(all_results)}")
# Small delay to be polite to the API between pages
time.sleep(1)
return all_results
Step 3: Processing and Calculating Utilization
The raw response provides seconds for tHandle, tAcw, and tHold. To calculate “utilization,” you typically compare active time against the total available time in the bucket.
However, the API does not directly return “available time” per agent per bucket because agents have different shifts, breaks, and statuses. A common proxy for utilization in a fixed time bucket is:
$$ \text{Utilization} = \frac{tHandle + tAcw + tHold}{\text{Bucket Duration in Seconds}} $$
For a 30-minute bucket, the denominator is 1800 seconds. Note that this metric can exceed 100% if an agent handles multiple concurrent interactions (rare in standard voice queues but possible in blended environments) or if the data aggregates multiple queues. More commonly, it represents the density of work.
def calculate_utilization_metrics(results: list) -> list:
"""
Processes the raw API groups into a structured list of utilization records.
"""
bucket_duration_seconds = 30 * 60 # 30 minutes in seconds
processed_metrics = []
for group in results:
agent_id = group.get("agent", {}).get("id")
agent_name = group.get("agent", {}).get("name", "Unknown")
# The 'metrics' object contains the sums for this group
metrics = group.get("metrics", {})
# Extract seconds. Default to 0 if missing
t_handle = metrics.get("tHandle", 0)
t_acw = metrics.get("tAcw", 0)
t_hold = metrics.get("tHold", 0)
interactions = metrics.get("interactions", 0)
# Calculate total active time
total_active_seconds = t_handle + t_acw + t_hold
# Calculate utilization percentage relative to the bucket size
# This represents how "full" the 30-min block was for this agent
utilization_pct = (total_active_seconds / bucket_duration_seconds) * 100
record = {
"agent_id": agent_id,
"agent_name": agent_name,
"t_handle_sec": t_handle,
"t_acw_sec": t_acw,
"t_hold_sec": t_hold,
"total_active_sec": total_active_seconds,
"interactions": interactions,
"utilization_pct": round(utilization_pct, 2)
}
processed_metrics.append(record)
return processed_metrics
Complete Working Example
Combine the authentication, query, and processing logic into a single runnable script.
import os
import sys
import json
import time
import requests
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class GenesysAuth:
def __init__(self):
self.region = os.getenv("GENESYS_REGION", "us-east-1")
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not self.client_id or not self.client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
if self.region == "us-east-1":
self.base_url = "https://api.mypurecloud.com"
elif self.region == "eu-west-1":
self.base_url = "https://api.eu.pure.cloud"
else:
self.base_url = f"https://api.{self.region}.pure.cloud"
self.access_token = None
self.token_expiry = 0
def get_access_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - 60):
return self.access_token
token_url = f"{self.base_url}/oauth/token"
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "analytics:conversations:view analytics:agents:view"
}
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
raise Exception(f"OAuth Error: {e}") from e
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def build_query_body() -> dict:
now = datetime.now(timezone.utc)
yesterday = now - timedelta(hours=24)
return {
"dateFrom": yesterday.isoformat(),
"dateTo": now.isoformat(),
"timeBucket": "PT30M",
"groupBy": ["agent"],
"view": "default",
"select": ["agent.name", "agent.id", "tHandle", "tAcw", "tHold", "interactions"],
"where": [{"path": "conversationType", "operator": "eq", "value": "voice"}],
"size": 1000
}
def fetch_utilization_data(auth: GenesysAuth) -> list:
endpoint = f"{auth.base_url}/api/v2/analytics/conversations/details/query"
headers = auth.get_headers()
body = build_query_body()
all_results = []
page_token = None
max_retries = 3
base_delay = 2
while True:
if page_token:
body["nextPage"] = page_token
attempt = 0
while attempt < max_retries:
try:
response = requests.post(endpoint, headers=headers, json=body)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
time.sleep(retry_after)
attempt += 1
continue
response.raise_for_status()
break
except requests.exceptions.HTTPError as e:
if response.status_code in [401, 403]:
raise e
attempt += 1
if attempt == max_retries:
raise e
time.sleep(base_delay * (2 ** attempt))
data = response.json()
if "groups" in data and data["groups"]:
all_results.extend(data["groups"])
page_token = data.get("nextPage")
if not page_token:
break
time.sleep(1)
return all_results
def calculate_utilization_metrics(results: list) -> list:
bucket_duration_seconds = 30 * 60
processed_metrics = []
for group in results:
agent_id = group.get("agent", {}).get("id")
agent_name = group.get("agent", {}).get("name", "Unknown")
metrics = group.get("metrics", {})
t_handle = metrics.get("tHandle", 0)
t_acw = metrics.get("tAcw", 0)
t_hold = metrics.get("tHold", 0)
interactions = metrics.get("interactions", 0)
total_active_seconds = t_handle + t_acw + t_hold
utilization_pct = (total_active_seconds / bucket_duration_seconds) * 100
processed_metrics.append({
"agent_id": agent_id,
"agent_name": agent_name,
"t_handle_sec": t_handle,
"t_acw_sec": t_acw,
"t_hold_sec": t_hold,
"interactions": interactions,
"utilization_pct": round(utilization_pct, 2)
})
return processed_metrics
def main():
try:
auth = GenesysAuth()
print("Fetching utilization data...")
raw_data = fetch_utilization_data(auth)
print(f"Retrieved {len(raw_data)} data points.")
metrics = calculate_utilization_metrics(raw_data)
# Sort by utilization descending to find busiest agents
metrics.sort(key=lambda x: x["utilization_pct"], reverse=True)
print("\nTop 5 Agent Utilization Buckets:")
print("-" * 60)
for m in metrics[:5]:
print(f"Agent: {m['agent_name']}")
print(f" Handle: {m['t_handle_sec']}s | ACW: {m['t_acw_sec']}s | Hold: {m['t_hold_sec']}s")
print(f" Interactions: {m['interactions']}")
print(f" Utilization: {m['utilization_pct']}%")
print("-" * 60)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth client does not have the analytics:conversations:view scope.
Fix:
- Go to Admin > Security > OAuth 2.0 Clients.
- Select your client.
- Navigate to the “Scopes” tab.
- Ensure
analytics:conversations:viewis checked. - Regenerate the access token.
Error: 429 Too Many Requests
Cause: The analytics engine is under heavy load, or your script is sending requests faster than the allowed rate (typically 10-20 requests per second for analytics, but this varies by organization tier).
Fix:
- Implement the retry logic shown in
fetch_utilization_data. - Check the
Retry-Afterheader in the response. - Increase the
time.sleep()duration between pages.
Error: Empty Groups or Zero Metrics
Cause: The time range (dateFrom to dateTo) contains no voice conversations, or the where filter is too restrictive.
Fix:
- Verify that
conversationTypeis set correctly. If you are querying chat, change"value": "voice"to"value": "chat". - Widen the time range to ensure data exists.
- Check if the agents in the
groupByare currently active. Historical data for inactive agents may be sparse.
Error: tHandle is Null or Missing
Cause: The select array in the query body did not include tHandle.
Fix: Ensure the select array in build_query_body explicitly lists the metric names you require. The default view does not guarantee all metric fields are present in the JSON if they are not explicitly selected or if the data type does not support them (e.g., tAcw is not applicable for all chat interactions depending on configuration).