Building a Custom Interval Report Using the Analytics Conversations Aggregates Query

Building a Custom Interval Report Using the Analytics Conversations Aggregates Query

What You Will Build

  • You will build a script that queries Genesys Cloud CX for historical conversation data, aggregates it by time interval, and extracts specific metrics like average handle time and conversation counts.
  • This tutorial uses the Genesys Cloud CX Analytics API (/api/v2/analytics/conversations/aggregates/query).
  • The implementation is provided in Python using the official genesys-cloud-purecloud-platform-client SDK.

Prerequisites

  • OAuth Client Type: A Client Credentials flow OAuth client with the analytics:conversation:read scope.
  • SDK Version: genesys-cloud-purecloud-platform-client >= 184.0.0.
  • Language/Runtime: Python 3.9+.
  • External Dependencies:
    • genesys-cloud-purecloud-platform-client
    • pandas (for optional data manipulation, though not strictly required for the API call).

Install the SDK using pip:

pip install genesys-cloud-purecloud-platform-client pandas

Authentication Setup

The Genesys Cloud CX SDK handles OAuth token retrieval and refresh automatically when configured correctly. You must provide your Client ID, Client Secret, and the OAuth URL for your environment (e.g., https://api.mypurecloud.com for US East).

Initialize the platform client in your script. This object will be reused for all subsequent API calls.

from purecloud_platform_client import PlatformClient
import os

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns a configured PlatformClient instance.
    """
    client = PlatformClient()
    
    # Configuration
    client.config.client_id = os.getenv("GENESYS_CLIENT_ID")
    client.config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    client.config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
    
    # Verify configuration
    if not client.config.client_id or not client.config.client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
        
    return client

This setup ensures that every time an API call is made, the SDK checks the validity of the access token. If the token is expired, the SDK automatically refreshes it using the client credentials flow. This removes the need for manual token management in your business logic.

Implementation

Step 1: Construct the Query Body

The Analytics Aggregates API requires a specific JSON structure to define what data to retrieve. This structure consists of four main components:

  1. dateRange: Defines the start and end times for the data window.
  2. interval: Defines how the data is grouped (e.g., PT1H for hourly, P1D for daily).
  3. view: Specifies which metrics to calculate (e.g., conversations, handle-time).
  4. groupBy: Defines the dimensions for grouping (e.g., time, queue).

You must construct this body as a dictionary or a dataclass object compatible with the SDK.

from purecloud_platform_client.models import AnalyticsQueryRequest, DateRange, View, Metric, GroupBy

def build_aggregate_query(start_time: str, end_time: str) -> AnalyticsQueryRequest:
    """
    Constructs the request body for the analytics aggregates query.
    
    Args:
        start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00Z")
        end_time: ISO 8601 end time (e.g., "2023-10-02T00:00:00Z")
    
    Returns:
        AnalyticsQueryRequest object
    """
    # Define the date range
    date_range = DateRange(
        start=start_time,
        end=end_time
    )
    
    # Define the view and metrics
    # We want to calculate the count of conversations and the average handle time
    view = View(
        name="conversations",
        metrics=[
            Metric(name="count"),
            Metric(name="handle-time")
        ]
    )
    
    # Define grouping by time interval
    # PT1H means 1 hour intervals
    group_by = GroupBy(
        by="time",
        interval="PT1H"
    )
    
    # Assemble the request
    query_request = AnalyticsQueryRequest(
        date_range=date_range,
        view=view,
        group_by=group_by
    )
    
    return query_request

The interval field uses ISO 8601 duration format. Common values are PT1H (1 hour), PT30M (30 minutes), and P1D (1 day). Ensure the interval is compatible with your date range. For example, querying a 10-minute range with a 1-day interval will return no data or a single empty bucket.

Step 2: Execute the Query and Handle Pagination

The Analytics API does not support traditional cursor-based pagination for aggregate queries in the same way list endpoints do. Instead, it returns all requested buckets in a single response, provided the result set is within the API limits. However, large date ranges or fine-grained intervals can exceed the maximum response size.

The SDK method post_analytics_conversations_aggregates_query sends the constructed request body. You must handle potential errors such as 400 (Bad Request) for invalid date formats or 429 (Too Many Requests) for excessive polling.

from purecloud_platform_client.rest import ApiException

def execute_aggregate_query(client: PlatformClient, query_request: AnalyticsQueryRequest):
    """
    Executes the analytics query and returns the response.
    
    Args:
        client: The PlatformClient instance
        query_request: The constructed AnalyticsQueryRequest
    
    Returns:
        AnalyticsAggregateResponse object
    """
    try:
        # The SDK method corresponds to POST /api/v2/analytics/conversations/aggregates/query
        response = client.analytics.post_analytics_conversations_aggregates_query(
            body=query_request
        )
        return response
    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_aggregates_query: {e}\n")
        if e.status == 400:
            print("Bad Request: Check your date range, interval, or metric names.")
        elif e.status == 403:
            print("Forbidden: Ensure your OAuth client has the 'analytics:conversation:read' scope.")
        elif e.status == 429:
            print("Rate Limited: Implement exponential backoff and retry.")
        elif e.status == 500:
            print("Internal Server Error: Genesys Cloud service issue. Retry later.")
        raise e

Note that this endpoint requires the analytics:conversation:read scope. If your OAuth client lacks this scope, the API returns a 403 Forbidden error. The error message in the response body usually contains a message field explaining the missing scope.

Step 3: Process the Result Buckets

The response contains a buckets array. Each bucket represents a time interval (or group) and contains the calculated metrics for that period. You must iterate through these buckets to extract the values.

The metrics field within each bucket is a dictionary where the keys are the metric names you requested (e.g., “count”, “handle-time”) and the values are the aggregated results.

def process_aggregate_response(response):
    """
    Parses the analytics response and extracts metrics into a list of dictionaries.
    
    Args:
        response: The AnalyticsAggregateResponse object
    
    Returns:
        List of dictionaries containing interval and metric values
    """
    results = []
    
    if not response or not response.buckets:
        print("No data returned for the specified query.")
        return results
        
    for bucket in response.buckets:
        # The 'interval' field in the bucket represents the start time of this interval
        interval_start = bucket.interval if bucket.interval else "Unknown"
        
        # Extract metrics
        count = 0
        handle_time = 0.0
        
        if bucket.metrics:
            if "count" in bucket.metrics:
                count = bucket.metrics["count"]
            if "handle-time" in bucket.metrics:
                # Handle time is returned in seconds, often as a float
                handle_time = bucket.metrics["handle-time"]
                
        results.append({
            "interval_start": interval_start,
            "conversation_count": count,
            "avg_handle_time_seconds": handle_time
        })
        
    return results

The handle-time metric returns the total handle time for all conversations in that bucket, not the average. To get the average, you must divide the handle-time by the count. If the count is zero, avoid division by zero errors.

Complete Working Example

This script combines all previous steps into a runnable module. It sets up authentication, builds the query for the last 24 hours, executes it, and prints the results.

import os
import sys
from datetime import datetime, timedelta
from purecloud_platform_client import PlatformClient
from purecloud_platform_client.rest import ApiException
from purecloud_platform_client.models import AnalyticsQueryRequest, DateRange, View, Metric, GroupBy

def get_platform_client() -> PlatformClient:
    client = PlatformClient()
    client.config.client_id = os.getenv("GENESYS_CLIENT_ID")
    client.config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    client.config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
    if not client.config.client_id or not client.config.client_secret:
        raise ValueError("Environment variables GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are required.")
    return client

def build_aggregate_query(start_time: str, end_time: str) -> AnalyticsQueryRequest:
    date_range = DateRange(start=start_time, end=end_time)
    view = View(
        name="conversations",
        metrics=[Metric(name="count"), Metric(name="handle-time")]
    )
    group_by = GroupBy(by="time", interval="PT1H")
    return AnalyticsQueryRequest(date_range=date_range, view=view, group_by=group_by)

def execute_aggregate_query(client: PlatformClient, query_request: AnalyticsQueryRequest):
    try:
        return client.analytics.post_analytics_conversations_aggregates_query(body=query_request)
    except ApiException as e:
        print(f"API Error: {e.status} - {e.reason}")
        raise e

def process_aggregate_response(response):
    results = []
    if not response or not response.buckets:
        return results
        
    for bucket in response.buckets:
        interval_start = bucket.interval
        count = bucket.metrics.get("count", 0) if bucket.metrics else 0
        total_handle_time = bucket.metrics.get("handle-time", 0.0) if bucket.metrics else 0.0
        
        avg_handle_time = (total_handle_time / count) if count > 0 else 0.0
        
        results.append({
            "interval_start": interval_start,
            "conversation_count": count,
            "avg_handle_time_seconds": avg_handle_time
        })
    return results

def main():
    # Define time range: Last 24 hours
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Format to ISO 8601
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    print(f"Querying analytics from {start_str} to {end_str}")
    
    # Initialize client
    client = get_platform_client()
    
    # Build query
    query_request = build_aggregate_query(start_str, end_str)
    
    # Execute query
    try:
        response = execute_aggregate_query(client, query_request)
    except Exception as e:
        print("Failed to execute query.")
        sys.exit(1)
        
    # Process results
    results = process_aggregate_response(response)
    
    # Output results
    print(f"{'Interval Start':<25} | {'Count':<10} | {'Avg Handle Time (s)':<20}")
    print("-" * 60)
    for row in results:
        print(f"{row['interval_start']:<25} | {row['conversation_count']:<10} | {row['avg_handle_time_seconds']:<20.2f}")

if __name__ == "__main__":
    main()

To run this script, export your credentials in the terminal:

export GENESYS_CLIENT_ID="your_client_id"
export GENESYS_CLIENT_SECRET="your_client_secret"
python analytics_report.py

Common Errors & Debugging

Error: 400 Bad Request

Cause: The date range is invalid, the interval format is incorrect, or the metric name does not exist for the selected view.

Fix: Verify that start_time is before end_time. Ensure the interval uses ISO 8601 duration format (e.g., PT1H not 1H). Check the Genesys Cloud API documentation for valid metric names under the conversations view.

# Correct interval format
group_by = GroupBy(by="time", interval="PT1H")

# Incorrect interval format
# group_by = GroupBy(by="time", interval="1 hour")

Error: 403 Forbidden

Cause: The OAuth client lacks the required scope.

Fix: Ensure the OAuth client has the analytics:conversation:read scope. You can verify scopes in the Genesys Cloud Admin Console under Platform > OAuth Clients.

Error: 429 Too Many Requests

Cause: You have exceeded the rate limit for the Analytics API.

Fix: Implement exponential backoff. Do not poll the API more than once every few seconds. The Analytics API is designed for batch queries, not real-time streaming.

import time

def execute_with_retry(client, query_request, max_retries=3):
    for attempt in range(max_retries):
        try:
            return execute_aggregate_query(client, query_request)
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise e
    raise Exception("Max retries exceeded")

Error: Empty Buckets

Cause: No conversations occurred during the specified time range, or the time range is in the future.

Fix: Verify the date range covers a period with activity. Ensure the timezone is handled correctly if your organization operates in multiple time zones. The API uses UTC for all timestamps.

Official References