Parse Genesys Cloud v2 Analytics Conversation Aggregate JSON

Parse Genesys Cloud v2 Analytics Conversation Aggregate JSON

What You Will Build

  • You will build a Python script that queries the Genesys Cloud Analytics API for conversation aggregates and flattens the nested JSON response into a structured list of dictionaries.
  • This tutorial uses the PureCloudPlatformClientV2 Python SDK and the underlying HTTP response structures.
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client Type: A Public or Confidential Client with the scope analytics:conversation:view.
  • SDK Version: genesys-cloud-purecloud-platform-client v123.0.0 or later.
  • Runtime: Python 3.9+ with venv or conda.
  • Dependencies:
    • genesys-cloud-purecloud-platform-client
    • pandas (optional, for final data manipulation)
    • python-dotenv (for secure credential management)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API requests. For server-to-server integrations, you must use the Client Credentials flow. The SDK handles token caching and refresh automatically, but you must initialize the client correctly.

Create a .env file in your project root:

# .env
GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_ENVIRONMENT=us-east-1 # e.g., us-east-1, eu-west-1

Initialize the API client in your script:

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    AnalyticsApi
)

load_dotenv()

def get_analytics_api_instance() -> AnalyticsApi:
    """
    Configures and returns an authenticated AnalyticsApi instance.
    """
    # 1. Load configuration from environment variables
    configuration = Configuration()
    configuration.host = f"https://{os.getenv('GENESYS_ENVIRONMENT')}.mypurecloud.com"
    
    # 2. Set up the OAuth client credentials
    configuration.client_id = os.getenv('GENESYS_CLIENT_ID')
    configuration.client_secret = os.getenv('GENESYS_CLIENT_SECRET')
    
    # 3. Create the API client with automatic token management
    api_client = ApiClient(configuration=configuration)
    
    # 4. Instantiate the specific API module
    analytics_api = AnalyticsApi(api_client)
    
    return analytics_api

Implementation

Step 1: Constructing the Aggregate Query

The endpoint POST /api/v2/analytics/conversations/aggregates/query accepts a complex JSON body. The most common mistake is misaligning the filters, groups, and metrics structures.

You must define:

  1. timeFilter: The start and end time (ISO 8601).
  2. filters: Constraints on the data (e.g., queue.id, conversation.type).
  3. groups: How to split the data (e.g., by queue.id).
  4. metrics: What numbers to calculate (e.g., conversation.count, handledDuration).
from purecloudplatformclientv2 import (
    ConversationAggregateQuery,
    ConversationTimeFilter,
    ConversationAggregateGroup,
    ConversationAggregateMetric
)
from datetime import datetime, timedelta, timezone

def build_query_body() -> ConversationAggregateQuery:
    """
    Constructs a query for the last 24 hours of voice conversations,
    grouped by Queue ID.
    """
    now = datetime.now(timezone.utc)
    start_time = now - timedelta(hours=24)

    # 1. Time Filter
    time_filter = ConversationTimeFilter(
        start_time=start_time.isoformat(),
        end_time=now.isoformat()
    )

    # 2. Filters: Only include Voice conversations
    # Note: The SDK allows direct object construction or dictionary mapping.
    # Using dictionaries is often clearer for complex nested filters.
    filters = {
        "conversationType": {
            "operation": "equals",
            "value": "voice"
        }
    }

    # 3. Groups: Split results by Queue ID
    groups = [
        ConversationAggregateGroup(
            field="queue.id",
            type="queue"
        )
    ]

    # 4. Metrics: Count of conversations and total handled duration
    metrics = [
        ConversationAggregateMetric(name="conversationCount"),
        ConversationAggregateMetric(name="handledDuration")
    ]

    # Assemble the final query object
    query = ConversationAggregateQuery(
        time_filter=time_filter,
        filters=filters,
        groups=groups,
        metrics=metrics,
        size=50  # Max 50 items per page for aggregates
    )

    return query

Step 2: Executing the Query and Handling Pagination

The Analytics API returns paginated results. You must check the nextPageToken in the response to fetch subsequent pages. If you ignore pagination, you will only analyze the first 50 records.

from purecloudplatformclientv2.rest import ApiException
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_all_aggregates(analytics_api: AnalyticsApi, query: ConversationAggregateQuery):
    """
    Fetches all pages of aggregate data.
    """
    all_results = []
    page_token = None
    page_number = 1

    while True:
        try:
            logger.info(f"Fetching page {page_number}...")
            
            # The SDK method expects the query object
            response = analytics_api.post_analytics_conversations_aggregates_query(
                body=query,
                page_token=page_token
            )

            # Append the current page's entities
            if response.entities:
                all_results.extend(response.entities)
            
            # Check for next page
            if response.next_page_token:
                page_token = response.next_page_token
                page_number += 1
            else:
                logger.info("No more pages found.")
                break

        except ApiException as e:
            logger.error(f"API Exception: {e.status} {e.reason}")
            if e.status == 429:
                logger.warning("Rate limited. Implement exponential backoff in production.")
            elif e.status in [401, 403]:
                logger.error("Authentication or Authorization failed. Check scopes.")
            raise

    return all_results

Step 3: Parsing the Nested JSON Structure

The core of this tutorial is parsing the entities list. Each entity is a ConversationAggregateEntity. The structure is nested because the groups and metrics are separated.

Raw JSON Structure Example:

{
  "entities": [
    {
      "id": "queue-id-123",
      "groups": {
        "queue.id": "queue-id-123"
      },
      "metrics": {
        "conversationCount": { "value": 150 },
        "handledDuration": { "value": 45000 }
      }
    }
  ]
}

The Parsing Logic:

  1. Iterate through response.entities.
  2. Extract the group key (e.g., queue.id).
  3. Extract the metric values. Note that metrics are wrapped in an object with a value key.
  4. Flatten into a simple dictionary for database insertion or DataFrame creation.
from purecloudplatformclientv2 import ConversationAggregateEntity
from typing import List, Dict, Any

def parse_aggregate_entities(entities: List[ConversationAggregateEntity]) -> List[Dict[str, Any]]:
    """
    Flattens nested ConversationAggregateEntity objects into simple dictionaries.
    """
    parsed_data = []

    for entity in entities:
        # Initialize a record for this aggregate
        record = {}

        # 1. Extract Group Keys
        # The 'groups' attribute is a dictionary where keys are the group fields
        # and values are the actual group values (e.g., Queue ID, User ID).
        if entity.groups:
            for group_field, group_value in entity.groups.items():
                # Sanitize field names for use as dictionary keys
                safe_key = group_field.replace(".", "_")
                record[safe_key] = group_value
        
        # 2. Extract Metric Values
        # The 'metrics' attribute is a dictionary where keys are metric names
        # and values are objects containing 'value', 'min', 'max', etc.
        if entity.metrics:
            for metric_name, metric_obj in entity.metrics.items():
                # We primarily want the aggregate value
                # metric_obj is a ConversationAggregateMetricValue object
                if hasattr(metric_obj, 'value') and metric_obj.value is not None:
                    record[metric_name] = metric_obj.value
                else:
                    record[metric_name] = 0 # Handle null metrics gracefully

        # 3. Add Metadata (Optional)
        # The entity ID is usually the composite key of the groups
        record['aggregate_id'] = entity.id

        parsed_data.append(record)

    return parsed_data

Complete Working Example

This script combines authentication, query construction, pagination, and parsing into a single runnable file.

import os
import json
import logging
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    AnalyticsApi,
    ConversationAggregateQuery,
    ConversationTimeFilter,
    ConversationAggregateGroup,
    ConversationAggregateMetric
)
from purecloudplatformclientv2.rest import ApiException
from typing import List, Dict, Any

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_analytics_api_instance() -> AnalyticsApi:
    """Initializes the Genesys Cloud Analytics API client."""
    load_dotenv()
    
    configuration = Configuration()
    env = os.getenv('GENESYS_ENVIRONMENT', 'us-east-1')
    configuration.host = f"https://{env}.mypurecloud.com"
    configuration.client_id = os.getenv('GENESYS_CLIENT_ID')
    configuration.client_secret = os.getenv('GENESYS_CLIENT_SECRET')
    
    if not configuration.client_id or not configuration.client_secret:
        raise ValueError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET in .env")

    api_client = ApiClient(configuration=configuration)
    return AnalyticsApi(api_client)

def build_query_body() -> ConversationAggregateQuery:
    """Builds the query for voice conversations grouped by Queue."""
    now = datetime.now(timezone.utc)
    start_time = now - timedelta(hours=24)

    time_filter = ConversationTimeFilter(
        start_time=start_time.isoformat(),
        end_time=now.isoformat()
    )

    filters = {
        "conversationType": {
            "operation": "equals",
            "value": "voice"
        }
    }

    groups = [
        ConversationAggregateGroup(field="queue.id", type="queue")
    ]

    metrics = [
        ConversationAggregateMetric(name="conversationCount"),
        ConversationAggregateMetric(name="handledDuration"),
        ConversationAggregateMetric(name="abandonCount")
    ]

    return ConversationAggregateQuery(
        time_filter=time_filter,
        filters=filters,
        groups=groups,
        metrics=metrics,
        size=50
    )

def fetch_all_aggregates(analytics_api: AnalyticsApi, query: ConversationAggregateQuery) -> List[Any]:
    """Fetches all pages of aggregate data."""
    all_entities = []
    page_token = None
    page_number = 1

    while True:
        try:
            logger.info(f"Fetching page {page_number}...")
            response = analytics_api.post_analytics_conversations_aggregates_query(
                body=query,
                page_token=page_token
            )

            if response.entities:
                all_entities.extend(response.entities)
            
            if response.next_page_token:
                page_token = response.next_page_token
                page_number += 1
            else:
                break

        except ApiException as e:
            logger.error(f"API Error: {e.status} {e.reason}")
            if e.body:
                logger.error(f"Response Body: {e.body}")
            raise

    return all_entities

def parse_aggregate_entities(entities: List[Any]) -> List[Dict[str, Any]]:
    """Flattens nested entity objects into simple dictionaries."""
    parsed_data = []

    for entity in entities:
        record = {}

        # Extract Groups
        if entity.groups:
            for group_field, group_value in entity.groups.items():
                record[group_field.replace(".", "_")] = group_value
        
        # Extract Metrics
        if entity.metrics:
            for metric_name, metric_obj in entity.metrics.items():
                # Access the 'value' attribute from the metric object
                value = metric_obj.value if hasattr(metric_obj, 'value') else 0
                record[metric_name] = value

        parsed_data.append(record)

    return parsed_data

def main():
    try:
        logger.info("Initializing Analytics API...")
        analytics_api = get_analytics_api_instance()

        logger.info("Building Query...")
        query = build_query_body()

        logger.info("Fetching Data...")
        raw_entities = fetch_all_aggregates(analytics_api, query)

        logger.info(f"Fetched {len(raw_entities)} total entities. Parsing...")
        flattened_data = parse_aggregate_entities(raw_entities)

        # Output results
        logger.info("Results:")
        for row in flattened_data:
            print(json.dumps(row, indent=2))

        # Optional: Save to JSON file
        with open('analytics_aggregates.json', 'w') as f:
            json.dump(flattened_data, f, indent=2)
        logger.info("Data saved to analytics_aggregates.json")

    except Exception as e:
        logger.error(f"Fatal error: {e}")
        raise

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request - “Invalid Query”

Cause: The timeFilter is malformed, or the groups field contains a value that does not exist in your Genesys Cloud instance (e.g., grouping by user.id when no users are associated with the filtered conversations).

Fix:

  1. Verify start_time and end_time are valid ISO 8601 strings with timezone info (e.g., 2023-10-27T12:00:00Z).
  2. Ensure the filters are not too restrictive. If you filter by a specific queue.id that had no conversations in the last 24 hours, the API returns an empty list, not an error. However, if you group by a field that is incompatible with the metric, you get a 400.

Debug Code:

# Print the raw JSON body before sending
import json
query_dict = analytics_api.api_client.sanitize_for_serialization(query)
print("Sending Query:", json.dumps(query_dict, indent=2))

Error: 429 Too Many Requests

Cause: You are hitting the Analytics API rate limit. Analytics queries are expensive.

Fix: Implement exponential backoff. The SDK does not handle retries automatically for 429s.

import time

def fetch_with_retry(analytics_api, query, max_retries=3):
    for attempt in range(max_retries):
        try:
            return analytics_api.post_analytics_conversations_aggregates_query(
                body=query
            )
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt
                logger.warning(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error: Metric Value is None

Cause: The metric exists in the response structure, but the value is null because no data matched the filter for that specific group.

Fix: Always check for None before processing numeric metrics.

if metric_obj.value is not None:
    record[metric_name] = metric_obj.value
else:
    record[metric_name] = 0

Official References