Query Agent Utilization Metrics (tHandle, tAcw, tHold) in 30-Minute Intervals
What You Will Build
- A Python script that retrieves granular agent performance metrics, specifically
tHandle,tAcw, andtHold, segmented by 30-minute time intervals. - This tutorial utilizes the Genesys Cloud CX Analytics API (
/api/v2/analytics/conversations/details/query) to fetch raw conversation detail data. - The implementation covers Python using the official
genesys-cloud-purecloud-platform-clientSDK.
Prerequisites
- OAuth Client Type: Service Account with appropriate permissions.
- Required Scopes:
analytics:conversation:view(Required for querying conversation details)analytics:dashboard:view(Optional, if validating against dashboard data)
- SDK Version:
genesys-cloud-purecloud-platform-clientv5.0.0 or later. - Language/Runtime: Python 3.8+
- External Dependencies:
genesys-cloud-purecloud-platform-clientpandas(for data manipulation and interval bucketing)pytz(for timezone handling)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-side integrations, a Service Account with Client Credentials flow is the standard approach. The SDK handles token acquisition and refresh automatically when initialized correctly.
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
PureCloudAuthFlow,
AnalyticsApi
)
def get_analytics_client(client_id: str, client_secret: str, env_name: str = "mypurecloud.com") -> AnalyticsApi:
"""
Initializes the Genesys Cloud Analytics API client.
Args:
client_id: OAuth Client ID
client_secret: OAuth Client Secret
env_name: Genesys Cloud environment domain (default: mypurecloud.com)
Returns:
Configured AnalyticsApi instance
"""
configuration = Configuration()
configuration.host = f"https://api.{env_name}"
# Configure OAuth credentials
configuration.client_id = client_id
configuration.client_secret = client_secret
# Use the PureCloudAuthFlow for automatic token management
auth_flow = PureCloudAuthFlow(configuration)
auth_flow.get_access_token()
# Create the API client
api_client = ApiClient(configuration)
return AnalyticsApi(api_client)
Implementation
Step 1: Constructing the Analytics Query
The Analytics API does not return pre-bucketed 30-minute intervals for agent-specific metrics like tHandle or tAcw in a single summary call. Instead, you must query conversationDetails to get the raw timestamps and metric values for each interaction, then aggregate them client-side.
We define a ConversationDetailsQuery body. Key parameters include:
interval: Set toPT30M(ISO 8601 duration) to request data in 30-minute chunks. Note: While the API supports this for summary reports, forconversationDetails, theintervalparameter dictates the pagination window size, not the grouping of metrics. We will use a large date range and process the results.groupBy: We do not group by time here; we fetch individual conversation records.select: We select specific metrics to reduce payload size:tHandle,tAcw,tHold,startTime,agentId.
from purecloudplatformclientv2 import ConversationDetailsQuery
def build_query_body(start_date: str, end_date: str, agent_ids: list = None) -> dict:
"""
Constructs the request body for the analytics query.
Args:
start_date: ISO 8601 start timestamp (e.g., "2023-10-01T00:00:00Z")
end_date: ISO 8601 end timestamp (e.g., "2023-10-01T23:59:59Z")
agent_ids: Optional list of Agent IDs to filter. If None, fetches all.
Returns:
Dictionary representing the ConversationDetailsQuery body
"""
query_body = ConversationDetailsQuery(
interval="PT30M", # Suggests a 30-minute window for pagination chunks
date_from=start_date,
date_to=end_date
)
# Define the metrics to retrieve
query_body.select = [
"conversationId",
"startTime",
"endTime",
"tHandle",
"tAcw",
"tHold",
"agentId"
]
# Optional: Filter by specific agents
if agent_ids:
query_body.filter = {
"agentId": {"in": agent_ids}
}
return query_body
Step 2: Executing the Query and Handling Pagination
The Analytics API returns paginated results. For high-volume environments, a single day of data for a large team may exceed the default page size (typically 1000 records). We must implement a loop to consume all pages until nextPageUri is null.
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_all_conversation_details(api_client: AnalyticsApi, query_body: dict) -> list:
"""
Fetches all conversation details pages for the given query.
Args:
api_client: The AnalyticsApi instance
query_body: The ConversationDetailsQuery dictionary
Returns:
List of ConversationDetail objects
"""
all_conversations = []
next_page_uri = None
while True:
try:
# The API call
response = api_client.post_analytics_conversations_details_query(
body=query_body,
next_page_uri=next_page_uri
)
# Append results
if response.entities:
all_conversations.extend(response.entities)
logger.info(f"Fetched {len(response.entities)} conversations. Total: {len(all_conversations)}")
# Check for next page
next_page_uri = response.next_page_uri
if not next_page_uri:
break
except Exception as e:
logger.error(f"Error fetching analytics data: {e}")
raise e
return all_conversations
Step 3: Processing Results and Bucketing into 30-Minute Intervals
The raw API response provides startTime for each conversation. To calculate utilization metrics by 30-minute intervals, we must:
- Parse the
startTimeof each conversation. - Determine which 30-minute bucket the
startTimefalls into. - Sum
tHandle,tAcw, andtHoldfor that bucket.
Note: tHandle (Total Handle Time) includes talk time, hold time, and wrap-up time in many contexts, but Genesys defines it specifically as Talk + Hold + Wrap (ACW). Therefore, tHandle = tTalk + tHold + tAcw. To avoid double-counting, we will sum tHold and tAcw separately as requested, and use tHandle as the total engagement metric.
import pandas as pd
from datetime import datetime, timedelta
def bucket_metrics_by_30_min(conversations: list) -> pd.DataFrame:
"""
Aggregates conversation metrics into 30-minute time buckets.
Args:
conversations: List of ConversationDetail objects from the API
Returns:
Pandas DataFrame with columns: interval_start, total_tHandle, total_tAcw, total_tHold, conversation_count
"""
if not conversations:
return pd.DataFrame()
# Convert to DataFrame for easier manipulation
data = []
for conv in conversations:
# Skip conversations without agent or start time
if not conv.start_time or not conv.agent_id:
continue
# Extract metrics (handle potential None values)
t_handle = conv.t_handle or 0
t_acw = conv.t_acw or 0
t_hold = conv.t_hold or 0
# Parse start time
start_dt = conv.start_time
# Calculate the 30-minute bucket start
# Floor the minute to the nearest 30 (0 or 30)
bucket_minute = 0 if start_dt.minute < 30 else 30
bucket_start = start_dt.replace(minute=bucket_minute, second=0, microsecond=0)
data.append({
"interval_start": bucket_start,
"tHandle": t_handle,
"tAcw": t_acw,
"tHold": t_hold,
"conversation_id": conv.conversation_id
})
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
# Group by interval_start and sum the metrics
aggregated = df.groupby("interval_start").agg(
total_tHandle=("tHandle", "sum"),
total_tAcw=("tAcw", "sum"),
total_tHold=("tHold", "sum"),
conversation_count=("conversation_id", "count")
).reset_index()
# Sort by time
aggregated = aggregated.sort_values("interval_start")
return aggregated
Complete Working Example
This script combines authentication, querying, and processing into a single runnable module. Replace the placeholder credentials with your Service Account details.
import os
import sys
import logging
from datetime import datetime, timedelta
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
PureCloudAuthFlow,
AnalyticsApi,
ConversationDetailsQuery
)
import pandas as pd
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def main():
# 1. Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENV_NAME = os.getenv("GENESYS_ENV", "mypurecloud.com")
if not CLIENT_ID or not CLIENT_SECRET:
logger.error("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")
sys.exit(1)
# 2. Initialize Client
try:
configuration = Configuration()
configuration.host = f"https://api.{ENV_NAME}"
configuration.client_id = CLIENT_ID
configuration.client_secret = CLIENT_SECRET
auth_flow = PureCloudAuthFlow(configuration)
auth_flow.get_access_token()
api_client = AnalyticsApi(ApiClient(configuration))
logger.info("Successfully authenticated with Genesys Cloud.")
except Exception as e:
logger.error(f"Authentication failed: {e}")
sys.exit(1)
# 3. Define Query Parameters
# Query data for the last 24 hours
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=24)
# Format as ISO 8601 with Z suffix for UTC
date_from = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
date_to = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
logger.info(f"Querying data from {date_from} to {date_to}")
# Build the query body
query_body = ConversationDetailsQuery(
interval="PT30M",
date_from=date_from,
date_to=date_to
)
# Select specific metrics to optimize payload
query_body.select = [
"conversationId",
"startTime",
"agentId",
"tHandle",
"tAcw",
"tHold"
]
# 4. Execute Query
try:
conversations = fetch_all_conversation_details(api_client, query_body)
logger.info(f"Total conversations fetched: {len(conversations)}")
except Exception as e:
logger.error(f"Failed to fetch data: {e}")
sys.exit(1)
# 5. Process and Bucket Data
if conversations:
df_metrics = bucket_metrics_by_30_min(conversations)
if not df_metrics.empty:
logger.info("Aggregated Metrics by 30-Minute Interval:")
print(df_metrics.to_string(index=False))
# Optional: Save to CSV
output_file = "agent_utilization_30min.csv"
df_metrics.to_csv(output_file, index=False)
logger.info(f"Data exported to {output_file}")
else:
logger.warning("No data found for the specified period.")
else:
logger.warning("No conversations returned from API.")
def fetch_all_conversation_details(api_client: AnalyticsApi, query_body: dict) -> list:
"""
Fetches all conversation details pages for the given query.
"""
all_conversations = []
next_page_uri = None
while True:
try:
response = api_client.post_analytics_conversations_details_query(
body=query_body,
next_page_uri=next_page_uri
)
if response.entities:
all_conversations.extend(response.entities)
logger.info(f"Fetched {len(response.entities)} conversations. Total: {len(all_conversations)}")
next_page_uri = response.next_page_uri
if not next_page_uri:
break
except Exception as e:
logger.error(f"Error fetching analytics data: {e}")
raise e
return all_conversations
def bucket_metrics_by_30_min(conversations: list) -> pd.DataFrame:
"""
Aggregates conversation metrics into 30-minute time buckets.
"""
if not conversations:
return pd.DataFrame()
data = []
for conv in conversations:
if not conv.start_time or not conv.agent_id:
continue
t_handle = conv.t_handle or 0
t_acw = conv.t_acw or 0
t_hold = conv.t_hold or 0
start_dt = conv.start_time
# Floor to 30-minute bucket
bucket_minute = 0 if start_dt.minute < 30 else 30
bucket_start = start_dt.replace(minute=bucket_minute, second=0, microsecond=0)
data.append({
"interval_start": bucket_start,
"tHandle": t_handle,
"tAcw": t_acw,
"tHold": t_hold,
"conversation_id": conv.conversation_id
})
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
aggregated = df.groupby("interval_start").agg(
total_tHandle=("tHandle", "sum"),
total_tAcw=("tAcw", "sum"),
total_tHold=("tHold", "sum"),
conversation_count=("conversation_id", "count")
).reset_index()
return aggregated.sort_values("interval_start")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
- What causes it: The Service Account lacks the
analytics:conversation:viewscope, or the user associated with the token does not have permission to view analytics for the queried agents. - How to fix it:
- Go to Genesys Cloud Admin > Security > OAuth Clients.
- Verify the client has
analytics:conversation:view. - Ensure the Service Account user has a role with “View Analytics” permissions.
Error: 429 Too Many Requests
- What causes it: Analytics queries are expensive. Querying large date ranges or high-volume teams can trigger rate limits.
- How to fix it: Implement exponential backoff. The SDK does not automatically retry 429s for analytics endpoints in all versions. You should catch the
PureCloudExceptionwith status 429 and wait before retrying.
import time
# Inside the fetch loop, wrap the API call:
try:
response = api_client.post_analytics_conversations_details_query(...)
except Exception as e:
if hasattr(e, 'status') and e.status == 429:
retry_after = e.headers.get('Retry-After', 5)
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(int(retry_after))
continue # Retry the loop iteration
else:
raise
Error: NoneType object has no attribute entities
- What causes it: The API returned an empty result or an error response that was not parsed correctly, or the
responseobject is null due to a network timeout. - How to fix it: Ensure you are checking
if response:before accessingresponse.entities. Also, verify that thedate_fromanddate_toare in the past. The Analytics API has a “freshness” delay (typically 15-30 minutes). Queryingnow()may return empty results. Always query at least 30 minutes into the past.
# Adjust end_date to be 30 minutes ago
end_date = datetime.utcnow() - timedelta(minutes=30)
Error: tHandle includes tHold and tAcw
- What causes it: Misunderstanding of metric definitions.
tHandleis the total time the agent was engaged. It is calculated astTalk + tHold + tAcw. - How to fix it: If you need to calculate “Pure Talk Time,” use
tHandle - tHold - tAcw. Do not sumtHandle,tHold, andtAcwtogether if you intend to report total time, as this will triple-count the hold and ACW periods.