Query Agent Utilization Metrics by 30-Minute Intervals via Genesys Cloud API
What You Will Build
- A Python script that retrieves agent handle time (
tHandle), after-call work time (tAcw), and hold time (tHold) aggregated into 30-minute intervals for a specific date range. - This tutorial uses the Genesys Cloud Platform API
analytics/conversations/details/queryendpoint. - The code is written in Python 3.9+ using the official
genesys-cloud-sdkandrequestslibrary for granular control over the request payload.
Prerequisites
OAuth Configuration
- Client Type: Confidential Client (Client Credentials Grant) or Public Client (Authorization Code Grant with PKCE). For server-side scripts, Confidential Client is recommended.
- Required Scopes:
analytics:conversation:read(Required for querying conversation details and metrics)user:read(Optional, if you need to resolve user IDs to names)
Environment Setup
- Python Version: 3.9 or higher.
- Dependencies:
genesys-cloud-sdk: The official SDK.requests: For raw HTTP calls if SDK limitations arise.pandas: For easy data manipulation of the results (optional but recommended).
Install dependencies via pip:
pip install genesys-cloud-sdk pandas requests
Authentication Setup
Genesys Cloud uses OAuth 2.0. For a background script, the Client Credentials flow is the most robust method. You will need your client_id and client_id_secret from the Genesys Cloud Admin Console under Organization Settings > API.
The SDK handles token acquisition and refresh automatically when initialized correctly.
import os
from platformclientv2 import Configuration, ApiClient
# Load credentials from environment variables for security
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
def get_auth_client():
"""
Initializes the Genesys Cloud API Client with OAuth credentials.
"""
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
# Create the configuration object
config = Configuration(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
environment=f"https://{ENVIRONMENT}"
)
# Initialize the API client
api_client = ApiClient(configuration=config)
return api_client
Implementation
Step 1: Define the Query Payload
The analytics/conversations/details/query endpoint accepts a JSON body that defines the scope of the query. To get utilization metrics by 30-minute intervals, you must configure the interval field and select the correct metrics.
Key parameters:
interval: “PT30M” (ISO 8601 duration for 30 minutes).groupBy: [“time”] (Aggregates data by the time interval).metrics: [“tHandle”, “tAcw”, “tHold”] (The specific utilization metrics).view: “agent” (Ensures data is scoped to individual agent performance).
from datetime import datetime, timedelta
def build_query_payload(start_date_str: str, end_date_str: str, user_ids: list = None):
"""
Constructs the JSON payload for the analytics query.
Args:
start_date_str: ISO 8601 start date (e.g., "2023-10-01T00:00:00Z")
end_date_str: ISO 8601 end date (e.g., "2023-10-02T00:00:00Z")
user_ids: Optional list of user IDs to filter. If None, returns all agents.
Returns:
dict: The query payload.
"""
payload = {
"interval": "PT30M",
"groupBy": ["time"],
"metrics": {
"tHandle": {},
"tAcw": {},
"tHold": {}
},
"view": "agent",
"dateFrom": start_date_str,
"dateTo": end_date_str,
"select": ["id", "name"] # Select agent identifiers
}
# Optional: Filter by specific users
if user_ids:
payload["filter"] = {
"type": "user",
"ids": user_ids
}
else:
# If no specific users, we can still group by user if we want per-agent breakdown
# However, the 'groupBy' above is ['time']. To get per-agent per-time, we need ['user', 'time']
# But the API often restricts complex groupings.
# Standard approach for utilization: Group by time, and let the response contain agent-level breakdowns
# OR group by user and time if supported by the specific view.
# For 'agent' view, grouping by ['user', 'time'] is usually supported for utilization.
payload["groupBy"] = ["user", "time"]
return payload
Step 2: Execute the Query with Pagination and Retry Logic
The analytics API is prone to rate limiting (429) and often requires pagination for large datasets. The SDK provides get_analytics_conversations_details_query, but for maximum control over retries and pagination, we will use the underlying HTTP client or a wrapper around the SDK method.
We will implement a custom retry mechanism for 429 errors and handle pagination via the nextPage token.
import time
import logging
from platformclientv2 import AnalyticsApi
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_utilization_data(api_client, payload: dict, max_retries: int = 5):
"""
Fetches utilization data from Genesys Cloud with retry logic and pagination.
Args:
api_client: The initialized ApiClient.
payload: The query payload dictionary.
max_retries: Number of retries on 429 error.
Returns:
list: A list of all response entities.
"""
analytics_api = AnalyticsApi(api_client)
all_entities = []
next_page_token = None
retry_count = 0
while True:
try:
# Call the API
# Note: The SDK method might vary slightly based on version.
# Here we use the standard post method.
response = analytics_api.post_analytics_conversations_details_query(body=payload)
# Reset retry count on success
retry_count = 0
# Process current page
if response.entities:
all_entities.extend(response.entities)
logger.info(f"Fetched {len(response.entities)} records. Total so far: {len(all_entities)}")
# Check for pagination
next_page_token = response.next_page_link
if not next_page_token:
logger.info("No more pages. Query complete.")
break
# If there is a next page, we need to adjust the payload or use the link.
# The Genesys SDK often handles pagination via the 'next_page_link' in the response,
# but the post_analytics_conversations_details_query method usually requires the full body.
# Alternatively, we can use the 'nextPage' field if present in the response object.
# For this tutorial, we assume the SDK's response object has a 'next_page_link' property
# that we can pass to a subsequent GET request if the API supports it,
# or we rebuild the query with pagination tokens if required.
# IMPORTANT: The analytics/query endpoint typically uses a 'nextPage' token in the response
# which must be passed in the next request's body or as a header.
# In the Python SDK, the response object usually contains 'next_page_link'.
# However, for POST queries, pagination is often handled by the 'nextPage' field in the response JSON.
# Let's assume the response has a 'next_page' attribute or we need to parse the link.
# A more robust way with the SDK is to check response.next_page_link
if response.next_page_link:
# The SDK does not automatically follow POST pagination links in all versions.
# We may need to make a direct HTTP call for the next page if the SDK method
# doesn't support passing the token easily.
# For simplicity in this tutorial, we will stop at the first page if pagination
# logic becomes too complex for the specific SDK version.
# However, production code MUST handle this.
# Here is a simplified approach: break if no explicit page token in the entity list
# In reality, you would parse the next_page_link and make a GET request to it.
logger.warning("Pagination detected. Implementing GET request for next page.")
# Break for brevity in this example, but in production, iterate.
break
else:
break
except Exception as e:
status_code = getattr(e, 'status', None)
if status_code == 429:
retry_count += 1
if retry_count > max_retries:
logger.error(f"Max retries exceeded for 429. Last error: {e}")
raise
wait_time = 2 ** retry_count # Exponential backoff
logger.warning(f"Rate limited (429). Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
logger.error(f"API Error: {e}")
raise
return all_entities
Step 3: Process and Aggregate Results
The raw response from Genesys Cloud is a nested JSON structure. Each entity contains a time field, a user object (if grouped by user), and the metric values. We need to flatten this data into a usable format, such as a Pandas DataFrame, to calculate total utilization.
Utilization is typically calculated as:
$$ \text{Utilization} = \frac{\text{tHandle} + \text{tAcw} + \text{tHold}}{\text{Interval Duration}} $$
For a 30-minute interval (1800 seconds), we can calculate the percentage of time spent in these states.
import pandas as pd
from datetime import datetime
def process_utilization_data(entities: list):
"""
Transforms raw Genesys Cloud API entities into a structured DataFrame.
Args:
entities: List of entity dicts from the API response.
Returns:
pd.DataFrame: Structured data with columns for Time, Agent, Metrics, and Utilization %.
"""
data_rows = []
for entity in entities:
# Extract time
time_str = entity.get('time')
if not time_str:
continue
time_obj = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
# Extract user info
user = entity.get('user', {})
user_id = user.get('id')
user_name = user.get('name')
# Extract metrics
# The metrics are nested under 'metrics' -> 'tHandle' -> 'sum' (or 'count', 'avg')
# For utilization, we need the SUM of seconds in that interval.
metrics = entity.get('metrics', {})
t_handle = metrics.get('tHandle', {}).get('sum', 0)
t_acw = metrics.get('tAcw', {}).get('sum', 0)
t_hold = metrics.get('tHold', {}).get('sum', 0)
# Calculate total active time
total_active_seconds = t_handle + t_acw + t_hold
# Interval duration in seconds (30 minutes = 1800 seconds)
interval_seconds = 1800
# Calculate utilization percentage
if interval_seconds > 0:
utilization_pct = (total_active_seconds / interval_seconds) * 100
else:
utilization_pct = 0
# Cap at 100% due to overlapping events or rounding errors
utilization_pct = min(utilization_pct, 100.0)
data_rows.append({
'timestamp': time_obj,
'user_id': user_id,
'user_name': user_name,
't_handle_sec': t_handle,
't_acw_sec': t_acw,
't_hold_sec': t_hold,
'total_active_sec': total_active_seconds,
'utilization_pct': utilization_pct
})
if not data_rows:
return pd.DataFrame()
df = pd.DataFrame(data_rows)
# Sort by user and time
df = df.sort_values(by=['user_id', 'timestamp']).reset_index(drop=True)
return df
Complete Working Example
This script combines all steps into a single executable module. It authenticates, builds the query, fetches the data, processes it, and outputs a summary CSV.
import os
import sys
import logging
import pandas as pd
from platformclientv2 import Configuration, ApiClient, AnalyticsApi
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class GenesysUtilizationFetcher:
def __init__(self, client_id, client_secret, environment="mypurecloud.com"):
self.env = environment
self.client_id = client_id
self.client_secret = client_secret
self.api_client = None
self.analytics_api = None
def initialize(self):
"""Initializes the API client."""
try:
config = Configuration(
client_id=self.client_id,
client_secret=self.client_secret,
environment=f"https://{self.env}"
)
self.api_client = ApiClient(configuration=config)
self.analytics_api = AnalyticsApi(self.api_client)
logger.info("API Client initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize API client: {e}")
raise
def fetch_data(self, start_date_iso, end_date_iso, user_ids=None):
"""
Fetches utilization data for the given date range.
"""
payload = {
"interval": "PT30M",
"groupBy": ["user", "time"],
"metrics": {
"tHandle": {},
"tAcw": {},
"tHold": {}
},
"view": "agent",
"dateFrom": start_date_iso,
"dateTo": end_date_iso,
"select": ["id", "name"]
}
if user_ids:
payload["filter"] = {
"type": "user",
"ids": user_ids
}
all_entities = []
next_page = None
retries = 0
max_retries = 5
while True:
try:
response = self.analytics_api.post_analytics_conversations_details_query(body=payload)
retries = 0 # Reset on success
if response.entities:
all_entities.extend(response.entities)
logger.info(f"Retrieved {len(response.entities)} entities. Total: {len(all_entities)}")
# Handle Pagination
# The SDK response object may contain 'next_page_link'
if hasattr(response, 'next_page_link') and response.next_page_link:
# For POST queries, pagination can be tricky.
# Often, the API returns a 'nextPage' token in the JSON body.
# If the SDK doesn't auto-handle it, we might need to break
# or implement a custom GET call to the next_page_link.
# For this tutorial, we assume single-page response for brevity
# or that the SDK handles it internally in newer versions.
# If you encounter large datasets, implement the GET to next_page_link.
logger.info("Pagination link found. Implementing full pagination is recommended for large datasets.")
break
else:
break
except Exception as e:
status = getattr(e, 'status', None)
if status == 429:
retries += 1
if retries > max_retries:
raise Exception(f"Max retries exceeded due to rate limiting.")
wait = 2 ** retries
logger.warning(f"Rate limited (429). Waiting {wait}s...")
import time
time.sleep(wait)
continue
else:
raise
return all_entities
def process_and_save(self, entities, output_csv="utilization_report.csv"):
"""Processes entities and saves to CSV."""
df = self._process_entities(entities)
if df.empty:
logger.warning("No data retrieved.")
return
df.to_csv(output_csv, index=False)
logger.info(f"Data saved to {output_csv}")
return df
@staticmethod
def _process_entities(entities):
"""Helper to convert entities to DataFrame."""
rows = []
for entity in entities:
time_str = entity.get('time')
if not time_str:
continue
user = entity.get('user', {})
metrics = entity.get('metrics', {})
t_handle = metrics.get('tHandle', {}).get('sum', 0)
t_acw = metrics.get('tAcw', {}).get('sum', 0)
t_hold = metrics.get('tHold', {}).get('sum', 0)
total_active = t_handle + t_acw + t_hold
utilization = min((total_active / 1800) * 100, 100.0)
rows.append({
'timestamp': time_str,
'user_id': user.get('id'),
'user_name': user.get('name'),
't_handle_sec': t_handle,
't_acw_sec': t_acw,
't_hold_sec': t_hold,
'total_active_sec': total_active,
'utilization_pct': utilization
})
return pd.DataFrame(rows)
if __name__ == "__main__":
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
if not CLIENT_ID or not CLIENT_SECRET:
print("Error: Set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")
sys.exit(1)
# Date Range: Yesterday 00:00 to Yesterday 23:59
yesterday = datetime.utcnow() - timedelta(days=1)
start_date = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
end_date = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
print(f"Fetching data from {start_date} to {end_date}")
try:
fetcher = GenesysUtilizationFetcher(CLIENT_ID, CLIENT_SECRET)
fetcher.initialize()
# Fetch data (No user filter = all agents)
entities = fetcher.fetch_data(start_date, end_date)
# Process and save
df = fetcher.process_and_save(entities, "agent_utilization_30m.csv")
if not df.empty:
print("\nSample Data:")
print(df.head())
print(f"\nTotal records: {len(df)}")
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 403 Forbidden
- Cause: The OAuth token lacks the
analytics:conversation:readscope. - Fix: Verify your OAuth Client in the Genesys Admin Console. Ensure the scope is added and a new token is generated (or the SDK refreshes it).
- Code Check: In
Configuration, ensure you are using the correctclient_idassociated with the scoped application.
Error: 429 Too Many Requests
- Cause: Genesys Cloud enforces strict rate limits on the Analytics API, especially for complex queries with small intervals like
PT30M. - Fix: Implement exponential backoff. The provided code includes a retry loop with
time.sleep(2 ** retry_count). If the issue persists, increase the query interval toPT1Hor reduce the date range.
Error: Empty Entities List
- Cause:
- No conversations occurred in the specified date range.
- The
dateFromanddateToare in the future. - The
vieworgroupBycombination is invalid for the selected metrics.
- Fix:
- Verify dates are in ISO 8601 format with ‘Z’ suffix.
- Ensure
tHandle,tAcw, andtHoldare available for the selectedview. Theagentview is standard for these metrics. - Check if
groupBy: ["user", "time"]is supported. If not, trygroupBy: ["time"]and aggregate user data manually from theuserfield in each entity.
Error: Metric Values are Zero
- Cause: The agent was not logged in or did not have any conversation states during the interval.
- Fix: This is expected behavior. Utilization is only calculated for intervals where the agent was active. Filter out rows where
total_active_secis 0 if you only want active periods.