Query Agent Utilization Metrics (tHandle, tAcw, tHold) in 30-Minute Intervals

Query Agent Utilization Metrics (tHandle, tAcw, tHold) in 30-Minute Intervals

What You Will Build

  • A Python script that retrieves granular agent performance metrics, specifically tHandle, tAcw, and tHold, segmented by 30-minute time intervals.
  • This tutorial utilizes the Genesys Cloud CX Analytics API (/api/v2/analytics/conversations/details/query) to fetch raw conversation detail data.
  • The implementation covers Python using the official genesys-cloud-purecloud-platform-client SDK.

Prerequisites

  • OAuth Client Type: Service Account with appropriate permissions.
  • Required Scopes:
    • analytics:conversation:view (Required for querying conversation details)
    • analytics:dashboard:view (Optional, if validating against dashboard data)
  • SDK Version: genesys-cloud-purecloud-platform-client v5.0.0 or later.
  • Language/Runtime: Python 3.8+
  • External Dependencies:
    • genesys-cloud-purecloud-platform-client
    • pandas (for data manipulation and interval bucketing)
    • pytz (for timezone handling)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-side integrations, a Service Account with Client Credentials flow is the standard approach. The SDK handles token acquisition and refresh automatically when initialized correctly.

from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    PureCloudAuthFlow,
    AnalyticsApi
)

def get_analytics_client(client_id: str, client_secret: str, env_name: str = "mypurecloud.com") -> AnalyticsApi:
    """
    Initializes the Genesys Cloud Analytics API client.
    
    Args:
        client_id: OAuth Client ID
        client_secret: OAuth Client Secret
        env_name: Genesys Cloud environment domain (default: mypurecloud.com)
        
    Returns:
        Configured AnalyticsApi instance
    """
    configuration = Configuration()
    configuration.host = f"https://api.{env_name}"
    
    # Configure OAuth credentials
    configuration.client_id = client_id
    configuration.client_secret = client_secret
    
    # Use the PureCloudAuthFlow for automatic token management
    auth_flow = PureCloudAuthFlow(configuration)
    auth_flow.get_access_token()
    
    # Create the API client
    api_client = ApiClient(configuration)
    return AnalyticsApi(api_client)

Implementation

Step 1: Constructing the Analytics Query

The Analytics API does not return pre-bucketed 30-minute intervals for agent-specific metrics like tHandle or tAcw in a single summary call. Instead, you must query conversationDetails to get the raw timestamps and metric values for each interaction, then aggregate them client-side.

We define a ConversationDetailsQuery body. Key parameters include:

  • interval: Set to PT30M (ISO 8601 duration) to request data in 30-minute chunks. Note: While the API supports this for summary reports, for conversationDetails, the interval parameter dictates the pagination window size, not the grouping of metrics. We will use a large date range and process the results.
  • groupBy: We do not group by time here; we fetch individual conversation records.
  • select: We select specific metrics to reduce payload size: tHandle, tAcw, tHold, startTime, agentId.
from purecloudplatformclientv2 import ConversationDetailsQuery

def build_query_body(start_date: str, end_date: str, agent_ids: list = None) -> dict:
    """
    Constructs the request body for the analytics query.
    
    Args:
        start_date: ISO 8601 start timestamp (e.g., "2023-10-01T00:00:00Z")
        end_date: ISO 8601 end timestamp (e.g., "2023-10-01T23:59:59Z")
        agent_ids: Optional list of Agent IDs to filter. If None, fetches all.
        
    Returns:
        Dictionary representing the ConversationDetailsQuery body
    """
    query_body = ConversationDetailsQuery(
        interval="PT30M", # Suggests a 30-minute window for pagination chunks
        date_from=start_date,
        date_to=end_date
    )
    
    # Define the metrics to retrieve
    query_body.select = [
        "conversationId",
        "startTime",
        "endTime",
        "tHandle",
        "tAcw",
        "tHold",
        "agentId"
    ]
    
    # Optional: Filter by specific agents
    if agent_ids:
        query_body.filter = {
            "agentId": {"in": agent_ids}
        }
        
    return query_body

Step 2: Executing the Query and Handling Pagination

The Analytics API returns paginated results. For high-volume environments, a single day of data for a large team may exceed the default page size (typically 1000 records). We must implement a loop to consume all pages until nextPageUri is null.

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_all_conversation_details(api_client: AnalyticsApi, query_body: dict) -> list:
    """
    Fetches all conversation details pages for the given query.
    
    Args:
        api_client: The AnalyticsApi instance
        query_body: The ConversationDetailsQuery dictionary
        
    Returns:
        List of ConversationDetail objects
    """
    all_conversations = []
    next_page_uri = None
    
    while True:
        try:
            # The API call
            response = api_client.post_analytics_conversations_details_query(
                body=query_body,
                next_page_uri=next_page_uri
            )
            
            # Append results
            if response.entities:
                all_conversations.extend(response.entities)
                logger.info(f"Fetched {len(response.entities)} conversations. Total: {len(all_conversations)}")
            
            # Check for next page
            next_page_uri = response.next_page_uri
            
            if not next_page_uri:
                break
                
        except Exception as e:
            logger.error(f"Error fetching analytics data: {e}")
            raise e
            
    return all_conversations

Step 3: Processing Results and Bucketing into 30-Minute Intervals

The raw API response provides startTime for each conversation. To calculate utilization metrics by 30-minute intervals, we must:

  1. Parse the startTime of each conversation.
  2. Determine which 30-minute bucket the startTime falls into.
  3. Sum tHandle, tAcw, and tHold for that bucket.

Note: tHandle (Total Handle Time) includes talk time, hold time, and wrap-up time in many contexts, but Genesys defines it specifically as Talk + Hold + Wrap (ACW). Therefore, tHandle = tTalk + tHold + tAcw. To avoid double-counting, we will sum tHold and tAcw separately as requested, and use tHandle as the total engagement metric.

import pandas as pd
from datetime import datetime, timedelta

def bucket_metrics_by_30_min(conversations: list) -> pd.DataFrame:
    """
    Aggregates conversation metrics into 30-minute time buckets.
    
    Args:
        conversations: List of ConversationDetail objects from the API
        
    Returns:
        Pandas DataFrame with columns: interval_start, total_tHandle, total_tAcw, total_tHold, conversation_count
    """
    if not conversations:
        return pd.DataFrame()
        
    # Convert to DataFrame for easier manipulation
    data = []
    for conv in conversations:
        # Skip conversations without agent or start time
        if not conv.start_time or not conv.agent_id:
            continue
            
        # Extract metrics (handle potential None values)
        t_handle = conv.t_handle or 0
        t_acw = conv.t_acw or 0
        t_hold = conv.t_hold or 0
        
        # Parse start time
        start_dt = conv.start_time
        
        # Calculate the 30-minute bucket start
        # Floor the minute to the nearest 30 (0 or 30)
        bucket_minute = 0 if start_dt.minute < 30 else 30
        bucket_start = start_dt.replace(minute=bucket_minute, second=0, microsecond=0)
        
        data.append({
            "interval_start": bucket_start,
            "tHandle": t_handle,
            "tAcw": t_acw,
            "tHold": t_hold,
            "conversation_id": conv.conversation_id
        })
        
    if not data:
        return pd.DataFrame()
        
    df = pd.DataFrame(data)
    
    # Group by interval_start and sum the metrics
    aggregated = df.groupby("interval_start").agg(
        total_tHandle=("tHandle", "sum"),
        total_tAcw=("tAcw", "sum"),
        total_tHold=("tHold", "sum"),
        conversation_count=("conversation_id", "count")
    ).reset_index()
    
    # Sort by time
    aggregated = aggregated.sort_values("interval_start")
    
    return aggregated

Complete Working Example

This script combines authentication, querying, and processing into a single runnable module. Replace the placeholder credentials with your Service Account details.

import os
import sys
import logging
from datetime import datetime, timedelta

from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    PureCloudAuthFlow,
    AnalyticsApi,
    ConversationDetailsQuery
)
import pandas as pd

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    # 1. Configuration
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
    ENV_NAME = os.getenv("GENESYS_ENV", "mypurecloud.com")
    
    if not CLIENT_ID or not CLIENT_SECRET:
        logger.error("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")
        sys.exit(1)
        
    # 2. Initialize Client
    try:
        configuration = Configuration()
        configuration.host = f"https://api.{ENV_NAME}"
        configuration.client_id = CLIENT_ID
        configuration.client_secret = CLIENT_SECRET
        
        auth_flow = PureCloudAuthFlow(configuration)
        auth_flow.get_access_token()
        
        api_client = AnalyticsApi(ApiClient(configuration))
        logger.info("Successfully authenticated with Genesys Cloud.")
    except Exception as e:
        logger.error(f"Authentication failed: {e}")
        sys.exit(1)

    # 3. Define Query Parameters
    # Query data for the last 24 hours
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(hours=24)
    
    # Format as ISO 8601 with Z suffix for UTC
    date_from = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
    date_to = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    logger.info(f"Querying data from {date_from} to {date_to}")
    
    # Build the query body
    query_body = ConversationDetailsQuery(
        interval="PT30M",
        date_from=date_from,
        date_to=date_to
    )
    
    # Select specific metrics to optimize payload
    query_body.select = [
        "conversationId",
        "startTime",
        "agentId",
        "tHandle",
        "tAcw",
        "tHold"
    ]
    
    # 4. Execute Query
    try:
        conversations = fetch_all_conversation_details(api_client, query_body)
        logger.info(f"Total conversations fetched: {len(conversations)}")
    except Exception as e:
        logger.error(f"Failed to fetch data: {e}")
        sys.exit(1)

    # 5. Process and Bucket Data
    if conversations:
        df_metrics = bucket_metrics_by_30_min(conversations)
        
        if not df_metrics.empty:
            logger.info("Aggregated Metrics by 30-Minute Interval:")
            print(df_metrics.to_string(index=False))
            
            # Optional: Save to CSV
            output_file = "agent_utilization_30min.csv"
            df_metrics.to_csv(output_file, index=False)
            logger.info(f"Data exported to {output_file}")
        else:
            logger.warning("No data found for the specified period.")
    else:
        logger.warning("No conversations returned from API.")

def fetch_all_conversation_details(api_client: AnalyticsApi, query_body: dict) -> list:
    """
    Fetches all conversation details pages for the given query.
    """
    all_conversations = []
    next_page_uri = None
    
    while True:
        try:
            response = api_client.post_analytics_conversations_details_query(
                body=query_body,
                next_page_uri=next_page_uri
            )
            
            if response.entities:
                all_conversations.extend(response.entities)
                logger.info(f"Fetched {len(response.entities)} conversations. Total: {len(all_conversations)}")
            
            next_page_uri = response.next_page_uri
            
            if not next_page_uri:
                break
                
        except Exception as e:
            logger.error(f"Error fetching analytics data: {e}")
            raise e
            
    return all_conversations

def bucket_metrics_by_30_min(conversations: list) -> pd.DataFrame:
    """
    Aggregates conversation metrics into 30-minute time buckets.
    """
    if not conversations:
        return pd.DataFrame()
        
    data = []
    for conv in conversations:
        if not conv.start_time or not conv.agent_id:
            continue
            
        t_handle = conv.t_handle or 0
        t_acw = conv.t_acw or 0
        t_hold = conv.t_hold or 0
        
        start_dt = conv.start_time
        
        # Floor to 30-minute bucket
        bucket_minute = 0 if start_dt.minute < 30 else 30
        bucket_start = start_dt.replace(minute=bucket_minute, second=0, microsecond=0)
        
        data.append({
            "interval_start": bucket_start,
            "tHandle": t_handle,
            "tAcw": t_acw,
            "tHold": t_hold,
            "conversation_id": conv.conversation_id
        })
        
    if not data:
        return pd.DataFrame()
        
    df = pd.DataFrame(data)
    
    aggregated = df.groupby("interval_start").agg(
        total_tHandle=("tHandle", "sum"),
        total_tAcw=("tAcw", "sum"),
        total_tHold=("tHold", "sum"),
        conversation_count=("conversation_id", "count")
    ).reset_index()
    
    return aggregated.sort_values("interval_start")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

  • What causes it: The Service Account lacks the analytics:conversation:view scope, or the user associated with the token does not have permission to view analytics for the queried agents.
  • How to fix it:
    1. Go to Genesys Cloud Admin > Security > OAuth Clients.
    2. Verify the client has analytics:conversation:view.
    3. Ensure the Service Account user has a role with “View Analytics” permissions.

Error: 429 Too Many Requests

  • What causes it: Analytics queries are expensive. Querying large date ranges or high-volume teams can trigger rate limits.
  • How to fix it: Implement exponential backoff. The SDK does not automatically retry 429s for analytics endpoints in all versions. You should catch the PureCloudException with status 429 and wait before retrying.
import time

# Inside the fetch loop, wrap the API call:
try:
    response = api_client.post_analytics_conversations_details_query(...)
except Exception as e:
    if hasattr(e, 'status') and e.status == 429:
        retry_after = e.headers.get('Retry-After', 5)
        logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
        time.sleep(int(retry_after))
        continue # Retry the loop iteration
    else:
        raise

Error: NoneType object has no attribute entities

  • What causes it: The API returned an empty result or an error response that was not parsed correctly, or the response object is null due to a network timeout.
  • How to fix it: Ensure you are checking if response: before accessing response.entities. Also, verify that the date_from and date_to are in the past. The Analytics API has a “freshness” delay (typically 15-30 minutes). Querying now() may return empty results. Always query at least 30 minutes into the past.
# Adjust end_date to be 30 minutes ago
end_date = datetime.utcnow() - timedelta(minutes=30)

Error: tHandle includes tHold and tAcw

  • What causes it: Misunderstanding of metric definitions. tHandle is the total time the agent was engaged. It is calculated as tTalk + tHold + tAcw.
  • How to fix it: If you need to calculate “Pure Talk Time,” use tHandle - tHold - tAcw. Do not sum tHandle, tHold, and tAcw together if you intend to report total time, as this will triple-count the hold and ACW periods.

Official References