Constructing Analytics Aggregation Queries Grouped by Queue and Media Type

Constructing Analytics Aggregation Queries Grouped by Queue and Media Type

What You Will Build

  • You will build a Python script that queries the Genesys Cloud Analytics API to retrieve conversation metrics aggregated by queue and media type.
  • This tutorial uses the Genesys Cloud analytics domain, specifically the POST /api/v2/analytics/conversations/details/query endpoint.
  • The implementation is written in Python using the requests library for HTTP handling and json for payload construction.

Prerequisites

  • OAuth Client Type: Service Account (Client Credentials Grant) or User Agent (Authorization Code Grant). Service Account is recommended for background aggregation jobs.
  • Required Scopes:
    • analytics:conversation:view (Required to query conversation details)
    • analytics:report:view (Optional, if you plan to save this as a saved report later, but not required for the query itself)
  • SDK/Library Version: This tutorial uses raw HTTP via requests to expose the exact JSON structure required by the API. If using the official Python SDK (genesyscloud v13+), the underlying JSON structure remains identical.
  • Runtime Requirements: Python 3.8+
  • External Dependencies:
    • requests (for HTTP calls)
    • python-dateutil (for parsing ISO dates, though standard library datetime is sufficient for this example)

Authentication Setup

Genesys Cloud APIs require a valid Bearer token in the Authorization header. For automated scripts, the Client Credentials Grant is the standard pattern. You must obtain a token from the OAuth2 endpoint before making any API calls.

The following function handles token retrieval and basic caching. In a production environment, you should implement a TTL (Time-To-Live) cache to avoid requesting a new token on every single API call, as tokens are valid for one hour.

import requests
import json
import time
from typing import Optional

class GenesysClient:
    def __init__(self, client_id: str, client_secret: str, org_id: str, base_url: str = "https://api.mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.base_url = base_url
        self.token_url = f"{base_url}/oauth/token"
        self._access_token: Optional[str] = None
        self._token_expiry: float = 0.0

    def _get_token(self) -> str:
        """
        Retrieves an OAuth2 access token using Client Credentials Grant.
        Returns:
            str: The access token.
        """
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        
        # Note: In a real production script, check if self._access_token is still valid
        # against self._token_expiry before making this network call.
        
        try:
            response = requests.post(self.token_url, data=data)
            response.raise_for_status()
            token_data = response.json()
            self._access_token = token_data["access_token"]
            self._token_expiry = time.time() + token_data["expires_in"]
            return self._access_token
        except requests.exceptions.HTTPError as e:
            if response.status_code == 401:
                raise ValueError("Invalid Client ID or Secret. Check your credentials.") from e
            raise e

    def _get_headers(self) -> dict:
        """
        Returns the headers required for all Genesys Cloud API calls.
        """
        if not self._access_token or time.time() >= self._token_expiry:
            self._get_token()
            
        return {
            "Authorization": f"Bearer {self._access_token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

Implementation

Step 1: Constructing the Analytics Query Payload

The core of this tutorial is constructing the correct JSON payload for the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint uses a complex nested structure to define time ranges, groupings, and metrics.

To group by Queue and Media Type, you must define these entities in the groupBy array. Genesys Cloud analytics allows grouping by multiple dimensions, but the order matters for the resulting JSON structure. The first item in groupBy becomes the primary key, and subsequent items become nested keys.

Critical Parameters:

  1. dateFrom / dateTo: Must be in ISO 8601 format.
  2. groupBy: An array of strings. Use "queue" and "mediaType".
  3. metrics: An array of metric IDs. For a basic volume check, use conversationsHandled. For wait times, use waitTime.
  4. interval: Defines the time resolution. Use "P1D" for daily, "PT1H" for hourly, or "PT15M" for 15-minute intervals.
def build_aggregation_query(date_from: str, date_to: str, interval: str = "P1D") -> dict:
    """
    Constructs the analytics query payload.
    
    Args:
        date_from: Start date in ISO 8601 format (e.g., "2023-10-01T00:00:00.000Z")
        date_to: End date in ISO 8601 format (e.g., "2023-10-02T00:00:00.000Z")
        interval: Time interval string. Default is P1D (Daily).
        
    Returns:
        dict: The JSON payload ready for POST request.
    """
    payload = {
        "dateFrom": date_from,
        "dateTo": date_to,
        "interval": interval,
        "groupBy": [
            "queue",
            "mediaType"
        ],
        "metrics": [
            "conversationsHandled",
            "waitTime",
            "handleTime"
        ],
        "filter": {
            "type": "and",
            "clauses": []
            # Optional: Add filters here if you want to exclude specific queues or media types
            # Example: 
            # {
            #     "type": "in",
            #     "field": "queueId",
            #     "values": ["queue-id-1", "queue-id-2"]
            # }
        }
    }
    return payload

Step 2: Executing the Query and Handling Pagination

The Analytics API does not return all data in a single response if the dataset is large. It uses pagination via the nextPageToken field. You must loop through responses until nextPageToken is null or empty.

Additionally, the Analytics API is subject to rate limiting. A 429 status code requires an exponential backoff strategy.

import time

def fetch_analytics_data(client: GenesysClient, query_payload: dict) -> list:
    """
    Executes the analytics query and handles pagination.
    
    Args:
        client: The GenesysClient instance.
        query_payload: The dictionary payload from build_aggregation_query.
        
    Returns:
        list: A list of all response objects from the API.
    """
    url = f"{client.base_url}/api/v2/analytics/conversations/details/query"
    headers = client._get_headers()
    
    all_results = []
    page_token = None
    max_retries = 3
    
    while True:
        # Prepare request data
        data = query_payload.copy()
        
        # Add pagination token if it exists
        if page_token:
            data["pageToken"] = page_token
            
        retries = 0
        while retries < max_retries:
            try:
                response = requests.post(url, headers=headers, json=data)
                
                if response.status_code == 200:
                    result = response.json()
                    all_results.append(result)
                    
                    # Check for next page
                    page_token = result.get("nextPageToken")
                    if not page_token:
                        return all_results
                    
                    # Small delay to be respectful of rate limits between pages
                    time.sleep(0.5)
                    break # Break retry loop, continue pagination loop
                    
                elif response.status_code == 429:
                    # Rate limit hit
                    retries += 1
                    wait_time = 2 ** retries
                    print(f"Rate limited. Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    
                elif response.status_code in [401, 403]:
                    # Token expired or insufficient permissions
                    client._access_token = None # Force token refresh
                    headers = client._get_headers()
                    retries += 1
                    
                else:
                    response.raise_for_status()
                    
            except requests.exceptions.RequestException as e:
                retries += 1
                if retries >= max_retries:
                    raise Exception(f"Failed to fetch analytics data after {max_retries} retries: {e}")
                time.sleep(2 ** retries)
                
    return all_results

Step 3: Processing the Nested GroupBy Structure

When you group by multiple fields (queue and mediaType), the response structure is nested. The partitions object in the response will have keys representing the first group-by field (Queue IDs). Each Queue ID key will contain another object with keys representing the second group-by field (Media Types).

Response Structure Example:

{
  "partitions": {
    "queue-id-123": {
      "voice": {
        "metrics": {
          "conversationsHandled": { "value": 150 }
        }
      },
      "chat": {
        "metrics": {
          "conversationsHandled": { "value": 45 }
        }
      }
    },
    "queue-id-456": {
      "voice": {
        "metrics": {
          "conversationsHandled": { "value": 200 }
        }
      }
    }
  }
}

The following function flattens this nested structure into a list of dictionaries, which is easier to export to CSV or load into a DataFrame.

def flatten_analytics_results(all_results: list) -> list:
    """
    Flattens the nested partitions from the analytics API response.
    
    Args:
        all_results: List of response dictionaries from fetch_analytics_data.
        
    Returns:
        list: List of dictionaries with flat keys: queueId, mediaType, metricName, value.
    """
    flat_data = []
    
    for result in all_results:
        partitions = result.get("partitions", {})
        
        # Iterate over Queue IDs (First GroupBy)
        for queue_id, queue_data in partitions.items():
            # queue_data is a dict of mediaType -> metrics
            
            # Iterate over Media Types (Second GroupBy)
            for media_type, media_data in queue_data.items():
                metrics = media_data.get("metrics", {})
                
                # Iterate over Metrics
                for metric_name, metric_data in metrics.items():
                    value = metric_data.get("value", 0)
                    
                    flat_data.append({
                        "queueId": queue_id,
                        "mediaType": media_type,
                        "metricName": metric_name,
                        "value": value,
                        "interval": result.get("interval", "unknown")
                    })
                    
    return flat_data

Complete Working Example

The following script combines all previous steps into a single runnable file. Replace the placeholder credentials with your actual Genesys Cloud Service Account details.

import requests
import json
import time
from typing import Optional, List, Dict

# ==========================
# Configuration
# ==========================
CLIENT_ID = "your-client-id-here"
CLIENT_SECRET = "your-client-secret-here"
ORG_ID = "your-org-id-here"
BASE_URL = "https://api.mypurecloud.com"

# Date range for the query (ISO 8601)
# Example: Last 24 hours
from datetime import datetime, timedelta
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=1)

DATE_FROM = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
DATE_TO = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")

# ==========================
# Genesys Client Class
# ==========================
class GenesysClient:
    def __init__(self, client_id: str, client_secret: str, org_id: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.base_url = base_url
        self.token_url = f"{base_url}/oauth/token"
        self._access_token: Optional[str] = None
        self._token_expiry: float = 0.0

    def _get_token(self) -> str:
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        response = requests.post(self.token_url, data=data)
        response.raise_for_status()
        token_data = response.json()
        self._access_token = token_data["access_token"]
        self._token_expiry = time.time() + token_data["expires_in"]
        return self._access_token

    def _get_headers(self) -> dict:
        if not self._access_token or time.time() >= self._token_expiry:
            self._get_token()
        return {
            "Authorization": f"Bearer {self._access_token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

# ==========================
# Logic Functions
# ==========================

def build_aggregation_query(date_from: str, date_to: str, interval: str = "P1D") -> dict:
    return {
        "dateFrom": date_from,
        "dateTo": date_to,
        "interval": interval,
        "groupBy": [
            "queue",
            "mediaType"
        ],
        "metrics": [
            "conversationsHandled",
            "waitTime",
            "handleTime"
        ],
        "filter": {
            "type": "and",
            "clauses": []
        }
    }

def fetch_analytics_data(client: GenesysClient, query_payload: dict) -> list:
    url = f"{client.base_url}/api/v2/analytics/conversations/details/query"
    headers = client._get_headers()
    all_results = []
    page_token = None
    max_retries = 3

    while True:
        data = query_payload.copy()
        if page_token:
            data["pageToken"] = page_token
            
        retries = 0
        while retries < max_retries:
            try:
                response = requests.post(url, headers=headers, json=data)
                
                if response.status_code == 200:
                    result = response.json()
                    all_results.append(result)
                    page_token = result.get("nextPageToken")
                    if not page_token:
                        return all_results
                    time.sleep(0.5)
                    break
                    
                elif response.status_code == 429:
                    retries += 1
                    wait_time = 2 ** retries
                    print(f"Rate limited. Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    
                elif response.status_code in [401, 403]:
                    client._access_token = None
                    headers = client._get_headers()
                    retries += 1
                    
                else:
                    response.raise_for_status()
                    
            except requests.exceptions.RequestException as e:
                retries += 1
                if retries >= max_retries:
                    raise Exception(f"Failed to fetch analytics data: {e}")
                time.sleep(2 ** retries)
    return all_results

def flatten_analytics_results(all_results: list) -> list:
    flat_data = []
    for result in all_results:
        partitions = result.get("partitions", {})
        for queue_id, queue_data in partitions.items():
            for media_type, media_data in queue_data.items():
                metrics = media_data.get("metrics", {})
                for metric_name, metric_data in metrics.items():
                    value = metric_data.get("value", 0)
                    flat_data.append({
                        "queueId": queue_id,
                        "mediaType": media_type,
                        "metricName": metric_name,
                        "value": value,
                        "interval": result.get("interval", "unknown")
                    })
    return flat_data

# ==========================
# Main Execution
# ==========================

if __name__ == "__main__":
    print("Initializing Genesys Client...")
    client = GenesysClient(CLIENT_ID, CLIENT_SECRET, ORG_ID, BASE_URL)
    
    print("Building Query...")
    query = build_aggregation_query(DATE_FROM, DATE_TO, interval="P1D")
    
    print(f"Fetching data from {DATE_FROM} to {DATE_TO}...")
    try:
        raw_results = fetch_analytics_data(client, query)
        print(f"Received {len(raw_results)} page(s) of data.")
        
        print("Flattening results...")
        flat_data = flatten_analytics_results(raw_results)
        
        print(f"Total rows extracted: {len(flat_data)}")
        
        # Output sample data
        if flat_data:
            print("\nSample Data (First 5 rows):")
            print(json.dumps(flat_data[:5], indent=2))
        else:
            print("No data found for the specified criteria.")
            
    except Exception as e:
        print(f"Error: {e}")

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is invalid, expired, or the Client ID/Secret is incorrect.
Fix: Ensure your CLIENT_ID and CLIENT_SECRET are correct. Check that the Service Account has not been disabled. In the code, the _get_token method handles refresh, but if the credentials themselves are wrong, it will throw a 401 on the token endpoint.

Error: 403 Forbidden

Cause: The Service Account lacks the required scope analytics:conversation:view.
Fix: Go to the Genesys Cloud Admin Console → Platform → Integrations → [Your Integration]. Edit the OAuth Scopes and ensure analytics:conversation:view is checked. Save the integration and regenerate the Client Secret if prompted.

Error: 429 Too Many Requests

Cause: You have exceeded the rate limit for the Analytics API. Analytics queries are computationally expensive.
Fix: The code includes a retry mechanism with exponential backoff. If you still see 429s, increase the time.sleep duration between requests or reduce the frequency of your queries. Do not parallelize analytics queries aggressively.

Error: Empty Partitions

Cause: The query returned successfully, but partitions is empty.
Fix:

  1. Check the dateFrom and dateTo range. Ensure it is not in the future.
  2. Ensure the interval is appropriate. If you query a 1-hour range with P1D (Daily), you may get aggregated data, but if there is no data in that day, it will be empty.
  3. Verify that the queues you expect to see actually had conversations in that media type during that time.

Error: KeyError in Flatten Function

Cause: The API response structure changed, or the metric requested does not exist for a specific partition.
Fix: The flatten function uses .get() methods with default values (e.g., metrics.get("metrics", {})). If you encounter a KeyError, inspect the raw result JSON. Sometimes, if a queue has no data for a specific media type, that media type key might be omitted entirely from the nested object. The current code handles this by iterating over existing keys only.

Official References