Analytics API Returning 413 Entity Too Large — How to Split a 90-Day Query
What You Will Build
- A robust Python utility that automatically segments a large date range into smaller chunks to bypass the 413 Entity Too Large error.
- This solution uses the Genesys Cloud CX Analytics API (
/api/v2/analytics/conversations/details/query). - The code is written in Python 3.8+ using the official
genesys-cloud-purecloud-platform-clientSDK and therequestslibrary for raw HTTP fallbacks.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Flow).
- Required Scopes:
analytics:conversation:read(for conversation details) oranalytics:interaction:read(for aggregated metrics). - SDK Version:
genesys-cloud-purecloud-platform-client>= 140.0.0. - Language/Runtime: Python 3.8 or higher.
- External Dependencies:
genesys-cloud-purecloud-platform-clientrequestspandas(optional, for data processing)
Authentication Setup
Genesys Cloud uses OAuth 2.0. For server-to-server integrations, the Client Credentials flow is standard. The SDK handles token refresh automatically, but you must initialize the client correctly.
import os
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow
# Environment variables should contain your credentials
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENV", "mypurecloud.com")
def get_purecloud_client() -> ApiClient:
"""
Initializes and returns an authenticated PureCloud API client.
"""
config = Configuration(
host=f"https://{ENVIRONMENT}",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
oauth_config=PureCloudAuthFlow(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
)
# The ApiClient handles the initial token fetch and subsequent refreshes
api_client = ApiClient(configuration=config)
return api_client
Implementation
Step 1: Understanding the 413 Limit
The Genesys Cloud Analytics API has a strict payload size limit. When querying conversation details for a long period (e.g., 90 days), the JSON body containing the query parameters, filters, and grouping instructions can exceed the server’s maximum allowed request size (often around 1MB-2MB depending on the specific endpoint complexity). Additionally, the resulting dataset might be too large to return in a single HTTP response, leading to timeouts or 504 errors, though 413 is specifically about the request body size or the internal query complexity limit.
To solve this, we must split the dateFrom and dateTo range into smaller intervals (e.g., 7-day or 14-day chunks) and execute separate API calls for each chunk.
Step 2: Defining the Query Structure
We need a standard query template. This example fetches conversation details for inbound voice interactions.
from purecloudplatformclientv2.models import ConversationDetailsQuery
def create_base_query(date_from: str, date_to: str) -> ConversationDetailsQuery:
"""
Creates a ConversationDetailsQuery object for a specific date range.
Args:
date_from: ISO 8601 start date string (e.g., "2023-10-01T00:00:00Z")
date_to: ISO 8601 end date string (e.g., "2023-10-08T00:00:00Z")
Returns:
ConversationDetailsQuery object ready for API submission.
"""
# Define the query body
query = ConversationDetailsQuery()
query.date_from = date_from
query.date_to = date_to
# Filter for inbound voice conversations
query.filter = {
"interactionTypes": ["voice"],
"direction": ["inbound"]
}
# Select specific fields to reduce payload size
query.select = [
"id",
"type",
"direction",
"startTime",
"endTime",
"duration",
"holdDuration",
"wrapUpCode",
"queue",
"agents"
]
# Grouping is often not needed for raw details, but if used, keep it simple
# query.groupBy = ["queue"]
return query
Step 3: Implementing the Chunking Logic
This is the core logic. We calculate the number of days, divide by the chunk size, and generate a list of start/end date pairs. We must ensure the dates are in UTC and properly formatted.
from datetime import datetime, timedelta, timezone
def split_date_range(start_date_str: str, end_date_str: str, chunk_days: int = 14) -> list[tuple[str, str]]:
"""
Splits a date range into smaller chunks.
Args:
start_date_str: ISO 8601 start date.
end_date_str: ISO 8601 end date.
chunk_days: Number of days per chunk (default 14).
Returns:
List of tuples containing (chunk_start, chunk_end) ISO strings.
"""
# Parse input strings to datetime objects (assuming UTC)
start_dt = datetime.fromisoformat(start_date_str.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
chunks = []
current_start = start_dt
while current_start < end_dt:
current_end = current_start + timedelta(days=chunk_days)
# Ensure the end of the chunk does not exceed the original end date
if current_end > end_dt:
current_end = end_dt
# Format back to ISO 8601 with Z suffix for Genesys API
chunks.append((
current_start.strftime("%Y-%m-%dT%H:%M:%SZ"),
current_end.strftime("%Y-%m-%dT%H:%M:%SZ")
))
# Move to next chunk
current_start = current_end
return chunks
Step 4: Executing Queries with Retry Logic
The Analytics API can be rate-limited (429) or temporarily unavailable (5xx). We implement a simple exponential backoff retry mechanism. We also catch the 413 error explicitly to log it, although our chunking strategy should prevent it.
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def query_analytics_with_retry(
api_client: ApiClient,
analytics_api,
query_body: ConversationDetailsQuery,
max_retries: int = 3
) -> list:
"""
Executes an analytics query with retry logic for 429 and 5xx errors.
Args:
api_client: The PureCloud ApiClient instance.
analytics_api: The AnalyticsApi instance.
query_body: The ConversationDetailsQuery object.
max_retries: Maximum number of retries.
Returns:
List of conversation detail objects.
"""
for attempt in range(max_retries):
try:
# Post the query to Genesys Cloud
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
# The API returns a list of conversation details
return response.entities
except Exception as e:
status_code = e.status_code if hasattr(e, 'status_code') else None
if status_code == 413:
logger.error(f"413 Entity Too Large encountered. Query body was too large. Chunk size may need reduction.")
raise e # Re-raise to halt execution as chunking failed
elif status_code == 429:
wait_time = 2 ** attempt
logger.warning(f"429 Too Many Requests. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
elif status_code and 500 <= status_code < 600:
wait_time = 2 ** attempt
logger.warning(f"5xx Server Error. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error(f"Unexpected error: {e}")
raise e
logger.error(f"Max retries ({max_retries}) exceeded.")
raise Exception("Max retries exceeded for analytics query.")
Step 5: Orchestrating the Full Process
We combine the chunking, query creation, and execution into a single function that aggregates results.
from purecloudplatformclientv2 import AnalyticsApi
def fetch_conversations_for_range(
api_client: ApiClient,
start_date: str,
end_date: str,
chunk_days: int = 14
) -> list:
"""
Fetches all conversations for a given date range by splitting into chunks.
Args:
api_client: Authenticated PureCloud ApiClient.
start_date: Start date ISO string.
end_date: End date ISO string.
chunk_days: Days per chunk.
Returns:
A flat list of all conversation detail objects.
"""
analytics_api = AnalyticsApi(api_client)
all_conversations = []
# Step 1: Split the date range
chunks = split_date_range(start_date, end_date, chunk_days)
logger.info(f"Split date range into {len(chunks)} chunks.")
# Step 2: Iterate through chunks
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
try:
# Create the query for this specific chunk
query_body = create_base_query(chunk_start, chunk_end)
# Execute the query
conversations = query_analytics_with_retry(
api_client,
analytics_api,
query_body
)
# Aggregate results
all_conversations.extend(conversations)
logger.info(f"Retrieved {len(conversations)} conversations for this chunk.")
# Optional: Small delay to be polite to the API
time.sleep(1)
except Exception as e:
logger.error(f"Failed to process chunk {chunk_start} - {chunk_end}: {e}")
# Depending on requirements, you might want to continue or break
continue
return all_conversations
Complete Working Example
This script combines all components. It assumes environment variables are set. It fetches 90 days of voice conversation details, splitting them into 14-day chunks.
import os
import sys
from datetime import datetime, timedelta, timezone
# Import Genesys Cloud SDK
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow, AnalyticsApi
from purecloudplatformclientv2.models import ConversationDetailsQuery
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_purecloud_client() -> ApiClient:
"""Initializes and returns an authenticated PureCloud API client."""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENV", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
config = Configuration(
host=f"https://{environment}",
client_id=client_id,
client_secret=client_secret,
oauth_config=PureCloudAuthFlow(client_id=client_id, client_secret=client_secret)
)
return ApiClient(configuration=config)
def split_date_range(start_date_str: str, end_date_str: str, chunk_days: int = 14) -> list[tuple[str, str]]:
"""Splits a date range into smaller chunks."""
start_dt = datetime.fromisoformat(start_date_str.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
chunks = []
current_start = start_dt
while current_start < end_dt:
current_end = current_start + timedelta(days=chunk_days)
if current_end > end_dt:
current_end = end_dt
chunks.append((
current_start.strftime("%Y-%m-%dT%H:%M:%SZ"),
current_end.strftime("%Y-%m-%dT%H:%M:%SZ")
))
current_start = current_end
return chunks
def create_base_query(date_from: str, date_to: str) -> ConversationDetailsQuery:
"""Creates a ConversationDetailsQuery object."""
query = ConversationDetailsQuery()
query.date_from = date_from
query.date_to = date_to
query.filter = {
"interactionTypes": ["voice"],
"direction": ["inbound"]
}
query.select = [
"id",
"type",
"startTime",
"endTime",
"duration",
"holdDuration",
"queue"
]
return query
def query_analytics_with_retry(api_client: ApiClient, analytics_api: AnalyticsApi, query_body: ConversationDetailsQuery, max_retries: int = 3) -> list:
"""Executes an analytics query with retry logic."""
for attempt in range(max_retries):
try:
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
return response.entities
except Exception as e:
status_code = e.status_code if hasattr(e, 'status_code') else None
if status_code == 413:
logger.error("413 Entity Too Large. Chunking strategy failed.")
raise e
elif status_code == 429:
time.sleep(2 ** attempt)
elif status_code and 500 <= status_code < 600:
time.sleep(2 ** attempt)
else:
logger.error(f"Unexpected error: {e}")
raise e
raise Exception("Max retries exceeded.")
def main():
try:
# 1. Initialize Client
logger.info("Initializing PureCloud Client...")
api_client = get_purecloud_client()
# 2. Define Date Range (Last 90 Days)
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=90)
start_str = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
logger.info(f"Fetching data from {start_str} to {end_str}")
# 3. Execute Chunked Fetch
total_conversations = []
chunks = split_date_range(start_str, end_str, chunk_days=14)
analytics_api = AnalyticsApi(api_client)
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing Chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
query_body = create_base_query(chunk_start, chunk_end)
try:
conversations = query_analytics_with_retry(api_client, analytics_api, query_body)
total_conversations.extend(conversations)
logger.info(f"Chunk {i+1} complete. Retrieved {len(conversations)} records.")
except Exception as e:
logger.error(f"Error in Chunk {i+1}: {e}")
continue
logger.info(f"Total conversations retrieved: {len(total_conversations)}")
# 4. Output Sample Data
if total_conversations:
print("\n--- Sample Conversation Data ---")
for conv in total_conversations[:5]:
print(f"ID: {conv.id}, Start: {conv.start_time}, Duration: {conv.duration}s")
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 413 Entity Too Large
What causes it:
The JSON payload sent to /api/v2/analytics/conversations/details/query exceeds the server’s maximum request size. This often happens when:
- The date range is too large (e.g., 90+ days).
- The
selectarray includes heavy fields liketranscripts,recordings, or largecustomAttributes. - Complex filters with many nested conditions are used.
How to fix it:
- Reduce Chunk Size: Change
chunk_daysfrom 14 to 7 or even 3 in thesplit_date_rangefunction. - Trim Select Fields: Remove unnecessary fields from the
query.selectlist. Only request what you need. - Simplify Filters: Ensure you are not using overly complex boolean logic in the filter.
Code Fix:
# In create_base_query, remove heavy fields
query.select = [
"id",
"startTime",
"endTime"
# Removed: "transcripts", "recordings", "customAttributes"
]
Error: 429 Too Many Requests
What causes it:
You have exceeded the rate limit for the Analytics API. Genesys Cloud enforces strict rate limits per client ID.
How to fix it:
- Implement Exponential Backoff: The provided
query_analytics_with_retryfunction already does this. - Add Delays Between Chunks: The
time.sleep(1)in the main loop helps distribute load. - Reduce Concurrency: Do not run multiple instances of this script simultaneously with the same Client ID.
Error: 401 Unauthorized
What causes it:
The OAuth token is invalid, expired, or the Client ID/Secret is incorrect.
How to fix it:
- Verify
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETenvironment variables. - Ensure the OAuth Client has the
analytics:conversation:readscope assigned in the Genesys Cloud Admin Console. - Check that the
Configurationobject is correctly passing theoauth_config.
Error: 403 Forbidden
What causes it:
The OAuth Client lacks the necessary scope permissions.
How to fix it:
- Go to Genesys Cloud Admin Console → Platform → OAuth Clients.
- Select your client.
- Ensure
analytics:conversation:readis checked in the Scopes section. - Save and wait for the change to propagate (usually immediate).