Constructing a Genesys Cloud Analytics Aggregation Query by Queue and Media Type

Constructing a Genesys Cloud Analytics Aggregation Query by Queue and Media Type

What You Will Build

  • One sentence: You will build a script that queries the Genesys Cloud Analytics API to retrieve conversation volume metrics aggregated by specific queues and media types (voice, chat, email).
  • One sentence: This tutorial uses the Genesys Cloud analytics/conversations/details/query endpoint via the Python SDK.
  • One sentence: The code is written in Python 3.9+ using the genesyscloud SDK and httpx for underlying request handling.

Prerequisites

  • OAuth Client Type: Service Account (Client Credentials Grant) or Resource Owner Password Credentials.
  • Required Scopes: analytics:conversation:read, analytics:report:read.
  • SDK Version: genesyscloud Python SDK >= 140.0.0.
  • Runtime Requirements: Python 3.9 or higher.
  • Dependencies: pip install genesyscloud requests (the SDK relies on requests for HTTP transport).

Authentication Setup

Genesys Cloud APIs require OAuth 2.0 authentication. For backend integrations and analytics queries, the Client Credentials Grant is the standard pattern. This flow exchanges a Client ID and Client Secret for an access token.

The Python SDK handles token management automatically if you configure the PlatformClient correctly. However, understanding the underlying token lifecycle is critical for debugging 401 Unauthorized errors.

import os
from purecloudplatformclientv2 import PlatformClient

# Environment variables must be set before running
# GENESYS_CLOUD_REGION: e.g., 'mypurecloud.com', 'usw2.pure.cloud', 'au02.pure.cloud'
# GENESYS_CLOUD_CLIENT_ID: Your OAuth Client ID
# GENESYS_CLOUD_CLIENT_SECRET: Your OAuth Client Secret

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns an authenticated PlatformClient instance.
    """
    region = os.getenv("GENESYS_CLOUD_REGION")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not all([region, client_id, client_secret]):
        raise ValueError("Missing required environment variables: REGION, CLIENT_ID, CLIENT_SECRET")

    # The PlatformClient automatically handles token acquisition and refresh
    # when using the client credentials flow.
    platform_client = PlatformClient(
        region=region,
        client_id=client_id,
        client_secret=client_secret
    )

    return platform_client

Implementation

Step 1: Define the Analytics Query Body

The core of this tutorial is constructing the body parameter for the post_analytics_conversations_details_query method. The Genesys Cloud Analytics API uses a declarative query structure. You define what you want (metrics), how you want it grouped (dimensions), and when it occurred (date range).

To group by Queue and Media Type, you must include queue.id and mediaType in the groupBy array.

from purecloudplatformclientv2 import PostAnalyticsConversationsDetailsQueryRequestBody
from purecloudplatformclientv2 import ConversationDetailsQuery

def build_query_body() -> PostAnalyticsConversationsDetailsQueryRequestBody:
    """
    Constructs the request body for the Analytics API.
    
    Returns:
        PostAnalyticsConversationsDetailsQueryRequestBody: The configured query object.
    """
    # Define the date range (ISO 8601 format)
    # Example: Last 7 days
    from datetime import datetime, timedelta
    end_time = datetime.utcnow().isoformat()
    start_time = (datetime.utcnow() - timedelta(days=7)).isoformat()

    # Define the metrics you want to aggregate
    # 'count' is the number of conversations
    # 'talkTime' is the total time agents spent talking (in seconds)
    metrics = ["count", "talkTime"]

    # Define the dimensions to group by
    # 'queue.id' groups results by Queue ID
    # 'mediaType' groups results by Voice, Chat, Email, etc.
    group_by = ["queue.id", "mediaType"]

    # Define filters to narrow down the data
    # Here we filter for active queues only. 
    # You can add more filters like 'queue.name' if needed.
    filters = {
        "queue.id": {
            "type": "in",
            "values": [] # We will populate this in Step 2
        }
    }

    # Construct the query object
    query = ConversationDetailsQuery(
        date_from=start_time,
        date_to=end_time,
        group_by=group_by,
        metrics=metrics,
        filters=filters
    )

    # Wrap in the request body class
    request_body = PostAnalyticsConversationsDetailsQueryRequestBody(
        query=query
    )

    return request_body

Step 2: Resolve Queue IDs

The groupBy dimension queue.id returns results keyed by the internal Queue ID (e.g., a1b2c3d4-e5f6-7890-1234-567890abcdef). To make the output human-readable, you often need to map these IDs to Queue Names. While the Analytics API can return some display names in certain contexts, it is robust practice to fetch the list of queues separately if you need precise naming control, or simply handle the ID mapping in your post-processing logic.

For this tutorial, we will assume you want to query all queues. If you need to filter by specific queues, you must first retrieve their IDs.

from purecloudplatformclientv2 import QueueApi
from purecloudplatformclientv2 import PaginationConfiguration

def get_all_queue_ids(platform_client: PlatformClient) -> list:
    """
    Retrieves a list of all active Queue IDs.
    
    Args:
        platform_client: Authenticated PlatformClient instance.
        
    Returns:
        list: A list of Queue ID strings.
    """
    queue_api = QueueApi(platform_client)
    queue_ids = []
    
    # Pagination configuration for fetching queues
    pagination_config = PaginationConfiguration(
        page_size=100,
        max_pages=10 # Adjust based on expected number of queues
    )

    try:
        # Fetch all queues
        for page in queue_api.get_queues(pagination_config=pagination_config):
            for queue in page.entities:
                # Filter for active queues only
                if queue.status == "ACTIVE":
                    queue_ids.append(queue.id)
    except Exception as e:
        print(f"Error fetching queues: {e}")
        raise

    return queue_ids

Step 3: Execute the Query and Handle Pagination

The post_analytics_conversations_details_query endpoint returns a ConversationDetailsQueryResponse. This response contains a partitions array. Each partition represents a subset of the aggregated data. If the result set is large, Genesys Cloud splits it into multiple partitions. You must iterate through all partitions to get the complete dataset.

from purecloudplatformclientv2 import AnalyticsConversationsApi
from purecloudplatformclientv2 import ConversationDetailsQueryResponse

def execute_analytics_query(
    platform_client: PlatformClient, 
    request_body: PostAnalyticsConversationsDetailsQueryRequestBody
) -> list:
    """
    Executes the analytics query and flattens the results from all partitions.
    
    Args:
        platform_client: Authenticated PlatformClient instance.
        request_body: The constructed query body.
        
    Returns:
        list: A list of dictionaries containing the aggregated metrics.
    """
    analytics_api = AnalyticsConversationsApi(platform_client)
    all_results = []

    try:
        # Execute the query
        # The API returns a response with a 'partitions' field
        response: ConversationDetailsQueryResponse = analytics_api.post_analytics_conversations_details_query(
            body=request_body
        )

        # Check if partitions exist
        if response.partitions:
            for partition in response.partitions:
                # Each partition contains 'rows' which are the aggregated data points
                if partition.rows:
                    for row in partition.rows:
                        # Convert the SDK object to a dictionary for easier handling
                        # The 'group' field contains the values for the groupBy dimensions
                        # The 'metrics' field contains the values for the requested metrics
                        
                        result_entry = {
                            "queue_id": row.group.get("queue.id"),
                            "media_type": row.group.get("mediaType"),
                            "conversation_count": row.metrics.get("count"),
                            "total_talk_time_seconds": row.metrics.get("talkTime")
                        }
                        all_results.append(result_entry)
        else:
            print("No partitions returned. Check date range and filters.")

    except Exception as e:
        # Handle specific API errors
        if hasattr(e, 'status') and e.status == 429:
            print("Rate limited (429). Implement exponential backoff.")
        elif hasattr(e, 'status') and e.status == 400:
            print(f"Bad Request (400). Check query syntax: {e.body}")
        else:
            print(f"Error executing query: {e}")
            raise

    return all_results

Complete Working Example

This script combines authentication, queue resolution, query construction, and execution. It outputs a CSV-formatted string of the results.

import os
import csv
import io
from purecloudplatformclientv2 import PlatformClient
from purecloudplatformclientv2 import PostAnalyticsConversationsDetailsQueryRequestBody
from purecloudplatformclientv2 import ConversationDetailsQuery
from purecloudplatformclientv2 import AnalyticsConversationsApi
from purecloudplatformclientv2 import QueueApi
from purecloudplatformclientv2 import PaginationConfiguration
from datetime import datetime, timedelta

def main():
    # 1. Authenticate
    region = os.getenv("GENESYS_CLOUD_REGION")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not all([region, client_id, client_secret]):
        raise ValueError("Set GENESYS_CLOUD_REGION, CLIENT_ID, CLIENT_SECRET in environment.")

    platform_client = PlatformClient(
        region=region,
        client_id=client_id,
        client_secret=client_secret
    )

    # 2. Get Queue IDs (Optional: Filter specific queues here)
    queue_api = QueueApi(platform_client)
    queue_ids = []
    pagination_config = PaginationConfiguration(page_size=100, max_pages=5)
    
    try:
        for page in queue_api.get_queues(pagination_config=pagination_config):
            for queue in page.entities:
                if queue.status == "ACTIVE":
                    queue_ids.append(queue.id)
    except Exception as e:
        print(f"Failed to fetch queues: {e}")
        return

    if not queue_ids:
        print("No active queues found.")
        return

    # 3. Build Query
    end_time = datetime.utcnow().isoformat()
    start_time = (datetime.utcnow() - timedelta(days=7)).isoformat()

    query = ConversationDetailsQuery(
        date_from=start_time,
        date_to=end_time,
        group_by=["queue.id", "mediaType"],
        metrics=["count", "talkTime"],
        filters={
            "queue.id": {
                "type": "in",
                "values": queue_ids
            }
        }
    )

    request_body = PostAnalyticsConversationsDetailsQueryRequestBody(query=query)

    # 4. Execute Query
    analytics_api = AnalyticsConversationsApi(platform_client)
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(body=request_body)
        
        # 5. Process Results
        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["Queue ID", "Media Type", "Conversation Count", "Total Talk Time (s)"])
        
        if response.partitions:
            for partition in response.partitions:
                if partition.rows:
                    for row in partition.rows:
                        queue_id = row.group.get("queue.id", "N/A")
                        media_type = row.group.get("mediaType", "N/A")
                        count = row.metrics.get("count", 0)
                        talk_time = row.metrics.get("talkTime", 0)
                        
                        writer.writerow([queue_id, media_type, count, talk_time])
        
        print(output.getvalue())
        
    except Exception as e:
        print(f"Error: {e}")
        if hasattr(e, 'body'):
            print(f"Response Body: {e.body}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request - Invalid Group By Dimension

What causes it:
The groupBy array contains a dimension that is not supported for the requested metrics, or the syntax is incorrect. For example, trying to group by agent.id and queue.id simultaneously in certain metric combinations might be invalid if the metric does not support that granularity.

How to fix it:
Verify that the dimensions listed in groupBy are compatible with the metrics array. queue.id and mediaType are generally safe together. Ensure you are using the correct field names (queue.id, not queueId).

Code showing the fix:

# Incorrect
group_by = ["queueId", "mediaType"]

# Correct
group_by = ["queue.id", "mediaType"]

Error: 429 Too Many Requests

What causes it:
Genesys Cloud APIs enforce rate limits. Analytics queries can be resource-intensive. If you run this script in a loop or during peak hours, you may hit the limit.

How to fix it:
Implement exponential backoff. The Python SDK does not automatically retry 429s for all endpoints. You must catch the exception and retry with a delay.

Code showing the fix:

import time

def execute_with_retry(platform_client, request_body, max_retries=3):
    analytics_api = AnalyticsConversationsApi(platform_client)
    
    for attempt in range(max_retries):
        try:
            return analytics_api.post_analytics_conversations_details_query(body=request_body)
        except Exception as e:
            if hasattr(e, 'status') and e.status == 429:
                wait_time = 2 ** attempt  # Exponential backoff: 2s, 4s, 8s
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error: Empty Results

What causes it:

  1. The date range is in the future.
  2. The filters are too restrictive (e.g., filtering by a queue ID that does not exist).
  3. No conversations occurred in the selected queues during the time period.

How to fix it:

  1. Check date_from and date_to. Ensure date_from is before date_to.
  2. Remove filters temporarily to see if data appears.
  3. Verify that the queue IDs retrieved in Step 2 are active and have received traffic.

Code showing the fix:

# Debugging: Print the query JSON to verify syntax
import json
print(json.dumps(request_body.to_dict(), indent=2))

Official References