Querying Genesys Cloud Analytics by Queue and Media Type

Querying Genesys Cloud Analytics by Queue and Media Type

What You Will Build

  • A Python script that constructs a valid aggregation query to retrieve conversation metrics grouped by queue ID and media type.
  • This tutorial uses the Genesys Cloud analytics/conversations/details/query endpoint and the official genesys-cloud-sdk-python library.
  • The implementation is written in Python 3.9+ using the requests library for low-level HTTP control and the SDK for object construction.

Prerequisites

  • OAuth Client Type: Service Account or Web Application (Client Credentials Grant).
  • Required Scopes: analytics:query:read is mandatory for executing aggregation queries.
  • SDK Version: genesys-cloud-sdk-python >= 140.0.0.
  • Runtime: Python 3.9 or higher.
  • Dependencies:
    • genesys-cloud-sdk-python: The official client library.
    • httpx: A modern HTTP client library for making raw requests to verify payload structure.

Install the dependencies:

pip install genesys-cloud-sdk-python httpx

Authentication Setup

Genesys Cloud APIs require a valid access token obtained via OAuth 2.0. For backend integrations, the Client Credentials Grant is the standard flow. You must create a Service Account in the Genesys Cloud Admin Console and assign it the analytics:query:read scope.

The following code demonstrates how to obtain a token using httpx. In production, you should cache this token and refresh it before expiration (tokens typically last 1 hour).

import httpx
import os
import time

# Configuration from environment variables
CLIENT_ID = os.environ.get("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.environ.get("GENESYS_CLIENT_SECRET")
REGION = os.environ.get("GENESYS_REGION", "us-east-1")

def get_access_token() -> str:
    """
    Retrieves an OAuth 2.0 access token from Genesys Cloud.
    """
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    base_url = f"https://{REGION}.mygenesys.com"
    url = f"{base_url}/oauth/token"

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }

    try:
        response = httpx.post(url, headers=headers, data=data)
        response.raise_for_status()
        token_data = response.json()
        return token_data["access_token"]
    except httpx.HTTPStatusError as e:
        print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
        raise
    except Exception as e:
        print(f"Unexpected error during authentication: {e}")
        raise

# Retrieve token
token = get_access_token()
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

Implementation

Step 1: Constructing the Aggregation Query Body

The analytics/conversations/details/query endpoint accepts a complex JSON body defining the date range, metrics, and groupings. The critical component for this tutorial is the groupings array. To group by queue and media type, you must specify the type and id for each grouping dimension.

  • Queue Grouping: type: "queue", id: "id" (or name if you prefer name-based grouping, but id is recommended for performance and consistency).
  • Media Type Grouping: type: "mediaType", id: "id".

The following code constructs the query payload. Note the use of startTime and endTime in ISO 8601 format.

from datetime import datetime, timedelta

def build_query_payload(start_time: datetime, end_time: datetime) -> dict:
    """
    Constructs the JSON payload for the analytics aggregation query.
    Groups by Queue ID and Media Type.
    """
    # Define the metrics to retrieve. 
    # 'totalConversations' is a standard metric available across most media types.
    metrics = ["totalConversations", "totalHandledConversations"]

    payload = {
        "dateFrom": start_time.isoformat(),
        "dateTo": end_time.isoformat(),
        "interval": "P1D",  # Daily interval (ISO 8601 Duration format)
        "metricFilters": [],
        "metrics": metrics,
        "groupings": [
            {
                "type": "queue",
                "id": "id"
            },
            {
                "type": "mediaType",
                "id": "id"
            }
        ],
        "pageSize": 1000,
        "totalCount": True
    }
    
    return payload

# Example usage: Last 7 days
end_dt = datetime.utcnow()
start_dt = end_dt - timedelta(days=7)
query_body = build_query_payload(start_dt, end_dt)

print("Query Payload:")
import json
print(json.dumps(query_body, indent=2))

Expected Response Structure Preview:
When sent to the API, this payload will return a JSON object containing a data array. Each item in data represents a unique combination of Queue and Media Type for a specific time interval.

{
  "data": [
    {
      "interval": "2023-10-25T00:00:00Z",
      "groups": [
        {
          "queue": {
            "id": "12345678-1234-1234-1234-123456789012",
            "name": "Sales Support"
          },
          "mediaType": {
            "id": "voice",
            "name": "Voice"
          },
          "metricResults": {
            "totalConversations": {
              "value": 150
            },
            "totalHandledConversations": {
              "value": 145
            }
          }
        },
        {
          "queue": {
            "id": "12345678-1234-1234-1234-123456789012",
            "name": "Sales Support"
          },
          "mediaType": {
            "id": "chat",
            "name": "Chat"
          },
          "metricResults": {
            "totalConversations": {
              "value": 30
            },
            "totalHandledConversations": {
              "value": 30
            }
          }
        }
      ]
    }
  ],
  "totalCount": 14,
  "nextPageToken": null
}

Step 2: Executing the Query with Retry Logic

Analytics queries can be resource-intensive. Genesys Cloud may return a 429 Too Many Requests if you exceed rate limits, or a 503 Service Unavailable if the analytics engine is under heavy load. Production code must implement exponential backoff for retries.

We will use httpx to send the raw request to demonstrate full control over headers and retries, rather than relying solely on the SDK’s internal retry mechanism, which can sometimes obscure specific error details.

import httpx
import time
import random

def query_analytics(base_url: str, headers: dict, payload: dict) -> dict:
    """
    Executes the analytics query with exponential backoff retry logic.
    """
    url = f"{base_url}/api/v2/analytics/conversations/details/query"
    
    max_retries = 3
    base_delay = 2.0  # seconds

    for attempt in range(max_retries):
        try:
            response = httpx.post(url, headers=headers, json=payload, timeout=60.0)
            
            # Handle Rate Limiting
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
                print(f"Rate limited. Retrying in {retry_after} seconds...")
                time.sleep(retry_after)
                continue
            
            # Handle Service Unavailable
            if response.status_code == 503:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Service unavailable. Retrying in {delay:.2f} seconds...")
                time.sleep(delay)
                continue

            response.raise_for_status()
            return response.json()

        except httpx.HTTPStatusError as e:
            if e.response.status_code in [401, 403]:
                print(f"Authentication/Authorization error: {e.response.status_code}")
                raise
            elif e.response.status_code == 422:
                print(f"Unprocessable Entity. Check query payload syntax: {e.response.text}")
                raise
            else:
                print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
                raise
        except httpx.RequestError as e:
            print(f"Network error: {e}")
            time.sleep(base_delay * (2 ** attempt))

    raise Exception("Max retries exceeded for analytics query.")

# Execute the query
region = os.environ.get("GENESYS_REGION", "us-east-1")
base_url = f"https://{region}.mygenesys.com"

try:
    result = query_analytics(base_url, headers, query_body)
    print("Query successful.")
except Exception as e:
    print(f"Failed to retrieve analytics data: {e}")

Step 3: Processing and Flattening Results

The response from analytics/conversations/details/query is nested. The data array contains intervals. Inside each interval, the groups array contains the aggregated metrics for each unique combination of the grouping dimensions (Queue and Media Type).

To make this data useful for reporting or database insertion, we should flatten it into a list of dictionaries, where each dictionary represents a single row of data (Queue ID, Queue Name, Media Type, Metric Value, Interval).

def flatten_analytics_data(response: dict) -> list:
    """
    Flattens the nested analytics response into a list of flat records.
    """
    flat_records = []
    
    if "data" not in response:
        return flat_records

    for interval_data in response["data"]:
        interval = interval_data.get("interval")
        groups = interval_data.get("groups", [])

        for group in groups:
            queue_info = group.get("queue", {})
            media_info = group.get("mediaType", {})
            metrics = group.get("metricResults", {})

            # Extract metric values safely
            total_conversations = metrics.get("totalConversations", {}).get("value", 0)
            total_handled = metrics.get("totalHandledConversations", {}).get("value", 0)

            record = {
                "interval": interval,
                "queue_id": queue_info.get("id"),
                "queue_name": queue_info.get("name"),
                "media_type_id": media_info.get("id"),
                "media_type_name": media_info.get("name"),
                "total_conversations": total_conversations,
                "total_handled_conversations": total_handled
            }
            flat_records.append(record)

    return flat_records

# Process the result from Step 2
if 'result' in locals():
    flat_data = flatten_analytics_data(result)
    for row in flat_data[:5]:  # Print first 5 rows
        print(row)

Complete Working Example

The following script combines authentication, query construction, execution with retries, and result flattening into a single runnable module.

import os
import httpx
import time
import random
import json
from datetime import datetime, timedelta

# --- Configuration ---
CLIENT_ID = os.environ.get("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.environ.get("GENESYS_CLIENT_SECRET")
REGION = os.environ.get("GENESYS_REGION", "us-east-1")

if not CLIENT_ID or not CLIENT_SECRET:
    raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")

BASE_URL = f"https://{REGION}.mygenesys.com"

def get_access_token() -> str:
    """Retrieves an OAuth 2.0 access token."""
    url = f"{BASE_URL}/oauth/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    
    response = httpx.post(url, headers=headers, data=data)
    response.raise_for_status()
    return response.json()["access_token"]

def build_query_payload(start_time: datetime, end_time: datetime) -> dict:
    """Constructs the analytics aggregation query payload."""
    return {
        "dateFrom": start_time.isoformat() + "Z",
        "dateTo": end_time.isoformat() + "Z",
        "interval": "P1D",
        "metrics": ["totalConversations", "totalHandledConversations"],
        "groupings": [
            {"type": "queue", "id": "id"},
            {"type": "mediaType", "id": "id"}
        ],
        "pageSize": 1000,
        "totalCount": True
    }

def execute_query(headers: dict, payload: dict) -> dict:
    """Executes the query with exponential backoff."""
    url = f"{BASE_URL}/api/v2/analytics/conversations/details/query"
    max_retries = 3
    base_delay = 2.0

    for attempt in range(max_retries):
        try:
            response = httpx.post(url, headers=headers, json=payload, timeout=60.0)
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
                time.sleep(retry_after)
                continue
            
            if response.status_code == 503:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)
                continue

            response.raise_for_status()
            return response.json()

        except httpx.HTTPStatusError as e:
            if e.response.status_code in [401, 403, 422]:
                raise
            print(f"HTTP Error {e.response.status_code}: {e.response.text}")
            time.sleep(base_delay * (2 ** attempt))
            
        except httpx.RequestError as e:
            print(f"Network error: {e}")
            time.sleep(base_delay * (2 ** attempt))

    raise Exception("Max retries exceeded.")

def flatten_data(response: dict) -> list:
    """Flattens nested analytics response."""
    flat_records = []
    for interval_data in response.get("data", []):
        interval = interval_data.get("interval")
        for group in interval_data.get("groups", []):
            queue = group.get("queue", {})
            media = group.get("mediaType", {})
            metrics = group.get("metricResults", {})
            
            flat_records.append({
                "interval": interval,
                "queue_id": queue.get("id"),
                "queue_name": queue.get("name"),
                "media_type": media.get("id"),
                "total_conversations": metrics.get("totalConversations", {}).get("value", 0),
                "total_handled": metrics.get("totalHandledConversations", {}).get("value", 0)
            })
    return flat_records

if __name__ == "__main__":
    print("Starting Analytics Query...")
    
    # 1. Authenticate
    token = get_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    # 2. Build Query
    end_dt = datetime.utcnow()
    start_dt = end_dt - timedelta(days=7)
    payload = build_query_payload(start_dt, end_dt)
    print(f"Querying data from {start_dt.isoformat()} to {end_dt.isoformat()}")

    # 3. Execute
    try:
        result = execute_query(headers, payload)
        
        # 4. Process
        flat_data = flatten_data(result)
        print(f"Retrieved {len(flat_data)} records.")
        
        # Output sample
        for record in flat_data[:3]:
            print(json.dumps(record, indent=2))
            
    except Exception as e:
        print(f"Error: {e}")

Common Errors & Debugging

Error: 422 Unprocessable Entity

Cause: The query payload contains invalid syntax, unsupported metrics, or conflicting groupings.
Fix:

  1. Verify that the metrics array contains only valid metric names for the selected media types. For example, talkTime is not valid for Chat media.
  2. Ensure dateFrom and dateTo are in strict ISO 8601 format (e.g., 2023-10-01T00:00:00Z).
  3. Check that groupings use valid types (queue, mediaType, user, etc.).
# Debugging Tip: Print the payload before sending
print(json.dumps(payload, indent=2))

Error: 403 Forbidden

Cause: The OAuth token lacks the required scope.
Fix: Ensure the Service Account or Web Application used to generate the token has the analytics:query:read scope assigned in the Genesys Cloud Admin Console under Users > Service Accounts or Integrations > OAuth 2.0.

Error: 429 Too Many Requests

Cause: Rate limit exceeded. The Analytics API has strict rate limits, especially for heavy aggregation queries.
Fix: Implement exponential backoff as shown in the execute_query function. Monitor the Retry-After header in the response.

Error: Empty data Array

Cause: No conversations match the criteria.
Fix:

  1. Verify the date range contains actual conversation data.
  2. Ensure the interval is compatible with the date range. A P1D interval on a 1-hour range may return empty results if no daily bucket is fully formed.
  3. Check if the queues specified (if using metricFilters) exist and are active.

Official References