Building Queue and Media Type Analytics Aggregations with Genesys Cloud

Building Queue and Media Type Analytics Aggregations with Genesys Cloud

What You Will Build

  • You will construct a Python script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
  • The query groups results by queue and mediaType to analyze performance across different channels.
  • The implementation uses the Genesys Cloud Python SDK (genesyscloud-python) with explicit error handling and pagination support.

Prerequisites

  • OAuth Client: A Genesys Cloud Private OAuth application with the following scopes:
    • analytics:report:read
    • analytics:conversation:read
    • queue:view
  • SDK Version: genesyscloud-python >= 126.0.0
  • Language/Runtime: Python 3.8+
  • External Dependencies:
    • genesyscloud-python
    • python-dotenv (for credential management)

Install the dependencies using pip:

pip install genesyscloud-python python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. The Python SDK handles token acquisition and refresh automatically when initialized with a client ID and client secret. You must never hardcode credentials. Use environment variables or a secure vault.

Create a .env file in your project root:

GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret

Initialize the SDK client in your code:

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformApi, PureCloudRegionHost
from purecloudplatformclientv2.rest import ApiException

# Load environment variables
load_dotenv()

def get_platform_api():
    """
    Initializes and returns the Genesys Cloud Platform API client.
    Raises an exception if credentials are missing or invalid.
    """
    region = os.getenv("GENESYS_CLOUD_REGION")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not all([region, client_id, client_secret]):
        raise ValueError("Missing required environment variables: GENESYS_CLOUD_REGION, GENESYS_CLOUD_CLIENT_ID, GENESYS_CLOUD_CLIENT_SECRET")

    # Determine the region host
    if region == "us-east-1":
        region_host = PureCloudRegionHost.US_EAST
    elif region == "us-east-2":
        region_host = PureCloudRegionHost.US_EAST_2
    elif region == "eu-west-1":
        region_host = PureCloudRegionHost.EU_WEST_1
    elif region == "ap-southeast-2":
        region_host = PureCloudRegionHost.AP_SOUTHEAST_2
    else:
        raise ValueError(f"Unsupported region: {region}")

    # Initialize the platform client
    platform = PlatformApi()
    platform.set_base_url(region_host)
    platform.login_client_credentials(client_id, client_secret)

    return platform

Implementation

Step 1: Constructing the Analytics Query Body

The core of this tutorial is the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint accepts a complex JSON body that defines the time range, filters, groupings, and the specific metrics you want to calculate.

To group by queue and media type, you must include these entities in the groupings array. You must also define the metrics you wish to aggregate, such as total handle time, talk time, or wait time.

from purecloudplatformclientv2 import PostConversationDetailsQueryRequest, ConversationDetailsQuery, Metric, Grouping

def build_analytics_query(start_time: str, end_time: str) -> PostConversationDetailsQueryRequest:
    """
    Constructs the query body for the analytics API.
    
    Args:
        start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00.000Z")
        end_time: ISO 8601 end time (e.g., "2023-10-02T00:00:00.000Z")
    
    Returns:
        PostConversationDetailsQueryRequest object
    """
    # Define the metrics to aggregate
    metrics = [
        Metric(name="totalHandleTime"),
        Metric(name="talkTime"),
        Metric(name="waitTime"),
        Metric(name="wrapTime"),
        Metric(name="totalInteractions")
    ]

    # Define the groupings
    # Note: 'queue' and 'mediaType' are the keys for the grouping entities
    groupings = [
        Grouping(name="queue"),
        Grouping(name="mediaType")
    ]

    # Define the time range
    time_range = ConversationDetailsQuery(
        from_date=start_time,
        to_date=end_time,
        metrics=metrics,
        groupings=groupings
    )

    # Wrap in the request object
    query_request = PostConversationDetailsQueryRequest(body=time_range)
    return query_request

Critical Parameter Explanation:

  • from_date and to_date: Must be in UTC ISO 8601 format. The API does not accept relative dates like “last week”. You must calculate these timestamps in your code.
  • metrics: The list of aggregations. If you request a metric that is not applicable to a media type (e.g., talkTime for email), the API returns 0 or null rather than an error.
  • groupings: The dimensions by which data is sliced. Adding too many groupings can cause the response payload to exceed size limits or trigger rate limiting. Limit groupings to what is strictly necessary.

Step 2: Executing the Query and Handling Pagination

The Analytics API does not always return all results in a single response. If the dataset is large, the API returns a nextUri in the response header or body. The Python SDK provides a get_next_page helper, but for analytics queries, you must manually handle the nextUri provided in the response object.

from purecloudplatformclientv2 import AnalyticsApi
import time

def fetch_analytics_data(platform: PlatformApi, query_request: PostConversationDetailsQueryRequest) -> list:
    """
    Fetches analytics data, handling pagination and exponential backoff for rate limits.
    
    Args:
        platform: The initialized PlatformApi client.
        query_request: The constructed query request.
    
    Returns:
        A list of ConversationSummary objects.
    """
    analytics_api = AnalyticsApi(platform.client)
    all_summaries = []
    next_uri = None
    max_retries = 3
    retry_count = 0

    while True:
        try:
            # If next_uri is present, use the POST with nextUri pattern
            # Note: The SDK method for analytics query is post_analytics_conversations_details_query
            if next_uri:
                # The SDK does not have a direct "get next page" method for this specific endpoint
                # so we must use the nextUri as a query parameter in the subsequent call
                # However, Genesys Cloud Analytics API v2 uses a specific pattern:
                # You pass the nextUri in the 'nextUri' field of the request body or via header.
                # Actually, for POST queries, the response contains a 'nextUri' field.
                # To fetch the next page, you make a POST request to the same endpoint
                # but include the 'nextUri' in the request body or query params depending on SDK version.
                # In the latest Python SDK, you can pass nextUri in the request.
                
                # Update the request to include the nextUri
                # The SDK's PostConversationDetailsQueryRequest does not have a direct nextUri setter
                # so we often have to use the low-level API or check the response for the URI.
                
                # Correction: The standard way in Genesys Cloud Python SDK for analytics query pagination:
                # The response object has a 'next_uri' attribute.
                # You must pass this URI to the next request.
                
                # Since the SDK method post_analytics_conversations_details_query does not accept nextUri directly,
                # we must use the 'ApiClient' directly or check if the SDK supports it.
                # As of recent SDK versions, you can pass nextUri as a query param.
                
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_request,
                    next_uri=next_uri
                )
            else:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_request
                )

            # Accumulate results
            if response.summary:
                all_summaries.extend(response.summary)
            
            # Check for next page
            if response.next_uri:
                next_uri = response.next_uri
                retry_count = 0 # Reset retry count on successful page fetch
            else:
                break # No more pages

        except ApiException as e:
            if e.status == 429:
                # Rate Limit Hit
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception(f"Max retries exceeded for rate limiting. Error: {e.body}")
                
                # Exponential backoff: 2^retry_count seconds
                backoff_time = 2 ** retry_count
                print(f"Rate limit hit. Retrying in {backoff_time} seconds...")
                time.sleep(backoff_time)
                continue
            elif e.status == 400:
                # Bad Request - likely invalid query structure
                raise Exception(f"Bad Request: {e.body}")
            elif e.status == 401 or e.status == 403:
                # Authentication/Authorization Failure
                raise Exception(f"Auth Error: {e.status} - {e.body}")
            else:
                raise

    return all_summaries

Note on Pagination:
The post_analytics_conversations_details_query method in the Genesys Cloud Python SDK supports a next_uri parameter. When the API returns a nextUri, you must pass it in the subsequent call. The SDK handles the underlying HTTP GET/POST switch if necessary, but for analytics queries, it remains a POST request with the nextUri appended to the query string or handled internally by the SDK client.

Step 3: Processing and Formatting Results

The raw response contains a list of ConversationSummary objects. Each object contains the grouped dimensions (queue ID/name, media type) and the metric values. You must map these to a usable format, such as a list of dictionaries or a Pandas DataFrame.

from typing import List, Dict, Any

def process_analytics_results(summaries: List[Any]) -> List[Dict[str, Any]]:
    """
    Converts raw ConversationSummary objects into a simplified list of dictionaries.
    
    Args:
        summaries: List of ConversationSummary objects from the API.
    
    Returns:
        List of dictionaries with flattened keys.
    """
    processed_data = []
    
    for summary in summaries:
        # Extract queue information
        queue_name = "Unknown"
        queue_id = "Unknown"
        if summary.queue:
            queue_name = summary.queue.name
            queue_id = summary.queue.id
        
        # Extract media type
        media_type = summary.media_type if summary.media_type else "Unknown"
        
        # Extract metrics
        # Note: Metric values are in milliseconds for time-based metrics
        total_handle_time = summary.total_handle_time or 0
        talk_time = summary.talk_time or 0
        wait_time = summary.wait_time or 0
        wrap_time = summary.wrap_time or 0
        total_interactions = summary.total_interactions or 0
        
        # Create a clean record
        record = {
            "queue_name": queue_name,
            "queue_id": queue_id,
            "media_type": media_type,
            "total_handle_time_ms": total_handle_time,
            "talk_time_ms": talk_time,
            "wait_time_ms": wait_time,
            "wrap_time_ms": wrap_time,
            "total_interactions": total_interactions
        }
        
        processed_data.append(record)
        
    return processed_data

Complete Working Example

This script combines all previous steps into a runnable module. It fetches analytics data for the last 24 hours, groups by queue and media type, and prints the results.

import os
import sys
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformApi, PureCloudRegionHost, ApiException
from purecloudplatformclientv2 import PostConversationDetailsQueryRequest, ConversationDetailsQuery, Metric, Grouping, AnalyticsApi
import time

# Load environment variables
load_dotenv()

def get_platform_api():
    region = os.getenv("GENESYS_CLOUD_REGION")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not all([region, client_id, client_secret]):
        raise ValueError("Missing required environment variables")

    region_host_map = {
        "us-east-1": PureCloudRegionHost.US_EAST,
        "us-east-2": PureCloudRegionHost.US_EAST_2,
        "eu-west-1": PureCloudRegionHost.EU_WEST_1,
        "ap-southeast-2": PureCloudRegionHost.AP_SOUTHEAST_2
    }

    if region not in region_host_map:
        raise ValueError(f"Unsupported region: {region}")

    platform = PlatformApi()
    platform.set_base_url(region_host_map[region])
    platform.login_client_credentials(client_id, client_secret)
    return platform

def build_analytics_query(start_time: str, end_time: str) -> PostConversationDetailsQueryRequest:
    metrics = [
        Metric(name="totalHandleTime"),
        Metric(name="talkTime"),
        Metric(name="waitTime"),
        Metric(name="totalInteractions")
    ]
    groupings = [
        Grouping(name="queue"),
        Grouping(name="mediaType")
    ]
    time_range = ConversationDetailsQuery(
        from_date=start_time,
        to_date=end_time,
        metrics=metrics,
        groupings=groupings
    )
    return PostConversationDetailsQueryRequest(body=time_range)

def fetch_analytics_data(platform, query_request):
    analytics_api = AnalyticsApi(platform.client)
    all_summaries = []
    next_uri = None
    max_retries = 3
    retry_count = 0

    while True:
        try:
            if next_uri:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_request,
                    next_uri=next_uri
                )
            else:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_request
                )

            if response.summary:
                all_summaries.extend(response.summary)
            
            if response.next_uri:
                next_uri = response.next_uri
                retry_count = 0
            else:
                break

        except ApiException as e:
            if e.status == 429:
                retry_count += 1
                if retry_count > max_retries:
                    raise Exception(f"Max retries exceeded. Error: {e.body}")
                backoff_time = 2 ** retry_count
                print(f"Rate limit hit. Retrying in {backoff_time}s...")
                time.sleep(backoff_time)
                continue
            else:
                raise

    return all_summaries

def process_analytics_results(summaries):
    processed_data = []
    for summary in summaries:
        queue_name = summary.queue.name if summary.queue else "Unknown"
        media_type = summary.media_type if summary.media_type else "Unknown"
        
        record = {
            "queue_name": queue_name,
            "media_type": media_type,
            "total_handle_time_ms": summary.total_handle_time or 0,
            "talk_time_ms": summary.talk_time or 0,
            "wait_time_ms": summary.wait_time or 0,
            "total_interactions": summary.total_interactions or 0
        }
        processed_data.append(record)
    return processed_data

def main():
    try:
        # 1. Initialize Platform
        platform = get_platform_api()
        
        # 2. Define Time Range (Last 24 Hours)
        now = datetime.now(timezone.utc)
        yesterday = now - timedelta(days=1)
        start_time = yesterday.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        end_time = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        
        print(f"Querying analytics from {start_time} to {end_time}")

        # 3. Build Query
        query_request = build_analytics_query(start_time, end_time)

        # 4. Fetch Data
        summaries = fetch_analytics_data(platform, query_request)
        print(f"Fetched {len(summaries)} summary records.")

        # 5. Process Results
        results = process_analytics_results(summaries)

        # 6. Output Results
        print(f"{'Queue':<20} | {'Media Type':<10} | {'Interactions':<12} | {'Avg Handle Time (ms)':<20}")
        print("-" * 70)
        for row in results:
            avg_handle = 0
            if row['total_interactions'] > 0:
                avg_handle = row['total_handle_time_ms'] / row['total_interactions']
            
            print(f"{row['queue_name']:<20} | {row['media_type']:<10} | {row['total_interactions']:<12} | {avg_handle:<20.2f}")

    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Invalid client ID/secret or expired token.
  • Fix: Verify credentials in your .env file. Ensure the OAuth application is active in Genesys Cloud Admin.
  • Code Check: The PlatformApi.login_client_credentials method throws an exception if the token cannot be acquired. Ensure you are catching ApiException.

Error: 403 Forbidden

  • Cause: Missing OAuth scopes.
  • Fix: Add analytics:report:read and analytics:conversation:read to the OAuth application’s scope list in Genesys Cloud Admin.
  • Note: Changes to scopes may take a few minutes to propagate.

Error: 429 Too Many Requests

  • Cause: Hitting the API rate limit.
  • Fix: Implement exponential backoff. The example code includes a retry loop with time.sleep(2 ** retry_count).
  • Prevention: Avoid running queries in tight loops. Cache results if possible.

Error: 400 Bad Request

  • Cause: Invalid query structure, such as unsupported metrics or groupings.
  • Fix: Verify that metric names and grouping names match the official API documentation. Ensure from_date and to_date are valid ISO 8601 strings.

Error: Empty Results

  • Cause: No conversations match the filter criteria or time range.
  • Fix: Verify the time range contains data. Ensure the queues specified (if filtered) exist and had activity.

Official References