Querying Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals

Querying Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals

What You Will Build

  • This tutorial demonstrates how to retrieve granular agent utilization metrics, specifically handle time, after-call work, and hold time, segmented into 30-minute intervals.
  • The solution utilizes the Genesys Cloud CX Analytics Conversations API (/api/v2/analytics/conversations/details/query).
  • The implementation is provided in Python using the genesys-cloud-sdk-python package.

Prerequisites

  • OAuth Client Type: Service Account or Confidential Client.
  • Required Scopes:
    • analytics:conversation:view (Primary requirement for querying conversation data).
    • conversation:view (Optional, but often required if you need to resolve user IDs to names in post-processing).
  • SDK Version: genesys-cloud-sdk-python v10.0.0 or higher.
  • Language/Runtime: Python 3.8+.
  • External Dependencies:
    • genesys-cloud-sdk-python: The official Genesys Cloud SDK.
    • python-dotenv: For secure credential management.

Install the dependencies using pip:

pip install genesys-cloud-sdk-python python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials Flow is the standard approach. The SDK handles token acquisition and refresh automatically when initialized correctly.

Create a .env file in your project root to store your credentials securely. Never hardcode these values.

GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_REGION=us-east-1

Initialize the SDK client in your script. The PlatformClient acts as the facade for all API calls.

from genesyscloud.platform.client import PlatformClient
from genesyscloud import configuration
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

def get_platform_client():
    """
    Initializes and returns the Genesys Cloud Platform Client.
    """
    config = configuration.Configuration()
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    # Set the region explicitly. Default is us-east-1.
    # Change this if your organization is in a different region.
    if os.getenv("GENESYS_REGION"):
        config.base_path = f"https://{os.getenv('GENESYS_REGION')}.mypurecloud.com"
        
    return PlatformClient(config)

Implementation

Step 1: Constructing the Analytics Query Body

The core of this tutorial is the query payload sent to the Analytics API. To get metrics broken down by 30-minute intervals, we must use the interval parameter within the groupBy array and specify the timeUnit and intervalSize.

The critical fields for utilization are:

  • tHandle: Total handle time (Talk + Hold + Wrap-up).
  • tAcw: After-call work time.
  • tHold: Hold time.

We will group by user to get per-agent data and by interval to get the time segmentation.

from genesyscloud.analytics.rest import AnalyticsApi

def build_query_body(start_time: str, end_time: str, user_ids: list = None) -> dict:
    """
    Constructs the JSON body for the analytics query.
    
    Args:
        start_time: ISO 8601 start time (e.g., '2023-10-01T00:00:00Z')
        end_time: ISO 8601 end time (e.g., '2023-10-01T23:59:59Z')
        user_ids: Optional list of user IDs to filter by. If None, queries all users.
    
    Returns:
        dict: The query payload.
    """
    
    # Define the metrics we want to aggregate
    metrics = [
        "tHandle",
        "tAcw",
        "tHold"
    ]
    
    # Define the group-by clauses
    group_by = [
        "user",
        "interval"
    ]
    
    # Configure the time interval
    # This is the key to getting 30-minute buckets
    interval_config = {
        "timeUnit": "minute",
        "intervalSize": 30
    }
    
    query_body = {
        "dateFrom": start_time,
        "dateTo": end_time,
        "metrics": metrics,
        "groupBy": group_by,
        "interval": interval_config
    }
    
    # Optional: Filter by specific agents
    if user_ids:
        query_body["filter"] = {
            "type": "and",
            "filters": [
                {
                    "type": "field",
                    "field": "userId",
                    "op": "in",
                    "values": user_ids
                }
            ]
        }
        
    return query_body

Why this structure?
The interval object inside the query body tells the analytics engine to bucket the raw conversation data into 30-minute windows starting from dateFrom. Without this, the API returns a single aggregated total for the entire dateFrom to dateTo range.

Step 2: Executing the Query and Handling Pagination

The Analytics API returns paginated results. The nextPageUri field indicates if more data is available. You must loop through pages until nextPageUri is null.

Additionally, you must handle HTTP errors. Common errors include:

  • 401 Unauthorized: Invalid OAuth token.
  • 403 Forbidden: Missing analytics:conversation:view scope.
  • 400 Bad Request: Invalid date format or query structure.
  • 429 Too Many Requests: Rate limiting. The SDK handles basic retries, but you should implement exponential backoff for robustness.
import time
from genesyscloud.analytics.rest import ApiException

def fetch_utilization_data(client: PlatformClient, query_body: dict) -> list:
    """
    Fetches analytics data, handling pagination and errors.
    
    Args:
        client: The initialized PlatformClient.
        query_body: The dictionary payload for the query.
        
    Returns:
        list: A list of all result objects from all pages.
    """
    analytics_api = AnalyticsApi(client)
    all_results = []
    
    try:
        # Initial request
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        # Process the first page
        if response.entities:
            all_results.extend(response.entities)
            
        # Pagination loop
        while response.next_page_uri:
            print(f"Fetching next page: {response.next_page_uri}")
            
            # The SDK helper method to follow the next page URI
            # Note: In newer SDK versions, you might need to parse the URI and call 
            # get_analytics_conversations_details_query_result directly if 
            # post_analytics... doesn't have a built-in follow_paginator.
            # Here we use the standard approach for the Python SDK:
            
            try:
                # The Python SDK often returns a response object with a method to follow pages
                # However, for precise control, we often reconstruct the GET request if the POST
                # returns a temporary URI.
                
                # Check if the response has a next_page_uri that is a full URL
                if response.next_page_uri.startswith("http"):
                    # Use the API client's get method for the result
                    # Extract the ID from the URI if necessary, or use the SDK's convenience method
                    # For simplicity in this tutorial, we assume the SDK handles the redirect 
                    # or we use the explicit GET endpoint.
                    
                    # Actual SDK pattern for following pages in Python:
                    response = analytics_api.post_analytics_conversations_details_query(
                        body=query_body, 
                        # Some SDK versions allow passing the page token directly
                        # If not, we rely on the response object's built-in iterator if available
                        # or manual GET.
                        # Below is the manual GET approach which is more robust across SDK versions
                    )
                    
                    # Let's use the explicit GET method for the result if the POST returns a URI
                    # This is the most reliable pattern for complex analytics queries
                    pass 
                    
            except Exception as e:
                print(f"Error fetching next page: {e}")
                break

        return all_results

    except ApiException as e:
        print(f"API Exception: {e.status} {e.reason}")
        if e.body:
            print(f"Error Body: {e.body}")
        raise

Correction on Pagination Strategy:
The Genesys Cloud Python SDK for Analytics post_analytics_conversations_details_query returns a PostAnalyticsConversationsDetailsQueryResponse. If the dataset is large, it returns a nextPageUri. You must use get_analytics_conversations_details_query_result with the ID extracted from that URI, or use the SDK’s paginator if available.

Here is the robust implementation using the get method for subsequent pages:

import re

def fetch_all_pages(client: PlatformClient, query_body: dict) -> list:
    """
    Robust pagination handler for Analytics API.
    """
    analytics_api = AnalyticsApi(client)
    all_results = []
    
    try:
        # 1. Post the query to create the temporary result set
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        current_response = response
        
        while True:
            # 2. Collect entities from the current page
            if current_response.entities:
                all_results.extend(current_response.entities)
            
            # 3. Check for next page
            if not current_response.next_page_uri:
                break
                
            # 4. Extract the query result ID from the next_page_uri
            # URI format: /api/v2/analytics/conversations/details/query/{id}/result
            match = re.search(r"/api/v2/analytics/conversations/details/query/([^/]+)/result", current_response.next_page_uri)
            if not match:
                print("Could not parse next page URI.")
                break
                
            query_result_id = match.group(1)
            
            # 5. Fetch the next page using GET
            print(f"Fetching page with ID: {query_result_id}")
            current_response = analytics_api.get_analytics_conversations_details_query_result(query_result_id)
            
            # 6. Respect rate limits (sleep briefly if needed, though SDK usually handles this)
            time.sleep(0.5)
            
    except ApiException as e:
        print(f"Failed to fetch analytics data: {e.status} - {e.reason}")
        raise
        
    return all_results

Step 3: Processing and Formatting Results

The raw response contains a list of entity objects. Each entity represents a unique combination of user and interval. You need to map the user IDs to names (optional but recommended) and format the metrics into a readable structure.

Note: The metrics are returned in seconds. You should convert them to minutes or hours for reporting.

from typing import Dict, List
from datetime import datetime

def process_results(results: list, user_map: Dict[str, str] = None) -> List[dict]:
    """
    Processes raw analytics entities into a clean list of utilization records.
    
    Args:
        results: List of entity objects from the API.
        user_map: Optional dictionary mapping userId to userName.
        
    Returns:
        List[dict]: Cleaned data records.
    """
    processed_data = []
    
    for entity in results:
        # Extract metrics (in seconds)
        t_handle_sec = entity.metrics.get("tHandle", {}).get("value", 0)
        t_acw_sec = entity.metrics.get("tAcw", {}).get("value", 0)
        t_hold_sec = entity.metrics.get("tHold", {}).get("value", 0)
        
        # Extract interval start time
        interval_start = entity.interval.get("timeFrom", "")
        
        # Extract User ID
        user_id = entity.user.get("id", "")
        user_name = user_id
        if user_map and user_id in user_map:
            user_name = user_map[user_id]
            
        # Convert seconds to minutes for readability
        record = {
            "user_id": user_id,
            "user_name": user_name,
            "interval_start": interval_start,
            "t_handle_minutes": round(t_handle_sec / 60, 2),
            "t_acw_minutes": round(t_acw_sec / 60, 2),
            "t_hold_minutes": round(t_hold_sec / 60, 2)
        }
        
        processed_data.append(record)
        
    return processed_data

def get_user_map(client: PlatformClient, user_ids: list) -> Dict[str, str]:
    """
    Fetches user names for a list of user IDs.
    """
    users_api = client.users
    user_map = {}
    
    for user_id in user_ids:
        try:
            user = users_api.get_user(user_id, expand=["name"])
            user_map[user_id] = user.name
        except Exception:
            user_map[user_id] = user_id # Fallback to ID if fetch fails
            
    return user_map

Complete Working Example

This script combines all steps into a single executable module. It queries the last 24 hours of data for all agents, groups them by 30-minute intervals, and prints the results.

import os
import sys
import time
from datetime import datetime, timedelta
from genesyscloud.platform.client import PlatformClient
from genesyscloud import configuration
from genesyscloud.analytics.rest import AnalyticsApi, ApiException
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

def init_client():
    config = configuration.Configuration()
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    region = os.getenv("GENESYS_REGION", "us-east-1")
    config.base_path = f"https://{region}.mypurecloud.com"
    
    return PlatformClient(config)

def get_query_body():
    # Define time range: Last 24 hours
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Format as ISO 8601 with Z suffix for UTC
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    query_body = {
        "dateFrom": start_str,
        "dateTo": end_str,
        "metrics": ["tHandle", "tAcw", "tHold"],
        "groupBy": ["user", "interval"],
        "interval": {
            "timeUnit": "minute",
            "intervalSize": 30
        }
    }
    return query_body

def main():
    print("Initializing Genesys Cloud Client...")
    client = init_client()
    
    print("Building Analytics Query...")
    query_body = get_query_body()
    
    print("Fetching Data (this may take a moment)...")
    analytics_api = AnalyticsApi(client)
    
    try:
        # Step 1: Post Query
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        all_entities = []
        
        current_response = response
        
        # Step 2: Handle Pagination
        while True:
            if current_response.entities:
                all_entities.extend(current_response.entities)
            
            if not current_response.next_page_uri:
                break
                
            # Extract ID from next_page_uri
            import re
            match = re.search(r"/api/v2/analytics/conversations/details/query/([^/]+)/result", current_response.next_page_uri)
            if not match:
                break
            
            query_id = match.group(1)
            print(f"Fetching next page ID: {query_id}")
            
            # Small delay to avoid rate limiting
            time.sleep(0.5)
            
            current_response = analytics_api.get_analytics_conversations_details_query_result(query_id)
            
        # Step 3: Process Data
        if not all_entities:
            print("No conversation data found for the selected time period.")
            return

        print(f"\nRetrieved {len(all_entities)} data points.")
        print("User ID | Interval Start | Handle (min) | ACW (min) | Hold (min)")
        print("-" * 70)
        
        for entity in all_entities:
            user_id = entity.user.get("id", "Unknown")
            interval_start = entity.interval.get("timeFrom", "Unknown")
            
            # Get metric values safely
            t_handle = entity.metrics.get("tHandle", {}).get("value", 0)
            t_acw = entity.metrics.get("tAcw", {}).get("value", 0)
            t_hold = entity.metrics.get("tHold", {}).get("value", 0)
            
            # Convert to minutes
            h_min = round(t_handle / 60, 2)
            a_min = round(t_acw / 60, 2)
            ho_min = round(t_hold / 60, 2)
            
            print(f"{user_id[:10]:<10} | {interval_start:<20} | {h_min:<12} | {a_min:<10} | {ho_min}")
            
    except ApiException as e:
        print(f"API Error: {e.status} {e.reason}")
        print(f"Response Body: {e.body}")
        sys.exit(1)
    except Exception as e:
        print(f"Unexpected Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request - “Invalid interval configuration”

Cause: The intervalSize does not align with the timeUnit or the total query duration is too large for the engine to process in one go.

Fix: Ensure intervalSize is a positive integer. For 30-minute intervals, timeUnit must be minute and intervalSize must be 30. If querying a very large date range (e.g., 1 year), break it down into smaller chunks (e.g., monthly queries).

# Correct Configuration
"interval": {
    "timeUnit": "minute",
    "intervalSize": 30
}

Error: 403 Forbidden - “Insufficient permissions”

Cause: The OAuth token lacks the analytics:conversation:view scope.

Fix: Check your Service Account or Client Credentials configuration in the Genesys Cloud Admin Console. Navigate to Admin > Security > OAuth Clients (or Service Accounts) and ensure the scope is checked. Regenerate the token after updating the scope.

Error: 429 Too Many Requests

Cause: You are hitting the rate limit for the Analytics API. Analytics queries are computationally expensive.

Fix: Implement exponential backoff. The provided code includes a time.sleep(0.5) between pages. For production systems, monitor the Retry-After header in 429 responses and delay accordingly.

import time

if response.status_code == 429:
    retry_after = int(response.headers.get("Retry-After", 5))
    print(f"Rate limited. Waiting {retry_after} seconds...")
    time.sleep(retry_after)

Error: Empty Results for Specific Users

Cause: The user did not have any conversations in the specified time range, or the user ID is incorrect.

Fix: Verify the user ID is valid. Check if the user was active in the system during the dateFrom to dateTo window. Analytics only returns data for users who participated in conversations.

Official References