Querying Genesys Cloud Analytics: Aggregating Conversations by Queue and Media Type

Querying Genesys Cloud Analytics: Aggregating Conversations by Queue and Media Type

What You Will Build

  • A Python script that queries the Genesys Cloud Analytics API to retrieve conversation metrics aggregated by queue ID and media type.
  • The solution uses the genesys-cloud-python SDK and the /api/v2/analytics/conversations/details/query endpoint.
  • The tutorial covers Python 3.9+ and demonstrates proper OAuth2 authentication, query construction, and result parsing.

Prerequisites

  • OAuth Client Type: Private Key (JWT) or Client Credentials. For this tutorial, we assume a Private Key setup, which is standard for server-to-server integrations.
  • Required Scopes: analytics:conversation:read and analytics:queue:read.
  • SDK Version: genesys-cloud-python version 2.0.0 or higher.
  • Runtime: Python 3.9 or higher.
  • Dependencies:
    • genesys-cloud-python
    • python-dotenv (for secure credential management)

Install the dependencies using pip:

pip install genesys-cloud-python python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0. The Python SDK handles the token acquisition and refresh cycles automatically if you provide the correct configuration. You must store your private key securely. Never hardcode private keys in source code.

Create a .env file in your project root:

GENESYS_CLOUD_REGION=us-east-1 # Adjust to your region
GENESYS_CLOUD_CLIENT_ID=your-client-id-here
GENESYS_CLOUD_PRIVATE_KEY_PATH=./path/to/your/private.key

Initialize the SDK client in your script:

import os
from dotenv import load_dotenv
from purerecl import PureCloudPlatformClientV2
from purerecl.auth import OAuthClient

# Load environment variables
load_dotenv()

def init_client():
    """
    Initializes the Genesys Cloud SDK client.
    Raises an exception if authentication fails.
    """
    region = os.getenv("GENESYS_CLOUD_REGION")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    private_key_path = os.getenv("GENESYS_CLOUD_PRIVATE_KEY_PATH")

    if not all([region, client_id, private_key_path]):
        raise ValueError("Missing required environment variables.")

    # The SDK uses the default region configuration based on the provided region string
    platform_client = PureCloudPlatformClientV2(region)
    
    # Configure the OAuth client
    oauth_client = OAuthClient(
        client_id=client_id,
        private_key_path=private_key_path
    )
    
    # Authenticate and attach to the platform client
    oauth_client.authenticate()
    platform_client.set_oauth_client(oauth_client)
    
    return platform_client

if __name__ == "__main__":
    try:
        client = init_client()
        print("Authentication successful.")
    except Exception as e:
        print(f"Authentication failed: {e}")

Implementation

Step 1: Constructing the Query Body

The Analytics API requires a specific JSON structure to define the aggregation. The core object is QueryDefinition. To group by queue and media type, you must specify the groupBy array.

Key parameters:

  • groupBy: An array of strings. Use "queue" for queue-level aggregation and "mediaType" for media type (voice, chat, etc.).
  • timeframe: A string defining the start and end of the query window (ISO 8601 format).
  • interval: Optional. If omitted, the API returns a single aggregated result for the entire timeframe. If provided (e.g., "PT1H"), it breaks results down by time intervals. For this tutorial, we will omit it to get a single snapshot.
  • select: The metrics you want to retrieve. Common metrics include "totalHandleTime", "abandonCount", and "serviceLevel".

Here is the construction of the query body:

from purerecl.models import AnalyticsQueryRequest, QueryDefinition

def build_query_request(start_time: str, end_time: str) -> AnalyticsQueryRequest:
    """
    Constructs the AnalyticsQueryRequest object.
    
    Args:
        start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00Z")
        end_time: ISO 8601 end time (e.g., "2023-10-02T00:00:00Z")
    
    Returns:
        AnalyticsQueryRequest object ready for the API call.
    """
    # Define the query definition
    query_definition = QueryDefinition(
        group_by=["queue", "mediaType"],
        timeframe=f"{start_time}/{end_time}",
        # Select specific metrics. 
        # Note: You can also use "*" to select all available metrics, 
        # but it is best practice to select only what you need.
        select=[
            "totalHandleTime",
            "abandonCount",
            "offerCount",
            "answeredCount"
        ]
    )

    # Wrap in the request object
    analytics_request = AnalyticsQueryRequest(
        query=query_definition
    )
    
    return analytics_request

Step 2: Executing the Query

The analytics_api.query_conversations_details method executes the query. This endpoint supports pagination via the next_page token. Since we are grouping by queue and media type, the result set could be large if you have many queues.

The SDK returns a QueryResponse object. This object contains:

  • entities: A list of ConversationDetail objects.
  • next_page: A token for the next page of results, if applicable.
  • total: The total number of entities found.
from purerecl.apis import AnalyticsApi
from purerecl.exceptions import ApiException

def fetch_aggregated_data(client: PureCloudPlatformClientV2, request: AnalyticsQueryRequest):
    """
    Fetches aggregated conversation data, handling pagination.
    
    Args:
        client: The authenticated PureCloudPlatformClientV2 instance.
        request: The AnalyticsQueryRequest object.
    
    Returns:
        A list of ConversationDetail objects.
    """
    analytics_api = AnalyticsApi(client)
    all_results = []
    
    # Initial request
    try:
        response = analytics_api.query_conversations_details(body=request)
    except ApiException as e:
        print(f"API Error: {e.status} - {e.reason}")
        if e.status == 401:
            print("Check your OAuth token or credentials.")
        elif e.status == 403:
            print("Check your OAuth scopes. You need 'analytics:conversation:read'.")
        elif e.status == 429:
            print("Rate limited. Implement exponential backoff.")
        raise
    
    # Process the first page
    if response.entities:
        all_results.extend(response.entities)
    
    # Handle pagination
    next_page = response.next_page
    while next_page:
        try:
            # Use the next_page token in the request
            response = analytics_api.query_conversations_details(body=request, next_page=next_page)
            
            if response.entities:
                all_results.extend(response.entities)
            
            next_page = response.next_page
            
        except ApiException as e:
            print(f"Pagination Error: {e.status} - {e.reason}")
            break
            
    return all_results

Step 3: Processing the Results

The ConversationDetail object returned by the API contains the aggregated metrics. Because we grouped by queue and mediaType, each entity in the entities list represents a unique combination of a queue and a media type.

Key fields in ConversationDetail:

  • queue: Contains id and name of the queue.
  • mediaType: The string identifier for the media type (e.g., "voice", "chat", "sms").
  • totalHandleTime: Total handle time in milliseconds.
  • abandonCount: Number of abandoned conversations.
from purerecl.models import ConversationDetail

def process_results(conversation_details: list[ConversationDetail]):
    """
    Iterates through the results and prints a formatted summary.
    
    Args:
        conversation_details: List of ConversationDetail objects.
    """
    print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (ms)':<15}")
    print("-" * 70)
    
    for detail in conversation_details:
        queue_name = detail.queue.name if detail.queue else "Unknown Queue"
        media_type = detail.media_type if detail.media_type else "Unknown"
        
        # Calculate metrics safely
        answered = detail.answered_count if detail.answered_count else 0
        abandoned = detail.abandon_count if detail.abandon_count else 0
        total_handle = detail.total_handle_time if detail.total_handle_time else 0
        
        # Calculate average handle time only if there were answered conversations
        avg_handle = (total_handle / answered) if answered > 0 else 0
        
        print(f"{queue_name:<20} | {media_type:<10} | {answered:<10} | {abandoned:<10} | {avg_handle:<15.2f}")

Complete Working Example

Below is the full, runnable script. Replace the placeholder values in the .env file with your actual credentials.

import os
import sys
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv

# Import Genesys Cloud SDK components
from purerecl import PureCloudPlatformClientV2
from purerecl.auth import OAuthClient
from purerecl.apis import AnalyticsApi
from purerecl.models import AnalyticsQueryRequest, QueryDefinition
from purerecl.exceptions import ApiException

def load_config():
    """Loads environment variables from .env file."""
    load_dotenv()
    config = {
        "region": os.getenv("GENESYS_CLOUD_REGION"),
        "client_id": os.getenv("GENESYS_CLOUD_CLIENT_ID"),
        "private_key_path": os.getenv("GENESYS_CLOUD_PRIVATE_KEY_PATH")
    }
    
    for key, value in config.items():
        if not value:
            raise ValueError(f"Missing required config: {key}")
            
    return config

def init_client(config: dict) -> PureCloudPlatformClientV2:
    """Initializes and authenticates the SDK client."""
    platform_client = PureCloudPlatformClientV2(config["region"])
    
    oauth_client = OAuthClient(
        client_id=config["client_id"],
        private_key_path=config["private_key_path"]
    )
    
    oauth_client.authenticate()
    platform_client.set_oauth_client(oauth_client)
    
    return platform_client

def build_query() -> AnalyticsQueryRequest:
    """Builds the analytics query for the last 7 days."""
    now = datetime.now(timezone.utc)
    start_time = now - timedelta(days=7)
    
    # Format to ISO 8601
    start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    query_definition = QueryDefinition(
        group_by=["queue", "mediaType"],
        timeframe=f"{start_str}/{end_str}",
        select=[
            "totalHandleTime",
            "abandonCount",
            "offerCount",
            "answeredCount"
        ]
    )
    
    return AnalyticsQueryRequest(query=query_definition)

def fetch_data(client: PureCloudPlatformClientV2, request: AnalyticsQueryRequest) -> list:
    """Fetches data with pagination support."""
    analytics_api = AnalyticsApi(client)
    all_results = []
    next_page = None
    
    while True:
        try:
            if next_page:
                response = analytics_api.query_conversations_details(body=request, next_page=next_page)
            else:
                response = analytics_api.query_conversations_details(body=request)
            
            if response.entities:
                all_results.extend(response.entities)
            
            next_page = response.next_page
            if not next_page:
                break
                
        except ApiException as e:
            print(f"Error fetching data: {e.status} - {e.reason}")
            if e.status == 403:
                print("Ensure 'analytics:conversation:read' scope is granted.")
            sys.exit(1)
            
    return all_results

def print_results(results: list):
    """Prints formatted results."""
    if not results:
        print("No data found for the selected timeframe.")
        return

    print(f"{'Queue Name':<25} | {'Media Type':<10} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (ms)':<15}")
    print("-" * 80)
    
    for detail in results:
        queue_name = detail.queue.name if detail.queue else "Unassigned"
        media_type = detail.media_type if detail.media_type else "Unknown"
        
        answered = detail.answered_count if detail.answered_count else 0
        abandoned = detail.abandon_count if detail.abandon_count else 0
        total_handle = detail.total_handle_time if detail.total_handle_time else 0
        
        avg_handle = (total_handle / answered) if answered > 0 else 0
        
        # Truncate queue name if too long
        if len(queue_name) > 24:
            queue_name = queue_name[:21] + "..."
            
        print(f"{queue_name:<25} | {media_type:<10} | {answered:<10} | {abandoned:<10} | {avg_handle:<15.2f}")

if __name__ == "__main__":
    try:
        config = load_config()
        client = init_client(config)
        print("Authentication successful.")
        
        request = build_query()
        print(f"Querying analytics for last 7 days...")
        
        results = fetch_data(client, request)
        
        print_results(results)
        
    except Exception as e:
        print(f"Fatal error: {e}")
        sys.exit(1)

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth token does not have the required scopes.
Fix: Ensure your OAuth client in the Genesys Cloud Admin Console has the analytics:conversation:read scope. If you are using a Private Key client, go to Admin > Security > OAuth Clients, select your client, and add the scope.
Code Check:

# In your OAuthClient initialization, scopes are granted by the client configuration, 
# not the code itself. However, you can verify the token info if needed.
token_info = oauth_client.get_token_info()
print(token_info.scopes) # Should include 'analytics:conversation:read'

Error: 429 Too Many Requests

Cause: You have exceeded the API rate limit. Analytics queries are heavy and have stricter limits than standard CRUD operations.
Fix: Implement exponential backoff. The SDK does not automatically retry 429s for analytics queries.
Code Fix:

import time

def fetch_with_retry(client, request, max_retries=3):
    analytics_api = AnalyticsApi(client)
    for attempt in range(max_retries):
        try:
            return analytics_api.query_conversations_details(body=request)
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error: Empty Results

Cause: The timeframe is too short, or no conversations occurred in the selected queues.
Fix: Verify the timeframe string format. It must be ISO 8601 with timezone designator (Z). Also, ensure the queues you expect to see are active and have had conversations in that window.
Debugging:

# Print the timeframe being used
print(f"Timeframe: {query_definition.timeframe}")

Official References