Query Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals

Query Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals

What You Will Build

  • You will build a Python script that queries Genesys Cloud CX for granular agent interaction metrics, specifically time in handle, after-call work, and hold.
  • You will use the GET /api/v2/analytics/conversations/details/query endpoint with a 30-minute time bucket configuration.
  • You will use the Python requests library with manual OAuth2 token management to retrieve and parse the JSON response.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant). Public clients cannot access the Analytics API.
  • Required OAuth Scopes:
    • analytics:conversations:view
    • analytics:agents:view
  • SDK/API Version: Genesys Cloud CX API v2.
  • Language/Runtime: Python 3.8+.
  • External Dependencies:
    • requests (standard HTTP library)
    • python-dotenv (for secure credential management)

Install dependencies via pip:

pip install requests python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0. For server-to-server integrations, the Client Credentials Grant is the standard. You must exchange your client ID and secret for an access token. This token expires after 3600 seconds (1 hour).

Create a .env file in your project root:

GENESYS_REGION=us-east-1
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here

The following Python class handles token acquisition and basic caching. It avoids the overhead of importing the full Genesys SDK for this specific analytics query, giving you precise control over the HTTP payload.

import os
import time
import requests
from dotenv import load_dotenv

load_dotenv()

class GenesysAuth:
    def __init__(self):
        self.region = os.getenv("GENESYS_REGION", "my.genesys.cloud")
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        
        if not self.client_id or not self.client_secret:
            raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")

        # Determine the base URL based on region
        if self.region == "us-east-1":
            self.base_url = "https://api.mypurecloud.com"
        elif self.region == "eu-west-1":
            self.base_url = "https://api.eu.pure.cloud"
        else:
            self.base_url = f"https://api.{self.region}.pure.cloud"

        self.access_token = None
        self.token_expiry = 0

    def get_access_token(self) -> str:
        """
        Returns a valid access token. If the current token is expired or missing,
        it fetches a new one from the OAuth endpoint.
        """
        # Check if token is still valid (add 60s buffer for clock skew)
        if self.access_token and time.time() < (self.token_expiry - 60):
            return self.access_token

        token_url = f"{self.base_url}/oauth/token"
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "analytics:conversations:view analytics:agents:view"
        }

        try:
            response = requests.post(token_url, data=data)
            response.raise_for_status()
            token_data = response.json()
            
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + token_data["expires_in"]
            
            return self.access_token

        except requests.exceptions.HTTPError as e:
            if response.status_code == 401:
                raise Exception("OAuth 401: Invalid client credentials.") from e
            raise Exception(f"OAuth Error {response.status_code}: {response.text}") from e

    def get_headers(self) -> dict:
        """Returns headers required for API calls."""
        token = self.get_access_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

Implementation

Step 1: Constructing the Analytics Query

The core of this tutorial is the query body sent to /api/v2/analytics/conversations/details/query. To get 30-minute intervals, you must set the timeBucket parameter to PT30M (ISO 8601 duration format).

You must also specify the groupBy field as agent to break down metrics by individual user. If you omit this, you get aggregate data for the entire organization, which is rarely useful for utilization analysis.

Define the query parameters. You need to specify a dateFrom and dateTo. For this example, we will query the last 24 hours.

import json
from datetime import datetime, timedelta, timezone

def build_query_body() -> dict:
    """
    Constructs the JSON body for the analytics query.
    """
    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(hours=24)

    # Format dates as ISO 8601 strings
    date_to = now.isoformat()
    date_from = yesterday.isoformat()

    query_body = {
        "dateFrom": date_from,
        "dateTo": date_to,
        "timeBucket": "PT30M",  # 30-minute intervals
        "groupBy": ["agent"],   # Breakdown by agent
        "view": "default",
        "select": [
            "agent.name",
            "agent.id",
            "tHandle",
            "tAcw",
            "tHold",
            "interactions"
        ],
        "where": [
            {
                "path": "conversationType",
                "operator": "eq",
                "value": "voice"
            }
        ],
        "size": 1000  # Max records per page
    }
    
    return query_body

Critical Parameter Explanation:

  • timeBucket: Setting this to PT30M tells the analytics engine to bin results into 30-minute slots. Without this, you receive a single aggregate sum for the entire time range.
  • groupBy: Including agent here is mandatory. The API returns a groups array where each item represents a unique agent.
  • select: Explicitly selecting tHandle, tAcw, and tHold ensures these fields are populated in the metrics object. While default view includes many metrics, being explicit reduces payload size and clarifies intent.

Step 2: Executing the Query with Pagination

The Analytics API supports pagination. If you have many agents or many time buckets, the response may exceed the size limit. You must check the nextPage token in the response and loop until all data is retrieved.

Additionally, you must handle HTTP 429 (Too Many Requests). Genesys Cloud enforces strict rate limits on analytics endpoints. A naive script will fail if it hammers the API. The following implementation includes a basic exponential backoff retry mechanism.

import time
import requests

def fetch_utilization_data(auth: GenesysAuth) -> list:
    """
    Fetches agent utilization metrics with pagination and retry logic.
    Returns a list of all result groups.
    """
    endpoint = f"{auth.base_url}/api/v2/analytics/conversations/details/query"
    headers = auth.get_headers()
    body = build_query_body()
    
    all_results = []
    page_token = None
    max_retries = 3
    base_delay = 2  # seconds

    while True:
        # Add pagination token if it exists
        if page_token:
            body["nextPage"] = page_token
        
        attempt = 0
        while attempt < max_retries:
            try:
                response = requests.post(endpoint, headers=headers, json=body)
                
                # Handle Rate Limiting
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
                    print(f"Rate limited (429). Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    attempt += 1
                    continue
                
                # Handle other errors
                response.raise_for_status()
                break # Success, exit retry loop
                
            except requests.exceptions.HTTPError as e:
                if response.status_code in [401, 403]:
                    raise Exception(f"Authentication/Authorization failed: {response.status_code}") from e
                attempt += 1
                if attempt == max_retries:
                    raise Exception(f"Failed to fetch data after {max_retries} retries: {response.text}") from e
                time.sleep(base_delay * (2 ** attempt))
        
        data = response.json()
        
        # Accumulate results
        if "groups" in data and data["groups"]:
            all_results.extend(data["groups"])
        
        # Check for next page
        page_token = data.get("nextPage")
        if not page_token:
            break
            
        print(f"Page fetched. Total groups so far: {len(all_results)}")
        # Small delay to be polite to the API between pages
        time.sleep(1)

    return all_results

Step 3: Processing and Calculating Utilization

The raw response provides seconds for tHandle, tAcw, and tHold. To calculate “utilization,” you typically compare active time against the total available time in the bucket.

However, the API does not directly return “available time” per agent per bucket because agents have different shifts, breaks, and statuses. A common proxy for utilization in a fixed time bucket is:

$$ \text{Utilization} = \frac{tHandle + tAcw + tHold}{\text{Bucket Duration in Seconds}} $$

For a 30-minute bucket, the denominator is 1800 seconds. Note that this metric can exceed 100% if an agent handles multiple concurrent interactions (rare in standard voice queues but possible in blended environments) or if the data aggregates multiple queues. More commonly, it represents the density of work.

def calculate_utilization_metrics(results: list) -> list:
    """
    Processes the raw API groups into a structured list of utilization records.
    """
    bucket_duration_seconds = 30 * 60  # 30 minutes in seconds
    processed_metrics = []

    for group in results:
        agent_id = group.get("agent", {}).get("id")
        agent_name = group.get("agent", {}).get("name", "Unknown")
        
        # The 'metrics' object contains the sums for this group
        metrics = group.get("metrics", {})
        
        # Extract seconds. Default to 0 if missing
        t_handle = metrics.get("tHandle", 0)
        t_acw = metrics.get("tAcw", 0)
        t_hold = metrics.get("tHold", 0)
        interactions = metrics.get("interactions", 0)
        
        # Calculate total active time
        total_active_seconds = t_handle + t_acw + t_hold
        
        # Calculate utilization percentage relative to the bucket size
        # This represents how "full" the 30-min block was for this agent
        utilization_pct = (total_active_seconds / bucket_duration_seconds) * 100
        
        record = {
            "agent_id": agent_id,
            "agent_name": agent_name,
            "t_handle_sec": t_handle,
            "t_acw_sec": t_acw,
            "t_hold_sec": t_hold,
            "total_active_sec": total_active_seconds,
            "interactions": interactions,
            "utilization_pct": round(utilization_pct, 2)
        }
        
        processed_metrics.append(record)

    return processed_metrics

Complete Working Example

Combine the authentication, query, and processing logic into a single runnable script.

import os
import sys
import json
import time
import requests
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

class GenesysAuth:
    def __init__(self):
        self.region = os.getenv("GENESYS_REGION", "us-east-1")
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        
        if not self.client_id or not self.client_secret:
            raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")

        if self.region == "us-east-1":
            self.base_url = "https://api.mypurecloud.com"
        elif self.region == "eu-west-1":
            self.base_url = "https://api.eu.pure.cloud"
        else:
            self.base_url = f"https://api.{self.region}.pure.cloud"

        self.access_token = None
        self.token_expiry = 0

    def get_access_token(self) -> str:
        if self.access_token and time.time() < (self.token_expiry - 60):
            return self.access_token

        token_url = f"{self.base_url}/oauth/token"
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "analytics:conversations:view analytics:agents:view"
        }

        try:
            response = requests.post(token_url, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + token_data["expires_in"]
            return self.access_token
        except requests.exceptions.HTTPError as e:
            raise Exception(f"OAuth Error: {e}") from e

    def get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_access_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

def build_query_body() -> dict:
    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(hours=24)
    
    return {
        "dateFrom": yesterday.isoformat(),
        "dateTo": now.isoformat(),
        "timeBucket": "PT30M",
        "groupBy": ["agent"],
        "view": "default",
        "select": ["agent.name", "agent.id", "tHandle", "tAcw", "tHold", "interactions"],
        "where": [{"path": "conversationType", "operator": "eq", "value": "voice"}],
        "size": 1000
    }

def fetch_utilization_data(auth: GenesysAuth) -> list:
    endpoint = f"{auth.base_url}/api/v2/analytics/conversations/details/query"
    headers = auth.get_headers()
    body = build_query_body()
    
    all_results = []
    page_token = None
    max_retries = 3
    base_delay = 2

    while True:
        if page_token:
            body["nextPage"] = page_token
        
        attempt = 0
        while attempt < max_retries:
            try:
                response = requests.post(endpoint, headers=headers, json=body)
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
                    time.sleep(retry_after)
                    attempt += 1
                    continue
                
                response.raise_for_status()
                break
                
            except requests.exceptions.HTTPError as e:
                if response.status_code in [401, 403]:
                    raise e
                attempt += 1
                if attempt == max_retries:
                    raise e
                time.sleep(base_delay * (2 ** attempt))
        
        data = response.json()
        if "groups" in data and data["groups"]:
            all_results.extend(data["groups"])
        
        page_token = data.get("nextPage")
        if not page_token:
            break
            
        time.sleep(1)

    return all_results

def calculate_utilization_metrics(results: list) -> list:
    bucket_duration_seconds = 30 * 60
    processed_metrics = []

    for group in results:
        agent_id = group.get("agent", {}).get("id")
        agent_name = group.get("agent", {}).get("name", "Unknown")
        metrics = group.get("metrics", {})
        
        t_handle = metrics.get("tHandle", 0)
        t_acw = metrics.get("tAcw", 0)
        t_hold = metrics.get("tHold", 0)
        interactions = metrics.get("interactions", 0)
        
        total_active_seconds = t_handle + t_acw + t_hold
        utilization_pct = (total_active_seconds / bucket_duration_seconds) * 100
        
        processed_metrics.append({
            "agent_id": agent_id,
            "agent_name": agent_name,
            "t_handle_sec": t_handle,
            "t_acw_sec": t_acw,
            "t_hold_sec": t_hold,
            "interactions": interactions,
            "utilization_pct": round(utilization_pct, 2)
        })
    return processed_metrics

def main():
    try:
        auth = GenesysAuth()
        print("Fetching utilization data...")
        raw_data = fetch_utilization_data(auth)
        print(f"Retrieved {len(raw_data)} data points.")
        
        metrics = calculate_utilization_metrics(raw_data)
        
        # Sort by utilization descending to find busiest agents
        metrics.sort(key=lambda x: x["utilization_pct"], reverse=True)
        
        print("\nTop 5 Agent Utilization Buckets:")
        print("-" * 60)
        for m in metrics[:5]:
            print(f"Agent: {m['agent_name']}")
            print(f"  Handle: {m['t_handle_sec']}s | ACW: {m['t_acw_sec']}s | Hold: {m['t_hold_sec']}s")
            print(f"  Interactions: {m['interactions']}")
            print(f"  Utilization: {m['utilization_pct']}%")
            print("-" * 60)

    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth client does not have the analytics:conversations:view scope.
Fix:

  1. Go to Admin > Security > OAuth 2.0 Clients.
  2. Select your client.
  3. Navigate to the “Scopes” tab.
  4. Ensure analytics:conversations:view is checked.
  5. Regenerate the access token.

Error: 429 Too Many Requests

Cause: The analytics engine is under heavy load, or your script is sending requests faster than the allowed rate (typically 10-20 requests per second for analytics, but this varies by organization tier).
Fix:

  1. Implement the retry logic shown in fetch_utilization_data.
  2. Check the Retry-After header in the response.
  3. Increase the time.sleep() duration between pages.

Error: Empty Groups or Zero Metrics

Cause: The time range (dateFrom to dateTo) contains no voice conversations, or the where filter is too restrictive.
Fix:

  1. Verify that conversationType is set correctly. If you are querying chat, change "value": "voice" to "value": "chat".
  2. Widen the time range to ensure data exists.
  3. Check if the agents in the groupBy are currently active. Historical data for inactive agents may be sparse.

Error: tHandle is Null or Missing

Cause: The select array in the query body did not include tHandle.
Fix: Ensure the select array in build_query_body explicitly lists the metric names you require. The default view does not guarantee all metric fields are present in the JSON if they are not explicitly selected or if the data type does not support them (e.g., tAcw is not applicable for all chat interactions depending on configuration).

Official References