Querying Genesys Cloud Analytics Aggregation Data Grouped by Queue and Media Type

Querying Genesys Cloud Analytics Aggregation Data Grouped by Queue and Media Type

What You Will Build

This tutorial demonstrates how to construct a robust Analytics API aggregation query that retrieves conversation metrics grouped by queue and media type. You will build a Python script using the official Genesys Cloud SDK to authenticate, define a granular query body, execute the request with pagination handling, and parse the resulting data into a structured format. This covers the Python programming language.

Prerequisites

To follow this tutorial, you need the following:

  • OAuth Application: A Genesys Cloud OAuth application with the analytics:conversation:read scope. This scope is mandatory for accessing any analytics data.
  • SDK Version: Genesys Cloud Python SDK version v2 (purecloudplatformclientv2). Ensure you install the latest stable release.
  • Runtime: Python 3.8 or higher.
  • Dependencies: The purecloudplatformclientv2 package and python-dotenv for secure credential management.

Install the required packages using pip:

pip install purecloudplatformclientv2 python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. The Python SDK handles the token retrieval and refresh logic internally, provided you configure the client correctly. You must store your Client ID and Client Secret securely. Using environment variables is the standard practice for production code.

Create a .env file in your project root with the following content:

GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_ENVIRONMENT=us-east-1

The GENESYS_ENVIRONMENT variable determines the API endpoint region. Common values include us-east-1, eu-west-1, au-southeast-1, etc.

Below is the code to initialize the platform client. The SDK automatically caches the access token and handles refresh tokens when the initial token expires.

import os
from purecloudplatformclientv2 import (
    PlatformClient,
    AnalyticsApi,
    PureCloudRegionHost
)
from dotenv import load_dotenv

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns an authenticated PlatformClient instance.
    """
    load_dotenv()
    
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
    
    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
    
    client = PlatformClient()
    
    # Set the region host based on the environment variable
    try:
        region_host = PureCloudRegionHost.get_host(environment)
    except ValueError as e:
        raise ValueError(f"Invalid environment: {environment}. {e}")
    
    client.set_environment(region_host)
    client.set_credentials(client_id, client_secret)
    
    return client

# Initialize the client
client = get_platform_client()

Implementation

The core of this tutorial involves constructing the AnalyticsQueryBody. This object defines what data you want, how it is grouped, and the time range. The Analytics Aggregation API (/api/v2/analytics/conversations/details/query) is powerful but strict. If the grouping keys do not match the data types or if the time range is invalid, the API will return a 400 Bad Request.

Step 1: Constructing the Query Body

The AnalyticsQueryBody requires a groupBy array. To group by queue and media type, you must specify the exact grouping keys: queue and mediaType. You must also define the interval for the time series data. For a simple aggregation without time-series breakdown, use PT0S (zero seconds) or omit the interval if you only want totals. However, most use cases require an interval. We will use P1D (one day) to get daily aggregates.

The filter object allows you to scope the data to specific queues, users, or time ranges. We will filter by a specific date range to ensure the query is performant.

from purecloudplatformclientv2 import AnalyticsQueryBody, AnalyticsFilter

def build_analytics_query(start_date: str, end_date: str, queue_ids: list = None) -> AnalyticsQueryBody:
    """
    Builds the AnalyticsQueryBody for grouping by queue and media type.
    
    Args:
        start_date: ISO 8601 start date (e.g., "2023-10-01T00:00:00.000Z")
        end_date: ISO 8601 end date (e.g., "2023-10-31T23:59:59.999Z")
        queue_ids: Optional list of Queue IDs to filter. If None, all queues are included.
    
    Returns:
        AnalyticsQueryBody instance
    """
    # Define the grouping keys
    group_by = ["queue", "mediaType"]
    
    # Define the time interval
    # P1D means one day. Other options: PT1H (hour), PT15M (15 minutes)
    interval = "P1D"
    
    # Define the filter
    filter_obj = AnalyticsFilter()
    filter_obj.date_from = start_date
    filter_obj.date_to = end_date
    
    # Optional: Filter by specific queues
    if queue_ids and len(queue_ids) > 0:
        filter_obj.queues = queue_ids
    
    # Construct the query body
    query_body = AnalyticsQueryBody()
    query_body.group_by = group_by
    query_body.interval = interval
    query_body.filter = filter_obj
    
    return query_body

Step 2: Executing the Query with Pagination

The Analytics Aggregation API returns data in pages. The response includes a nextPageId if more data is available. You must handle pagination to retrieve the complete dataset. The SDK method post_analytics_conversations_details_query sends the POST request.

Note that the API enforces rate limits. If you receive a 429 Too Many Requests response, you must wait before retrying. The SDK does not automatically retry, so you should implement exponential backoff in production code. For this tutorial, we will assume standard traffic conditions but include error handling for common HTTP errors.

from purecloudplatformclientv2.rest import ApiException
import time

def fetch_analytics_data(client: PlatformClient, query_body: AnalyticsQueryBody) -> list:
    """
    Fetches analytics data with pagination handling.
    
    Args:
        client: Authenticated PlatformClient
        query_body: The AnalyticsQueryBody to execute
    
    Returns:
        List of analytics result objects
    """
    api_instance = AnalyticsApi(client)
    all_results = []
    page_id = None
    
    print("Starting analytics query...")
    
    while True:
        try:
            # Execute the query
            # The nextPageId parameter is None for the first call
            response = api_instance.post_analytics_conversations_details_query(
                body=query_body,
                next_page_id=page_id
            )
            
            # Append the results from this page
            if response.entities:
                all_results.extend(response.entities)
                print(f"Retrieved {len(response.entities)} records. Total so far: {len(all_results)}")
            
            # Check for pagination
            if response.next_page_id:
                page_id = response.next_page_id
                # Small delay to respect rate limits between pages
                time.sleep(0.5)
            else:
                # No more pages
                break
                
        except ApiException as e:
            if e.status == 429:
                print("Rate limit exceeded. Waiting 5 seconds before retrying...")
                time.sleep(5)
                continue
            elif e.status == 400:
                print(f"Bad Request: {e.body}")
                raise ValueError(f"Invalid query body. Check date format and grouping keys. Error: {e.body}")
            else:
                print(f"API Exception: {e.status} - {e.reason}")
                raise e
                
    return all_results

Step 3: Processing and Structuring the Results

The raw response from the API contains nested objects. Each entity in response.entities represents a single combination of queue and media type for the specified interval. The summary object contains the aggregated metrics.

You need to extract the relevant fields: queue.name, mediaType, summary.handleTime, summary.wrapUpTime, and summary.totalConversations.

from typing import List, Dict, Any

def process_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
    """
    Parses the raw analytics results into a clean list of dictionaries.
    
    Args:
        results: List of AnalyticsQueryResponseEntity objects
    
    Returns:
        List of dictionaries with flattened keys
    """
    processed_data = []
    
    for entity in results:
        if not entity.summary:
            continue
            
        # Extract queue information
        queue_name = entity.queue.name if entity.queue and entity.queue.name else "Unknown Queue"
        queue_id = entity.queue.id if entity.queue and entity.queue.id else None
        
        # Extract media type
        media_type = entity.media_type if hasattr(entity, 'media_type') and entity.media_type else "Unknown"
        
        # Extract summary metrics
        handle_time = entity.summary.handle_time if entity.summary.handle_time else 0
        wrap_up_time = entity.summary.wrap_up_time if entity.summary.wrap_up_time else 0
        total_conversations = entity.summary.total_conversations if entity.summary.total_conversations else 0
        abandoned_conversations = entity.summary.abandoned_conversations if entity.summary.abandoned_conversations else 0
        
        # Construct the output dictionary
        record = {
            "queue_id": queue_id,
            "queue_name": queue_name,
            "media_type": media_type,
            "handle_time_seconds": handle_time,
            "wrap_up_time_seconds": wrap_up_time,
            "total_conversations": total_conversations,
            "abandoned_conversations": abandoned_conversations,
            "interval_start": entity.interval_start,
            "interval_end": entity.interval_end
        }
        
        processed_data.append(record)
        
    return processed_data

Complete Working Example

The following script combines all the previous steps into a single, runnable module. It authenticates, builds the query, fetches all pages of data, processes the results, and prints the final output.

Replace the placeholder dates in the main function with your desired date range.

import os
import sys
from purecloudplatformclientv2 import (
    PlatformClient,
    AnalyticsApi,
    PureCloudRegionHost,
    AnalyticsQueryBody,
    AnalyticsFilter
)
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv
import time
from typing import List, Dict, Any

def get_platform_client() -> PlatformClient:
    load_dotenv()
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
    
    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
    
    client = PlatformClient()
    region_host = PureCloudRegionHost.get_host(environment)
    client.set_environment(region_host)
    client.set_credentials(client_id, client_secret)
    return client

def build_analytics_query(start_date: str, end_date: str, queue_ids: list = None) -> AnalyticsQueryBody:
    group_by = ["queue", "mediaType"]
    interval = "P1D"
    
    filter_obj = AnalyticsFilter()
    filter_obj.date_from = start_date
    filter_obj.date_to = end_date
    
    if queue_ids and len(queue_ids) > 0:
        filter_obj.queues = queue_ids
        
    query_body = AnalyticsQueryBody()
    query_body.group_by = group_by
    query_body.interval = interval
    query_body.filter = filter_obj
    
    return query_body

def fetch_analytics_data(client: PlatformClient, query_body: AnalyticsQueryBody) -> list:
    api_instance = AnalyticsApi(client)
    all_results = []
    page_id = None
    
    while True:
        try:
            response = api_instance.post_analytics_conversations_details_query(
                body=query_body,
                next_page_id=page_id
            )
            
            if response.entities:
                all_results.extend(response.entities)
                print(f"Retrieved {len(response.entities)} records. Total so far: {len(all_results)}")
            
            if response.next_page_id:
                page_id = response.next_page_id
                time.sleep(0.5)
            else:
                break
                
        except ApiException as e:
            if e.status == 429:
                print("Rate limit exceeded. Waiting 5 seconds before retrying...")
                time.sleep(5)
                continue
            else:
                raise e
                
    return all_results

def process_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
    processed_data = []
    for entity in results:
        if not entity.summary:
            continue
            
        queue_name = entity.queue.name if entity.queue and entity.queue.name else "Unknown Queue"
        queue_id = entity.queue.id if entity.queue and entity.queue.id else None
        media_type = entity.media_type if hasattr(entity, 'media_type') and entity.media_type else "Unknown"
        
        handle_time = entity.summary.handle_time if entity.summary.handle_time else 0
        total_conversations = entity.summary.total_conversations if entity.summary.total_conversations else 0
        
        record = {
            "queue_id": queue_id,
            "queue_name": queue_name,
            "media_type": media_type,
            "handle_time_seconds": handle_time,
            "total_conversations": total_conversations,
            "interval_start": entity.interval_start,
            "interval_end": entity.interval_end
        }
        processed_data.append(record)
    return processed_data

def main():
    try:
        # 1. Authenticate
        client = get_platform_client()
        print("Authentication successful.")
        
        # 2. Define Date Range
        # Use ISO 8601 format
        start_date = "2023-10-01T00:00:00.000Z"
        end_date = "2023-10-31T23:59:59.999Z"
        
        # 3. Build Query
        # Optional: Pass specific queue IDs to filter
        # queue_ids = ["queue-id-1", "queue-id-2"]
        query_body = build_analytics_query(start_date, end_date)
        
        # 4. Fetch Data
        raw_results = fetch_analytics_data(client, query_body)
        
        # 5. Process Results
        final_data = process_analytics_results(raw_results)
        
        # 6. Output
        print("\n--- Final Analytics Data ---")
        for row in final_data:
            print(row)
            
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request

Cause: The query body is malformed. Common reasons include:

  • Invalid date_from or date_to format. The API requires ISO 8601 with timezone.
  • Invalid interval. The interval must be a valid ISO 8601 duration (e.g., P1D, PT1H).
  • Missing required fields in the filter object.

Fix: Validate the ISO 8601 strings. Ensure the start date is before the end date. Check the SDK documentation for valid interval formats.

# Correct ISO 8601 format
start_date = "2023-10-01T00:00:00.000Z"
# Incorrect format (missing timezone)
# start_date = "2023-10-01T00:00:00"

Error: 403 Forbidden

Cause: The OAuth application does not have the required scope.

Fix: Ensure the OAuth application has the analytics:conversation:read scope. If you are using a custom OAuth app, go to the Admin Console > Apps > [Your App] > Scopes and add the missing scope. Then regenerate the client secret if you changed the scope configuration.

Error: 429 Too Many Requests

Cause: You are exceeding the API rate limits. The Analytics API has strict rate limits, especially for aggregation queries which are computationally expensive.

Fix: Implement exponential backoff. The code example above includes a simple 5-second wait. In production, implement a retry mechanism with increasing delays (e.g., 1s, 2s, 4s, 8s).

import time
import random

def retry_with_backoff(func, *args, max_retries=5, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func(*args)
        except ApiException as e:
            if e.status == 429:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited. Retrying in {delay:.2f} seconds...")
                time.sleep(delay)
            else:
                raise e
    raise Exception("Max retries exceeded")

Error: Empty Results

Cause: No conversations occurred in the specified date range, or the filter is too restrictive.

Fix: Check the date range. Ensure the queues exist and had activity. Try removing the queue_ids filter to see if data returns for all queues.

Official References