Splitting Large Analytics Queries to Avoid 413 Entity Too Large Errors
What You Will Build
- You will build a Python script that retrieves conversation detail data from Genesys Cloud CX for a 90-day period without triggering a 413 Entity Too Large error.
- This tutorial uses the Genesys Cloud CX Python SDK (
genesyscloud) and the/api/v2/analytics/conversations/details/queryendpoint. - The code is written in Python 3.9+ using
asyncioandhttpxfor underlying HTTP handling via the SDK.
Prerequisites
- OAuth Client Type: Service Account or Client Credentials Flow.
- Required Scopes:
analytics:conversation:readandanalytics:report:read. - SDK Version:
genesyscloud>= 140.0.0 (ensure you use the latest stable release). - Language/Runtime: Python 3.9 or higher.
- Dependencies:
genesyscloudpandas(optional, for data aggregation)python-dotenv(for secure credential management)
Install the dependencies using pip:
pip install genesyscloud pandas python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. The SDK handles token acquisition and refresh automatically when you initialize the platform client. You must configure the client with your organization domain, client ID, and client secret.
Create a .env file in your project root with the following variables:
GENESYS_CLOUD_DOMAIN="your-org.mygen.com"
GENESYS_CLOUD_CLIENT_ID="your-client-id"
GENESYS_CLOUD_CLIENT_SECRET="your-client-secret"
Initialize the authentication in your code:
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient
load_dotenv()
def get_platform_client() -> PlatformClient:
"""
Initializes and returns a configured Genesys Cloud Platform Client.
Uses Client Credentials flow for service accounts.
"""
domain = os.getenv("GENESYS_CLOUD_DOMAIN")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([domain, client_id, client_secret]):
raise ValueError("Missing required environment variables for Genesys Cloud authentication.")
# Initialize the platform client
platform_client = PlatformClient()
# Configure OAuth with client credentials flow
oauth_client = platform_client.oauth_client
oauth_client.client_id = client_id
oauth_client.client_secret = client_secret
oauth_client.domain = domain
return platform_client
Implementation
The core issue with the 413 Entity Too Large error in the Analytics API is the size of the JSON payload sent in the POST body. The /api/v2/analytics/conversations/details/query endpoint accepts a complex JSON body containing filters, groupings, and metrics. When you request data for a long duration (e.g., 90 days), the resulting query object can exceed the server’s maximum request body size limit, especially if you include many filters or groupings.
The solution is to split the date range into smaller chunks (e.g., 7-day or 14-day intervals), execute separate queries for each chunk, and then aggregate the results.
Step 1: Define the Query Chunking Logic
First, create a function that splits a start and end date into smaller intervals. This ensures each individual API call has a manageable payload size.
from datetime import datetime, timedelta
from typing import List, Tuple
def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> List[Tuple[str, str]]:
"""
Splits a date range into smaller chunks to avoid 413 errors.
Args:
start_date: Start date in ISO format (YYYY-MM-DD)
end_date: End date in ISO format (YYYY-MM-DD)
chunk_days: Number of days per chunk (default 14)
Returns:
List of tuples, each containing (chunk_start, chunk_end) in ISO format
"""
start = datetime.fromisoformat(start_date)
end = datetime.fromisoformat(end_date)
chunks = []
current_start = start
while current_start < end:
chunk_end = min(current_start + timedelta(days=chunk_days), end)
chunks.append((current_start.isoformat()[:10], chunk_end.isoformat()[:10]))
current_start = chunk_end
return chunks
Step 2: Construct the Analytics Query Payload
The Analytics API requires a specific JSON structure for the query body. You must define the metrics, filters, and groupings. For this example, we will retrieve conversation details grouped by user.
from purecloudplatformclientv2.models import QueryConversationDetailRequest, ConversationDetailFilter, ConversationDetailGroupBy
def create_query_payload(start_date: str, end_date: str, user_ids: List[str] = None) -> QueryConversationDetailRequest:
"""
Creates a QueryConversationDetailRequest object for a specific date range.
Args:
start_date: Start date in ISO format
end_date: End date in ISO format
user_ids: Optional list of user IDs to filter by
Returns:
Configured QueryConversationDetailRequest object
"""
# Initialize the request object
query_request = QueryConversationDetailRequest()
# Set the date range
query_request.date_from = start_date + "T00:00:00.000Z"
query_request.date_to = end_date + "T23:59:59.999Z"
# Define filters
filters = ConversationDetailFilter()
# Filter by conversation type (e.g., voice)
filters.conversation_type = ["voice"]
# Optional: Filter by specific users
if user_ids:
filters.user_ids = user_ids
query_request.filter = filters
# Define groupings
group_by = ConversationDetailGroupBy()
group_by.user = True # Group results by user
query_request.group_by = group_by
# Define metrics (optional, defaults are often sufficient for details)
# metrics = ConversationDetailMetrics()
# metrics.handle_time = True
# query_request.metrics = metrics
return query_request
Step 3: Execute Queries and Handle Pagination
The Analytics API returns paginated results. You must handle the nextPageToken to retrieve all data for each chunk. Additionally, you must implement retry logic for 429 Too Many Requests errors, which are common when making multiple sequential API calls.
import time
import logging
from purecloudplatformclientv2 import AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def fetch_chunk_data(analytics_api: AnalyticsApi, start_date: str, end_date: str, user_ids: List[str] = None) -> dict:
"""
Fetches analytics data for a single date chunk with pagination and retry logic.
Args:
analytics_api: The AnalyticsApi instance
start_date: Start date in ISO format
end_date: End date in ISO format
user_ids: Optional list of user IDs
Returns:
Dictionary containing the aggregated results for this chunk
"""
query_request = create_query_payload(start_date, end_date, user_ids)
all_results = []
next_page_token = None
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
while True:
# Execute the query
if next_page_token:
response = analytics_api.post_analytics_conversations_details_query(
body=query_request,
page_token=next_page_token
)
else:
response = analytics_api.post_analytics_conversations_details_query(
body=query_request
)
# Accumulate results
if response.entities:
all_results.extend(response.entities)
# Check for pagination
if response.page_token:
next_page_token = response.page_token
else:
break
return {
"start_date": start_date,
"end_date": end_date,
"entities": all_results,
"total_count": response.total_count
}
except ApiException as e:
if e.status == 429:
logger.warning(f"Rate limited (429). Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
continue
elif e.status == 413:
logger.error(f"413 Entity Too Large. Chunk size may still be too large. Reduce chunk_days.")
raise e
else:
logger.error(f"API Error: {e.status} - {e.reason}")
raise e
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise e
raise Exception("Max retries exceeded for 429 errors.")
Step 4: Orchestrate the Full 90-Day Query
Combine the chunking logic and the fetch function to process the entire date range.
async def fetch_90_day_analytics(platform_client: PlatformClient, start_date: str, end_date: str, user_ids: List[str] = None) -> list:
"""
Fetches analytics data for a 90-day period by splitting into chunks.
Args:
platform_client: The initialized PlatformClient
start_date: Start date in ISO format (YYYY-MM-DD)
end_date: End date in ISO format (YYYY-MM-DD)
user_ids: Optional list of user IDs
Returns:
List of all conversation detail entities
"""
# Get the Analytics API instance
analytics_api = AnalyticsApi(platform_client)
# Split the date range into chunks
chunks = split_date_range(start_date, end_date, chunk_days=14)
logger.info(f"Splitting date range into {len(chunks)} chunks.")
all_entities = []
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
try:
chunk_data = await fetch_chunk_data(analytics_api, chunk_start, chunk_end, user_ids)
all_entities.extend(chunk_data["entities"])
logger.info(f"Retrieved {len(chunk_data['entities'])} entities for this chunk.")
# Optional: Add a small delay between chunks to be respectful of rate limits
time.sleep(1)
except Exception as e:
logger.error(f"Failed to process chunk {chunk_start} to {chunk_end}: {str(e)}")
raise e
logger.info(f"Total entities retrieved: {len(all_entities)}")
return all_entities
Complete Working Example
Here is the full, copy-pasteable script. Save this as gen_analytics_split.py.
import os
import asyncio
import logging
from datetime import datetime
from typing import List
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient, AnalyticsApi, ApiException
from purecloudplatformclientv2.models import QueryConversationDetailRequest, ConversationDetailFilter, ConversationDetailGroupBy
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_env():
"""Loads environment variables from .env file."""
load_dotenv()
return {
"domain": os.getenv("GENESYS_CLOUD_DOMAIN"),
"client_id": os.getenv("GENESYS_CLOUD_CLIENT_ID"),
"client_secret": os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
}
def get_platform_client() -> PlatformClient:
"""Initializes the Genesys Cloud Platform Client."""
env_vars = load_env()
if not all(env_vars.values()):
raise ValueError("Missing required environment variables.")
platform_client = PlatformClient()
oauth_client = platform_client.oauth_client
oauth_client.client_id = env_vars["client_id"]
oauth_client.client_secret = env_vars["client_secret"]
oauth_client.domain = env_vars["domain"]
return platform_client
def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> List[tuple]:
"""Splits a date range into smaller chunks."""
start = datetime.fromisoformat(start_date)
end = datetime.fromisoformat(end_date)
chunks = []
current_start = start
while current_start < end:
chunk_end = min(current_start + timedelta(days=chunk_days), end)
chunks.append((current_start.isoformat()[:10], chunk_end.isoformat()[:10]))
current_start = chunk_end
return chunks
def create_query_payload(start_date: str, end_date: str, user_ids: List[str] = None) -> QueryConversationDetailRequest:
"""Creates the analytics query request object."""
query_request = QueryConversationDetailRequest()
query_request.date_from = start_date + "T00:00:00.000Z"
query_request.date_to = end_date + "T23:59:59.999Z"
filters = ConversationDetailFilter()
filters.conversation_type = ["voice"]
if user_ids:
filters.user_ids = user_ids
query_request.filter = filters
group_by = ConversationDetailGroupBy()
group_by.user = True
query_request.group_by = group_by
return query_request
async def fetch_chunk_data(analytics_api: AnalyticsApi, start_date: str, end_date: str, user_ids: List[str] = None) -> dict:
"""Fetches data for a single chunk with pagination and retry."""
query_request = create_query_payload(start_date, end_date, user_ids)
all_results = []
next_page_token = None
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
while True:
if next_page_token:
response = analytics_api.post_analytics_conversations_details_query(
body=query_request,
page_token=next_page_token
)
else:
response = analytics_api.post_analytics_conversations_details_query(
body=query_request
)
if response.entities:
all_results.extend(response.entities)
if response.page_token:
next_page_token = response.page_token
else:
break
return {
"start_date": start_date,
"end_date": end_date,
"entities": all_results,
"total_count": response.total_count
}
except ApiException as e:
if e.status == 429:
logger.warning(f"Rate limited (429). Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay *= 2
continue
elif e.status == 413:
logger.error("413 Entity Too Large. Reduce chunk_days.")
raise e
else:
logger.error(f"API Error: {e.status} - {e.reason}")
raise e
async def main():
"""Main execution function."""
# Define date range (90 days back from today)
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
logger.info(f"Starting analytics fetch from {start_date} to {end_date}")
platform_client = get_platform_client()
analytics_api = AnalyticsApi(platform_client)
chunks = split_date_range(start_date, end_date, chunk_days=14)
logger.info(f"Created {len(chunks)} chunks.")
all_entities = []
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
try:
chunk_data = await fetch_chunk_data(analytics_api, chunk_start, chunk_end)
all_entities.extend(chunk_data["entities"])
logger.info(f"Chunk complete. Total entities so far: {len(all_entities)}")
await asyncio.sleep(1) # Respect rate limits
except Exception as e:
logger.error(f"Error in chunk {chunk_start}: {str(e)}")
break
logger.info(f"Fetch complete. Total entities: {len(all_entities)}")
# Example: Save to JSON
import json
with open("analytics_results.json", "w") as f:
json.dump([entity.to_dict() for entity in all_entities], f, indent=2, default=str)
logger.info("Results saved to analytics_results.json")
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 413 Entity Too Large
- Cause: The JSON body sent to the API exceeds the server’s maximum request size. This often happens with large date ranges, many filters, or complex groupings.
- Fix: Reduce the
chunk_daysparameter insplit_date_range. Start with 7 days if 14 days fails. Also, review thefiltersandgroup_byobjects to ensure they are not unnecessarily large.
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the Analytics API. The limit is typically requests per minute.
- Fix: Implement exponential backoff (as shown in the code). Increase the
retry_delayand addasyncio.sleep()between chunks. If the error persists, reduce the concurrency if you are using multiple threads/async tasks.
Error: 401 Unauthorized
- Cause: Invalid or expired OAuth token.
- Fix: Ensure your
GENESYS_CLOUD_CLIENT_IDandGENESYS_CLOUD_CLIENT_SECRETare correct. Check that the service account has theanalytics:conversation:readscope assigned.
Error: 403 Forbidden
- Cause: The service account lacks permission to access the requested data.
- Fix: Verify the service account has the
analytics:conversation:readscope. Also, check if the user IDs you are filtering by are accessible to the service account.