Splitting Large Analytics Queries to Avoid 413 Entity Too Large Errors

Splitting Large Analytics Queries to Avoid 413 Entity Too Large Errors

What You Will Build

  • You will build a Python script that retrieves conversation detail data from Genesys Cloud CX for a 90-day period without triggering a 413 Entity Too Large error.
  • This tutorial uses the Genesys Cloud CX Python SDK (genesyscloud) and the /api/v2/analytics/conversations/details/query endpoint.
  • The code is written in Python 3.9+ using asyncio and httpx for underlying HTTP handling via the SDK.

Prerequisites

  • OAuth Client Type: Service Account or Client Credentials Flow.
  • Required Scopes: analytics:conversation:read and analytics:report:read.
  • SDK Version: genesyscloud >= 140.0.0 (ensure you use the latest stable release).
  • Language/Runtime: Python 3.9 or higher.
  • Dependencies:
    • genesyscloud
    • pandas (optional, for data aggregation)
    • python-dotenv (for secure credential management)

Install the dependencies using pip:

pip install genesyscloud pandas python-dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. The SDK handles token acquisition and refresh automatically when you initialize the platform client. You must configure the client with your organization domain, client ID, and client secret.

Create a .env file in your project root with the following variables:

GENESYS_CLOUD_DOMAIN="your-org.mygen.com"
GENESYS_CLOUD_CLIENT_ID="your-client-id"
GENESYS_CLOUD_CLIENT_SECRET="your-client-secret"

Initialize the authentication in your code:

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient

load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes and returns a configured Genesys Cloud Platform Client.
    Uses Client Credentials flow for service accounts.
    """
    domain = os.getenv("GENESYS_CLOUD_DOMAIN")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not all([domain, client_id, client_secret]):
        raise ValueError("Missing required environment variables for Genesys Cloud authentication.")

    # Initialize the platform client
    platform_client = PlatformClient()

    # Configure OAuth with client credentials flow
    oauth_client = platform_client.oauth_client
    oauth_client.client_id = client_id
    oauth_client.client_secret = client_secret
    oauth_client.domain = domain

    return platform_client

Implementation

The core issue with the 413 Entity Too Large error in the Analytics API is the size of the JSON payload sent in the POST body. The /api/v2/analytics/conversations/details/query endpoint accepts a complex JSON body containing filters, groupings, and metrics. When you request data for a long duration (e.g., 90 days), the resulting query object can exceed the server’s maximum request body size limit, especially if you include many filters or groupings.

The solution is to split the date range into smaller chunks (e.g., 7-day or 14-day intervals), execute separate queries for each chunk, and then aggregate the results.

Step 1: Define the Query Chunking Logic

First, create a function that splits a start and end date into smaller intervals. This ensures each individual API call has a manageable payload size.

from datetime import datetime, timedelta
from typing import List, Tuple

def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> List[Tuple[str, str]]:
    """
    Splits a date range into smaller chunks to avoid 413 errors.
    
    Args:
        start_date: Start date in ISO format (YYYY-MM-DD)
        end_date: End date in ISO format (YYYY-MM-DD)
        chunk_days: Number of days per chunk (default 14)
        
    Returns:
        List of tuples, each containing (chunk_start, chunk_end) in ISO format
    """
    start = datetime.fromisoformat(start_date)
    end = datetime.fromisoformat(end_date)
    
    chunks = []
    current_start = start
    
    while current_start < end:
        chunk_end = min(current_start + timedelta(days=chunk_days), end)
        chunks.append((current_start.isoformat()[:10], chunk_end.isoformat()[:10]))
        current_start = chunk_end
        
    return chunks

Step 2: Construct the Analytics Query Payload

The Analytics API requires a specific JSON structure for the query body. You must define the metrics, filters, and groupings. For this example, we will retrieve conversation details grouped by user.

from purecloudplatformclientv2.models import QueryConversationDetailRequest, ConversationDetailFilter, ConversationDetailGroupBy

def create_query_payload(start_date: str, end_date: str, user_ids: List[str] = None) -> QueryConversationDetailRequest:
    """
    Creates a QueryConversationDetailRequest object for a specific date range.
    
    Args:
        start_date: Start date in ISO format
        end_date: End date in ISO format
        user_ids: Optional list of user IDs to filter by
        
    Returns:
        Configured QueryConversationDetailRequest object
    """
    # Initialize the request object
    query_request = QueryConversationDetailRequest()
    
    # Set the date range
    query_request.date_from = start_date + "T00:00:00.000Z"
    query_request.date_to = end_date + "T23:59:59.999Z"
    
    # Define filters
    filters = ConversationDetailFilter()
    
    # Filter by conversation type (e.g., voice)
    filters.conversation_type = ["voice"]
    
    # Optional: Filter by specific users
    if user_ids:
        filters.user_ids = user_ids
    
    query_request.filter = filters
    
    # Define groupings
    group_by = ConversationDetailGroupBy()
    group_by.user = True  # Group results by user
    
    query_request.group_by = group_by
    
    # Define metrics (optional, defaults are often sufficient for details)
    # metrics = ConversationDetailMetrics()
    # metrics.handle_time = True
    # query_request.metrics = metrics
    
    return query_request

Step 3: Execute Queries and Handle Pagination

The Analytics API returns paginated results. You must handle the nextPageToken to retrieve all data for each chunk. Additionally, you must implement retry logic for 429 Too Many Requests errors, which are common when making multiple sequential API calls.

import time
import logging
from purecloudplatformclientv2 import AnalyticsApi
from purecloudplatformclientv2.rest import ApiException

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def fetch_chunk_data(analytics_api: AnalyticsApi, start_date: str, end_date: str, user_ids: List[str] = None) -> dict:
    """
    Fetches analytics data for a single date chunk with pagination and retry logic.
    
    Args:
        analytics_api: The AnalyticsApi instance
        start_date: Start date in ISO format
        end_date: End date in ISO format
        user_ids: Optional list of user IDs
        
    Returns:
        Dictionary containing the aggregated results for this chunk
    """
    query_request = create_query_payload(start_date, end_date, user_ids)
    
    all_results = []
    next_page_token = None
    max_retries = 3
    retry_delay = 2  # seconds
    
    for attempt in range(max_retries):
        try:
            while True:
                # Execute the query
                if next_page_token:
                    response = analytics_api.post_analytics_conversations_details_query(
                        body=query_request,
                        page_token=next_page_token
                    )
                else:
                    response = analytics_api.post_analytics_conversations_details_query(
                        body=query_request
                    )
                
                # Accumulate results
                if response.entities:
                    all_results.extend(response.entities)
                
                # Check for pagination
                if response.page_token:
                    next_page_token = response.page_token
                else:
                    break
            
            return {
                "start_date": start_date,
                "end_date": end_date,
                "entities": all_results,
                "total_count": response.total_count
            }
            
        except ApiException as e:
            if e.status == 429:
                logger.warning(f"Rate limited (429). Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
                continue
            elif e.status == 413:
                logger.error(f"413 Entity Too Large. Chunk size may still be too large. Reduce chunk_days.")
                raise e
            else:
                logger.error(f"API Error: {e.status} - {e.reason}")
                raise e
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            raise e
    
    raise Exception("Max retries exceeded for 429 errors.")

Step 4: Orchestrate the Full 90-Day Query

Combine the chunking logic and the fetch function to process the entire date range.

async def fetch_90_day_analytics(platform_client: PlatformClient, start_date: str, end_date: str, user_ids: List[str] = None) -> list:
    """
    Fetches analytics data for a 90-day period by splitting into chunks.
    
    Args:
        platform_client: The initialized PlatformClient
        start_date: Start date in ISO format (YYYY-MM-DD)
        end_date: End date in ISO format (YYYY-MM-DD)
        user_ids: Optional list of user IDs
        
    Returns:
        List of all conversation detail entities
    """
    # Get the Analytics API instance
    analytics_api = AnalyticsApi(platform_client)
    
    # Split the date range into chunks
    chunks = split_date_range(start_date, end_date, chunk_days=14)
    
    logger.info(f"Splitting date range into {len(chunks)} chunks.")
    
    all_entities = []
    
    for i, (chunk_start, chunk_end) in enumerate(chunks):
        logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
        
        try:
            chunk_data = await fetch_chunk_data(analytics_api, chunk_start, chunk_end, user_ids)
            all_entities.extend(chunk_data["entities"])
            logger.info(f"Retrieved {len(chunk_data['entities'])} entities for this chunk.")
            
            # Optional: Add a small delay between chunks to be respectful of rate limits
            time.sleep(1)
            
        except Exception as e:
            logger.error(f"Failed to process chunk {chunk_start} to {chunk_end}: {str(e)}")
            raise e
    
    logger.info(f"Total entities retrieved: {len(all_entities)}")
    return all_entities

Complete Working Example

Here is the full, copy-pasteable script. Save this as gen_analytics_split.py.

import os
import asyncio
import logging
from datetime import datetime
from typing import List

from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient, AnalyticsApi, ApiException
from purecloudplatformclientv2.models import QueryConversationDetailRequest, ConversationDetailFilter, ConversationDetailGroupBy

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def load_env():
    """Loads environment variables from .env file."""
    load_dotenv()
    return {
        "domain": os.getenv("GENESYS_CLOUD_DOMAIN"),
        "client_id": os.getenv("GENESYS_CLOUD_CLIENT_ID"),
        "client_secret": os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
    }

def get_platform_client() -> PlatformClient:
    """Initializes the Genesys Cloud Platform Client."""
    env_vars = load_env()
    
    if not all(env_vars.values()):
        raise ValueError("Missing required environment variables.")
        
    platform_client = PlatformClient()
    oauth_client = platform_client.oauth_client
    oauth_client.client_id = env_vars["client_id"]
    oauth_client.client_secret = env_vars["client_secret"]
    oauth_client.domain = env_vars["domain"]
    
    return platform_client

def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> List[tuple]:
    """Splits a date range into smaller chunks."""
    start = datetime.fromisoformat(start_date)
    end = datetime.fromisoformat(end_date)
    chunks = []
    current_start = start
    
    while current_start < end:
        chunk_end = min(current_start + timedelta(days=chunk_days), end)
        chunks.append((current_start.isoformat()[:10], chunk_end.isoformat()[:10]))
        current_start = chunk_end
    return chunks

def create_query_payload(start_date: str, end_date: str, user_ids: List[str] = None) -> QueryConversationDetailRequest:
    """Creates the analytics query request object."""
    query_request = QueryConversationDetailRequest()
    query_request.date_from = start_date + "T00:00:00.000Z"
    query_request.date_to = end_date + "T23:59:59.999Z"
    
    filters = ConversationDetailFilter()
    filters.conversation_type = ["voice"]
    if user_ids:
        filters.user_ids = user_ids
    query_request.filter = filters
    
    group_by = ConversationDetailGroupBy()
    group_by.user = True
    query_request.group_by = group_by
    
    return query_request

async def fetch_chunk_data(analytics_api: AnalyticsApi, start_date: str, end_date: str, user_ids: List[str] = None) -> dict:
    """Fetches data for a single chunk with pagination and retry."""
    query_request = create_query_payload(start_date, end_date, user_ids)
    all_results = []
    next_page_token = None
    max_retries = 3
    retry_delay = 2
    
    for attempt in range(max_retries):
        try:
            while True:
                if next_page_token:
                    response = analytics_api.post_analytics_conversations_details_query(
                        body=query_request,
                        page_token=next_page_token
                    )
                else:
                    response = analytics_api.post_analytics_conversations_details_query(
                        body=query_request
                    )
                
                if response.entities:
                    all_results.extend(response.entities)
                
                if response.page_token:
                    next_page_token = response.page_token
                else:
                    break
            
            return {
                "start_date": start_date,
                "end_date": end_date,
                "entities": all_results,
                "total_count": response.total_count
            }
            
        except ApiException as e:
            if e.status == 429:
                logger.warning(f"Rate limited (429). Retrying in {retry_delay}s...")
                await asyncio.sleep(retry_delay)
                retry_delay *= 2
                continue
            elif e.status == 413:
                logger.error("413 Entity Too Large. Reduce chunk_days.")
                raise e
            else:
                logger.error(f"API Error: {e.status} - {e.reason}")
                raise e

async def main():
    """Main execution function."""
    # Define date range (90 days back from today)
    end_date = datetime.now().strftime("%Y-%m-%d")
    start_date = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
    
    logger.info(f"Starting analytics fetch from {start_date} to {end_date}")
    
    platform_client = get_platform_client()
    analytics_api = AnalyticsApi(platform_client)
    
    chunks = split_date_range(start_date, end_date, chunk_days=14)
    logger.info(f"Created {len(chunks)} chunks.")
    
    all_entities = []
    
    for i, (chunk_start, chunk_end) in enumerate(chunks):
        logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
        try:
            chunk_data = await fetch_chunk_data(analytics_api, chunk_start, chunk_end)
            all_entities.extend(chunk_data["entities"])
            logger.info(f"Chunk complete. Total entities so far: {len(all_entities)}")
            await asyncio.sleep(1)  # Respect rate limits
        except Exception as e:
            logger.error(f"Error in chunk {chunk_start}: {str(e)}")
            break
            
    logger.info(f"Fetch complete. Total entities: {len(all_entities)}")
    
    # Example: Save to JSON
    import json
    with open("analytics_results.json", "w") as f:
        json.dump([entity.to_dict() for entity in all_entities], f, indent=2, default=str)
    
    logger.info("Results saved to analytics_results.json")

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 413 Entity Too Large

  • Cause: The JSON body sent to the API exceeds the server’s maximum request size. This often happens with large date ranges, many filters, or complex groupings.
  • Fix: Reduce the chunk_days parameter in split_date_range. Start with 7 days if 14 days fails. Also, review the filters and group_by objects to ensure they are not unnecessarily large.

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for the Analytics API. The limit is typically requests per minute.
  • Fix: Implement exponential backoff (as shown in the code). Increase the retry_delay and add asyncio.sleep() between chunks. If the error persists, reduce the concurrency if you are using multiple threads/async tasks.

Error: 401 Unauthorized

  • Cause: Invalid or expired OAuth token.
  • Fix: Ensure your GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET are correct. Check that the service account has the analytics:conversation:read scope assigned.

Error: 403 Forbidden

  • Cause: The service account lacks permission to access the requested data.
  • Fix: Verify the service account has the analytics:conversation:read scope. Also, check if the user IDs you are filtering by are accessible to the service account.

Official References