How to query agent utilization metrics (tHandle, tAcw, tHold) broken down by 30-minute intervals

How to query agent utilization metrics (tHandle, tAcw, tHold) broken down by 30-minute intervals

What You Will Build

  • A script that retrieves historical conversation details for a specific agent or group, filtering for conversations that occurred within a defined time window.
  • This tutorial uses the Genesys Cloud CX Analytics Conversations API (/api/v2/analytics/conversations/details/query) to fetch granular interaction data.
  • The implementation is provided in Python using the official Genesys Cloud Python SDK (genesys-cloud-sdk).

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC) for testing. For production, use Client Credentials.
  • Required OAuth Scopes:
    • analytics:conversation:view (Required to query conversation details)
    • user:view (Optional, if you need to resolve user IDs from names)
  • SDK Version: genesys-cloud-sdk v1.0.0 or later.
  • Language/Runtime: Python 3.8+.
  • External Dependencies:
    • genesys-cloud-sdk: The official SDK.
    • pandas: For data manipulation and time-bucketing logic (optional but recommended for interval calculation).
    • python-dotenv: To manage environment variables securely.

Install dependencies:

pip install genesys-cloud-sdk pandas python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0. The Python SDK handles the token acquisition and refresh automatically when initialized with client credentials. You must provide your Client ID and Client Secret.

Create a .env file in your project root:

GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_REGION=us-east-1

Initialize the SDK client:

import os
from dotenv import load_dotenv
from platform_sdk.client import PlatformClient
from platform_sdk.auth import OAuthClientCredentials
from platform_sdk.auth import OAuthResourceOwner

# Load environment variables
load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns the Genesys Cloud Platform Client.
    Uses Client Credentials flow for service-to-service authentication.
    """
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    region = os.getenv("GENESYS_REGION", "us-east-1")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")

    # Configure the OAuth provider
    oauth_provider = OAuthClientCredentials(
        client_id=client_id,
        client_secret=client_secret,
        region=region
    )

    # Initialize the platform client
    platform_client = PlatformClient(oauth_provider)
    
    # Verify connection by fetching a simple resource (optional but good for debugging)
    try:
        platform_client.users.get_user_by_id("0") # This will fail but validates auth handshake
    except Exception as e:
        # We expect this to fail because user ID 0 doesn't exist, 
        # but if it fails with 401, auth is broken.
        if "401" in str(e) or "403" in str(e):
            print(f"Authentication successful (unauthorized access to invalid user is expected).")
        else:
            print(f"Connection test failed: {e}")

    return platform_client

Implementation

Step 1: Construct the Analytics Query Payload

The core of this tutorial is the POST /api/v2/analytics/conversations/details/query endpoint. Unlike simple list endpoints, this accepts a complex JSON body that defines the time window, filter criteria, and specific metrics to return.

To get tHandle (Total Handle Time), tAcw (After Call Work), and tHold (Hold Time), you must request the conversations view with specific metrics.

Key Parameters:

  • view: Must be conversations.
  • interval: Set to PT30M for 30-minute intervals. Note: The API aggregates data into these buckets. If you need precise per-conversation timestamps to calculate custom buckets, you must use view=conversations with includeMetrics=false or rely on the timestamp field in the result. However, for utilization metrics, the aggregated view is more efficient.
  • filter: Use entities to scope down to specific agents or groups.
  • metrics: Specify tHandle, tAcw, tHold.
from platform_sdk.models import AnalyticsConversationDetailsQueryBody
from platform_sdk.models import AnalyticsConversationFilter
from platform_sdk.models import AnalyticsConversationEntityFilter
import datetime

def build_query_body(agent_ids: list[str], start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
    """
    Constructs the payload for the analytics conversation details query.
    
    Args:
        agent_ids: List of user IDs (strings) to filter by.
        start_time: Start of the reporting window (UTC).
        end_time: End of the reporting window (UTC).
    
    Returns:
        dict: The JSON-compatible payload for the API call.
    """
    # Define the filter for specific agents
    filter_criteria = AnalyticsConversationFilter(
        entities=[
            AnalyticsConversationEntityFilter(
                ids=agent_ids,
                type="users" # Filter by User (Agent) IDs
            )
        ]
    )

    # Define the query body
    # Note: The SDK models may vary slightly by version. 
    # We construct the dict directly here to ensure compatibility with the raw API structure 
    # if the SDK model is incomplete for complex nested queries.
    
    payload = {
        "view": "conversations",
        "interval": "PT30M", # 30-minute intervals
        "dateFrom": start_time.isoformat(),
        "dateTo": end_time.isoformat(),
        "filter": {
            "entities": [
                {
                    "ids": agent_ids,
                    "type": "users"
                }
            ]
        },
        "metrics": [
            "tHandle",
            "tAcw",
            "tHold",
            "count" # Total number of conversations in the bucket
        ],
        "groupings": [] # No additional groupings; we are aggregating by time interval only
    }

    return payload

Step 2: Execute the Query and Handle Pagination

The Analytics API returns a maximum of 1000 intervals per request. If your date range is large (e.g., last 30 days with 30-minute intervals), you will exceed this limit. You must implement pagination using the nextPageUrl provided in the response.

import requests
import time

def fetch_conversation_analytics(platform_client: PlatformClient, payload: dict) -> list[dict]:
    """
    Fetches analytics data, handling pagination automatically.
    
    Args:
        platform_client: The initialized PlatformClient.
        payload: The query payload constructed in Step 1.
    
    Returns:
        list[dict]: A list of all interval buckets returned by the API.
    """
    all_intervals = []
    next_page_url = platform_client.base_url + "/api/v2/analytics/conversations/details/query"
    page_count = 0

    while next_page_url:
        page_count += 1
        print(f"Fetching page {page_count}...")
        
        try:
            # Get the OAuth token from the platform client
            token = platform_client.oauth_provider.get_access_token()
            
            headers = {
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json"
            }

            response = requests.post(next_page_url, json=payload, headers=headers)
            
            # Handle HTTP Errors
            if response.status_code == 429:
                # Rate Limited. Wait and retry.
                retry_after = int(response.headers.get("Retry-After", 5))
                print(f"Rate limited. Waiting {retry_after} seconds...")
                time.sleep(retry_after)
                continue
            
            elif response.status_code == 401:
                raise Exception("Authentication failed. Token may be expired.")
            
            elif response.status_code == 403:
                raise Exception("Forbidden. Check OAuth scopes (analytics:conversation:view).")
            
            elif response.status_code >= 400:
                raise Exception(f"API Error {response.status_code}: {response.text}")

            data = response.json()

            # Append intervals from this page
            if "intervals" in data and data["intervals"]:
                all_intervals.extend(data["intervals"])
            
            # Check for next page
            next_page_url = data.get("nextPageUrl")
            
            # Safety break to prevent infinite loops if API is buggy
            if page_count > 100:
                print("Safety break: Too many pages.")
                break

        except requests.exceptions.RequestException as e:
            print(f"Network error: {e}")
            time.sleep(5)
            continue

    return all_intervals

Step 3: Process Results and Calculate Utilization

The API returns data in buckets. Each bucket contains the sum of metrics for that 30-minute window. To calculate utilization, you need to compare the sum of tHandle, tAcw, and tHold against the total available time in the interval (30 minutes = 1800 seconds).

Note on Logic:

  • tHandle = Talk + Hold + Wrap.
  • tAcw = After Call Work (often included in Handle depending on view, but explicitly requested here).
  • tHold = Hold time.

If you request tHandle, tAcw, and tHold separately, be aware of overlaps. In Genesys Cloud:

  • tHandle typically includes tTalk and tHold.
  • tAcw is separate from tHandle in some views, but in the conversations view, tHandle is often the total duration from answer to wrap-up.
  • To avoid double counting, we will use tHandle as the primary “busy” metric. If you need specific breakdowns, you can analyze tTalk, tHold, and tAcw individually.

For this tutorial, we will calculate Agent Occupancy for each 30-minute bucket:
$$ \text{Occupancy} = \frac{\sum tHandle}{\text{Bucket Duration (seconds)}} $$

import pandas as pd
from datetime import datetime

def process_utilization_data(intervals: list[dict]) -> pd.DataFrame:
    """
    Converts the raw API intervals into a DataFrame with calculated utilization metrics.
    
    Args:
        intervals: List of interval dictionaries from the API.
    
    Returns:
        pd.DataFrame: Processed data with occupancy percentages.
    """
    if not intervals:
        return pd.DataFrame()

    # Flatten the intervals for easier processing
    records = []
    
    for interval in intervals:
        # The interval key is the start time of the bucket (ISO format)
        interval_start = interval.get("interval")
        if not interval_start:
            continue
            
        # Extract metrics. The API returns sums for the bucket.
        # Metrics are nested under 'metrics' or directly in the interval object 
        # depending on the view. For 'conversations' view with groupings=[], 
        # the metrics are usually at the top level of the interval object if no entities are grouped,
        # OR inside an 'entities' array if grouped by user.
        
        # In our query, we did NOT group by user, we filtered by user.
        # Therefore, the result is aggregated across all selected users.
        # If you want per-user breakdown, you must add "users" to "groupings" in Step 1.
        
        # Let's assume we want per-user breakdown for accurate utilization.
        # We will modify the query in Step 1 to include groupings.
        pass

    # REVISION: The previous query aggregates all agents into one bucket.
    # To get per-agent utilization, we MUST group by user.
    # Let's update the processing logic to handle grouped results.
    
    # If the query included "groupings": ["users"], the response structure changes.
    # It returns a list of entities (users), each with their own intervals.
    
    # Since we didn't include groupings in Step 1, let's assume the user wants 
    # aggregate utilization for a TEAM, or we will adjust the code to support per-agent.
    
    # For this tutorial, let's assume we want PER-AGENT metrics.
    # We will re-read the intervals structure. If no groupings, it's one set of metrics.
    # If groupings=["users"], it's a list of users.
    
    # Let's stick to the simpler aggregate view for the code example below,
    # but note that for individual agent utilization, you must group by user.
    
    # Parsing the aggregate view (no groupings):
    for interval in intervals:
        timestamp = interval.get("interval")
        metrics = interval.get("metrics", {})
        
        # Extract values, defaulting to 0 if null
        t_handle = metrics.get("tHandle", 0)
        t_acw = metrics.get("tAcw", 0)
        t_hold = metrics.get("tHold", 0)
        count = metrics.get("count", 0)
        
        # Convert handle time from seconds to minutes for readability
        handle_minutes = t_handle / 60 if t_handle else 0
        hold_minutes = t_hold / 60 if t_hold else 0
        acw_minutes = t_acw / 60 if t_acw else 0
        
        # Calculate Occupancy (Handle Time / 30 mins)
        # 30 minutes = 1800 seconds
        occupancy = (t_handle / 1800) * 100 if t_handle else 0
        
        records.append({
            "interval_start": timestamp,
            "conversation_count": count,
            "t_handle_seconds": t_handle,
            "t_handle_minutes": handle_minutes,
            "t_hold_seconds": t_hold,
            "t_hold_minutes": hold_minutes,
            "t_acw_seconds": t_acw,
            "t_acw_minutes": acw_minutes,
            "occupancy_percent": round(occupancy, 2)
        })

    df = pd.DataFrame(records)
    return df

Revised Step 1: Query for Per-Agent Utilization

To get utilization for each agent, you must add "groupings": ["users"] to the payload. This changes the response structure significantly. The API returns a list of entities (users), and each user has their own intervals array.

Update build_query_body:

def build_query_body_grouped(agent_ids: list[str], start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
    payload = {
        "view": "conversations",
        "interval": "PT30M",
        "dateFrom": start_time.isoformat(),
        "dateTo": end_time.isoformat(),
        "filter": {
            "entities": [
                {
                    "ids": agent_ids,
                    "type": "users"
                }
            ]
        },
        "metrics": [
            "tHandle",
            "tAcw",
            "tHold",
            "count"
        ],
        "groupings": ["users"] # Crucial for per-agent breakdown
    }
    return payload

Revised Step 3: Process Grouped Results

def process_grouped_utilization_data(intervals: list[dict]) -> pd.DataFrame:
    """
    Processes intervals where data is grouped by user.
    """
    records = []
    
    # The top-level 'intervals' in the response contains the aggregated time buckets.
    # Inside each bucket, there is an 'entities' array containing the users.
    for interval in intervals:
        interval_start = interval.get("interval")
        entities = interval.get("entities", [])
        
        for entity in entities:
            user_id = entity.get("id")
            user_name = entity.get("name")
            metrics = entity.get("metrics", {})
            
            t_handle = metrics.get("tHandle", 0)
            t_acw = metrics.get("tAcw", 0)
            t_hold = metrics.get("tHold", 0)
            count = metrics.get("count", 0)
            
            # Calculate Occupancy
            occupancy = (t_handle / 1800) * 100 if t_handle else 0
            
            records.append({
                "interval_start": interval_start,
                "user_id": user_id,
                "user_name": user_name,
                "conversation_count": count,
                "t_handle_seconds": t_handle,
                "t_handle_minutes": round(t_handle / 60, 2),
                "t_hold_seconds": t_hold,
                "t_hold_minutes": round(t_hold / 60, 2),
                "t_acw_seconds": t_acw,
                "t_acw_minutes": round(t_acw / 60, 2),
                "occupancy_percent": round(occupancy, 2)
            })

    df = pd.DataFrame(records)
    return df

Complete Working Example

This script ties everything together. It authenticates, queries the last 24 hours for a specific agent, and prints a summary of their 30-minute utilization buckets.

import os
import datetime
from dotenv import load_dotenv
from platform_sdk.client import PlatformClient
from platform_sdk.auth import OAuthClientCredentials
import requests
import time
import pandas as pd

# --- Authentication ---
def get_platform_client() -> PlatformClient:
    load_dotenv()
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    region = os.getenv("GENESYS_REGION", "us-east-1")
    
    oauth_provider = OAuthClientCredentials(client_id=client_id, client_secret=client_secret, region=region)
    return PlatformClient(oauth_provider)

# --- Query Construction ---
def build_query_body(agent_id: str, start_time: datetime.datetime, end_time: datetime.datetime) -> dict:
    return {
        "view": "conversations",
        "interval": "PT30M",
        "dateFrom": start_time.isoformat(),
        "dateTo": end_time.isoformat(),
        "filter": {
            "entities": [{"ids": [agent_id], "type": "users"}]
        },
        "metrics": ["tHandle", "tAcw", "tHold", "count"],
        "groupings": ["users"]
    }

# --- Data Fetching ---
def fetch_analytics(platform_client: PlatformClient, payload: dict) -> list[dict]:
    all_intervals = []
    next_page_url = platform_client.base_url + "/api/v2/analytics/conversations/details/query"
    
    while next_page_url:
        token = platform_client.oauth_provider.get_access_token()
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        
        response = requests.post(next_page_url, json=payload, headers=headers)
        
        if response.status_code == 429:
            time.sleep(int(response.headers.get("Retry-After", 5)))
            continue
        elif response.status_code != 200:
            raise Exception(f"API Error {response.status_code}: {response.text}")
            
        data = response.json()
        if data.get("intervals"):
            all_intervals.extend(data["intervals"])
            
        next_page_url = data.get("nextPageUrl")
        
    return all_intervals

# --- Processing ---
def process_data(intervals: list[dict]) -> pd.DataFrame:
    records = []
    for interval in intervals:
        interval_start = interval.get("interval")
        for entity in interval.get("entities", []):
            metrics = entity.get("metrics", {})
            t_handle = metrics.get("tHandle", 0) or 0
            t_hold = metrics.get("tHold", 0) or 0
            t_acw = metrics.get("tAcw", 0) or 0
            
            records.append({
                "Time": interval_start,
                "Agent": entity.get("name"),
                "Handle (min)": round(t_handle / 60, 2),
                "Hold (min)": round(t_hold / 60, 2),
                "ACW (min)": round(t_acw / 60, 2),
                "Occupancy %": round((t_handle / 1800) * 100, 2)
            })
    return pd.DataFrame(records)

# --- Main Execution ---
if __name__ == "__main__":
    # Configuration
    AGENT_ID = "YOUR_AGENT_ID_HERE" # Replace with a valid User ID
    END_TIME = datetime.datetime.utcnow()
    START_TIME = END_TIME - datetime.timedelta(hours=24) # Last 24 hours
    
    if AGENT_ID == "YOUR_AGENT_ID_HERE":
        print("Please set AGENT_ID in the script.")
        exit()

    try:
        client = get_platform_client()
        print("Authenticating...")
        
        payload = build_query_body(AGENT_ID, START_TIME, END_TIME)
        print(f"Querying analytics for {AGENT_ID} from {START_TIME} to {END_TIME}...")
        
        intervals = fetch_analytics(client, payload)
        print(f"Fetched {len(intervals)} time buckets.")
        
        df = process_data(intervals)
        
        if not df.empty:
            print("\n--- Agent Utilization Report (30-min Intervals) ---")
            print(df.to_string(index=False))
            
            # Optional: Save to CSV
            df.to_csv("agent_utilization_report.csv", index=False)
            print("\nReport saved to agent_utilization_report.csv")
        else:
            print("No data found for the specified agent and time range.")

    except Exception as e:
        print(f"Error: {e}")

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Invalid Client ID/Secret, or the OAuth token has expired.
  • Fix: Ensure your .env file contains correct credentials. The SDK handles refresh, but if you are using a raw HTTP request outside the SDK, ensure you are fetching a fresh token.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the analytics:conversation:view scope.
  • Fix: Go to Genesys Cloud Admin > Security > OAuth Clients. Edit your client and add the analytics:conversation:view scope. Save and regenerate the client secret if necessary.

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for the Analytics API.
  • Fix: Implement exponential backoff. The code example above checks the Retry-After header and sleeps.

Error: Empty Response or Null Metrics

  • Cause: The time range does not contain any conversations for the specified agent, or the agent ID is invalid.
  • Fix: Verify the AGENT_ID is a valid User ID (not a name). Check the dateFrom and dateTo format (ISO 8601 UTC). Ensure the agent actually had conversations in that window.

Error: Metrics Not Summing Correctly

  • Cause: Confusion between tHandle and tTalk.
  • Fix: tHandle includes Talk, Hold, and Wrap. If you sum tTalk + tHold + tAcw, you may get a different number than tHandle depending on how the system defines “Wrap”. For utilization, tHandle is the standard metric for “busy” time.

Official References