Constructing an Analytics API Aggregation Query: Grouping by Queue and Media Type

Constructing an Analytics API Aggregation Query: Grouping by Queue and Media Type

What You Will Build

  • This tutorial demonstrates how to construct a robust analytics query that aggregates conversation metrics, specifically grouping results by queue ID and media type.
  • The implementation utilizes the Genesys Cloud CX Analytics API endpoint /api/v2/analytics/conversations/details/query.
  • The code examples are provided in Python using the official genesys-cloud-purecloud-sdk and raw HTTP requests via httpx.

Prerequisites

  • OAuth Client Type: A Service Account or Client Credentials Flow setup with the following scopes:
    • analytics:conversation:read
    • analytics:dashboard:read
  • SDK Version: genesys-cloud-purecloud-sdk version 3.4.0 or higher.
  • Language/Runtime: Python 3.9+.
  • External Dependencies:
    • pip install genesys-cloud-purecloud-sdk
    • pip install httpx
    • pip install python-dotenv (for secure credential management)

Authentication Setup

Genesys Cloud APIs require a valid OAuth 2.0 Bearer token. For backend integrations and analytics queries, the Client Credentials flow is the standard approach. This flow exchanges a Client ID and Client Secret for an access token without user interaction.

The following Python function handles token acquisition. In a production environment, you must implement token caching to avoid hitting rate limits on the /oauth/token endpoint. Tokens are valid for one hour by default.

import httpx
import os
from dotenv import load_dotenv

load_dotenv()

# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mytmyt") # e.g., "mypurecloud.com"

OAUTH_URL = f"https://{ENVIRONMENT}.mypurecloud.com/oauth/token"

async def get_access_token() -> str:
    """
    Retrieves an OAuth2 access token using Client Credentials flow.
    
    Returns:
        str: The access token string.
    
    Raises:
        httpx.HTTPStatusError: If authentication fails (401, 403).
    """
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")

    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                OAUTH_URL,
                auth=(CLIENT_ID, CLIENT_SECRET),
                data={
                    "grant_type": "client_credentials",
                    "scope": "analytics:conversation:read analytics:dashboard:read"
                }
            )
            response.raise_for_status()
            token_data = response.json()
            return token_data["access_token"]
        except httpx.HTTPStatusError as e:
            print(f"Authentication failed with status {e.response.status_code}: {e.response.text}")
            raise

Implementation

Step 1: Defining the Query Structure

The core of this tutorial is the request body sent to /api/v2/analytics/conversations/details/query. This endpoint uses a complex JSON structure to define time windows, filters, and groupings.

To group by Queue and Media Type, you must specify these fields in the groupBys array. Additionally, you must define which metrics you want to aggregate (e.g., totalHandled, totalAbandoned, avgTalkTime) in the metrics array.

Critical Parameter Explanation:

  • interval: Defines the time bucket size. For aggregation queries, "PT0H" (zero hour) is often used if you want a single total sum across the entire time window. If you want hourly breakdowns, use "PT1H".
  • groupBys: An array of strings. Valid values include "queueId", "mediaType", "wrapUpCode", etc.
  • timeGroup: Defines the start and end of the analysis window. It uses ISO 8601 format with inclusive/exclusive bounds.

Here is the construction of the query payload:

from datetime import datetime, timedelta
import json

def build_analytics_query(queue_ids: list[str] | None = None) -> dict:
    """
    Constructs the JSON payload for the analytics query.
    
    Args:
        queue_ids: Optional list of queue IDs to filter. If None, all queues are included.
        
    Returns:
        dict: The query payload.
    """
    # Define time window: Last 7 days
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=7)
    
    # Format times for the API (ISO 8601 with inclusive/exclusive markers)
    # [ indicates inclusive start, ) indicates exclusive end
    time_group = {
        "start": start_time.isoformat() + "Z",
        "end": end_time.isoformat() + "Z",
        "type": "absolute"
    }

    # Base query structure
    query_payload = {
        "interval": "PT0H",  # Aggregate into a single bucket
        "metrics": [
            "totalHandled",
            "totalAbandoned",
            "avgTalkTime",
            "avgHoldTime",
            "avgWaitTime"
        ],
        "groupBys": [
            "queueId",
            "mediaType"
        ],
        "timeGroup": time_group,
        "filter": {
            "type": "and",
            "predicates": []
        }
    }

    # Add queue filter if specific queues are requested
    if queue_ids and len(queue_ids) > 0:
        query_payload["filter"]["predicates"].append({
            "type": "in",
            "field": "queueId",
            "values": queue_ids
        })

    return query_payload

Step 2: Executing the Query with Raw HTTP

While SDKs are convenient, understanding the raw HTTP request is essential for debugging. The following function sends the query constructed in Step 1.

Note on Rate Limiting: The Analytics API is subject to strict rate limits (typically 10-20 requests per minute per tenant). If you receive a 429 Too Many Requests, you must wait before retrying. The code below includes a basic retry mechanism for 429s.

import asyncio

ANALYTICS_URL = f"https://{ENVIRONMENT}.mypurecloud.com/api/v2/analytics/conversations/details/query"

async def fetch_analytics_data(query_payload: dict, token: str, max_retries: int = 3) -> dict:
    """
    Sends the analytics query to Genesys Cloud.
    
    Args:
        query_payload: The JSON body for the query.
        token: The OAuth access token.
        max_retries: Number of retries for 429 errors.
        
    Returns:
        dict: The parsed JSON response.
    """
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    async with httpx.AsyncClient(timeout=30.0) as client:
        for attempt in range(max_retries):
            try:
                response = await client.post(
                    ANALYTICS_URL,
                    headers=headers,
                    json=query_payload
                )
                
                if response.status_code == 200:
                    return response.json()
                elif response.status_code == 429:
                    # Retry-After header is usually present
                    wait_time = int(response.headers.get("Retry-After", 2 ** attempt))
                    print(f"Rate limited. Waiting {wait_time} seconds before retry {attempt + 1}/{max_retries}")
                    await asyncio.sleep(wait_time)
                    continue
                else:
                    # Handle other errors
                    print(f"Request failed with status {response.status_code}: {response.text}")
                    raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
                    
            except httpx.RequestError as e:
                print(f"Network error: {e}")
                break
                
        raise Exception("Max retries exceeded for analytics query.")

Step 3: Processing Results with the SDK

The raw JSON response from the Analytics API can be deeply nested. The Genesys Cloud Python SDK provides dataclasses that map directly to this structure, making it easier to iterate over results without manual type checking.

The response object contains a results list. Each item in this list represents a unique combination of the groupBys (Queue ID + Media Type) and contains the calculated metrics.

from genesyscloud.rest import Configuration
from genesyscloud.analytics.api import AnalyticsApi
from genesyscloud.analytics.model.conversation_details_query import ConversationDetailsQuery
from genesyscloud.analytics.model.conversation_details_query_result import ConversationDetailsQueryResult

def process_analytics_with_sdk(token: str) -> list[dict]:
    """
    Uses the official SDK to fetch and parse analytics data.
    
    Args:
        token: OAuth access token.
        
    Returns:
        list[dict]: A simplified list of analytics records.
    """
    # Configure the SDK client
    configuration = Configuration(
        host=f"https://{ENVIRONMENT}.mypurecloud.com",
        access_token=token
    )

    with AnalyticsApi(configuration) as analytics_api:
        # Construct the query using SDK models
        # Note: The SDK models require specific datetime objects and enums
        
        from datetime import datetime, timedelta
        
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=7)
        
        # Create the query object
        query = ConversationDetailsQuery(
            interval="PT0H",
            metrics=["totalHandled", "totalAbandoned", "avgTalkTime"],
            group_bys=["queueId", "mediaType"],
            time_group={
                "start": start_time.isoformat() + "Z",
                "end": end_time.isoformat() + "Z",
                "type": "absolute"
            }
        )

        try:
            # Execute the query
            api_response = analytics_api.post_analytics_conversations_details_query(
                body=query
            )
            
            # api_response is a ConversationDetailsQueryResult
            # It contains a 'results' attribute which is a list of ConversationDetailsQueryResultData
            
            simplified_results = []
            
            if api_response.results:
                for result_item in api_response.results:
                    # Extract metrics safely
                    total_handled = 0
                    total_abandoned = 0
                    avg_talk = 0.0
                    
                    if result_item.metrics:
                        for metric in result_item.metrics:
                            if metric.name == "totalHandled":
                                total_handled = metric.total or 0
                            elif metric.name == "totalAbandoned":
                                total_abandoned = metric.total or 0
                            elif metric.name == "avgTalkTime":
                                avg_talk = metric.average or 0.0

                    simplified_results.append({
                        "queue_id": result_item.queue_id,
                        "media_type": result_item.media_type,
                        "total_handled": total_handled,
                        "total_abandoned": total_abandoned,
                        "avg_talk_time_seconds": avg_talk
                    })
                    
            return simplified_results
            
        except Exception as e:
            print(f"SDK Error: {e}")
            raise

Complete Working Example

The following script combines authentication, query construction, and result processing into a single runnable module. It uses the raw HTTP approach for maximum transparency but can be adapted to the SDK pattern shown above.

import asyncio
import httpx
import os
import json
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()

# --- Configuration ---
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mytmyt")

OAUTH_URL = f"https://{ENVIRONMENT}.mypurecloud.com/oauth/token"
ANALYTICS_URL = f"https://{ENVIRONMENT}.mypurecloud.com/api/v2/analytics/conversations/details/query"

# --- Authentication ---

async def get_access_token() -> str:
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("Missing Credentials")

    async with httpx.AsyncClient() as client:
        response = await client.post(
            OAUTH_URL,
            auth=(CLIENT_ID, CLIENT_SECRET),
            data={
                "grant_type": "client_credentials",
                "scope": "analytics:conversation:read analytics:dashboard:read"
            }
        )
        response.raise_for_status()
        return response.json()["access_token"]

# --- Query Construction ---

def build_query() -> dict:
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=7)
    
    return {
        "interval": "PT0H",
        "metrics": [
            "totalHandled",
            "totalAbandoned",
            "avgTalkTime",
            "avgHoldTime"
        ],
        "groupBys": [
            "queueId",
            "mediaType"
        ],
        "timeGroup": {
            "start": start_time.isoformat() + "Z",
            "end": end_time.isoformat() + "Z",
            "type": "absolute"
        },
        "filter": {
            "type": "and",
            "predicates": []
        }
    }

# --- Execution ---

async def run_analytics_report():
    try:
        # 1. Get Token
        print("Authenticating...")
        token = await get_access_token()
        
        # 2. Build Query
        query_payload = build_query()
        print(f"Query Interval: {query_payload['interval']}")
        print(f"Group By: {query_payload['groupBys']}")
        
        # 3. Execute Query
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
        
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                ANALYTICS_URL,
                headers=headers,
                json=query_payload
            )
            
            if response.status_code == 200:
                data = response.json()
                
                # 4. Process Results
                print("\n--- Analytics Results ---")
                print(f"{'Queue ID':<35} | {'Media Type':<10} | {'Handled':<8} | {'Abandoned':<10} | {'Avg Talk (s)':<12}")
                print("-" * 80)
                
                if "results" in data and data["results"]:
                    for item in data["results"]:
                        queue_id = item.get("queueId", "N/A")
                        media_type = item.get("mediaType", "N/A")
                        
                        # Extract metrics from the nested metrics array
                        metrics = {m["name"]: m for m in item.get("metrics", [])}
                        
                        handled = metrics.get("totalHandled", {}).get("total", 0)
                        abandoned = metrics.get("totalAbandoned", {}).get("total", 0)
                        avg_talk = metrics.get("avgTalkTime", {}).get("average", 0.0)
                        
                        print(f"{queue_id:<35} | {media_type:<10} | {handled:<8} | {abandoned:<10} | {avg_talk:<12.2f}")
                else:
                    print("No results found for the specified criteria.")
                    
            else:
                print(f"API Error {response.status_code}: {response.text}")
                
    except Exception as e:
        print(f"Fatal Error: {e}")

if __name__ == "__main__":
    asyncio.run(run_analytics_report())

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token is invalid, expired, or missing the required scope.
  • Fix: Ensure your client_id and client_secret are correct. Verify that the scope analytics:conversation:read is included in the token request. Tokens expire after one hour; implement a refresh mechanism for long-running scripts.

Error: 403 Forbidden

  • Cause: The service account lacks permissions to view analytics data for the specified queues or the tenant.
  • Fix: In the Genesys Cloud Admin portal, navigate to the Service Account user. Ensure the “Analytics” capability is enabled. Check that the account has “Read” access to the specific Queues if you are filtering by them.

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for the Analytics API. Genesys Cloud enforces strict limits on analytics queries to protect database performance.
  • Fix: Implement exponential backoff. Check the Retry-After header in the response. Do not fire multiple analytics queries in parallel without staggering them. Cache results if the data does not need to be real-time.

Error: Empty Results

  • Cause: The time window is too narrow, or the filters are too restrictive.
  • Fix: Verify the timeGroup start and end times. Ensure the timezone is correct (UTC is required for the API). If filtering by queueId, confirm that the queue IDs exist and had activity during the specified period. Check that mediaType matches actual data (e.g., "voice", "chat", "callback").

Official References