Querying Genesys Cloud Analytics: Grouping Conversations by Queue and Media Type

Querying Genesys Cloud Analytics: Grouping Conversations by Queue and Media Type

What You Will Build

  • A script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
  • The query groups results by queue.id and mediaType to identify volume distribution across channels.
  • The implementation uses Python with the requests library for direct HTTP interaction.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth 2.0 client with the following scopes:
    • analytics:query:read
    • analytics:conversation:view
  • SDK/Library: Python 3.8+ with requests and python-dateutil.
  • Environment Variables:
    • GENESYS_REGION: The API region (e.g., mypurecloud.com, usw2.pure.cloud).
    • GENESYS_CLIENT_ID: Your OAuth Client ID.
    • GENESYS_CLIENT_SECRET: Your OAuth Client Secret.

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials grant for server-to-server integrations. You must obtain an access token before making any API calls. The token expires after 3600 seconds, so production code should implement token caching or refresh logic. For this tutorial, we will fetch a fresh token at runtime.

import requests
import os
from datetime import datetime, timezone
import json

def get_access_token() -> str:
    """
    Authenticates with Genesys Cloud OAuth and returns an access token.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # Construct the OAuth token endpoint
    token_url = f"https://{region}/oauth/token"
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    response = requests.post(token_url, headers=headers, data=data)
    
    if response.status_code != 200:
        raise Exception(f"Authentication failed: {response.status_code} - {response.text}")
    
    token_data = response.json()
    return token_data["access_token"]

# Fetch token
access_token = get_access_token()
print("Authentication successful.")

Implementation

Step 1: Constructing the Analytics Query Body

The core of this tutorial is constructing the JSON payload for POST /api/v2/analytics/conversations/details/query. This endpoint allows complex aggregations. To group by queue and media type, we must define groupBys and specify the metrics we want to aggregate.

Key parameters:

  • dateFrom / dateTo: The time window for the query. Use ISO 8601 format.
  • groupBys: An array of strings defining the dimensions to group by. We use queue.id and mediaType.
  • metrics: The values to calculate (e.g., conversationCount, handledCount).
  • filter: Optional filters to narrow down the data (e.g., specific queues or media types).
def build_analytics_query() -> dict:
    """
    Constructs the analytics query body for grouping by queue and media type.
    """
    # Define the time range: Last 24 hours
    now = datetime.now(timezone.utc)
    date_to = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    date_from = (now - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    # Required: Import timedelta
    from datetime import timedelta

    query_body = {
        "dateFrom": date_from,
        "dateTo": date_to,
        "interval": "P1D",  # Aggregate by day, though we are grouping by dimension
        "groupBys": [
                "queue.id",
                "mediaType"
            ],
        "metrics": [
            "conversationCount",
            "handledCount",
            "abandonedCount",
            "averageHandleTime"
        ],
        "filter": {
            "type": "and",
            "clauses": [
                {
                    "type": "dimension",
                    "dimension": "mediaType",
                    "operator": "eq",
                    "value": ["voice", "chat"]  # Optional: Limit to voice and chat
                }
            ]
        },
        "pageSize": 100,
        "pageToken": None
    }
    
    return query_body

Step 2: Executing the Query and Handling Pagination

The Analytics API returns paginated results. You must check the nextPageToken in the response to fetch subsequent pages until all data is retrieved. If the query returns no results, the entities array will be empty.

import time

def fetch_analytics_data(access_token: str, query_body: dict) -> list:
    """
    Sends the analytics query to Genesys Cloud and handles pagination.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    api_url = f"https://{region}/api/v2/analytics/conversations/details/query"
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    all_results = []
    page_token = None
    max_retries = 3

    while True:
        # Update pageToken in body if it exists
        if page_token:
            query_body["pageToken"] = page_token
        else:
            query_body["pageToken"] = None

        for attempt in range(max_retries):
            try:
                response = requests.post(api_url, headers=headers, json=query_body)
                
                # Handle Rate Limiting (429)
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 1))
                    print(f"Rate limited. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    continue
                
                # Handle Authentication/Authorization Errors
                if response.status_code in [401, 403]:
                    raise Exception(f"Auth Error: {response.status_code} - {response.text}")
                
                # Handle Server Errors
                if response.status_code >= 500:
                    raise Exception(f"Server Error: {response.status_code} - {response.text}")

                # Success
                break
                
            except requests.exceptions.RequestException as e:
                print(f"Request failed: {e}")
                raise

        data = response.json()
        
        # Append entities from this page
        if "entities" in data and len(data["entities"]) > 0:
            all_results.extend(data["entities"])
        
        # Check for next page
        page_token = data.get("nextPageToken")
        if not page_token:
            break
            
        # Small delay to respect rate limits between pages
        time.sleep(1)

    return all_results

Step 3: Processing and Formatting Results

The response contains an array of entities. Each entity represents a unique combination of the groupBys dimensions (Queue ID and Media Type). You need to map the queue IDs to human-readable names for useful reporting.

First, you may want to fetch the queue names to resolve the IDs.

def get_queue_names(access_token: str, queue_ids: set) -> dict:
    """
    Fetches queue details to map IDs to names.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    api_url = f"https://{region}/api/v2/queues"
    
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    queue_map = {}
    page_token = None
    
    while True:
        params = {
            "pageSize": 100,
            "pageToken": page_token if page_token else ""
        }
        
        response = requests.get(api_url, headers=headers, params=params)
        if response.status_code != 200:
            raise Exception(f"Failed to fetch queues: {response.status_code}")
        
        data = response.json()
        for queue in data.get("entities", []):
            if queue["id"] in queue_ids:
                queue_map[queue["id"]] = queue["name"]
        
        page_token = data.get("nextPageToken")
        if not page_token:
            break
            
    return queue_map

Now, combine the analytics data with the queue names.

def process_results(analytics_data: list, queue_map: dict) -> list:
    """
    Formats the raw analytics entities into a readable structure.
    """
    formatted_results = []
    
    for entity in analytics_data:
        # Extract dimensions
        queue_id = entity.get("dimensions", {}).get("queue.id", {}).get("value", "Unknown")
        media_type = entity.get("dimensions", {}).get("mediaType", {}).get("value", "Unknown")
        
        # Resolve queue name
        queue_name = queue_map.get(queue_id, queue_id)
        
        # Extract metrics
        metrics = entity.get("metrics", {})
        
        formatted_results.append({
            "Queue Name": queue_name,
            "Queue ID": queue_id,
            "Media Type": media_type,
            "Conversation Count": metrics.get("conversationCount", {}).get("value", 0),
            "Handled Count": metrics.get("handledCount", {}).get("value", 0),
            "Abandoned Count": metrics.get("abandonedCount", {}).get("value", 0),
            "Average Handle Time (seconds)": metrics.get("averageHandleTime", {}).get("value", 0)
        })
        
    return formatted_results

Complete Working Example

Below is the complete, runnable script. Save this as analytics_queue_media.py.

import requests
import os
import json
import time
from datetime import datetime, timezone, timedelta

def get_access_token() -> str:
    """
    Authenticates with Genesys Cloud OAuth and returns an access token.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    token_url = f"https://{region}/oauth/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    response = requests.post(token_url, headers=headers, data=data)
    if response.status_code != 200:
        raise Exception(f"Authentication failed: {response.status_code} - {response.text}")
    
    return response.json()["access_token"]

def get_queue_names(access_token: str, queue_ids: set) -> dict:
    """
    Fetches queue details to map IDs to names.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    api_url = f"https://{region}/api/v2/queues"
    headers = {"Authorization": f"Bearer {access_token}"}
    queue_map = {}
    page_token = None
    
    while True:
        params = {"pageSize": 100, "pageToken": page_token if page_token else ""}
        response = requests.get(api_url, headers=headers, params=params)
        if response.status_code != 200:
            raise Exception(f"Failed to fetch queues: {response.status_code}")
        
        data = response.json()
        for queue in data.get("entities", []):
            if queue["id"] in queue_ids:
                queue_map[queue["id"]] = queue["name"]
        
        page_token = data.get("nextPageToken")
        if not page_token:
            break
    return queue_map

def build_analytics_query() -> dict:
    """
    Constructs the analytics query body.
    """
    now = datetime.now(timezone.utc)
    date_to = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    date_from = (now - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")

    return {
        "dateFrom": date_from,
        "dateTo": date_to,
        "interval": "P1D",
        "groupBys": ["queue.id", "mediaType"],
        "metrics": [
            "conversationCount",
            "handledCount",
            "abandonedCount",
            "averageHandleTime"
        ],
        "filter": {
            "type": "and",
            "clauses": [
                {
                    "type": "dimension",
                    "dimension": "mediaType",
                    "operator": "eq",
                    "value": ["voice", "chat"]
                }
            ]
        },
        "pageSize": 100,
        "pageToken": None
    }

def fetch_analytics_data(access_token: str, query_body: dict) -> list:
    """
    Sends the analytics query and handles pagination.
    """
    region = os.getenv("GENESYS_REGION", "mypurecloud.com")
    api_url = f"https://{region}/api/v2/analytics/conversations/details/query"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    all_results = []
    page_token = None
    max_retries = 3

    while True:
        if page_token:
            query_body["pageToken"] = page_token
        else:
            query_body["pageToken"] = None

        for attempt in range(max_retries):
            try:
                response = requests.post(api_url, headers=headers, json=query_body)
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 1))
                    time.sleep(retry_after)
                    continue
                
                if response.status_code not in [200, 207]:
                    raise Exception(f"API Error: {response.status_code} - {response.text}")
                
                break
            except requests.exceptions.RequestException as e:
                print(f"Request failed: {e}")
                raise

        data = response.json()
        if "entities" in data and len(data["entities"]) > 0:
            all_results.extend(data["entities"])
        
        page_token = data.get("nextPageToken")
        if not page_token:
            break
        time.sleep(1)

    return all_results

def process_results(analytics_data: list, queue_map: dict) -> list:
    """
    Formats the raw analytics entities.
    """
    formatted_results = []
    for entity in analytics_data:
        dimensions = entity.get("dimensions", {})
        queue_id = dimensions.get("queue.id", {}).get("value", "Unknown")
        media_type = dimensions.get("mediaType", {}).get("value", "Unknown")
        queue_name = queue_map.get(queue_id, queue_id)
        
        metrics = entity.get("metrics", {})
        
        formatted_results.append({
            "Queue Name": queue_name,
            "Queue ID": queue_id,
            "Media Type": media_type,
            "Conversation Count": metrics.get("conversationCount", {}).get("value", 0),
            "Handled Count": metrics.get("handledCount", {}).get("value", 0),
            "Abandoned Count": metrics.get("abandonedCount", {}).get("value", 0),
            "Average Handle Time (seconds)": metrics.get("averageHandleTime", {}).get("value", 0)
        })
    return formatted_results

if __name__ == "__main__":
    try:
        # 1. Authenticate
        token = get_access_token()
        
        # 2. Build Query
        query = build_analytics_query()
        
        # 3. Fetch Data
        print("Fetching analytics data...")
        raw_data = fetch_analytics_data(token, query)
        
        # 4. Extract Queue IDs to resolve names
        queue_ids = set()
        for entity in raw_data:
            q_id = entity.get("dimensions", {}).get("queue.id", {}).get("value")
            if q_id:
                queue_ids.add(q_id)
        
        # 5. Resolve Queue Names
        if queue_ids:
            queue_map = get_queue_names(token, queue_ids)
        else:
            queue_map = {}
            
        # 6. Process and Print Results
        results = process_results(raw_data, queue_map)
        
        if not results:
            print("No data found for the specified criteria.")
        else:
            print("\n--- Analytics Report: Queue & Media Type ---")
            print(json.dumps(results, indent=2))
            
    except Exception as e:
        print(f"Error: {e}")

Common Errors & Debugging

Error: 400 Bad Request - Invalid Filter

Cause: The filter clause uses an incorrect operator or dimension name. For example, using mediaType instead of mediaType (case-sensitive) or using eq with a single value when the API expects an array.

Fix: Ensure the value in the filter is an array, even if it contains only one item.

# Correct
"value": ["voice"]

# Incorrect
"value": "voice"

Error: 429 Too Many Requests

Cause: The Analytics API has strict rate limits, especially for complex aggregation queries. Sending requests too quickly triggers a 429.

Fix: Implement exponential backoff or respect the Retry-After header. The code above includes a basic retry loop with Retry-After handling.

Error: Empty Results

Cause: The time window (dateFrom/dateTo) might not contain any conversations, or the filter is too restrictive (e.g., filtering for a queue ID that does not exist or has no activity).

Fix:

  1. Verify the time range is recent (last 24 hours).
  2. Remove the filter clause temporarily to ensure data exists.
  3. Check that the OAuth client has permissions to view the specific queues.

Error: 403 Forbidden

Cause: The OAuth client lacks the required scopes.

Fix: Ensure the client has analytics:query:read and analytics:conversation:view scopes assigned in the Genesys Cloud Admin portal under Users > OAuth Clients.

Official References