Constructing Analytics Aggregation Queries for Queue and Media Type in Genesys Cloud

Constructing Analytics Aggregation Queries for Queue and Media Type in Genesys Cloud

What You Will Build

  • You will build a Python script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
  • The query will group results by queue and mediaType to identify performance trends across different communication channels.
  • The tutorial uses the Genesys Cloud Python SDK (genesys-cloud-sdk) and the requests library for raw HTTP validation.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant).
  • Required Scopes:
    • analytics:query:read (Required for accessing aggregated analytics data).
    • analytics:conversations:view (Optional, if you need to drill down into specific conversation details later).
  • SDK Version: Genesys Cloud Python SDK >= 160.0.0.
  • Runtime: Python 3.8 or higher.
  • External Dependencies:
    • genesys-cloud-sdk
    • requests (for debugging raw payloads)
    • pydantic (included with SDK, used for model validation)

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API access. You must obtain an access token before making any analytics queries. The SDK handles token caching and refresh automatically, but understanding the underlying flow is critical for debugging 401 errors.

Step 1: Install the SDK

Install the official Genesys Cloud Python SDK via pip.

pip install genesys-cloud-sdk requests

Step 2: Initialize the Platform Client

The PureCloudPlatformClientV2 is the entry point for all SDK interactions. It manages the OAuth token lifecycle.

import os
from purecloudplatformclientv2 import PlatformClient

def get_platform_client():
    """
    Initializes and returns a configured PureCloudPlatformClientV2 instance.
    """
    # These environment variables must be set in your execution environment
    client_id = os.environ.get('GENESYS_CLIENT_ID')
    client_secret = os.environ.get('GENESYS_CLIENT_SECRET')
    environment = os.environ.get('GENESYS_ENVIRONMENT', 'mypurecloud.com')

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # The SDK automatically handles token acquisition and refresh
    platform_client = PlatformClient(client_id, client_secret, environment)
    
    return platform_client

# Initialize the client
platform_client = get_platform_client()

Note: The SDK caches the token in memory. If your script runs for a long duration, the SDK will automatically refresh the token before it expires. If you are running this in a short-lived container (like AWS Lambda), the token will be fetched on every invocation.

Implementation

The core of this tutorial is constructing the QueryBody for the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint allows for complex aggregations using a JSON-based query language.

Step 1: Constructing the Query Body

The Analytics API uses a specific structure for aggregation queries. You must define:

  1. interval: The time range for the query.
  2. view: The data view (e.g., default, custom).
  3. groupBy: The dimensions to aggregate by (queue, mediaType).
  4. metrics: The specific metrics to calculate (e.g., conversationCount, handledCount).

Defining the Time Interval

Analytics queries require a start and end time. The API accepts ISO 8601 formatted strings.

from datetime import datetime, timedelta

def get_time_interval(days_back=7):
    """
    Returns a start and end time for the query.
    """
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)
    
    # Format as ISO 8601 with timezone
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    return start_str, end_str

start_time, end_time = get_time_interval(days_back=7)
print(f"Querying from {start_time} to {end_time}")

Building the Aggregation Query

We will use the SDK’s data models to build the query. This ensures type safety and proper JSON serialization.

from purecloudplatformclientv2 import (
    QueryBody,
    MetricFilter,
    GroupBy,
    View
)

def build_analytics_query(start_time: str, end_time: str):
    """
    Constructs the QueryBody for the Analytics API.
    """
    # 1. Define the metrics we want to aggregate
    # We want the total number of conversations and the number of handled conversations
    metrics = [
        "conversationCount",
        "handledCount",
        "abandonedCount",
        "averageHandleTime"
    ]

    # 2. Define the groupBy dimensions
    # We group by 'queue' and 'mediaType'
    group_by = [
        GroupBy(name="queue"),
        GroupBy(name="mediaType")
    ]

    # 3. Define the view
    # 'default' is the standard view for conversation analytics
    view_name = "default"

    # 4. Construct the QueryBody
    # The SDK allows us to build this object programmatically
    query_body = QueryBody(
        interval=f"{start_time}/{end_time}",
        view=view_name,
        group_by=group_by,
        metrics=metrics
    )

    return query_body

query_body = build_analytics_query(start_time, end_time)

Important: The interval field in QueryBody expects a string in the format start/end. The SDK does not automatically concatenate the datetime strings, so you must provide the full interval string.

Step 2: Executing the Query

Now that the query body is constructed, we execute it using the AnalyticsApi.

from purecloudplatformclientv2 import AnalyticsApi, ApiException
import json

def execute_analytics_query(platform_client: PlatformClient, query_body: QueryBody):
    """
    Executes the analytics query and returns the response.
    """
    analytics_api = AnalyticsApi(platform_client)
    
    try:
        # The endpoint is POST /api/v2/anversations/details/query
        # The SDK maps this to analytics_api.post_analytics_conversations_details_query
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        return response
    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}\n")
        # Print the response body for debugging
        print(f"Response Body: {e.body}")
        raise

# Execute the query
response = execute_analytics_query(platform_client, query_body)

Step 3: Processing the Results

The response from the Analytics API is a complex JSON structure. The results array contains the aggregated data. Each item in the results array represents a unique combination of queue and mediaType.

Understanding the Response Structure

A typical response looks like this:

{
  "results": [
    {
      "groupByValues": [
        {
          "name": "queue",
          "values": [
            {
              "id": "queue-id-1",
              "name": "Sales Queue"
            }
          ]
        },
        {
          "name": "mediaType",
          "values": [
            {
              "id": "voice",
              "name": "Voice"
            }
          ]
        }
      ],
      "metrics": {
        "conversationCount": {
          "value": 150
        },
        "handledCount": {
          "value": 140
        },
        "abandonedCount": {
          "value": 10
        },
        "averageHandleTime": {
          "value": 180.5
        }
      }
    }
  ],
  "nextPageUri": null
}

Parsing the Results

We will write a function to parse this response and output a clean table of data.

def process_analytics_results(response):
    """
    Parses the analytics response and prints a summary table.
    """
    if not response or not response.results:
        print("No results found.")
        return

    print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Conversations':<12} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (s)':<12}")
    print("-" * 80)

    for result in response.results:
        # Extract groupBy values
        queue_name = "Unknown"
        media_type = "Unknown"
        
        for group in result.group_by_values:
            if group.name == "queue" and group.values:
                queue_name = group.values[0].name
            elif group.name == "mediaType" and group.values:
                media_type = group.values[0].name
        
        # Extract metrics
        metrics = result.metrics
        conversation_count = metrics.get("conversationCount", {}).get("value", 0)
        handled_count = metrics.get("handledCount", {}).get("value", 0)
        abandoned_count = metrics.get("abandonedCount", {}).get("value", 0)
        avg_handle_time = metrics.get("averageHandleTime", {}).get("value", 0)
        
        # Format and print
        print(f"{queue_name:<20} | {media_type:<10} | {conversation_count:<12} | {handled_count:<10} | {abandoned_count:<10} | {avg_handle_time:<12.2f}")

# Process the results
process_analytics_results(response)

Complete Working Example

Below is the complete, copy-pasteable Python script. Save this as analytics_query.py.

import os
import sys
from datetime import datetime, timedelta
from purecloudplatformclientv2 import (
    PlatformClient,
    AnalyticsApi,
    QueryBody,
    GroupBy,
    ApiException
)

def get_platform_client():
    """
    Initializes and returns a configured PureCloudPlatformClientV2 instance.
    """
    client_id = os.environ.get('GENESYS_CLIENT_ID')
    client_secret = os.environ.get('GENESYS_CLIENT_SECRET')
    environment = os.environ.get('GENESYS_ENVIRONMENT', 'mypurecloud.com')

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    platform_client = PlatformClient(client_id, client_secret, environment)
    return platform_client

def get_time_interval(days_back=7):
    """
    Returns a start and end time for the query in ISO 8601 format.
    """
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)
    
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    return start_str, end_str

def build_analytics_query(start_time: str, end_time: str):
    """
    Constructs the QueryBody for the Analytics API.
    Groups by queue and mediaType.
    """
    metrics = [
        "conversationCount",
        "handledCount",
        "abandonedCount",
        "averageHandleTime"
    ]

    group_by = [
        GroupBy(name="queue"),
        GroupBy(name="mediaType")
    ]

    view_name = "default"

    query_body = QueryBody(
        interval=f"{start_time}/{end_time}",
        view=view_name,
        group_by=group_by,
        metrics=metrics
    )

    return query_body

def execute_analytics_query(platform_client: PlatformClient, query_body: QueryBody):
    """
    Executes the analytics query and returns the response.
    """
    analytics_api = AnalyticsApi(platform_client)
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        return response
    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}\n")
        print(f"Response Body: {e.body}")
        raise

def process_analytics_results(response):
    """
    Parses the analytics response and prints a summary table.
    """
    if not response or not response.results:
        print("No results found.")
        return

    print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Conversations':<12} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (s)':<12}")
    print("-" * 80)

    for result in response.results:
        queue_name = "Unknown"
        media_type = "Unknown"
        
        if result.group_by_values:
            for group in result.group_by_values:
                if group.name == "queue" and group.values:
                    queue_name = group.values[0].name
                elif group.name == "mediaType" and group.values:
                    media_type = group.values[0].name
        
        metrics = result.metrics
        conversation_count = metrics.get("conversationCount", {}).get("value", 0)
        handled_count = metrics.get("handledCount", {}).get("value", 0)
        abandoned_count = metrics.get("abandonedCount", {}).get("value", 0)
        avg_handle_time = metrics.get("averageHandleTime", {}).get("value", 0)
        
        print(f"{queue_name:<20} | {media_type:<10} | {conversation_count:<12} | {handled_count:<10} | {abandoned_count:<10} | {avg_handle_time:<12.2f}")

def main():
    try:
        # 1. Initialize Client
        platform_client = get_platform_client()
        
        # 2. Define Time Interval
        start_time, end_time = get_time_interval(days_back=7)
        print(f"Querying analytics data from {start_time} to {end_time}")
        
        # 3. Build Query
        query_body = build_analytics_query(start_time, end_time)
        
        # 4. Execute Query
        response = execute_analytics_query(platform_client, query_body)
        
        # 5. Process Results
        process_analytics_results(response)
        
    except Exception as e:
        print(f"An error occurred: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.

Fix:

  1. Verify that GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are set correctly.
  2. Ensure the client has the analytics:query:read scope. You can check this in the Genesys Cloud Admin Console under Developers > OAuth Clients.
  3. If using the SDK, ensure you are not sharing the PlatformClient instance across multiple threads without proper locking, as the token cache is not thread-safe by default.

Error: 400 Bad Request - Invalid Interval

Cause: The interval string is malformed or the time range is too large.

Fix:

  1. Ensure the interval format is start/end (e.g., 2023-10-01T00:00:00.000Z/2023-10-08T00:00:00.000Z).
  2. The maximum time range for a single query is 90 days. If you need more data, you must paginate using multiple queries.
  3. Ensure the start time is before the end time.

Error: 429 Too Many Requests

Cause: You have exceeded the rate limit for the Analytics API.

Fix:

  1. Implement exponential backoff and retry logic.
  2. The Analytics API has a lower rate limit than other APIs. Do not fire queries in a tight loop.
  3. Use the Retry-After header in the response to determine how long to wait.
import time

def execute_with_retry(platform_client, query_body, max_retries=3):
    analytics_api = AnalyticsApi(platform_client)
    for attempt in range(max_retries):
        try:
            return analytics_api.post_analytics_conversations_details_query(body=query_body)
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error: Empty Results

Cause: No conversations match the query criteria.

Fix:

  1. Verify that the time interval contains data. Try expanding the date range.
  2. Ensure that the queues you are querying have active conversations.
  3. Check if the view is correct. The default view is usually sufficient, but custom views may have different data availability.

Official References