Constructing an Analytics API Aggregation Query by Queue and Media Type

Constructing an Analytics API Aggregation Query by Queue and Media Type

What You Will Build

  • You will build a script that queries the Genesys Cloud Analytics API to retrieve conversation metrics grouped by queue and media type.
  • This uses the /api/v2/analytics/conversations/details/query endpoint via the Python SDK.
  • The implementation covers Python with the genesyscloud SDK.

Prerequisites

  • OAuth Client Type: Confidential client (Client Credentials Grant) or Public client with PKCE. For server-side scripts, Client Credentials is standard.
  • Required Scopes: analytics:conversation:view is mandatory. If you need user-specific details, add user:view.
  • SDK Version: Genesys Cloud Python SDK version 153.0.0 or higher.
  • Language/Runtime: Python 3.8+.
  • External Dependencies: genesys-cloud, python-dotenv for credential management.

Install the dependencies:

pip install genesys-cloud python-dotenv

Authentication Setup

The Genesys Cloud SDK handles the OAuth token lifecycle automatically when initialized with client credentials. You must set the region parameter correctly. For the US East region (the default), the hostname is api.mypurecloud.com. For EU, use api.eu.purecloud.com.

Create a .env file in your project root:

GENESYS_CLOUD_REGION=us_east
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret

Initialize the SDK in your script:

import os
from dotenv import load_dotenv
from purecloud_platform_client_v2 import PlatformClient

load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns a configured Genesys Cloud PlatformClient.
    """
    client = PlatformClient()
    
    # Configure OAuth
    client.set_oauth_client_credentials(
        os.getenv("GENESYS_CLOUD_CLIENT_ID"),
        os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
    )
    
    # Set region explicitly to avoid defaulting errors if env var is missing
    region = os.getenv("GENESYS_CLOUD_REGION", "us_east")
    if region == "eu":
        client.set_base_url("https://api.eu.purecloud.com")
    elif region == "ap":
        client.set_base_url("https://api.ap.purecloud.com")
    # us_east defaults to https://api.mypurecloud.com
    
    return client

client = get_platform_client()
analytics_api = client.analytics_api

Implementation

Step 1: Define the Date Range and Query Parameters

The Analytics API requires a strict ISO 8601 date range. The to date must be in the future relative to the from date. For aggregated data, you often want to pull data from “yesterday” to avoid partial day data, or use a fixed historical window.

We also need to define the groupBy fields. The prompt requires grouping by queue and mediaType.

from datetime import datetime, timedelta
from purecloud_platform_client_v2.model import AnalyticsQuery

def build_analytics_query() -> AnalyticsQuery:
    """
    Constructs the AnalyticsQuery object with specific grouping and filtering.
    """
    # Define date range: Last 7 days up to now
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=7)
    
    # Format to ISO 8601 with timezone offset (UTC)
    from_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    to_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    # Initialize the query object
    query = AnalyticsQuery(
        date_from=from_str,
        date_to=to_str,
        group_by=["queue", "mediaType"],
        # Select specific metrics to reduce payload size and improve performance
        select=[
            "queue/name",
            "queue/id",
            "mediaType",
            "conversationCount",
            "wrapUpTime",
            "talkTime",
            "holdTime"
        ]
    )
    
    return query

Key Parameter Explanation:

  • group_by: This is the core of the aggregation. By including queue and mediaType, the API returns one row for every unique combination of Queue ID and Media Type (e.g., “Sales Queue” + “voice”, “Sales Queue” + “chat”).
  • select: You cannot select *. You must list specific metrics. queue/name pulls the human-readable name. conversationCount is a standard metric. talkTime and holdTime are useful for calculating efficiency.
  • date_from / date_to: Must be inclusive. The API calculates metrics for conversations that occurred within this window.

Step 2: Execute the Query and Handle Pagination

The Analytics API returns paginated results. The default page size is often small (e.g., 50-100 records). To get all data, you must iterate through pages using the nextPage token.

from purecloud_platform_client_v2.rest import ApiException
from typing import List, Dict, Any

def fetch_analytics_data(client: PlatformClient, query: AnalyticsQuery) -> List[Dict[str, Any]]:
    """
    Executes the analytics query and handles pagination.
    Returns a flat list of aggregated data rows.
    """
    all_results = []
    next_page_token = None
    page_count = 0
    
    # Maximum pages to prevent infinite loops in case of API errors
    max_pages = 50 
    
    while page_count < max_pages:
        try:
            # The API call: post_analytics_conversations_details_query
            # Parameters:
            # - body: The AnalyticsQuery object
            # - size: Records per page (max 1000 for details, but aggregation limits may vary)
            # - next_page: Token from previous response
            
            response = analytics_api.post_analytics_conversations_details_query(
                body=query,
                size=100,
                next_page=next_page_token
            )
            
            # Check if response has entities
            if response.entities:
                all_results.extend(response.entities)
                print(f"Fetched page {page_count + 1} with {len(response.entities)} records.")
            
            # Check for next page
            if response.next_page:
                next_page_token = response.next_page
                page_count += 1
            else:
                # No more pages
                break
                
        except ApiException as e:
            print(f"API Exception: {e.status} {e.reason}")
            print(f"Body: {e.body}")
            if e.status == 429:
                print("Rate limited. Waiting 5 seconds before retry...")
                import time
                time.sleep(5)
                continue
            elif e.status == 400:
                raise ValueError(f"Bad Request. Check date format and group_by fields. {e.body}")
            else:
                raise e
        except Exception as e:
            print(f"Unexpected error: {e}")
            raise e
            
    return all_results

Error Handling Notes:

  • 400 Bad Request: Often caused by invalid date_from/date_to formats or requesting metrics that do not exist for the specified group_by fields. For example, you cannot group by user and select queue/name in a single aggregation without flattening logic, but queue and mediaType is a safe combination.
  • 401 Unauthorized: Ensure the OAuth token is valid and the client has the analytics:conversation:view scope.
  • 429 Too Many Requests: The Analytics API has strict rate limits (typically 10 requests per second for detail queries). If you hit this, implement exponential backoff. The example above uses a simple 5-second sleep for brevity.

Step 3: Process and Flatten the Results

The response structure from Genesys Cloud Analytics is hierarchical. When you group by queue and mediaType, the entities list contains objects where the grouping keys are nested.

A typical entity looks like this:

{
  "queue": {
    "id": "12345",
    "name": "Support Queue"
  },
  "mediaType": "voice",
  "conversationCount": 150,
  "talkTime": "PT1200S",
  "wrapUpTime": "PT300S"
}

You need to flatten this for consumption by a spreadsheet or database.

import pandas as pd
from datetime import timedelta

def process_results(results: List[Dict[str, Any]]) -> pd.DataFrame:
    """
    Flattens the hierarchical API response into a flat DataFrame.
    Converts ISO duration strings (PT...S) to seconds.
    """
    if not results:
        return pd.DataFrame()
    
    flat_data = []
    
    for item in results:
        # Extract nested queue info safely
        queue_info = item.get("queue", {})
        queue_id = queue_info.get("id", "Unknown")
        queue_name = queue_info.get("name", "Unknown")
        
        # Extract media type
        media_type = item.get("mediaType", "Unknown")
        
        # Extract metrics
        conversation_count = item.get("conversationCount", 0)
        
        # Parse ISO 8601 Duration strings (e.g., "PT1200S")
        talk_time_sec = parse_duration_to_seconds(item.get("talkTime", "PT0S"))
        hold_time_sec = parse_duration_to_seconds(item.get("holdTime", "PT0S"))
        wrap_up_time_sec = parse_duration_to_seconds(item.get("wrapUpTime", "PT0S"))
        
        flat_data.append({
            "Queue ID": queue_id,
            "Queue Name": queue_name,
            "Media Type": media_type,
            "Conversation Count": conversation_count,
            "Talk Time (seconds)": talk_time_sec,
            "Hold Time (seconds)": hold_time_sec,
            "Wrap Up Time (seconds)": wrap_up_time_sec
        })
        
    df = pd.DataFrame(flat_data)
    
    # Optional: Calculate average talk time per conversation
    if not df.empty:
        df["Avg Talk Time (sec)"] = df.apply(
            lambda row: row["Talk Time (seconds)"] / row["Conversation Count"] if row["Conversation Count"] > 0 else 0,
            axis=1
        )
        
    return df

def parse_duration_to_seconds(duration_str: str) -> float:
    """
    Converts ISO 8601 duration string (e.g., PT1H30M20S) to total seconds.
    """
    if not duration_str or duration_str == "PT0S":
        return 0.0
    
    try:
        # Simple regex parser for common Genesys duration formats
        import re
        # Pattern matches PT[H]h[M]m[S]s
        match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration_str)
        if match:
            hours = int(match.group(1)) if match.group(1) else 0
            minutes = int(match.group(2)) if match.group(2) else 0
            seconds = int(match.group(3)) if match.group(3) else 0
            return (hours * 3600) + (minutes * 60) + seconds
        else:
            return 0.0
    except Exception:
        return 0.0

Complete Working Example

This script combines all steps into a single executable module. Save this as analytics_queue_media_query.py.

import os
import sys
import pandas as pd
import re
from datetime import datetime, timedelta
from dotenv import load_dotenv

# Import Genesys Cloud SDK
from purecloud_platform_client_v2 import PlatformClient
from purecloud_platform_client_v2.model import AnalyticsQuery
from purecloud_platform_client_v2.rest import ApiException

def load_env():
    load_dotenv()
    if not all([os.getenv("GENESYS_CLOUD_CLIENT_ID"), os.getenv("GENESYS_CLOUD_CLIENT_SECRET")]):
        raise EnvironmentError("Missing GENESYS_CLOUD_CLIENT_ID or GENESYS_CLOUD_CLIENT_SECRET in .env")

def get_platform_client() -> PlatformClient:
    client = PlatformClient()
    client.set_oauth_client_credentials(
        os.getenv("GENESYS_CLOUD_CLIENT_ID"),
        os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
    )
    region = os.getenv("GENESYS_CLOUD_REGION", "us_east")
    if region == "eu":
        client.set_base_url("https://api.eu.purecloud.com")
    elif region == "ap":
        client.set_base_url("https://api.ap.purecloud.com")
    return client

def build_analytics_query() -> AnalyticsQuery:
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=7)
    
    from_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    to_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    return AnalyticsQuery(
        date_from=from_str,
        date_to=to_str,
        group_by=["queue", "mediaType"],
        select=[
            "queue/name",
            "queue/id",
            "mediaType",
            "conversationCount",
            "talkTime",
            "holdTime",
            "wrapUpTime"
        ]
    )

def parse_duration_to_seconds(duration_str: str) -> float:
    if not duration_str or duration_str == "PT0S":
        return 0.0
    try:
        match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration_str)
        if match:
            hours = int(match.group(1)) if match.group(1) else 0
            minutes = int(match.group(2)) if match.group(2) else 0
            seconds = int(match.group(3)) if match.group(3) else 0
            return (hours * 3600) + (minutes * 60) + seconds
    except Exception:
        pass
    return 0.0

def fetch_and_process_analytics():
    load_env()
    client = get_platform_client()
    analytics_api = client.analytics_api
    
    query = build_analytics_query()
    
    all_results = []
    next_page_token = None
    page_count = 0
    max_pages = 50
    
    print("Starting Analytics Query...")
    
    while page_count < max_pages:
        try:
            response = analytics_api.post_analytics_conversations_details_query(
                body=query,
                size=100,
                next_page=next_page_token
            )
            
            if response.entities:
                all_results.extend(response.entities)
                print(f"Page {page_count + 1}: Retrieved {len(response.entities)} records.")
            
            if response.next_page:
                next_page_token = response.next_page
                page_count += 1
            else:
                break
                
        except ApiException as e:
            print(f"API Error {e.status}: {e.reason}")
            if e.status == 429:
                import time
                print("Rate limited. Retrying in 5s...")
                time.sleep(5)
                continue
            raise e
            
    if not all_results:
        print("No data returned.")
        return

    # Flatten Data
    flat_data = []
    for item in all_results:
        queue_info = item.get("queue", {})
        flat_data.append({
            "Queue ID": queue_info.get("id", "Unknown"),
            "Queue Name": queue_info.get("name", "Unknown"),
            "Media Type": item.get("mediaType", "Unknown"),
            "Conversation Count": item.get("conversationCount", 0),
            "Talk Time (sec)": parse_duration_to_seconds(item.get("talkTime", "PT0S")),
            "Hold Time (sec)": parse_duration_to_seconds(item.get("holdTime", "PT0S")),
            "Wrap Up Time (sec)": parse_duration_to_seconds(item.get("wrapUpTime", "PT0S"))
        })

    df = pd.DataFrame(flat_data)
    
    # Calculate Averages
    if not df.empty:
        df["Avg Talk Time (sec)"] = df.apply(
            lambda row: row["Talk Time (sec)"] / row["Conversation Count"] if row["Conversation Count"] > 0 else 0,
            axis=1
        )
        
        # Sort by Queue Name and Media Type
        df = df.sort_values(by=["Queue Name", "Media Type"])
        
        # Export to CSV
        output_filename = "analytics_queue_media_report.csv"
        df.to_csv(output_filename, index=False)
        print(f"Report saved to {output_filename}")
        print(df.head(10))
    else:
        print("No valid records to process.")

if __name__ == "__main__":
    try:
        fetch_and_process_analytics()
    except Exception as e:
        print(f"Fatal Error: {e}")
        sys.exit(1)

Common Errors & Debugging

Error: 400 Bad Request - “Invalid group by field”

Cause: The group_by field specified does not exist for the selected metrics or the query type. For example, trying to group by user and queue simultaneously in a single aggregation query can sometimes fail if the metrics requested are not compatible with that level of granularity.

Fix: Verify the group_by fields against the Genesys Cloud Analytics API documentation. Ensure that queue and mediaType are valid. If you need user-level data, you may need to run a separate query grouped by user and then join the data in your application logic.

Error: 401 Unauthorized - “Insufficient permissions”

Cause: The OAuth token does not have the analytics:conversation:view scope.

Fix: Go to the Genesys Cloud Admin Console → Platform → OAuth. Edit your client application. Add analytics:conversation:view to the Scopes list. Save the changes. Note: Existing tokens may need to be refreshed.

Error: 429 Too Many Requests

Cause: You are exceeding the rate limit for the Analytics API. The limit is typically 10 requests per second for detail queries.

Fix: Implement exponential backoff. In the provided code, a simple 5-second sleep is used. For production systems, use a library like tenacity or backoff to handle retries gracefully.

from tenacity import retry, wait_exponential, stop_after_attempt

@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5))
def safe_api_call():
    # API call here
    pass

Error: Empty Result Set

Cause: The date range is too narrow, or there is no data for the specific queues/media types in that window.

Fix: Expand the date_from window. Ensure that the queues selected (if you filter by queue ID) actually had conversations in that period. Check the select list to ensure you are requesting metrics that exist for mediaType (e.g., talkTime exists for voice, but not for SMS).

Official References