Building a Queue and Media Type Analytics Aggregation Query

Building a Queue and Media Type Analytics Aggregation Query

What You Will Build

  • You will write a script that retrieves historical conversation metrics aggregated by queue and media type.
  • This tutorial uses the Genesys Cloud CX Analytics API (/api/v2/analytics/conversations/details/query).
  • The implementation is provided in Python using the official genesyscloud SDK.

Prerequisites

  • OAuth Client Type: Service Account or Client Credentials flow.
  • Required Scopes: analytics:conversation:view and analytics:queue:view.
  • SDK Version: genesyscloud Python SDK version 137.0.0 or later.
  • Runtime: Python 3.8 or later.
  • Dependencies:
    • genesyscloud
    • python-dotenv (for managing secrets securely)

Install the dependencies via pip:

pip install genesyscloud python-dotenv

Authentication Setup

The Genesys Cloud SDK handles token acquisition and refresh automatically when configured correctly. You must provide the client ID, client secret, and environment URL.

Create a .env file in your project root:

GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_ENV_URL=https://api.mypurecloud.com

Initialize the SDK in your code. This step establishes the session and caches the token for subsequent requests.

import os
from dotenv import load_dotenv
from genesyscloud import Configuration, ApiClient, AnalyticsApi

# Load environment variables
load_dotenv()

def get_authenticated_analytics_client():
    """
    Configures and returns an authenticated AnalyticsApi client.
    """
    # Load credentials from environment
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    env_url = os.getenv("GENESYS_ENV_URL")

    if not all([client_id, client_secret, env_url]):
        raise ValueError("Missing required environment variables.")

    # Configure the SDK
    config = Configuration()
    config.host = env_url
    config.oauth_client_id = client_id
    config.oauth_client_secret = client_secret

    # Create the API client
    api_client = ApiClient(configuration=config)
    
    # Initialize the Analytics API wrapper
    analytics_api = AnalyticsApi(api_client)
    
    return analytics_api

Implementation

Step 1: Define the Aggregation Query Body

The core of this tutorial is constructing the body parameter for the post_analytics_conversations_details_query method. The Analytics API uses a query object that defines the time window, the metrics to retrieve, and the grouping keys.

To group by queue and media type, you must specify these entities in the groupBys array. The API requires that the interval is specified for time-series data, even if you are only interested in the total aggregate.

from genesyscloud import PostAnalyticsConversationsDetailsQueryBody, Interval

def build_query_body(start_time: str, end_time: str) -> dict:
    """
    Constructs the request body for the analytics query.
    
    Args:
        start_time: ISO 8601 formatted start time (e.g., '2023-10-01T00:00:00Z')
        end_time: ISO 8601 formatted end time (e.g., '2023-10-02T00:00:00Z')
    
    Returns:
        A dictionary representing the query body.
    """
    
    # Define the time interval
    # Note: The API returns data for each interval bucket. 
    # For a simple total, use a single large interval or iterate over daily intervals.
    interval_obj = Interval(
        startTime=start_time,
        endTime=end_time,
        size="P1D" # One day granularity
    )

    # Define the metrics you want to aggregate
    # Common metrics: answered, abandoned, offered, totalhandle
    metrics = [
        "answered",
        "abandoned",
        "offered",
        "totalhandle",
        "wait"
    ]

    # Define the grouping keys
    # This is the critical part for the tutorial
    group_bys = ["queue", "mediaType"]

    # Construct the body object
    # Using a dictionary here for clarity, though the SDK supports model objects
    query_body = {
        "interval": {
            "startTime": start_time,
            "endTime": end_time,
            "size": "P1D"
        },
        "metrics": metrics,
        "groupBys": group_bys,
        "filter": {
            "type": "and",
            "clauses": [
                {
                    "type": "equals",
                    "path": "mediaType",
                    "value": "voice" # Optional: Filter only voice if desired
                }
            ]
        }
    }
    
    return query_body

Key Parameter Explanation:

  • interval: Defines the time range. The size parameter determines the granularity of the returned buckets. If you set size to P1D, you get one row per day per queue per media type. If you want a single total for the entire range, you can omit the size or use a size equal to the duration, but the API often expects a standard ISO 8601 duration format.
  • groupBys: This array tells the engine how to slice the data. queue groups by the queue ID/name. mediaType groups by voice, chat, email, etc.
  • filter: You can pre-filter the data before aggregation. In the example above, we filter for voice only. Remove the filter block to include all media types.

Step 2: Execute the Query and Handle Pagination

The Analytics API does not return all data in one call if the dataset is large. You must handle the nextPageUri to fetch subsequent pages. Additionally, you must implement retry logic for 429 Too Many Requests errors, which are common in analytics queries due to the heavy computational load.

import time
import requests
from typing import List, Dict, Any

def fetch_analytics_data(
    analytics_api: AnalyticsApi, 
    query_body: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Executes the analytics query and handles pagination.
    
    Args:
        analytics_api: The authenticated AnalyticsApi client.
        query_body: The query definition.
        
    Returns:
        A list of aggregation result dictionaries.
    """
    all_results = []
    next_page_uri = None
    max_retries = 3
    retry_delay = 2 # seconds

    while True:
        attempt = 0
        while attempt < max_retries:
            try:
                # The SDK method call
                # Note: The SDK maps the body to the correct endpoint
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_body,
                    page_uri=next_page_uri if next_page_uri else None
                )
                
                # Break the retry loop on success
                break
                
            except Exception as e:
                attempt += 1
                # Check for 429 Too Many Requests
                if hasattr(e, 'status') and e.status == 429:
                    print(f"Rate limited (429). Retrying in {retry_delay * attempt} seconds...")
                    time.sleep(retry_delay * attempt)
                else:
                    # Re-raise other exceptions
                    raise e
        
        if attempt >= max_retries:
            raise Exception("Max retries exceeded for analytics query.")

        # Extract entities from the response
        if response.entities:
            all_results.extend(response.entities)
        
        # Check for pagination
        if response.next_page_uri:
            next_page_uri = response.next_page_uri
        else:
            break
            
    return all_results

Error Handling Note:
The SDK raises exceptions for HTTP errors. Always check the status code. A 400 error usually indicates a malformed query body (e.g., invalid ISO date or unsupported metric name). A 401 or 403 indicates authentication or scope issues.

Step 3: Process and Flatten the Results

The response from the Analytics API is hierarchical. Each entity contains an interval, groupBys, and metrics. The groupBys is a dictionary mapping the group key name to its value. The metrics is a dictionary mapping the metric name to its value.

You need to flatten this structure into a usable format, such as a list of dictionaries or a pandas DataFrame.

def flatten_analytics_results(results: List[Any]) -> List[Dict[str, Any]]:
    """
    Flattens the hierarchical analytics response into a list of flat dictionaries.
    
    Args:
        results: The list of entity objects from the API response.
        
    Returns:
        A list of dictionaries with keys: interval, queueId, queueName, mediaType, metrics...
    """
    flattened = []
    
    for entity in results:
        # Initialize the row
        row = {
            "interval": entity.interval,
            "startTime": entity.start_time,
            "endTime": entity.end_time
        }
        
        # Extract groupBys
        if hasattr(entity, 'group_bys') and entity.group_bys:
            for key, value in entity.group_bys.items():
                if key == "queue":
                    row["queueId"] = value.id
                    row["queueName"] = value.name
                elif key == "mediaType":
                    row["mediaType"] = value
                else:
                    row[f"group_{key}"] = value
        
        # Extract metrics
        if hasattr(entity, 'metrics') and entity.metrics:
            for metric_name, metric_value in entity.metrics.items():
                # Handle complex metric objects (e.g., distribution, histogram)
                # For simple aggregations, the value is usually a number or a simple object
                if hasattr(metric_value, 'count'):
                    row[f"{metric_name}_count"] = metric_value.count
                elif hasattr(metric_value, 'value'):
                    row[f"{metric_name}_value"] = metric_value.value
                else:
                    # Simple numeric value
                    row[f"{metric_name}"] = metric_value
        
        flattened.append(row)
        
    return flattened

Complete Working Example

This script combines all steps into a runnable module. It fetches voice conversation metrics for the last 7 days, grouped by queue and media type.

import os
import sys
from datetime import datetime, timedelta
from dotenv import load_dotenv
from genesyscloud import Configuration, ApiClient, AnalyticsApi

# Import helper functions defined above
# In a real module, these would be imported from a separate file
# from analytics_utils import get_authenticated_analytics_client, build_query_body, fetch_analytics_data, flatten_analytics_results

def main():
    # 1. Setup Authentication
    try:
        analytics_api = get_authenticated_analytics_client()
    except Exception as e:
        print(f"Authentication failed: {e}")
        sys.exit(1)

    # 2. Define Time Window (Last 7 Days)
    end_time = datetime.utcnow().isoformat() + "Z"
    start_time = (datetime.utcnow() - timedelta(days=7)).isoformat() + "Z"

    # 3. Build Query
    query_body = build_query_body(start_time, end_time)
    
    print(f"Executing analytics query for {start_time} to {end_time}...")

    # 4. Fetch Data
    try:
        raw_results = fetch_analytics_data(analytics_api, query_body)
    except Exception as e:
        print(f"Query execution failed: {e}")
        sys.exit(1)

    # 5. Process Results
    if not raw_results:
        print("No data found for the specified criteria.")
        return

    flat_data = flatten_analytics_results(raw_results)

    # 6. Output Results
    print(f"Retrieved {len(flat_data)} aggregation records.")
    print("-" * 50)
    
    # Print header
    print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Answered':<10} | {'Abandoned':<10}")
    print("-" * 50)

    for row in flat_data:
        queue_name = row.get("queueName", "Unknown")
        media_type = row.get("mediaType", "Unknown")
        answered = row.get("answered", 0)
        abandoned = row.get("abandoned", 0)
        
        print(f"{queue_name:<20} | {media_type:<10} | {answered:<10} | {abandoned:<10}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request - Invalid Interval Size

Cause: The interval.size parameter must be a valid ISO 8601 duration string. Common mistakes include using “1 day” instead of “P1D” or “24 hours” instead of “PT24H”.
Fix: Ensure the size string follows ISO 8601 duration format.

  • Daily: P1D
  • Hourly: PT1H
  • Weekly: P1W

Error: 403 Forbidden - Insufficient Scope

Cause: The OAuth token does not have the analytics:conversation:view scope.
Fix: Verify the OAuth Client in the Genesys Cloud Admin Console has the correct scope assigned. Regenerate the token.

Error: 429 Too Many Requests

Cause: Analytics queries are resource-intensive. The API limits the number of concurrent queries or the frequency of queries for a single client.
Fix: Implement exponential backoff. The code above includes a basic retry loop. For production systems, use a robust retry library like tenacity or backoff.

Error: Empty Results

Cause: No conversations match the filter criteria within the time window.
Fix:

  1. Verify the time window contains data.
  2. Check the filter clause. If you filter by mediaType: voice but the queue only handles chat, results will be empty.
  3. Confirm the queues exist and were active during the period.

Official References