Constructing Analytics Queries Grouped by Queue and Media Type in Genesys Cloud

Constructing Analytics Queries Grouped by Queue and Media Type in Genesys Cloud

What You Will Build

  • A Python script that constructs and executes an analytics aggregation query against the Genesys Cloud API.
  • The query groups conversation metrics by queue ID and media type to produce a matrix of volume and handle time data.
  • The implementation uses the Genesys Cloud Python SDK (genesys-cloud-purecloud-platform-client) and handles pagination, rate limiting, and date range formatting.

Prerequisites

  • OAuth Client Type: Service Account or Confidential Client.
  • Required Scopes: analytics:query:read is mandatory for executing aggregation queries.
  • SDK Version: genesys-cloud-purecloud-platform-client >= 170.0.0.
  • Runtime: Python 3.8 or higher.
  • Dependencies: Install the SDK via pip:
    pip install genesys-cloud-purecloud-platform-client
    

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API access. For service integrations, the Client Credentials flow is the standard approach. This flow exchanges a Client ID and Client Secret for an access token.

The following code initializes the Genesys Cloud SDK client. The SDK handles the internal token caching and refresh logic automatically once configured.

import os
from purecloudplatformclientv2 import (
    Configuration,
    PureCloudPlatformClientV2,
    AnalyticsApi,
    CreateQueryRequest,
    QuerySettings,
    DataQuery,
    GroupBy,
    Interval,
    Metric,
    Filter,
    FilterGroup
)
from purecloudplatformclientv2.rest import ApiException
import logging

# Configure logging to see SDK debug info if needed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_purecloud_client() -> PureCloudPlatformClientV2:
    """
    Initializes and returns a configured PureCloudPlatformClientV2 instance.
    Uses environment variables for credentials.
    """
    config = Configuration()
    config.host = os.getenv("GENESYS_CLOUD_REGION", "us-east-1") # e.g., us-east-1, eu-west-1
    config.client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
    
    if not config.client_id or not config.client_secret:
        raise ValueError("GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET environment variables are required.")

    client = PureCloudPlatformClientV2(config)
    return client

if __name__ == "__main__":
    try:
        client = get_purecloud_client()
        print("Authentication successful. Client initialized.")
    except Exception as e:
        logger.error(f"Failed to initialize client: {e}")

Implementation

Step 1: Constructing the DataQuery Object

The core of the analytics API is the CreateQueryRequest. This object contains the QuerySettings, which defines the DataQuery. The DataQuery object specifies the interval (time granularity), groupBys (dimensions), and metrics (measures).

To group by queue and media type, we must define two GroupBy objects: one for queue and one for mediaType.

from datetime import datetime, timezone

def build_query_request(start_date: str, end_date: str) -> CreateQueryRequest:
    """
    Constructs a CreateQueryRequest that groups by queue and media type.
    
    Args:
        start_date: ISO 8601 string (e.g., "2023-10-01T00:00:00Z")
        end_date: ISO 8601 string (e.g., "2023-10-02T00:00:00Z")
    """
    # 1. Define the time interval
    # 'hourly' is common for analytics. Options include: 'daily', 'weekly', 'monthly', 'hourly', '15min'
    time_interval = Interval(
        type_="hourly",
        startDate=start_date,
        endDate=end_date
    )

    # 2. Define GroupBys (Dimensions)
    # Grouping by queue allows us to see data per queue.
    group_by_queue = GroupBy(
        type_="queue",
        idType="id" # Use 'id' for UUIDs, 'name' for string names
    )

    # Grouping by mediaType allows us to distinguish between Voice, Chat, Email, etc.
    group_by_media = GroupBy(
        type_="mediaType"
    )

    # 3. Define Metrics (Measures)
    # Common metrics: 'conversationCount', 'totalHandleTime', 'abandonedCount'
    metric_volume = Metric(
        type_="conversationCount",
        displayType="sum"
    )
    
    metric_handle_time = Metric(
        type_="totalHandleTime",
        displayType="sum"
    )

    # 4. Assemble the DataQuery
    data_query = DataQuery(
        interval=time_interval,
        groupBys=[group_by_queue, group_by_media],
        metrics=[metric_volume, metric_handle_time],
        # Optional: Filter by specific queues if needed. 
        # For this example, we retrieve all queues.
        filter=None 
    )

    # 5. Wrap in QuerySettings and CreateQueryRequest
    query_settings = QuerySettings(
        dataQueries=[data_query]
    )

    request = CreateQueryRequest(
        querySettings=query_settings
    )

    return request

Step 2: Executing the Query and Handling Pagination

The Analytics API returns results in pages. The AnalyticsApi.post_analytics_conversations_details_query method returns a response object containing a nextPageUri. You must loop through these pages until nextPageUri is null.

Additionally, the API enforces rate limits. A 429 Too Many Requests response requires implementing exponential backoff.

from purecloudplatformclientv2 import AnalyticsApi
import time
import json

def fetch_analytics_data(client: PureCloudPlatformClientV2, request: CreateQueryRequest) -> list:
    """
    Executes the analytics query and handles pagination and rate limiting.
    
    Args:
        client: The authenticated PureCloudPlatformClientV2 client.
        request: The CreateQueryRequest object.
        
    Returns:
        A list of all aggregated result rows.
    """
    analytics_api = AnalyticsApi(client)
    all_results = []
    page_uri = "/api/v2/analytics/conversations/details/query"
    
    max_retries = 3
    retry_count = 0

    while page_uri:
        try:
            # Use the API client's post method
            # Note: The SDK method name corresponds to the endpoint
            response = analytics_api.post_analytics_conversations_details_query(
                body=request
            )

            # Process the current page
            if response.result and response.result.dataQueries:
                data_query_result = response.result.dataQueries[0]
                if data_query_result.rows:
                    all_results.extend(data_query_result.rows)
                    logger.info(f"Retrieved {len(data_query_result.rows)} rows from page.")
            
            # Check for pagination
            page_uri = response.result.nextPageUri
            retry_count = 0 # Reset retry count on success

        except ApiException as e:
            logger.error(f"API Exception: {e.status} {e.reason}")
            
            # Handle Rate Limiting (429)
            if e.status == 429:
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception("Max retries exceeded for 429 Too Many Requests.")
                
                # Exponential backoff: 2^retry_count seconds
                backoff_time = 2 ** retry_count
                logger.warning(f"Rate limited. Retrying in {backoff_time} seconds...")
                time.sleep(backoff_time)
                continue
            
            # Handle other errors (401, 403, 500)
            elif e.status in [401, 403]:
                logger.error("Authentication or Authorization failed. Check scopes.")
                raise
            else:
                logger.error(f"Unexpected error: {e.body}")
                raise
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

    return all_results

Step 3: Processing and Formatting Results

The raw response from the Analytics API contains complex nested objects. Each row in data_query_result.rows contains groupBys and metrics. We need to flatten this structure for consumption by a dashboard or database.

def format_results(rows: list) -> list:
    """
    Flattens the analytics response rows into a list of dictionaries.
    
    Args:
        rows: List of Row objects from the Analytics API response.
        
    Returns:
        List of dictionaries with keys: queue_id, queue_name, media_type, conversation_count, total_handle_time
    """
    formatted_data = []
    
    for row in rows:
        # Extract GroupBy values
        queue_info = None
        media_type = None
        
        for group_by in row.groupBys:
            if group_by.name == "queue":
                queue_info = group_by.value
                # Note: group_by.value is usually the ID if idType="id"
                # To get the name, you might need a separate lookup or use idType="name" in the query.
                # For this example, we assume the value is the ID.
            elif group_by.name == "mediaType":
                media_type = group_by.value

        # Extract Metric values
        conversation_count = 0
        total_handle_time = 0.0
        
        for metric in row.metrics:
            if metric.type_ == "conversationCount":
                conversation_count = metric.value
            elif metric.type_ == "totalHandleTime":
                # Handle time is returned in milliseconds
                total_handle_time = metric.value / 1000.0 # Convert to seconds

        formatted_data.append({
            "queue_id": queue_info,
            "media_type": media_type,
            "conversation_count": conversation_count,
            "total_handle_time_seconds": total_handle_time
        })
        
    return formatted_data

Complete Working Example

The following script combines all previous steps into a single executable module. It defines the date range, builds the query, executes it with pagination, and prints the results as JSON.

import os
import json
import logging
from datetime import datetime, timezone, timedelta
from purecloudplatformclientv2 import (
    Configuration,
    PureCloudPlatformClientV2,
    AnalyticsApi,
    CreateQueryRequest,
    QuerySettings,
    DataQuery,
    GroupBy,
    Interval,
    Metric
)
from purecloudplatformclientv2.rest import ApiException
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def get_purecloud_client() -> PureCloudPlatformClientV2:
    config = Configuration()
    # Default to us-east-1 if not set
    config.host = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
    config.client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
    
    if not config.client_id or not config.client_secret:
        raise ValueError("Environment variables GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET are required.")

    return PureCloudPlatformClientV2(config)

def build_query_request(start_date: str, end_date: str) -> CreateQueryRequest:
    # Define Interval
    interval = Interval(
        type_="hourly",
        startDate=start_date,
        endDate=end_date
    )

    # Define GroupBys
    group_by_queue = GroupBy(type_="queue", idType="id")
    group_by_media = GroupBy(type_="mediaType")

    # Define Metrics
    metric_count = Metric(type_="conversationCount", displayType="sum")
    metric_handle_time = Metric(type_="totalHandleTime", displayType="sum")

    # Build DataQuery
    data_query = DataQuery(
        interval=interval,
        groupBys=[group_by_queue, group_by_media],
        metrics=[metric_count, metric_handle_time]
    )

    # Build Request
    return CreateQueryRequest(
        querySettings=QuerySettings(dataQueries=[data_query])
    )

def execute_query(client: PureCloudPlatformClientV2, request: CreateQueryRequest) -> list:
    analytics_api = AnalyticsApi(client)
    all_rows = []
    page_uri = "/api/v2/analytics/conversations/details/query"
    
    max_retries = 5
    retry_count = 0

    while page_uri:
        try:
            response = analytics_api.post_analytics_conversations_details_query(body=request)
            
            if response.result and response.result.dataQueries:
                dq_result = response.result.dataQueries[0]
                if dq_result.rows:
                    all_rows.extend(dq_result.rows)
                    logger.info(f"Page fetched. Total rows so far: {len(all_rows)}")
            
            page_uri = response.result.nextPageUri
            retry_count = 0

        except ApiException as e:
            if e.status == 429:
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception("Max retries exceeded due to rate limiting.")
                wait_time = 2 ** retry_count
                logger.warning(f"Rate limited (429). Waiting {wait_time}s...")
                time.sleep(wait_time)
                continue
            else:
                raise
        except Exception as e:
            logger.error(f"Error during query execution: {e}")
            raise
            
    return all_rows

def format_and_print_results(rows: list):
    if not rows:
        print("No data found for the specified range and filters.")
        return

    output_data = []
    for row in rows:
        queue_id = None
        media_type = None
        
        for gb in row.groupBys:
            if gb.name == "queue":
                queue_id = gb.value
            elif gb.name == "mediaType":
                media_type = gb.value

        conv_count = 0
        handle_time_ms = 0

        for m in row.metrics:
            if m.type_ == "conversationCount":
                conv_count = m.value
            elif m.type_ == "totalHandleTime":
                handle_time_ms = m.value

        output_data.append({
            "queue_id": queue_id,
            "media_type": media_type,
            "conversation_count": conv_count,
            "total_handle_time_seconds": round(handle_time_ms / 1000, 2)
        })

    print(json.dumps(output_data, indent=2))

if __name__ == "__main__":
    try:
        # 1. Initialize Client
        client = get_purecloud_client()
        
        # 2. Define Date Range (Last 24 Hours)
        end_time = datetime.now(timezone.utc)
        start_time = end_time - timedelta(days=1)
        
        # Format as ISO 8601 with Z suffix
        start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        end_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        
        logger.info(f"Querying analytics from {start_str} to {end_str}")

        # 3. Build Query
        request = build_query_request(start_str, end_str)

        # 4. Execute Query
        rows = execute_query(client, request)

        # 5. Format and Output
        format_and_print_results(rows)

    except Exception as e:
        logger.error(f"Fatal error: {e}")
        raise

Common Errors & Debugging

Error: 401 Unauthorized or 403 Forbidden

Cause: The OAuth token is invalid, expired, or lacks the required scope.
Fix: Ensure your environment variables are correct. Verify that the OAuth client has the analytics:query:read scope assigned in the Genesys Cloud Admin Console under Manage > Integrations > OAuth 2.0 Clients.

# Debug tip: Print the token info if available
# The SDK does not expose the raw token easily, so check the Admin Console.
# If using a Service Account, ensure the user associated with the service account has the "Analytics Query User" role or higher.

Error: 429 Too Many Requests

Cause: The Analytics API has strict rate limits. Aggregation queries are resource-intensive.
Fix: Implement exponential backoff, as shown in the execute_query function. Do not fire multiple concurrent analytics queries. Serialize your requests.

Error: Empty Results

Cause: The date range might have no data, or the filters are too restrictive.
Fix:

  1. Check the date range. Ensure startDate is before endDate.
  2. Verify that the queues selected actually had conversations in that period.
  3. If filtering by specific queue IDs, ensure the IDs are valid UUIDs.
# To debug empty results, remove filters first.
# data_query = DataQuery(interval=interval, groupBys=[group_by_queue, group_by_media], metrics=[metric_count], filter=None)

Error: AttributeError: 'NoneType' object has no attribute 'rows'

Cause: The API response structure might vary slightly if the query fails silently or returns an error object instead of a result object.
Fix: Always check if response.result and response.result.dataQueries before accessing rows.

Official References