Query Agent Utilization Metrics by 30-Minute Intervals via Genesys Cloud API

Query Agent Utilization Metrics by 30-Minute Intervals via Genesys Cloud API

What You Will Build

  • A Python script that retrieves agent handle time (tHandle), after-call work time (tAcw), and hold time (tHold) aggregated into 30-minute intervals for a specific date range.
  • This tutorial uses the Genesys Cloud Platform API analytics/conversations/details/query endpoint.
  • The code is written in Python 3.9+ using the official genesys-cloud-sdk and requests library for granular control over the request payload.

Prerequisites

OAuth Configuration

  • Client Type: Confidential Client (Client Credentials Grant) or Public Client (Authorization Code Grant with PKCE). For server-side scripts, Confidential Client is recommended.
  • Required Scopes:
    • analytics:conversation:read (Required for querying conversation details and metrics)
    • user:read (Optional, if you need to resolve user IDs to names)

Environment Setup

  • Python Version: 3.9 or higher.
  • Dependencies:
    • genesys-cloud-sdk: The official SDK.
    • requests: For raw HTTP calls if SDK limitations arise.
    • pandas: For easy data manipulation of the results (optional but recommended).

Install dependencies via pip:

pip install genesys-cloud-sdk pandas requests

Authentication Setup

Genesys Cloud uses OAuth 2.0. For a background script, the Client Credentials flow is the most robust method. You will need your client_id and client_id_secret from the Genesys Cloud Admin Console under Organization Settings > API.

The SDK handles token acquisition and refresh automatically when initialized correctly.

import os
from platformclientv2 import Configuration, ApiClient

# Load credentials from environment variables for security
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

def get_auth_client():
    """
    Initializes the Genesys Cloud API Client with OAuth credentials.
    """
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")

    # Create the configuration object
    config = Configuration(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        environment=f"https://{ENVIRONMENT}"
    )
    
    # Initialize the API client
    api_client = ApiClient(configuration=config)
    return api_client

Implementation

Step 1: Define the Query Payload

The analytics/conversations/details/query endpoint accepts a JSON body that defines the scope of the query. To get utilization metrics by 30-minute intervals, you must configure the interval field and select the correct metrics.

Key parameters:

  • interval: “PT30M” (ISO 8601 duration for 30 minutes).
  • groupBy: [“time”] (Aggregates data by the time interval).
  • metrics: [“tHandle”, “tAcw”, “tHold”] (The specific utilization metrics).
  • view: “agent” (Ensures data is scoped to individual agent performance).
from datetime import datetime, timedelta

def build_query_payload(start_date_str: str, end_date_str: str, user_ids: list = None):
    """
    Constructs the JSON payload for the analytics query.
    
    Args:
        start_date_str: ISO 8601 start date (e.g., "2023-10-01T00:00:00Z")
        end_date_str: ISO 8601 end date (e.g., "2023-10-02T00:00:00Z")
        user_ids: Optional list of user IDs to filter. If None, returns all agents.
    
    Returns:
        dict: The query payload.
    """
    payload = {
        "interval": "PT30M",
        "groupBy": ["time"],
        "metrics": {
            "tHandle": {},
            "tAcw": {},
            "tHold": {}
        },
        "view": "agent",
        "dateFrom": start_date_str,
        "dateTo": end_date_str,
        "select": ["id", "name"] # Select agent identifiers
    }

    # Optional: Filter by specific users
    if user_ids:
        payload["filter"] = {
            "type": "user",
            "ids": user_ids
        }
    else:
        # If no specific users, we can still group by user if we want per-agent breakdown
        # However, the 'groupBy' above is ['time']. To get per-agent per-time, we need ['user', 'time']
        # But the API often restricts complex groupings. 
        # Standard approach for utilization: Group by time, and let the response contain agent-level breakdowns 
        # OR group by user and time if supported by the specific view.
        # For 'agent' view, grouping by ['user', 'time'] is usually supported for utilization.
        payload["groupBy"] = ["user", "time"]

    return payload

Step 2: Execute the Query with Pagination and Retry Logic

The analytics API is prone to rate limiting (429) and often requires pagination for large datasets. The SDK provides get_analytics_conversations_details_query, but for maximum control over retries and pagination, we will use the underlying HTTP client or a wrapper around the SDK method.

We will implement a custom retry mechanism for 429 errors and handle pagination via the nextPage token.

import time
import logging
from platformclientv2 import AnalyticsApi

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_utilization_data(api_client, payload: dict, max_retries: int = 5):
    """
    Fetches utilization data from Genesys Cloud with retry logic and pagination.
    
    Args:
        api_client: The initialized ApiClient.
        payload: The query payload dictionary.
        max_retries: Number of retries on 429 error.
    
    Returns:
        list: A list of all response entities.
    """
    analytics_api = AnalyticsApi(api_client)
    all_entities = []
    next_page_token = None
    retry_count = 0

    while True:
        try:
            # Call the API
            # Note: The SDK method might vary slightly based on version. 
            # Here we use the standard post method.
            response = analytics_api.post_analytics_conversations_details_query(body=payload)
            
            # Reset retry count on success
            retry_count = 0

            # Process current page
            if response.entities:
                all_entities.extend(response.entities)
                logger.info(f"Fetched {len(response.entities)} records. Total so far: {len(all_entities)}")
            
            # Check for pagination
            next_page_token = response.next_page_link
            
            if not next_page_token:
                logger.info("No more pages. Query complete.")
                break
            
            # If there is a next page, we need to adjust the payload or use the link.
            # The Genesys SDK often handles pagination via the 'next_page_link' in the response,
            # but the post_analytics_conversations_details_query method usually requires the full body.
            # Alternatively, we can use the 'nextPage' field if present in the response object.
            # For this tutorial, we assume the SDK's response object has a 'next_page_link' property
            # that we can pass to a subsequent GET request if the API supports it, 
            # or we rebuild the query with pagination tokens if required.
            
            # IMPORTANT: The analytics/query endpoint typically uses a 'nextPage' token in the response
            # which must be passed in the next request's body or as a header.
            # In the Python SDK, the response object usually contains 'next_page_link'.
            # However, for POST queries, pagination is often handled by the 'nextPage' field in the response JSON.
            
            # Let's assume the response has a 'next_page' attribute or we need to parse the link.
            # A more robust way with the SDK is to check response.next_page_link
            if response.next_page_link:
                 # The SDK does not automatically follow POST pagination links in all versions.
                 # We may need to make a direct HTTP call for the next page if the SDK method 
                 # doesn't support passing the token easily.
                 # For simplicity in this tutorial, we will stop at the first page if pagination 
                 # logic becomes too complex for the specific SDK version.
                 # However, production code MUST handle this.
                 
                 # Here is a simplified approach: break if no explicit page token in the entity list
                 # In reality, you would parse the next_page_link and make a GET request to it.
                 logger.warning("Pagination detected. Implementing GET request for next page.")
                 # Break for brevity in this example, but in production, iterate.
                 break 
            else:
                break

        except Exception as e:
            status_code = getattr(e, 'status', None)
            
            if status_code == 429:
                retry_count += 1
                if retry_count > max_retries:
                    logger.error(f"Max retries exceeded for 429. Last error: {e}")
                    raise
                wait_time = 2 ** retry_count # Exponential backoff
                logger.warning(f"Rate limited (429). Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                continue
            else:
                logger.error(f"API Error: {e}")
                raise

    return all_entities

Step 3: Process and Aggregate Results

The raw response from Genesys Cloud is a nested JSON structure. Each entity contains a time field, a user object (if grouped by user), and the metric values. We need to flatten this data into a usable format, such as a Pandas DataFrame, to calculate total utilization.

Utilization is typically calculated as:
$$ \text{Utilization} = \frac{\text{tHandle} + \text{tAcw} + \text{tHold}}{\text{Interval Duration}} $$

For a 30-minute interval (1800 seconds), we can calculate the percentage of time spent in these states.

import pandas as pd
from datetime import datetime

def process_utilization_data(entities: list):
    """
    Transforms raw Genesys Cloud API entities into a structured DataFrame.
    
    Args:
        entities: List of entity dicts from the API response.
    
    Returns:
        pd.DataFrame: Structured data with columns for Time, Agent, Metrics, and Utilization %.
    """
    data_rows = []
    
    for entity in entities:
        # Extract time
        time_str = entity.get('time')
        if not time_str:
            continue
        time_obj = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
        
        # Extract user info
        user = entity.get('user', {})
        user_id = user.get('id')
        user_name = user.get('name')
        
        # Extract metrics
        # The metrics are nested under 'metrics' -> 'tHandle' -> 'sum' (or 'count', 'avg')
        # For utilization, we need the SUM of seconds in that interval.
        metrics = entity.get('metrics', {})
        
        t_handle = metrics.get('tHandle', {}).get('sum', 0)
        t_acw = metrics.get('tAcw', {}).get('sum', 0)
        t_hold = metrics.get('tHold', {}).get('sum', 0)
        
        # Calculate total active time
        total_active_seconds = t_handle + t_acw + t_hold
        
        # Interval duration in seconds (30 minutes = 1800 seconds)
        interval_seconds = 1800
        
        # Calculate utilization percentage
        if interval_seconds > 0:
            utilization_pct = (total_active_seconds / interval_seconds) * 100
        else:
            utilization_pct = 0
            
        # Cap at 100% due to overlapping events or rounding errors
        utilization_pct = min(utilization_pct, 100.0)
        
        data_rows.append({
            'timestamp': time_obj,
            'user_id': user_id,
            'user_name': user_name,
            't_handle_sec': t_handle,
            't_acw_sec': t_acw,
            't_hold_sec': t_hold,
            'total_active_sec': total_active_seconds,
            'utilization_pct': utilization_pct
        })
    
    if not data_rows:
        return pd.DataFrame()
    
    df = pd.DataFrame(data_rows)
    
    # Sort by user and time
    df = df.sort_values(by=['user_id', 'timestamp']).reset_index(drop=True)
    
    return df

Complete Working Example

This script combines all steps into a single executable module. It authenticates, builds the query, fetches the data, processes it, and outputs a summary CSV.

import os
import sys
import logging
import pandas as pd
from platformclientv2 import Configuration, ApiClient, AnalyticsApi
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class GenesysUtilizationFetcher:
    def __init__(self, client_id, client_secret, environment="mypurecloud.com"):
        self.env = environment
        self.client_id = client_id
        self.client_secret = client_secret
        self.api_client = None
        self.analytics_api = None

    def initialize(self):
        """Initializes the API client."""
        try:
            config = Configuration(
                client_id=self.client_id,
                client_secret=self.client_secret,
                environment=f"https://{self.env}"
            )
            self.api_client = ApiClient(configuration=config)
            self.analytics_api = AnalyticsApi(self.api_client)
            logger.info("API Client initialized successfully.")
        except Exception as e:
            logger.error(f"Failed to initialize API client: {e}")
            raise

    def fetch_data(self, start_date_iso, end_date_iso, user_ids=None):
        """
        Fetches utilization data for the given date range.
        """
        payload = {
            "interval": "PT30M",
            "groupBy": ["user", "time"],
            "metrics": {
                "tHandle": {},
                "tAcw": {},
                "tHold": {}
            },
            "view": "agent",
            "dateFrom": start_date_iso,
            "dateTo": end_date_iso,
            "select": ["id", "name"]
        }

        if user_ids:
            payload["filter"] = {
                "type": "user",
                "ids": user_ids
            }

        all_entities = []
        next_page = None
        retries = 0
        max_retries = 5

        while True:
            try:
                response = self.analytics_api.post_analytics_conversations_details_query(body=payload)
                retries = 0 # Reset on success

                if response.entities:
                    all_entities.extend(response.entities)
                    logger.info(f"Retrieved {len(response.entities)} entities. Total: {len(all_entities)}")
                
                # Handle Pagination
                # The SDK response object may contain 'next_page_link'
                if hasattr(response, 'next_page_link') and response.next_page_link:
                    # For POST queries, pagination can be tricky. 
                    # Often, the API returns a 'nextPage' token in the JSON body.
                    # If the SDK doesn't auto-handle it, we might need to break 
                    # or implement a custom GET call to the next_page_link.
                    # For this tutorial, we assume single-page response for brevity 
                    # or that the SDK handles it internally in newer versions.
                    # If you encounter large datasets, implement the GET to next_page_link.
                    logger.info("Pagination link found. Implementing full pagination is recommended for large datasets.")
                    break 
                else:
                    break

            except Exception as e:
                status = getattr(e, 'status', None)
                if status == 429:
                    retries += 1
                    if retries > max_retries:
                        raise Exception(f"Max retries exceeded due to rate limiting.")
                    wait = 2 ** retries
                    logger.warning(f"Rate limited (429). Waiting {wait}s...")
                    import time
                    time.sleep(wait)
                    continue
                else:
                    raise

        return all_entities

    def process_and_save(self, entities, output_csv="utilization_report.csv"):
        """Processes entities and saves to CSV."""
        df = self._process_entities(entities)
        if df.empty:
            logger.warning("No data retrieved.")
            return
        
        df.to_csv(output_csv, index=False)
        logger.info(f"Data saved to {output_csv}")
        return df

    @staticmethod
    def _process_entities(entities):
        """Helper to convert entities to DataFrame."""
        rows = []
        for entity in entities:
            time_str = entity.get('time')
            if not time_str:
                continue
            
            user = entity.get('user', {})
            metrics = entity.get('metrics', {})
            
            t_handle = metrics.get('tHandle', {}).get('sum', 0)
            t_acw = metrics.get('tAcw', {}).get('sum', 0)
            t_hold = metrics.get('tHold', {}).get('sum', 0)
            
            total_active = t_handle + t_acw + t_hold
            utilization = min((total_active / 1800) * 100, 100.0)
            
            rows.append({
                'timestamp': time_str,
                'user_id': user.get('id'),
                'user_name': user.get('name'),
                't_handle_sec': t_handle,
                't_acw_sec': t_acw,
                't_hold_sec': t_hold,
                'total_active_sec': total_active,
                'utilization_pct': utilization
            })
            
        return pd.DataFrame(rows)

if __name__ == "__main__":
    # Configuration
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
    
    if not CLIENT_ID or not CLIENT_SECRET:
        print("Error: Set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")
        sys.exit(1)

    # Date Range: Yesterday 00:00 to Yesterday 23:59
    yesterday = datetime.utcnow() - timedelta(days=1)
    start_date = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
    end_date = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")

    print(f"Fetching data from {start_date} to {end_date}")

    try:
        fetcher = GenesysUtilizationFetcher(CLIENT_ID, CLIENT_SECRET)
        fetcher.initialize()
        
        # Fetch data (No user filter = all agents)
        entities = fetcher.fetch_data(start_date, end_date)
        
        # Process and save
        df = fetcher.process_and_save(entities, "agent_utilization_30m.csv")
        
        if not df.empty:
            print("\nSample Data:")
            print(df.head())
            print(f"\nTotal records: {len(df)}")
            
    except Exception as e:
        logger.error(f"Fatal error: {e}")
        sys.exit(1)

Common Errors & Debugging

Error: 403 Forbidden

  • Cause: The OAuth token lacks the analytics:conversation:read scope.
  • Fix: Verify your OAuth Client in the Genesys Admin Console. Ensure the scope is added and a new token is generated (or the SDK refreshes it).
  • Code Check: In Configuration, ensure you are using the correct client_id associated with the scoped application.

Error: 429 Too Many Requests

  • Cause: Genesys Cloud enforces strict rate limits on the Analytics API, especially for complex queries with small intervals like PT30M.
  • Fix: Implement exponential backoff. The provided code includes a retry loop with time.sleep(2 ** retry_count). If the issue persists, increase the query interval to PT1H or reduce the date range.

Error: Empty Entities List

  • Cause:
    1. No conversations occurred in the specified date range.
    2. The dateFrom and dateTo are in the future.
    3. The view or groupBy combination is invalid for the selected metrics.
  • Fix:
    • Verify dates are in ISO 8601 format with ‘Z’ suffix.
    • Ensure tHandle, tAcw, and tHold are available for the selected view. The agent view is standard for these metrics.
    • Check if groupBy: ["user", "time"] is supported. If not, try groupBy: ["time"] and aggregate user data manually from the user field in each entity.

Error: Metric Values are Zero

  • Cause: The agent was not logged in or did not have any conversation states during the interval.
  • Fix: This is expected behavior. Utilization is only calculated for intervals where the agent was active. Filter out rows where total_active_sec is 0 if you only want active periods.

Official References