Splitting 90-Day Analytics Queries to Avoid 413 Entity Too Large Errors

Splitting 90-Day Analytics Queries to Avoid 413 Entity Too Large Errors

What You Will Build

  • A Python script that dynamically splits a 90-day date range into smaller, compliant chunks to query Genesys Cloud Conversation Analytics.
  • An implementation using the genesys-cloud-sdk-python library that handles pagination, token refresh, and 413 error recovery.
  • A robust data aggregation pattern that merges partial results into a single coherent dataset.

Prerequisites

  • OAuth Client Type: Private key or Client Credentials flow.
  • Required Scopes: analytics:conversation:view (for reading analytics data).
  • SDK Version: genesys-cloud-sdk-python >= 170.0.0.
  • Language/Runtime: Python 3.9+.
  • External Dependencies:
    • genesys-cloud-sdk-python
    • pandas (for data manipulation, optional but recommended for this tutorial)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API interactions. The SDK handles the complexity of token acquisition and automatic refresh, but you must configure the client correctly. Using a private key is the standard for server-to-server integrations.

Install the SDK:

pip install genesys-cloud-sdk-python pandas

Initialize the client. The PureCloudPlatformClientV2 manages the session state.

import os
from purecloud_platform_client_v2 import PureCloudPlatformClientV2, Configuration

# Load credentials from environment variables
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

def get_platform_client() -> PureCloudPlatformClientV2:
    """
    Initializes and returns a configured Genesys Cloud platform client.
    """
    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # Create a new client instance
    client = PureCloudPlatformClientV2()

    # Configure the OAuth client
    # The SDK supports automatic token refresh when using client credentials
    client.set_oauth_client_credentials(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        environment=ENVIRONMENT
    )

    # Set the base URL explicitly if needed, though environment usually suffices
    client.set_base_url(f"https://{ENVIRONMENT}")

    return client

Implementation

Step 1: Define the Query Structure

The Genesys Cloud Analytics API (/api/v2/analytics/conversations/details/query) accepts a JSON body defining the time range, groupings, and metrics. A 413 Entity Too Large error occurs when the resulting dataset exceeds the server’s processing limits for a single request, or when the request body itself is overly complex. While the 90-day span is often too large for a single details query due to record counts, it is also common for summary queries to hit limits if you request too many groupings.

We will define a base query object that excludes the interval (time range), allowing us to inject different start and end dates dynamically.

from purecloud_platform_client_v2.models import ConversationQueryRequest

def build_base_query() -> dict:
    """
    Builds a base analytics query structure.
    We exclude the interval here to inject it per chunk.
    """
    # Define the metrics you want to analyze
    # For this example, we look at average handle time and total calls
    query_body = {
        "interval": "", # Placeholder, will be replaced
        "groupings": ["user"], # Group by agent
        "metrics": [
            "conversation/count",
            "conversation/hold/total",
            "conversation/wait/total",
            "conversation/work/total"
        ],
        "filters": [
            {
                "field": "conversation/type",
                "operator": "in",
                "value": ["voice"] # Only voice conversations
            }
        ]
    }
    return query_body

Step 2: Implement the Chunking Logic

The core strategy is to break the 90-day period into smaller intervals. Genesys Cloud analytics endpoints generally handle 7-day or 30-day intervals well for details, and up to 365 days for summaries. However, if you are hitting 413, you must reduce the window.

We will use a 7-day chunk size. This ensures each request is lightweight and reduces the risk of timeout or size limits.

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

def generate_date_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> list[tuple[datetime, datetime]]:
    """
    Splits a date range into smaller chunks.
    
    Args:
        start_date: The beginning of the analysis period.
        end_date: The end of the analysis period.
        chunk_days: The number of days per chunk (e.g., 7).
        
    Returns:
        A list of tuples, each containing (chunk_start, chunk_end).
    """
    chunks = []
    current_start = start_date
    
    while current_start < end_date:
        chunk_end = min(current_start + timedelta(days=chunk_days), end_date)
        chunks.append((current_start, chunk_end))
        current_start = chunk_end
        
    return chunks

Step 3: Execute Queries with Retry Logic

The SDK provides an AnalyticsApi client. We will iterate through the chunks, execute the query, and handle potential 413 errors explicitly. Although chunking should prevent 413s, it is good practice to include a fallback retry mechanism for transient issues.

from purecloud_platform_client_v2.rest import ApiException
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def query_analytics_chunk(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
    """
    Executes a single analytics query for a specific time chunk.
    
    Args:
        client: The initialized Genesys Cloud client.
        start: Start date of the chunk.
        end: End date of the chunk.
        query_template: The base query dictionary.
        
    Returns:
        A list of result objects from the API.
    """
    # Clone the template and set the interval
    # Format: ISO 8601 with timezone
    interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
    
    request_body = query_template.copy()
    request_body["interval"] = interval_str

    api_instance = client.analytics_api
    
    try:
        # Call the API
        # Note: The SDK method name corresponds to POST /api/v2/analytics/conversations/details/query
        response = api_instance.post_analytics_conversations_details_query(
            body=request_body,
            async_req=False
        )
        
        logger.info(f"Successfully retrieved data for {start.date()} to {end.date()}")
        return response.entities
        
    except ApiException as e:
        if e.status == 413:
            logger.error(f"413 Entity Too Large for interval {interval_str}. Consider reducing chunk size.")
            raise
        elif e.status == 429:
            # Rate limit handling
            retry_after = int(e.headers.get('Retry-After', 5))
            logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
            time.sleep(retry_after)
            # Simple retry once
            return query_analytics_chunk(client, start, end, query_template)
        else:
            logger.error(f"API Error {e.status}: {e.body}")
            raise

Step 4: Aggregate Results

The API returns paginated results. You must iterate through all pages for each chunk to ensure you capture all data. The post_analytics_conversations_details_query returns an object with an entities list and a nextPageUri.

def fetch_all_pages(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
    """
    Fetches all pages of results for a single chunk.
    """
    all_entities = []
    
    # Initial call
    response = query_analytics_chunk(client, start, end, query_template)
    all_entities.extend(response)
    
    # Check for pagination
    # Note: The SDK response object structure may vary slightly by version.
    # In newer SDKs, the response object from post_analytics_conversations_details_query
    # is a ConversationDetailsQueryResponse.
    # We need to check if there is a next page.
    
    # However, the SDK often handles pagination internally if you use the iterator,
    # but for explicit control, we check the response object.
    # The raw response object from the API call contains 'nextPageUri'.
    # Since the SDK wraps this, we look at the return value of the API method.
    # Actually, the SDK method returns the body. We need to access the full response to get nextPageUri?
    # No, the SDK's post_analytics_conversations_details_query returns the entity list directly in some versions,
    # or a wrapper object. Let's assume the standard wrapper.
    
    # Correction: The SDK method post_analytics_conversations_details_query returns 
    # a 'ConversationDetailsQueryResponse' object.
    # We need to check if 'next_page_uri' exists.
    
    # Let's adjust the previous function to return the full response object to handle pagination here.
    pass

# Revised approach for pagination handling within the chunk function
def query_analytics_chunk_with_pagination(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
    """
    Executes a single analytics query for a specific time chunk, handling pagination.
    """
    all_entities = []
    
    interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
    request_body = query_template.copy()
    request_body["interval"] = interval_str
    
    api_instance = client.analytics_api
    next_page_uri = None
    
    while True:
        try:
            if next_page_uri:
                # Subsequent calls use the next page URI
                response = api_instance.get_analytics_conversations_details_query(
                    next_page_uri=next_page_uri,
                    async_req=False
                )
            else:
                # Initial call
                response = api_instance.post_analytics_conversations_details_query(
                    body=request_body,
                    async_req=False
                )
            
            # Extract entities
            if response.entities:
                all_entities.extend(response.entities)
            
            # Check for next page
            if response.next_page_uri:
                next_page_uri = response.next_page_uri
            else:
                break # No more pages
                
        except ApiException as e:
            if e.status == 413:
                logger.error(f"413 Entity Too Large for interval {interval_str}.")
                raise
            elif e.status == 429:
                retry_after = int(e.headers.get('Retry-After', 5))
                logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
                time.sleep(retry_after)
                continue # Retry the current page
            else:
                logger.error(f"API Error {e.status}: {e.body}")
                raise
                
    return all_entities

Complete Working Example

This script combines all components into a runnable module. It retrieves conversation analytics for the last 90 days, split into 7-day chunks.

import os
import logging
from datetime import datetime, timedelta
from purecloud_platform_client_v2 import PureCloudPlatformClientV2
from purecloud_platform_client_v2.rest import ApiException

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_platform_client() -> PureCloudPlatformClientV2:
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
    ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not CLIENT_ID or not CLIENT_SECRET:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    client = PureCloudPlatformClientV2()
    client.set_oauth_client_credentials(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        environment=ENVIRONMENT
    )
    return client

def build_base_query() -> dict:
    return {
        "interval": "", 
        "groupings": ["user"], 
        "metrics": [
            "conversation/count",
            "conversation/hold/total",
            "conversation/wait/total",
            "conversation/work/total"
        ],
        "filters": [
            {
                "field": "conversation/type",
                "operator": "in",
                "value": ["voice"]
            }
        ]
    }

def generate_date_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> list[tuple[datetime, datetime]]:
    chunks = []
    current_start = start_date
    
    while current_start < end_date:
        chunk_end = min(current_start + timedelta(days=chunk_days), end_date)
        chunks.append((current_start, chunk_end))
        current_start = chunk_end
        
    return chunks

def query_analytics_chunk_with_pagination(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
    all_entities = []
    
    interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
    request_body = query_template.copy()
    request_body["interval"] = interval_str
    
    api_instance = client.analytics_api
    next_page_uri = None
    
    while True:
        try:
            if next_page_uri:
                response = api_instance.get_analytics_conversations_details_query(
                    next_page_uri=next_page_uri,
                    async_req=False
                )
            else:
                response = api_instance.post_analytics_conversations_details_query(
                    body=request_body,
                    async_req=False
                )
            
            if response.entities:
                all_entities.extend(response.entities)
            
            if response.next_page_uri:
                next_page_uri = response.next_page_uri
            else:
                break
                
        except ApiException as e:
            if e.status == 413:
                logger.error(f"413 Entity Too Large for interval {interval_str}.")
                raise
            elif e.status == 429:
                retry_after = int(e.headers.get('Retry-After', 5))
                logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
                time.sleep(retry_after)
                continue
            else:
                logger.error(f"API Error {e.status}: {e.body}")
                raise
                
    return all_entities

def main():
    try:
        client = get_platform_client()
        logger.info("Client initialized successfully.")
        
        # Define date range: Last 90 days
        end_date = datetime.utcnow()
        start_date = end_date - timedelta(days=90)
        
        logger.info(f"Querying analytics from {start_date.date()} to {end_date.date()}")
        
        # Generate chunks
        chunks = generate_date_chunks(start_date, end_date, chunk_days=7)
        logger.info(f"Splitting date range into {len(chunks)} chunks.")
        
        all_data = []
        
        # Process each chunk
        for i, (chunk_start, chunk_end) in enumerate(chunks):
            logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start.date()} to {chunk_end.date()}")
            
            query_template = build_base_query()
            
            try:
                chunk_data = query_analytics_chunk_with_pagination(client, chunk_start, chunk_end, query_template)
                all_data.extend(chunk_data)
                logger.info(f"Retrieved {len(chunk_data)} records for this chunk.")
            except Exception as e:
                logger.error(f"Failed to process chunk {i+1}: {e}")
                # Depending on requirements, you might want to skip or abort
                continue
                
        logger.info(f"Query complete. Total records retrieved: {len(all_data)}")
        
        # Optional: Save to CSV or process further
        if all_data:
            import pandas as pd
            # Flatten the data if necessary for pandas
            # The structure depends on the exact metrics and groupings
            df = pd.DataFrame(all_data)
            df.to_csv("analytics_90_days.csv", index=False)
            logger.info("Data saved to analytics_90_days.csv")
            
    except Exception as e:
        logger.error(f"Fatal error: {e}")
        raise

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 413 Entity Too Large

What causes it:
The server rejects the request because the expected payload size (either the request body or the resulting dataset) exceeds the configured limit. For Analytics APIs, this often happens when:

  1. The time interval is too large (e.g., >30 days for details queries).
  2. The number of groupings is excessive (e.g., grouping by user, skill, and queue simultaneously).
  3. The number of metrics requested is high.

How to fix it:

  1. Reduce Chunk Size: Change chunk_days in generate_date_chunks from 7 to 3 or 1.
  2. Simplify Groupings: Remove unnecessary groupings from query_template.
  3. Use Summary API: If you do not need individual conversation records, switch to /api/v2/analytics/conversations/summary/query.

Code Fix:
Adjust the chunk size in the main function:

# Change from 7 to 3 days
chunks = generate_date_chunks(start_date, end_date, chunk_days=3)

Error: 401 Unauthorized

What causes it:
The OAuth token is invalid, expired, or the client credentials are incorrect.

How to fix it:

  1. Verify GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are correct.
  2. Ensure the client has the analytics:conversation:view scope.
  3. Check if the private key has been revoked.

Error: 429 Too Many Requests

What causes it:
You are exceeding the Genesys Cloud API rate limits. Analytics queries are heavy and consume more quota than standard CRUD operations.

How to fix it:

  1. Implement exponential backoff.
  2. Reduce the frequency of calls by increasing the chunk size (if not hitting 413).
  3. Use the Retry-After header value as shown in the code.

Official References