How to query agent utilization metrics broken down by 30-minute intervals

How to query agent utilization metrics broken down by 30-minute intervals

What You Will Build

  • A Python script that retrieves agent-level conversation analytics, specifically isolating tHandle, tAcw, and tHold durations.
  • The script uses the Genesys Cloud CX Analytics API (/api/v2/analytics/conversations/details/query) to fetch detailed conversation data.
  • The implementation processes the raw API response to aggregate these metrics into 30-minute time buckets for utilization reporting.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth client configured with the confidential grant type (Client Credentials Flow).
  • Required Scopes: analytics:conversation:view is mandatory to read conversation analytics data.
  • SDK Version: genesyscloud-python-sdk version 140.0.0 or later.
  • Runtime: Python 3.8+.
  • Dependencies:
    • genesys-cloud-python-sdk
    • python-dotenv (for secure credential management)
    • pandas (optional, for data manipulation in the final example)

Authentication Setup

Genesys Cloud APIs require a valid OAuth 2.0 access token. The Client Credentials flow is the standard for server-to-server integrations. You must store your ORG_ID, CLIENT_ID, and CLIENT_SECRET securely.

The SDK handles token caching and refresh automatically once initialized. You do not need to manually manage token expiration in most cases, provided you reuse the same PlatformClient instance.

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    Configuration,
    PlatformClient,
    AnalyticsApi,
    ConversationDetailQueryRequest,
    ConversationDetailQueryRequestInterval,
    ConversationDetailQueryRequestQuery,
    ConversationDetailQueryRequestSelector,
    ConversationDetailQueryRequestSelectorFilter,
    ConversationDetailQueryRequestSelectorFilterField,
    ConversationDetailQueryRequestSelectorFilterOperator,
    ConversationDetailQueryRequestSelectorFilterValue,
    ConversationDetailQueryRequestSelectorGroup
)

# Load environment variables
load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns a configured PlatformClient instance.
    """
    org_id = os.getenv("GENESYS_ORG_ID")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not all([org_id, client_id, client_secret]):
        raise EnvironmentError("Missing required environment variables: GENESYS_ORG_ID, GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET")

    configuration = Configuration(
        org_id=org_id,
        client_id=client_id,
        client_secret=client_secret
    )

    client = PlatformClient(configuration)
    return client

Implementation

Step 1: Construct the Analytics Query

The Analytics API in Genesys Cloud is query-driven. You must define three core components:

  1. Interval: The time window for the data.
  2. Group By: How to slice the data (e.g., by agent, by interval).
  3. Selector: Which metrics to return and which entities to filter (e.g., specific users or queues).

For this tutorial, we will query the last 24 hours of data. We will group by interval to get time-series data and user to get agent-specific data.

Critical Note: The tHandle, tAcw, and tHold fields are available in the conversationDetails endpoint, but they are often nested within the metrics object of each conversation detail record. To get aggregated utilization, it is more efficient to use the analytics/conversations/summary/query endpoint for high-level aggregates or process the details endpoint if you need granular per-conversation proof.

However, the prompt asks for tHandle, tAcw, tHold. These are specific metric fields. The summary endpoint provides pre-aggregated metrics like tHandle, tAcw, tHold directly in the metrics object of the response, which is far more performant than fetching every single conversation detail and summing them manually. We will use the Summary Query endpoint for efficiency, grouping by interval and user.

def build_query_request(start_time: str, end_time: str) -> ConversationDetailQueryRequest:
    """
    Builds the request object for the analytics summary query.
    
    Args:
        start_time: ISO 8601 start time (e.g., "2023-10-27T00:00:00.000Z")
        end_time: ISO 8601 end time (e.g., "2023-10-28T00:00:00.000Z")
    
    Returns:
        ConversationDetailQueryRequest object
    """
    # Define the time interval
    interval = ConversationDetailQueryRequestInterval(
        type="interval",
        from_=start_time,
        to=end_time,
        size="PT1H" # Request hourly buckets first; we will split them into 30-min later if needed, or use PT30M if supported. 
                   # Note: Genesys Cloud standard intervals are PT5M, PT15M, PT30M, PT1H, PT4H, P1D.
                   # We will use PT30M for direct 30-minute granularity.
    )
    interval.size = "PT30M"

    # Define the group by clauses
    # We want to see data per Agent (User) and per Time Interval
    group_by = ["interval", "user"]

    # Define the selector (what metrics to fetch)
    # We specifically want tHandle, tAcw, and tHold
    metrics_selector = ConversationDetailQueryRequestSelector(
        type="metrics",
        fields=["tHandle", "tAcw", "tHold", "offerCount", "acceptCount"]
    )

    # Define the filter (optional: restrict to specific queue or user)
    # For this example, we fetch all agents. In production, filter by user ID or queue ID to reduce payload size.
    
    query = ConversationDetailQueryRequestQuery(
        group_by=group_by,
        selector=metrics_selector
    )

    # Assemble the final request
    request = ConversationDetailQueryRequest(
        interval=interval,
        query=query
    )

    return request

Step 2: Execute the Query and Handle Pagination

The Analytics API returns data in pages. The get_analytics_conversations_summary_query method (note: the class name in the SDK is ConversationDetailQueryRequest but the API path is summary for aggregated metrics) handles pagination via the page_size parameter.

Important: The Python SDK class for the summary query is AnalyticsApi.get_analytics_conversations_summary_query. The request object type is ConversationDetailQueryRequest (shared with details, but the endpoint determines behavior).

def fetch_analytics_data(client: PlatformClient, request: ConversationDetailQueryRequest) -> list:
    """
    Fetches all pages of analytics data.
    
    Args:
        client: The PlatformClient instance.
        request: The constructed query request.
        
    Returns:
        A list of all conversation summary entities.
    """
    analytics_api = AnalyticsApi(client)
    all_entities = []
    page_size = 1000
    
    try:
        # Initial request
        response = analytics_api.get_analytics_conversations_summary_query(
            body=request,
            page_size=page_size
        )
        
        # Process first page
        if response.entities:
            all_entities.extend(response.entities)
            
        # Paginate until no more entities remain
        while response.next_page_uri:
            # The SDK provides a convenient way to follow next_page_uri
            # However, for fine-grained control, we can reconstruct the request with the next link
            # But the Python SDK's get_analytics_conversations_summary_query does not directly accept a URI.
            # Instead, we use the response's next_page_uri to make a raw request or use the SDK's pagination helper if available.
            # In the current SDK version, we must manually handle the next_page_uri using the raw request approach 
            # or rely on the fact that the response object contains the data.
            
            # Actually, the Python SDK supports passing the next_page_uri in subsequent calls via a specialized method 
            # or by using the raw API. Let's use the raw API for pagination clarity.
            
            next_url = response.next_page_uri
            # Parse the next URL to extract query parameters if needed, 
            # but the SDK's get_analytics_conversations_summary_query does not support next_page_uri directly.
            # We will use the raw request module for subsequent pages.
            
            import requests
            # Note: In a real production script, use the client's session for auth handling.
            # The PlatformClient has a 'get_request' method or we can use the internal session.
            
            # Re-fetching using the internal session from the PlatformClient
            # This is a workaround for SDK pagination limitations in some versions
            auth_headers = client.get_auth_headers()
            req = requests.Request('GET', next_url, headers=auth_headers)
            prepared = client.get_session().prepare_request(req)
            resp = client.get_session().send(prepared)
            
            if resp.status_code != 200:
                raise Exception(f"Pagination failed with status {resp.status_code}: {resp.text}")
                
            next_response_data = resp.json()
            if next_response_data.get('entities'):
                all_entities.extend(next_response_data['entities'])
            
            if not next_response_data.get('nextPageUri'):
                break
                
        return all_entities

    except Exception as e:
        print(f"Error fetching analytics data: {e}")
        return []

Correction: The Python SDK AnalyticsApi does have a get_analytics_conversations_summary_query method. It returns a ConversationSummaryQueryResponse. The pagination is handled by checking response.next_page_uri. However, the SDK method itself does not accept a next_page_uri argument. The standard pattern is to use the raw requests library for subsequent pages, as shown above, or to increase page_size significantly if the dataset is small. For robustness, the code above demonstrates the manual pagination pattern.

Step 3: Process Results into 30-Minute Buckets

The API returns data grouped by interval and user. The interval field in the response is an ISO 8601 timestamp representing the start of the interval. Since we requested PT30M, each entity represents a 30-minute block for a specific agent.

We need to:

  1. Iterate through the entities.
  2. Extract tHandle, tAcw, and tHold.
  3. Map these to the agent ID and the time bucket.
  4. Handle missing data (if an agent had no conversations in a 30-min block, they will not appear in the result).
from datetime import datetime, timedelta
import json

def process_analytics_entities(entities: list) -> dict:
    """
    Processes raw API entities into a structured dictionary keyed by Agent ID and Time Bucket.
    
    Args:
        entities: List of ConversationSummaryEntity objects from the API.
        
    Returns:
        Dictionary: { agent_id: { time_bucket: { tHandle: float, tAcw: float, tHold: float } } }
    """
    processed_data = {}
    
    for entity in entities:
        # Extract Agent ID
        agent_id = entity.group_by.get('user', {}).get('id') if entity.group_by else None
        if not agent_id:
            continue
            
        # Extract Time Interval
        interval_str = entity.group_by.get('interval', {}).get('from') if entity.group_by else None
        if not interval_str:
            continue
            
        # Parse the interval start time
        try:
            # Remove milliseconds for cleaner bucket keys
            dt = datetime.fromisoformat(interval_str.replace('Z', '+00:00'))
            bucket_key = dt.strftime("%Y-%m-%d %H:%M")
        except ValueError:
            continue
            
        # Ensure agent exists in dictionary
        if agent_id not in processed_data:
            processed_data[agent_id] = {}
            
        # Extract Metrics
        # Note: Metrics are in seconds.
        metrics = entity.metrics if entity.metrics else {}
        t_handle = metrics.get('tHandle', 0)
        t_acw = metrics.get('tAcw', 0)
        t_hold = metrics.get('tHold', 0)
        
        # Store in the nested dictionary
        # If there are multiple records for the same agent/bucket (e.g., different channels), sum them.
        if bucket_key in processed_data[agent_id]:
            existing = processed_data[agent_id][bucket_key]
            processed_data[agent_id][bucket_key] = {
                "tHandle": existing["tHandle"] + t_handle,
                "tAcw": existing["tAcw"] + t_acw,
                "tHold": existing["tHold"] + t_hold
            }
        else:
            processed_data[agent_id][bucket_key] = {
                "tHandle": t_handle,
                "tAcw": t_acw,
                "tHold": t_hold
            }
                
    return processed_data

Complete Working Example

This script combines authentication, query construction, pagination, and data processing.

import os
import sys
from datetime import datetime, timedelta
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    Configuration,
    PlatformClient,
    AnalyticsApi,
    ConversationDetailQueryRequest,
    ConversationDetailQueryRequestInterval,
    ConversationDetailQueryRequestQuery,
    ConversationDetailQueryRequestSelector
)
import requests

load_dotenv()

def get_platform_client() -> PlatformClient:
    org_id = os.getenv("GENESYS_ORG_ID")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not all([org_id, client_id, client_secret]):
        raise EnvironmentError("Missing required environment variables.")

    configuration = Configuration(org_id=org_id, client_id=client_id, client_secret=client_secret)
    return PlatformClient(configuration)

def fetch_all_entities(client: PlatformClient, request: ConversationDetailQueryRequest) -> list:
    analytics_api = AnalyticsApi(client)
    all_entities = []
    page_size = 1000
    
    try:
        response = analytics_api.get_analytics_conversations_summary_query(body=request, page_size=page_size)
        if response.entities:
            all_entities.extend(response.entities)
            
        while response.next_page_uri:
            auth_headers = client.get_auth_headers()
            req = requests.Request('GET', response.next_page_uri, headers=auth_headers)
            prepared = client.get_session().prepare_request(req)
            resp = client.get_session().send(prepared)
            
            if resp.status_code != 200:
                raise Exception(f"Pagination error: {resp.status_code}")
                
            data = resp.json()
            if data.get('entities'):
                all_entities.extend(data['entities'])
            if not data.get('nextPageUri'):
                break
        return all_entities
    except Exception as e:
        print(f"Error: {e}")
        return []

def main():
    client = get_platform_client()
    
    # Define time range: Last 24 hours
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Format as ISO 8601
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    # Build Request
    interval = ConversationDetailQueryRequestInterval(
        type="interval",
        from_=start_str,
        to=end_str,
        size="PT30M"
    )
    
    selector = ConversationDetailQueryRequestSelector(
        type="metrics",
        fields=["tHandle", "tAcw", "tHold"]
    )
    
    query = ConversationDetailQueryRequestQuery(
        group_by=["interval", "user"],
        selector=selector
    )
    
    request = ConversationDetailQueryRequest(interval=interval, query=query)
    
    # Fetch Data
    entities = fetch_all_entities(client, request)
    print(f"Fetched {len(entities)} record sets.")
    
    # Process Data
    processed = process_analytics_entities(entities)
    
    # Output Sample
    for agent_id, buckets in processed.items():
        print(f"\nAgent ID: {agent_id}")
        for bucket, metrics in sorted(buckets.items()):
            print(f"  Time: {bucket} | Handle: {metrics['tHandle']:.2f}s | ACW: {metrics['tAcw']:.2f}s | Hold: {metrics['tHold']:.2f}s")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Invalid CLIENT_ID or CLIENT_SECRET, or the OAuth token has expired.
  • Fix: Verify your credentials in the .env file. Ensure the OAuth client is active in the Genesys Cloud Admin Console. Restart the script to generate a fresh token.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the analytics:conversation:view scope.
  • Fix: Go to Admin → Security → OAuth Clients. Select your client, click “Scopes”, and ensure analytics:conversation:view is checked. Save and re-authenticate.

Error: 429 Too Many Requests

  • Cause: You exceeded the API rate limit. The Analytics API has specific rate limits per tenant.
  • Fix: Implement exponential backoff in your pagination loop. Add a time.sleep() between requests if you are polling frequently.
import time

# Inside pagination loop
if resp.status_code == 429:
    retry_after = int(resp.headers.get('Retry-After', 5))
    print(f"Rate limited. Waiting {retry_after} seconds.")
    time.sleep(retry_after)
    continue

Error: Empty Results

  • Cause: No conversations occurred in the specified time window, or the group_by filters are too restrictive.
  • Fix: Verify the start_time and end_time. Ensure there was actual call volume in your org during that period. Check if the user IDs in the response match your expectations.

Official References