Query Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals in Genesys Cloud

Query Agent Utilization Metrics (tHandle, tAcw, tHold) by 30-Minute Intervals in Genesys Cloud

What You Will Build

  • You will build a Python script that retrieves agent interaction metrics, specifically time in handle, time in after-call work, and time in hold, aggregated into 30-minute intervals.
  • You will use the Genesys Cloud CX Analytics Conversations Details Query API (/api/v2/analytics/conversations/details/query).
  • You will use Python 3.9+ with the requests library and the official genesyscloud SDK.

Prerequisites

  • OAuth Client: You need a Genesys Cloud OAuth client with the following scopes:
    • analytics:conversation:read
    • analytics:report:read
  • SDK Version: genesyscloud SDK version 148.0.0 or later (Python).
  • Runtime: Python 3.9 or higher.
  • Dependencies:
    • genesyscloud
    • requests
    • pydantic (included with SDK)
    • pandas (optional, for data manipulation in the example)

Install the dependencies:

pip install genesyscloud requests pandas

Authentication Setup

Genesys Cloud uses OAuth 2.0. For server-to-server integrations, use the Client Credentials Grant. You must cache the access token and handle expiration. The following class handles token acquisition and refresh logic.

import time
import requests
from typing import Optional, Dict

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://login.{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0
        self._headers: Dict[str, str] = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json"
        }

    def _get_token(self) -> str:
        """Fetches a new OAuth access token."""
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        
        response = requests.post(self.token_url, data=payload, headers=self._headers)
        response.raise_for_status()
        
        data = response.json()
        self.access_token = data["access_token"]
        # Expires_in is in seconds; add to current time for expiry check
        self.token_expiry = time.time() + data["expires_in"]
        return self.access_token

    def get_token(self) -> str:
        """Returns a valid access token, refreshing if necessary."""
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token
        return self._get_token()

    def get_authorization_header(self) -> Dict[str, str]:
        """Returns the Authorization header dict."""
        token = self.get_token()
        return {"Authorization": f"Bearer {token}"}

Implementation

Step 1: Constructing the Analytics Query

The analytics/conversations/details/query endpoint requires a specific JSON body structure. To get 30-minute intervals, you must set the interval field to PT30M (ISO 8601 duration). To break down metrics by agent, you must include groupBy: ["agentId"].

The metrics tHandle, tAcw, and tHold are available in the summary object of the response. Note that tHandle is the total duration of the interaction, tAcw is after-call work, and tHold is time spent on hold.

from datetime import datetime, timedelta
from typing import Dict, Any

def build_analytics_query(
    start_time: datetime, 
    end_time: datetime, 
    queue_ids: list = None
) -> Dict[str, Any]:
    """
    Constructs the JSON body for the analytics query.
    
    Args:
        start_time: Start of the reporting period.
        end_time: End of the reporting period.
        queue_ids: Optional list of queue IDs to filter by.
        
    Returns:
        Dictionary representing the API request body.
    """
    
    # Format times as ISO 8601 strings with timezone (UTC)
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    query_body = {
        "dateFrom": start_str,
        "dateTo": end_str,
        "interval": "PT30M",  # 30-minute intervals
        "groupBy": ["agentId"],  # Break down by agent
        "view": "agent",  # Use the 'agent' view for agent-specific metrics
        "select": [
            "tHandle",
            "tAcw",
            "tHold",
            "agentId",
            "agentName"
        ],
        "filters": []
    }

    # Optional: Filter by specific queues
    if queue_ids:
        query_body["filters"].append({
            "type": "queue",
            "ids": queue_ids
        })

    return query_body

Step 2: Executing the Query and Handling Pagination

Genesys Cloud analytics queries can return large datasets. The API supports pagination via the pageToken field in the response. You must loop until pageToken is null. Additionally, you must handle HTTP 429 (Too Many Requests) by implementing exponential backoff.

import json
import time
from typing import List, Dict, Any

def fetch_analytics_data(
    auth: GenesysAuth, 
    query_body: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Executes the analytics query and handles pagination and rate limiting.
    
    Args:
        auth: GenesysAuth instance.
        query_body: The constructed query dictionary.
        
    Returns:
        List of dictionaries containing the aggregated metrics.
    """
    base_url = f"https://api.{auth.environment}/api/v2/analytics/conversations/details/query"
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    all_results = []
    page_token = None
    max_retries = 5
    
    while True:
        # Add pageToken if it exists
        request_body = query_body.copy()
        if page_token:
            request_body["pageToken"] = page_token

        # Attempt request with retry logic for 429
        for attempt in range(max_retries):
            try:
                headers.update(auth.get_authorization_header())
                response = requests.post(base_url, headers=headers, json=request_body)
                
                if response.status_code == 429:
                    # Exponential backoff: 2^attempt seconds
                    wait_time = 2 ** attempt
                    print(f"Rate limited (429). Waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                
                response.raise_for_status()
                break  # Success, exit retry loop
                
            except requests.exceptions.HTTPError as e:
                if response.status_code != 429:
                    raise e  # Re-raise non-429 errors
                if attempt == max_retries - 1:
                    raise Exception("Max retries exceeded for 429 error.")

        data = response.json()
        
        # Extract results
        if "summary" in data and data["summary"]:
            all_results.extend(data["summary"])
        
        # Check for next page
        page_token = data.get("pageToken")
        if not page_token:
            break
            
        print(f"Processed page. Fetching next page token: {page_token[:10]}...")
        # Small delay to be polite to the API even if not rate limited
        time.sleep(0.5)

    return all_results

Step 3: Processing and Formatting Results

The response from analytics/conversations/details/query returns a list of summary objects. Each object contains the metrics for a specific interval and agent combination. The metrics are in milliseconds. You must convert them to minutes or seconds for readability.

Key fields in the response object:

  • dateFrom: Start of the 30-minute interval.
  • dateTo: End of the 30-minute interval.
  • agentId: ID of the agent.
  • agentName: Name of the agent.
  • tHandle: Total handle time in milliseconds.
  • tAcw: After-call work time in milliseconds.
  • tHold: Hold time in milliseconds.
import pandas as pd
from typing import List, Dict, Any

def process_metrics(raw_data: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Converts raw API response into a structured DataFrame.
    
    Args:
        raw_data: List of summary objects from the API.
        
    Returns:
        Pandas DataFrame with cleaned metrics.
    """
    if not raw_data:
        return pd.DataFrame()

    # Convert milliseconds to seconds for easier reading
    processed_data = []
    
    for item in raw_data:
        processed_item = {
            "interval_start": item["dateFrom"],
            "interval_end": item["dateTo"],
            "agent_id": item["agentId"],
            "agent_name": item.get("agentName", "Unknown"),
            # Convert ms to seconds
            "handle_time_sec": item.get("tHandle", 0) / 1000,
            "acw_time_sec": item.get("tAcw", 0) / 1000,
            "hold_time_sec": item.get("tHold", 0) / 1000
        }
        processed_data.append(processed_item)

    df = pd.DataFrame(processed_data)
    
    # Sort by interval start time and agent name
    df = df.sort_values(by=["interval_start", "agent_name"]).reset_index(drop=True)
    
    return df

Complete Working Example

This script combines all previous steps into a single runnable module. Replace CLIENT_ID and CLIENT_SECRET with your actual credentials.

import os
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any

# --- Authentication Class ---
class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://login.{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0
        self._headers: Dict[str, str] = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json"
        }

    def _get_token(self) -> str:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        response = requests.post(self.token_url, data=payload, headers=self._headers)
        response.raise_for_status()
        data = response.json()
        self.access_token = data["access_token"]
        self.token_expiry = time.time() + data["expires_in"]
        return self.access_token

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token
        return self._get_token()

    def get_authorization_header(self) -> Dict[str, str]:
        token = self.get_token()
        return {"Authorization": f"Bearer {token}"}

# --- Query Builder ---
def build_analytics_query(
    start_time: datetime, 
    end_time: datetime, 
    queue_ids: list = None
) -> Dict[str, Any]:
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    query_body = {
        "dateFrom": start_str,
        "dateTo": end_str,
        "interval": "PT30M",
        "groupBy": ["agentId"],
        "view": "agent",
        "select": [
            "tHandle",
            "tAcw",
            "tHold",
            "agentId",
            "agentName"
        ],
        "filters": []
    }

    if queue_ids:
        query_body["filters"].append({
            "type": "queue",
            "ids": queue_ids
        })

    return query_body

# --- Data Fetcher ---
def fetch_analytics_data(
    auth: GenesysAuth, 
    query_body: Dict[str, Any]
) -> List[Dict[str, Any]]:
    base_url = f"https://api.{auth.environment}/api/v2/analytics/conversations/details/query"
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    all_results = []
    page_token = None
    max_retries = 5
    
    while True:
        request_body = query_body.copy()
        if page_token:
            request_body["pageToken"] = page_token

        for attempt in range(max_retries):
            try:
                headers.update(auth.get_authorization_header())
                response = requests.post(base_url, headers=headers, json=request_body)
                
                if response.status_code == 429:
                    wait_time = 2 ** attempt
                    print(f"Rate limited (429). Waiting {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                
                response.raise_for_status()
                break
                
            except requests.exceptions.HTTPError as e:
                if response.status_code != 429:
                    raise e
                if attempt == max_retries - 1:
                    raise Exception("Max retries exceeded for 429 error.")

        data = response.json()
        
        if "summary" in data and data["summary"]:
            all_results.extend(data["summary"])
        
        page_token = data.get("pageToken")
        if not page_token:
            break
            
        time.sleep(0.5)

    return all_results

# --- Data Processor ---
def process_metrics(raw_data: List[Dict[str, Any]]) -> pd.DataFrame:
    if not raw_data:
        return pd.DataFrame()

    processed_data = []
    
    for item in raw_data:
        processed_item = {
            "interval_start": item["dateFrom"],
            "interval_end": item["dateTo"],
            "agent_id": item["agentId"],
            "agent_name": item.get("agentName", "Unknown"),
            "handle_time_sec": item.get("tHandle", 0) / 1000,
            "acw_time_sec": item.get("tAcw", 0) / 1000,
            "hold_time_sec": item.get("tHold", 0) / 1000
        }
        processed_data.append(processed_item)

    df = pd.DataFrame(processed_data)
    df = df.sort_values(by=["interval_start", "agent_name"]).reset_index(drop=True)
    return df

# --- Main Execution ---
if __name__ == "__main__":
    # Configuration
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
    
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("Please set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")

    # Initialize Auth
    auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET)
    
    # Define Time Range (Last 24 hours)
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Build Query
    query_body = build_analytics_query(start_time, end_time)
    
    print("Fetching analytics data...")
    raw_data = fetch_analytics_data(auth, query_body)
    print(f"Fetched {len(raw_data)} records.")
    
    # Process Data
    df = process_metrics(raw_data)
    
    # Display Results
    if not df.empty:
        print("\n--- Agent Utilization Metrics (Last 24 Hours, 30-Min Intervals) ---")
        print(df.to_string(index=False))
        
        # Example: Calculate average handle time per agent
        avg_metrics = df.groupby("agent_name").agg({
            "handle_time_sec": "mean",
            "acw_time_sec": "mean",
            "hold_time_sec": "mean"
        }).round(2)
        
        print("\n--- Average Metrics Per Agent ---")
        print(avg_metrics)
    else:
        print("No data found for the specified period.")

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token is invalid, expired, or missing.
  • Fix: Ensure CLIENT_ID and CLIENT_SECRET are correct. Verify the token is being refreshed before each request. Check that the OAuth client has the analytics:conversation:read scope.
  • Code Fix: The GenesysAuth class handles refresh, but ensure you are calling auth.get_authorization_header() for every request.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the required scopes or the user associated with the client does not have permission to view analytics.
  • Fix: Add analytics:conversation:read and analytics:report:read to the OAuth client scopes in the Genesys Cloud Admin console. Ensure the client is assigned to a role with analytics permissions.

Error: 400 Bad Request

  • Cause: The query body is malformed. Common issues include invalid ISO 8601 dates, invalid interval format, or missing required fields.
  • Fix: Verify dateFrom is before dateTo. Ensure interval is PT30M (not 30m or 30 minutes). Ensure groupBy is a list of strings.
  • Debugging: Print the query_body JSON before sending it to the API to validate structure.

Error: 429 Too Many Requests

  • Cause: You have exceeded the API rate limits. Analytics queries are expensive and have lower rate limits than CRUD operations.
  • Fix: Implement exponential backoff. The provided code includes a retry loop with 2 ** attempt seconds wait time. Reduce the frequency of queries if you are polling continuously.

Error: Empty Results

  • Cause: No conversations occurred in the specified time range, or the filters are too restrictive.
  • Fix: Verify the time range. If you filtered by queue_ids, ensure those queues had activity. Check if view: "agent" is appropriate; sometimes view: "default" is needed if agent-specific data is not aggregated.

Official References