Building a Custom Interval Report Using the Genesys Cloud Analytics Aggregates API

Building a Custom Interval Report Using the Genesys Cloud Analytics Aggregates API

What You Will Build

You will build a Python script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics grouped by specific time intervals. The script will authenticate via OAuth 2.0, construct a complex query body with date ranges, group-by dimensions, and metric selections, and handle pagination to retrieve the complete dataset. This tutorial covers the Python SDK and the raw HTTP API approach.

Prerequisites

  • Genesys Cloud Account: A user with permissions to view analytics data.
  • OAuth Client Credentials: A Machine-to-Machine (M2M) OAuth client ID and secret, or a user OAuth client ID and secret.
  • Required Scopes:
    • analytics:conversation:read (Required for querying conversation aggregates)
    • analytics:report:read (Optional, if you plan to save the report definition)
  • Python Environment: Python 3.8 or higher.
  • Dependencies:
    • purecloudplatformclientv2 (Official Genesys Cloud Python SDK)
    • requests (For raw HTTP examples)
    • pydantic (Optional, for data validation)

Install the dependencies using pip:

pip install purecloudplatformclientv2 requests

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For backend scripts and integrations, the Client Credentials Grant flow is the standard approach. This flow requires an M2M OAuth client.

Creating an M2M OAuth Client

  1. Log in to the Genesys Cloud Admin portal.
  2. Navigate to Developers > OAuth clients.
  3. Click Add OAuth client.
  4. Set Client type to M2M (Machine-to-Machine).
  5. Under Scopes, search for and select analytics:conversation:read.
  6. Click Save.
  7. Copy the Client ID and Client Secret. Store these securely. Do not commit them to version control.

Implementing Authentication in Python

The Genesys Cloud Python SDK handles token management automatically if you configure the PlatformClient correctly. However, understanding the underlying token retrieval is critical for debugging.

Below is the setup for the SDK client.

import os
from purecloudplatformclientv2 import PlatformClient, Configuration
from purecloudplatformclientv2.rest import ApiException

def get_platform_client(client_id: str, client_secret: str, env: str = "us-east-1") -> PlatformClient:
    """
    Initializes and returns a configured PlatformClient instance.
    
    Args:
        client_id: The OAuth Client ID.
        client_secret: The OAuth Client Secret.
        env: The Genesys Cloud environment (e.g., us-east-1, eu-west-1).
    
    Returns:
        PlatformClient: A configured client ready to make API calls.
    """
    # Determine the base URL based on the environment
    base_url_map = {
        "us-east-1": "https://api.mypurecloud.com",
        "us-east-2": "https://api.mypurecloud.com",
        "us-west-2": "https://api.mypurecloud.com",
        "eu-west-1": "https://api.eu.purecloud.com",
        "ap-southeast-2": "https://api.ap.purecloud.com",
        "ap-northeast-1": "https://api.jp.purecloud.com"
    }
    
    base_url = base_url_map.get(env, "https://api.mypurecloud.com")
    
    # Configure the SDK
    config = Configuration(
        host=base_url,
        client_id=client_id,
        client_secret=client_secret
    )
    
    # Create the platform client
    platform_client = PlatformClient(config)
    
    return platform_client

# Example usage
# CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
# CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
# client = get_platform_client(CLIENT_ID, CLIENT_SECRET)

The SDK automatically fetches an access token when the first API call is made and refreshes it when it expires. This eliminates the need for manual token caching in most scenarios.

Implementation

Step 1: Constructing the Analytics Query Body

The core of the Analytics Aggregates API is the request body. This body defines what data you want, when it occurred, and how it should be grouped.

The endpoint is POST /api/v2/analytics/conversations/aggregates/query.

Key components of the request body:

  1. dateRange: Defines the start and end of the reporting period.
  2. interval: Defines the granularity of the data (e.g., PT1H for hourly, P1D for daily).
  3. groupBys: Defines how to slice the data (e.g., by queue, by skill, by wrap-up code).
  4. metrics: Defines which specific metrics to retrieve (e.g., offerCount, handledCount, serviceLevelPercent).

Below is a Python class structure that mirrors the SDK’s AnalyticsConversationAggregatesQueryRequest to help you understand the JSON structure.

# This is a conceptual representation of the JSON body.
# In code, you will use the SDK object or a dict.

query_body = {
    "dateRange": {
        "from": "2023-10-01T00:00:00.000Z",
        "to": "2023-10-01T23:59:59.999Z"
    },
    "interval": "PT1H",  # ISO 8601 Duration format
    "groupBys": [
        "queue"
    ],
    "metrics": [
        "offerCount",
        "handledCount",
        "abandonedCount",
        "serviceLevelPercent",
        "handleTime",
        "wrapUpTime"
    ],
    "filter": {
        "type": "AND",
        "predicates": [
            {
                "type": "EQUALS",
                "field": "queue.id",
                "value": "12345678-1234-1234-1234-123456789012" # Replace with actual Queue ID
            }
        ]
    }
}

Note on Intervals: The interval string must be a valid ISO 8601 duration. Common values:

  • PT1H: 1 Hour
  • PT15M: 15 Minutes
  • P1D: 1 Day
  • P1W: 1 Week

Note on Grouping: You can group by multiple dimensions. For example, ["queue", "skill"] will return data for every combination of queue and skill.

Step 2: Executing the Query with the Python SDK

Now we implement the actual API call using the AnalyticsApi.

from purecloudplatformclientv2 import AnalyticsApi
from purecloudplatformclientv2.models import AnalyticsConversationAggregatesQueryRequest
from purecloudplatformclientv2.rest import ApiException

def get_conversation_aggregates(
    platform_client: PlatformClient,
    queue_id: str,
    start_date: str,
    end_date: str,
    interval: str = "PT1H"
) -> dict:
    """
    Queries the Analytics API for conversation aggregates for a specific queue.
    
    Args:
        platform_client: The authenticated PlatformClient.
        queue_id: The ID of the queue to query.
        start_date: ISO 8601 start date (e.g., "2023-10-01T00:00:00.000Z").
        end_date: ISO 8601 end date (e.g., "2023-10-01T23:59:59.999Z").
        interval: The time interval for grouping (default "PT1H").
    
    Returns:
        dict: The response containing the aggregates.
    """
    analytics_api = AnalyticsApi(platform_client)
    
    # Construct the query request object
    query_request = AnalyticsConversationAggregatesQueryRequest(
        date_range={
            "from": start_date,
            "to": end_date
        },
        interval=interval,
        group_bys=["queue"],
        metrics=[
            "offerCount",
            "handledCount",
            "abandonedCount",
            "serviceLevelPercent",
            "handleTime",
            "wrapUpTime"
        ],
        filter={
            "type": "AND",
            "predicates": [
                {
                    "type": "EQUALS",
                    "field": "queue.id",
                    "value": queue_id
                }
            ]
        }
    )
    
    try:
        # Execute the query
        # The SDK handles pagination automatically if you use the iterator, 
        # but for a single call, we get the first page.
        response = analytics_api.post_analytics_conversations_aggregates_query(
            body=query_request,
            page_size=250 # Maximum page size is typically 250 or 500 depending on endpoint
        )
        
        return response.to_dict()
        
    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_aggregates_query: {e}\n")
        if e.status == 401:
            print("Authentication failed. Check your Client ID and Secret.")
        elif e.status == 403:
            print("Forbidden. Check if your OAuth client has the 'analytics:conversation:read' scope.")
        elif e.status == 429:
            print("Rate limited. Wait and retry.")
        raise e

Step 3: Handling Pagination

The Analytics Aggregates API returns paginated results. If your query spans a long time period or has many group-by combinations, you will receive a nextPageId. You must loop through these pages to get the complete dataset.

The SDK provides a convenient iterator pattern for this.

def get_all_aggregates(
    platform_client: PlatformClient,
    queue_id: str,
    start_date: str,
    end_date: str,
    interval: str = "PT1H"
) -> list:
    """
    Retrieves all pages of conversation aggregates.
    
    Args:
        platform_client: The authenticated PlatformClient.
        queue_id: The ID of the queue.
        start_date: ISO 8601 start date.
        end_date: ISO 8601 end date.
        interval: The time interval.
    
    Returns:
        list: A list of all aggregate entities from all pages.
    """
    analytics_api = AnalyticsApi(platform_client)
    
    query_request = AnalyticsConversationAggregatesQueryRequest(
        date_range={
            "from": start_date,
            "to": end_date
        },
        interval=interval,
        group_bys=["queue"],
        metrics=[
            "offerCount",
            "handledCount",
            "abandonedCount",
            "serviceLevelPercent",
            "handleTime",
            "wrapUpTime"
        ],
        filter={
            "type": "AND",
            "predicates": [
                {
                    "type": "EQUALS",
                    "field": "queue.id",
                    "value": queue_id
                }
            ]
        }
    )
    
    all_entities = []
    
    try:
        # Use the iterator which automatically handles nextPageId
        for page in analytics_api.post_analytics_conversations_aggregates_query_with_http_info(
            body=query_request,
            page_size=250
        )[0].entities: # Note: The SDK iterator might vary by version. 
                        # A more robust manual pagination is shown below.
            all_entities.append(page)
            
    except ApiException as e:
        print(f"API Error: {e}")
        raise e

    return all_entities

Robust Manual Pagination Implementation:

Since SDK versions can vary, here is the explicit manual pagination logic which is safer for production code.

def get_all_aggregates_manual(
    platform_client: PlatformClient,
    queue_id: str,
    start_date: str,
    end_date: str,
    interval: str = "PT1H"
) -> list:
    """
    Retrieves all pages of conversation aggregates using manual pagination.
    """
    analytics_api = AnalyticsApi(platform_client)
    
    query_request = AnalyticsConversationAggregatesQueryRequest(
        date_range={
            "from": start_date,
            "to": end_date
        },
        interval=interval,
        group_bys=["queue"],
        metrics=[
            "offerCount",
            "handledCount",
            "abandonedCount",
            "serviceLevelPercent",
            "handleTime",
            "wrapUpTime"
        ],
        filter={
            "type": "AND",
            "predicates": [
                {
                    "type": "EQUALS",
                    "field": "queue.id",
                    "value": queue_id
                }
            ]
        }
    )
    
    all_entities = []
    page_id = None
    page_size = 250
    
    while True:
        try:
            # If page_id is None, it fetches the first page.
            # If page_id is set, it fetches the next page.
            response = analytics_api.post_analytics_conversations_aggregates_query(
                body=query_request,
                page_size=page_size,
                page_id=page_id
            )
            
            # Extract entities from the response
            if response.entities:
                all_entities.extend(response.entities)
            
            # Check if there is a next page
            if response.next_page_id:
                page_id = response.next_page_id
            else:
                break # No more pages
                
        except ApiException as e:
            print(f"API Error: {e}")
            raise e
            
    return all_entities

Step 4: Processing the Results

The response contains a list of AnalyticsConversationAggregateEntity objects. Each entity represents a unique combination of the group-by dimensions and the time interval.

Key fields in the entity:

  • date: The start time of the interval.
  • groupByValues: A list of strings representing the group-by values (e.g., ["Queue Name"]).
  • metrics: A dictionary where the key is the metric name and the value is another dictionary containing count (the number of intervals with data) and sum (the total value).
def process_aggregates(entities: list) -> list:
    """
    Processes the raw aggregate entities into a cleaner format.
    
    Args:
        entities: List of AnalyticsConversationAggregateEntity objects.
    
    Returns:
        list: A list of dictionaries with simplified data.
    """
    cleaned_data = []
    
    for entity in entities:
        # Extract metrics
        offer_count = entity.metrics.get("offerCount", {}).get("sum", 0)
        handled_count = entity.metrics.get("handledCount", {}).get("sum", 0)
        abandoned_count = entity.metrics.get("abandonedCount", {}).get("sum", 0)
        service_level = entity.metrics.get("serviceLevelPercent", {}).get("sum", 0)
        
        # Calculate average handle time if offers exist
        handle_time_sum = entity.metrics.get("handleTime", {}).get("sum", 0)
        avg_handle_time = handle_time_sum / offer_count if offer_count > 0 else 0
        
        cleaned_data.append({
            "date": entity.date.isoformat(),
            "queue_name": entity.group_by_values[0] if entity.group_by_values else "Unknown",
            "offer_count": offer_count,
            "handled_count": handled_count,
            "abandoned_count": abandoned_count,
            "service_level_percent": service_level,
            "avg_handle_time_seconds": avg_handle_time
        })
        
    return cleaned_data

Complete Working Example

This script ties everything together. It authenticates, queries the analytics API, handles pagination, processes the data, and prints the results.

import os
import sys
from datetime import datetime, timedelta
from purecloudplatformclientv2 import PlatformClient, Configuration, AnalyticsApi
from purecloudplatformclientv2.models import AnalyticsConversationAggregatesQueryRequest
from purecloudplatformclientv2.rest import ApiException

# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = "us-east-1" # Change to your environment
QUEUE_ID = "YOUR_QUEUE_ID_HERE" # Replace with a valid Queue ID

def main():
    if not CLIENT_ID or not CLIENT_SECRET:
        print("Error: GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
        sys.exit(1)
        
    if QUEUE_ID == "YOUR_QUEUE_ID_HERE":
        print("Error: Please replace QUEUE_ID with a valid Queue ID from your Genesys Cloud instance.")
        sys.exit(1)

    # 1. Initialize Client
    config = Configuration(
        host=f"https://api.mypurecloud.com" if ENVIRONMENT != "eu-west-1" else "https://api.eu.purecloud.com",
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET
    )
    platform_client = PlatformClient(config)
    analytics_api = AnalyticsApi(platform_client)

    # 2. Define Date Range (Last 24 Hours)
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=1)
    
    start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_date_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.999Z")

    print(f"Querying analytics for Queue ID: {QUEUE_ID}")
    print(f"Date Range: {start_date_str} to {end_date_str}")

    # 3. Construct Query
    query_request = AnalyticsConversationAggregatesQueryRequest(
        date_range={
            "from": start_date_str,
            "to": end_date_str
        },
        interval="PT1H", # Hourly intervals
        group_bys=["queue"],
        metrics=[
            "offerCount",
            "handledCount",
            "abandonedCount",
            "serviceLevelPercent",
            "handleTime"
        ],
        filter={
            "type": "AND",
            "predicates": [
                {
                    "type": "EQUALS",
                    "field": "queue.id",
                    "value": QUEUE_ID
                }
            ]
        }
    )

    # 4. Execute Query with Pagination
    all_entities = []
    page_id = None
    
    while True:
        try:
            response = analytics_api.post_analytics_conversations_aggregates_query(
                body=query_request,
                page_size=250,
                page_id=page_id
            )
            
            if response.entities:
                all_entities.extend(response.entities)
            
            if response.next_page_id:
                page_id = response.next_page_id
            else:
                break
                
        except ApiException as e:
            print(f"API Error: {e.status} - {e.body}")
            sys.exit(1)

    # 5. Process Results
    if not all_entities:
        print("No data found for the specified queue and date range.")
        return

    print(f"\nRetrieved {len(all_entities)} intervals.")
    print("-" * 80)
    print(f"{'Date':<20} {'Offers':<10} {'Handled':<10} {'Abandoned':<10} {'SL%':<10} {'Avg HLT(s)':<10}")
    print("-" * 80)

    for entity in all_entities:
        offer_count = entity.metrics.get("offerCount", {}).get("sum", 0)
        handled_count = entity.metrics.get("handledCount", {}).get("sum", 0)
        abandoned_count = entity.metrics.get("abandonedCount", {}).get("sum", 0)
        service_level = entity.metrics.get("serviceLevelPercent", {}).get("sum", 0)
        handle_time_sum = entity.metrics.get("handleTime", {}).get("sum", 0)
        
        avg_handle_time = handle_time_sum / offer_count if offer_count > 0 else 0
        
        print(f"{entity.date.isoformat():<20} {offer_count:<10} {handled_count:<10} {abandoned_count:<10} {service_level:<10.2f} {avg_handle_time:<10.2f}")

    print("-" * 80)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is invalid, expired, or missing.
Fix:

  1. Verify that the CLIENT_ID and CLIENT_SECRET are correct.
  2. Ensure the OAuth client exists and is active in the Genesys Cloud Admin portal.
  3. Check that the client has the analytics:conversation:read scope.
  4. If using the SDK, ensure you are not manually invalidating the token cache.

Error: 403 Forbidden

Cause: The OAuth client lacks the required permissions.
Fix:

  1. Go to Developers > OAuth clients.
  2. Edit your client.
  3. Add the scope analytics:conversation:read.
  4. Save the changes. The SDK will fetch a new token with the updated scopes on the next call.

Error: 400 Bad Request

Cause: The query body is malformed or contains invalid parameters.
Fix:

  1. Check the interval string. It must be a valid ISO 8601 duration (e.g., PT1H, not 1H).
  2. Verify that dateRange.from is before dateRange.to.
  3. Ensure that the queue.id in the filter is a valid UUID.
  4. Check the API response body for specific error messages. The Genesys Cloud API often provides detailed error paths.

Error: 429 Too Many Requests

Cause: You have exceeded the rate limit for the Analytics API.
Fix:

  1. Implement exponential backoff in your retry logic.
  2. Reduce the frequency of queries.
  3. Note that Analytics queries are computationally expensive. Do not poll this API in a tight loop.

Error: Empty Results

Cause: No conversations occurred in the specified queue during the date range.
Fix:

  1. Verify the QUEUE_ID is correct.
  2. Check the date range. Ensure it covers a period with actual activity.
  3. Try broadening the date range to confirm data exists.

Official References