Analytics API Returning 413 Entity Too Large — How to Split a 90-Day Query

Analytics API Returning 413 Entity Too Large — How to Split a 90-Day Query

What You Will Build

  • A robust Python utility that automatically segments a large date range into smaller chunks to bypass the 413 Entity Too Large error.
  • This solution uses the Genesys Cloud CX Analytics API (/api/v2/analytics/conversations/details/query).
  • The code is written in Python 3.8+ using the official genesys-cloud-purecloud-platform-client SDK and the requests library for raw HTTP fallbacks.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Flow).
  • Required Scopes: analytics:conversation:read (for conversation details) or analytics:interaction:read (for aggregated metrics).
  • SDK Version: genesys-cloud-purecloud-platform-client >= 140.0.0.
  • Language/Runtime: Python 3.8 or higher.
  • External Dependencies:
    • genesys-cloud-purecloud-platform-client
    • requests
    • pandas (optional, for data processing)

Authentication Setup

Genesys Cloud uses OAuth 2.0. For server-to-server integrations, the Client Credentials flow is standard. The SDK handles token refresh automatically, but you must initialize the client correctly.

import os
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow

# Environment variables should contain your credentials
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENV", "mypurecloud.com")

def get_purecloud_client() -> ApiClient:
    """
    Initializes and returns an authenticated PureCloud API client.
    """
    config = Configuration(
        host=f"https://{ENVIRONMENT}",
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        oauth_config=PureCloudAuthFlow(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
    )
    
    # The ApiClient handles the initial token fetch and subsequent refreshes
    api_client = ApiClient(configuration=config)
    return api_client

Implementation

Step 1: Understanding the 413 Limit

The Genesys Cloud Analytics API has a strict payload size limit. When querying conversation details for a long period (e.g., 90 days), the JSON body containing the query parameters, filters, and grouping instructions can exceed the server’s maximum allowed request size (often around 1MB-2MB depending on the specific endpoint complexity). Additionally, the resulting dataset might be too large to return in a single HTTP response, leading to timeouts or 504 errors, though 413 is specifically about the request body size or the internal query complexity limit.

To solve this, we must split the dateFrom and dateTo range into smaller intervals (e.g., 7-day or 14-day chunks) and execute separate API calls for each chunk.

Step 2: Defining the Query Structure

We need a standard query template. This example fetches conversation details for inbound voice interactions.

from purecloudplatformclientv2.models import ConversationDetailsQuery

def create_base_query(date_from: str, date_to: str) -> ConversationDetailsQuery:
    """
    Creates a ConversationDetailsQuery object for a specific date range.
    
    Args:
        date_from: ISO 8601 start date string (e.g., "2023-10-01T00:00:00Z")
        date_to: ISO 8601 end date string (e.g., "2023-10-08T00:00:00Z")
        
    Returns:
        ConversationDetailsQuery object ready for API submission.
    """
    # Define the query body
    query = ConversationDetailsQuery()
    query.date_from = date_from
    query.date_to = date_to
    
    # Filter for inbound voice conversations
    query.filter = {
        "interactionTypes": ["voice"],
        "direction": ["inbound"]
    }
    
    # Select specific fields to reduce payload size
    query.select = [
        "id",
        "type",
        "direction",
        "startTime",
        "endTime",
        "duration",
        "holdDuration",
        "wrapUpCode",
        "queue",
        "agents"
    ]
    
    # Grouping is often not needed for raw details, but if used, keep it simple
    # query.groupBy = ["queue"] 
    
    return query

Step 3: Implementing the Chunking Logic

This is the core logic. We calculate the number of days, divide by the chunk size, and generate a list of start/end date pairs. We must ensure the dates are in UTC and properly formatted.

from datetime import datetime, timedelta, timezone

def split_date_range(start_date_str: str, end_date_str: str, chunk_days: int = 14) -> list[tuple[str, str]]:
    """
    Splits a date range into smaller chunks.
    
    Args:
        start_date_str: ISO 8601 start date.
        end_date_str: ISO 8601 end date.
        chunk_days: Number of days per chunk (default 14).
        
    Returns:
        List of tuples containing (chunk_start, chunk_end) ISO strings.
    """
    # Parse input strings to datetime objects (assuming UTC)
    start_dt = datetime.fromisoformat(start_date_str.replace('Z', '+00:00'))
    end_dt = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
    
    chunks = []
    current_start = start_dt
    
    while current_start < end_dt:
        current_end = current_start + timedelta(days=chunk_days)
        
        # Ensure the end of the chunk does not exceed the original end date
        if current_end > end_dt:
            current_end = end_dt
            
        # Format back to ISO 8601 with Z suffix for Genesys API
        chunks.append((
            current_start.strftime("%Y-%m-%dT%H:%M:%SZ"),
            current_end.strftime("%Y-%m-%dT%H:%M:%SZ")
        ))
        
        # Move to next chunk
        current_start = current_end
        
    return chunks

Step 4: Executing Queries with Retry Logic

The Analytics API can be rate-limited (429) or temporarily unavailable (5xx). We implement a simple exponential backoff retry mechanism. We also catch the 413 error explicitly to log it, although our chunking strategy should prevent it.

import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def query_analytics_with_retry(
    api_client: ApiClient,
    analytics_api,
    query_body: ConversationDetailsQuery,
    max_retries: int = 3
) -> list:
    """
    Executes an analytics query with retry logic for 429 and 5xx errors.
    
    Args:
        api_client: The PureCloud ApiClient instance.
        analytics_api: The AnalyticsApi instance.
        query_body: The ConversationDetailsQuery object.
        max_retries: Maximum number of retries.
        
    Returns:
        List of conversation detail objects.
    """
    for attempt in range(max_retries):
        try:
            # Post the query to Genesys Cloud
            response = analytics_api.post_analytics_conversations_details_query(body=query_body)
            
            # The API returns a list of conversation details
            return response.entities
            
        except Exception as e:
            status_code = e.status_code if hasattr(e, 'status_code') else None
            
            if status_code == 413:
                logger.error(f"413 Entity Too Large encountered. Query body was too large. Chunk size may need reduction.")
                raise e # Re-raise to halt execution as chunking failed
                
            elif status_code == 429:
                wait_time = 2 ** attempt
                logger.warning(f"429 Too Many Requests. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                
            elif status_code and 500 <= status_code < 600:
                wait_time = 2 ** attempt
                logger.warning(f"5xx Server Error. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                
            else:
                logger.error(f"Unexpected error: {e}")
                raise e
                
    logger.error(f"Max retries ({max_retries}) exceeded.")
    raise Exception("Max retries exceeded for analytics query.")

Step 5: Orchestrating the Full Process

We combine the chunking, query creation, and execution into a single function that aggregates results.

from purecloudplatformclientv2 import AnalyticsApi

def fetch_conversations_for_range(
    api_client: ApiClient,
    start_date: str,
    end_date: str,
    chunk_days: int = 14
) -> list:
    """
    Fetches all conversations for a given date range by splitting into chunks.
    
    Args:
        api_client: Authenticated PureCloud ApiClient.
        start_date: Start date ISO string.
        end_date: End date ISO string.
        chunk_days: Days per chunk.
        
    Returns:
        A flat list of all conversation detail objects.
    """
    analytics_api = AnalyticsApi(api_client)
    all_conversations = []
    
    # Step 1: Split the date range
    chunks = split_date_range(start_date, end_date, chunk_days)
    logger.info(f"Split date range into {len(chunks)} chunks.")
    
    # Step 2: Iterate through chunks
    for i, (chunk_start, chunk_end) in enumerate(chunks):
        logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
        
        try:
            # Create the query for this specific chunk
            query_body = create_base_query(chunk_start, chunk_end)
            
            # Execute the query
            conversations = query_analytics_with_retry(
                api_client, 
                analytics_api, 
                query_body
            )
            
            # Aggregate results
            all_conversations.extend(conversations)
            logger.info(f"Retrieved {len(conversations)} conversations for this chunk.")
            
            # Optional: Small delay to be polite to the API
            time.sleep(1)
            
        except Exception as e:
            logger.error(f"Failed to process chunk {chunk_start} - {chunk_end}: {e}")
            # Depending on requirements, you might want to continue or break
            continue
            
    return all_conversations

Complete Working Example

This script combines all components. It assumes environment variables are set. It fetches 90 days of voice conversation details, splitting them into 14-day chunks.

import os
import sys
from datetime import datetime, timedelta, timezone

# Import Genesys Cloud SDK
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow, AnalyticsApi
from purecloudplatformclientv2.models import ConversationDetailsQuery
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_purecloud_client() -> ApiClient:
    """Initializes and returns an authenticated PureCloud API client."""
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENV", "mypurecloud.com")
    
    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
        
    config = Configuration(
        host=f"https://{environment}",
        client_id=client_id,
        client_secret=client_secret,
        oauth_config=PureCloudAuthFlow(client_id=client_id, client_secret=client_secret)
    )
    return ApiClient(configuration=config)

def split_date_range(start_date_str: str, end_date_str: str, chunk_days: int = 14) -> list[tuple[str, str]]:
    """Splits a date range into smaller chunks."""
    start_dt = datetime.fromisoformat(start_date_str.replace('Z', '+00:00'))
    end_dt = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
    
    chunks = []
    current_start = start_dt
    
    while current_start < end_dt:
        current_end = current_start + timedelta(days=chunk_days)
        if current_end > end_dt:
            current_end = end_dt
            
        chunks.append((
            current_start.strftime("%Y-%m-%dT%H:%M:%SZ"),
            current_end.strftime("%Y-%m-%dT%H:%M:%SZ")
        ))
        current_start = current_end
        
    return chunks

def create_base_query(date_from: str, date_to: str) -> ConversationDetailsQuery:
    """Creates a ConversationDetailsQuery object."""
    query = ConversationDetailsQuery()
    query.date_from = date_from
    query.date_to = date_to
    
    query.filter = {
        "interactionTypes": ["voice"],
        "direction": ["inbound"]
    }
    
    query.select = [
        "id",
        "type",
        "startTime",
        "endTime",
        "duration",
        "holdDuration",
        "queue"
    ]
    
    return query

def query_analytics_with_retry(api_client: ApiClient, analytics_api: AnalyticsApi, query_body: ConversationDetailsQuery, max_retries: int = 3) -> list:
    """Executes an analytics query with retry logic."""
    for attempt in range(max_retries):
        try:
            response = analytics_api.post_analytics_conversations_details_query(body=query_body)
            return response.entities
        except Exception as e:
            status_code = e.status_code if hasattr(e, 'status_code') else None
            
            if status_code == 413:
                logger.error("413 Entity Too Large. Chunking strategy failed.")
                raise e
            elif status_code == 429:
                time.sleep(2 ** attempt)
            elif status_code and 500 <= status_code < 600:
                time.sleep(2 ** attempt)
            else:
                logger.error(f"Unexpected error: {e}")
                raise e
    raise Exception("Max retries exceeded.")

def main():
    try:
        # 1. Initialize Client
        logger.info("Initializing PureCloud Client...")
        api_client = get_purecloud_client()
        
        # 2. Define Date Range (Last 90 Days)
        end_date = datetime.now(timezone.utc)
        start_date = end_date - timedelta(days=90)
        
        start_str = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
        end_str = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
        
        logger.info(f"Fetching data from {start_str} to {end_str}")
        
        # 3. Execute Chunked Fetch
        total_conversations = []
        chunks = split_date_range(start_str, end_str, chunk_days=14)
        analytics_api = AnalyticsApi(api_client)
        
        for i, (chunk_start, chunk_end) in enumerate(chunks):
            logger.info(f"Processing Chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
            
            query_body = create_base_query(chunk_start, chunk_end)
            
            try:
                conversations = query_analytics_with_retry(api_client, analytics_api, query_body)
                total_conversations.extend(conversations)
                logger.info(f"Chunk {i+1} complete. Retrieved {len(conversations)} records.")
            except Exception as e:
                logger.error(f"Error in Chunk {i+1}: {e}")
                continue
                
        logger.info(f"Total conversations retrieved: {len(total_conversations)}")
        
        # 4. Output Sample Data
        if total_conversations:
            print("\n--- Sample Conversation Data ---")
            for conv in total_conversations[:5]:
                print(f"ID: {conv.id}, Start: {conv.start_time}, Duration: {conv.duration}s")
                
    except Exception as e:
        logger.error(f"Fatal error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 413 Entity Too Large

What causes it:
The JSON payload sent to /api/v2/analytics/conversations/details/query exceeds the server’s maximum request size. This often happens when:

  1. The date range is too large (e.g., 90+ days).
  2. The select array includes heavy fields like transcripts, recordings, or large customAttributes.
  3. Complex filters with many nested conditions are used.

How to fix it:

  1. Reduce Chunk Size: Change chunk_days from 14 to 7 or even 3 in the split_date_range function.
  2. Trim Select Fields: Remove unnecessary fields from the query.select list. Only request what you need.
  3. Simplify Filters: Ensure you are not using overly complex boolean logic in the filter.

Code Fix:

# In create_base_query, remove heavy fields
query.select = [
    "id",
    "startTime",
    "endTime"
    # Removed: "transcripts", "recordings", "customAttributes"
]

Error: 429 Too Many Requests

What causes it:
You have exceeded the rate limit for the Analytics API. Genesys Cloud enforces strict rate limits per client ID.

How to fix it:

  1. Implement Exponential Backoff: The provided query_analytics_with_retry function already does this.
  2. Add Delays Between Chunks: The time.sleep(1) in the main loop helps distribute load.
  3. Reduce Concurrency: Do not run multiple instances of this script simultaneously with the same Client ID.

Error: 401 Unauthorized

What causes it:
The OAuth token is invalid, expired, or the Client ID/Secret is incorrect.

How to fix it:

  1. Verify GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.
  2. Ensure the OAuth Client has the analytics:conversation:read scope assigned in the Genesys Cloud Admin Console.
  3. Check that the Configuration object is correctly passing the oauth_config.

Error: 403 Forbidden

What causes it:
The OAuth Client lacks the necessary scope permissions.

How to fix it:

  1. Go to Genesys Cloud Admin Console → Platform → OAuth Clients.
  2. Select your client.
  3. Ensure analytics:conversation:read is checked in the Scopes section.
  4. Save and wait for the change to propagate (usually immediate).

Official References