Splitting Genesys Cloud Analytics Queries to Avoid 413 Entity Too Large Errors
What You Will Build
- A Python utility that programmatically splits a 90-day analytics query into smaller, manageable chunks to prevent
413 Entity Too Largeerrors. - This tutorial uses the Genesys Cloud CX Analytics API (
/api/v2/analytics/conversations/details/query). - The programming language covered is Python 3.9+ using the
genesys-cloud-pythonSDK.
Prerequisites
- OAuth Client Type: A Machine-to-Machine (M2M) OAuth client is recommended for server-side analytics retrieval.
- Required Scopes:
analytics:conversation:readis required to query conversation details. - SDK Version:
genesys-cloud-pythonversion 140.0.0 or later. - Runtime Requirements: Python 3.9 or higher.
- External Dependencies:
pip install genesys-cloud-pythonpip install python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For analytics queries, which are often executed in batch or scheduled contexts, the Client Credentials Grant flow is the standard approach. This flow exchanges your OAuth client ID and secret for an access token.
The SDK handles the token retrieval automatically if you provide the credentials during initialization. However, understanding the underlying mechanism helps when debugging 401 Unauthorized errors.
import os
from dotenv import load_dotenv
from purecloudplatform.client.configuration import Configuration
from purecloudplatform.client.api_client import ApiClient
# Load environment variables from .env file
load_dotenv()
def get_auth_configuration():
"""
Creates and returns a configured API client instance.
"""
# Retrieve credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
# Create configuration object
configuration = Configuration()
configuration.host = base_url
configuration.client_id = client_id
configuration.client_secret = client_secret
# Create API client with configuration
# The SDK will automatically fetch and refresh tokens as needed
api_client = ApiClient(configuration)
return api_client
Implementation
Step 1: Understanding the 413 Entity Too Large Constraint
The Genesys Cloud Analytics API imposes a limit on the size of the request body. When querying conversation/details, the request body includes filters, groupings, and time ranges. If the time range is too broad (e.g., 90 days) and the filters are not sufficiently restrictive, the resulting payload or the internal processing requirement exceeds the server’s limit, triggering a 413 Entity Too Large response.
The solution is not to reduce the filter complexity alone, but to split the time range into smaller segments. A safe segment size for most environments is 7 to 14 days. For this tutorial, we will split a 90-day window into seven 13-day chunks.
Step 2: Defining the Query Structure
Before splitting the time range, we must define the core query structure. This structure remains constant across all chunks; only the dateFrom and dateTo fields change.
from purecloudplatform.client.model.conversation_details_query_body import ConversationDetailsQueryBody
from purecloudplatform.client.model.conversation_view import ConversationView
from datetime import datetime, timedelta
import pytz
def create_base_query_body(start_date: datetime, end_date: datetime) -> ConversationDetailsQueryBody:
"""
Creates a ConversationDetailsQueryBody with fixed filters and dynamic date range.
Args:
start_date: The start of the time window (UTC).
end_date: The end of the time window (UTC).
Returns:
ConversationDetailsQueryBody: The query body object.
"""
query_body = ConversationDetailsQueryBody()
# Set the time range
query_body.date_from = start_date.isoformat() + "Z"
query_body.date_to = end_date.isoformat() + "Z"
# Define the view to retrieve conversation details
view = ConversationView("details")
query_body.view = view
# Add common filters to reduce data volume
# Example: Only retrieve calls (not chats, emails, etc.)
query_body.filter = {
"type": "AND",
"clauses": [
{
"path": "type",
"operation": "equals",
"value": "call"
},
{
"path": "wrapupcode",
"operation": "exists"
}
]
}
# Limit the number of records returned per chunk to manage memory
# The API allows up to 1000 records per page.
# Note: This is a limit on the *result*, not the request body size,
# but keeping it reasonable helps with overall processing.
query_body.size = 1000
return query_body
Step 3: Implementing the Time Chunking Logic
The core logic involves calculating the start and end dates for each chunk. We use pytz to ensure all times are in UTC, which is required by the Genesys Cloud API.
def generate_time_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 13):
"""
Generates a list of (start_date, end_date) tuples covering the full range.
Args:
start_date: The overall start date (UTC).
end_date: The overall end date (UTC).
chunk_days: The number of days per chunk.
Yields:
Tuple[datetime, datetime]: Start and end dates for each chunk.
"""
current_start = start_date
while current_start < end_date:
current_end = current_start + timedelta(days=chunk_days)
if current_end > end_date:
current_end = end_date
yield (current_start, current_end)
current_start = current_end
Step 4: Executing the Query with Pagination and Retry Logic
The Analytics API returns paginated results. We must handle pagination within each chunk. Additionally, we implement retry logic for 429 Too Many Requests and 5xx server errors.
import time
import logging
from purecloudplatform.client.api.analytics_api import AnalyticsApi
from purecloudplatform.client.rest import ApiException
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def execute_chunk_query(api_instance: AnalyticsApi, query_body: ConversationDetailsQueryBody, chunk_index: int):
"""
Executes a single analytics query chunk with pagination and retry logic.
Args:
api_instance: The AnalyticsApi instance.
query_body: The query body for this chunk.
chunk_index: The index of the chunk for logging purposes.
Returns:
List[dict]: A list of conversation detail records.
"""
all_results = []
next_page_token = None
max_retries = 3
while True:
try:
# Prepare request parameters
request_kwargs = {
"body": query_body,
}
# Add pagination token if available
if next_page_token:
request_kwargs["next_page"] = next_page_token
# Execute the API call
logger.info(f"Chunk {chunk_index}: Executing query...")
response = api_instance.post_analytics_conversations_details_query(**request_kwargs)
# Collect results
if response.entities:
all_results.extend(response.entities)
logger.info(f"Chunk {chunk_index}: Retrieved {len(response.entities)} records.")
# Check for more pages
if response.next_page:
next_page_token = response.next_page
else:
break
except ApiException as e:
status_code = e.status
if status_code == 429:
# Handle Rate Limiting
retry_after = int(e.headers.get("Retry-After", 5))
logger.warning(f"Chunk {chunk_index}: Rate limited. Retrying after {retry_after} seconds.")
time.sleep(retry_after)
continue
elif 500 <= status_code < 600:
# Handle Server Errors with exponential backoff
if max_retries > 0:
wait_time = 2 ** (3 - max_retries)
logger.warning(f"Chunk {chunk_index}: Server error {status_code}. Retrying in {wait_time} seconds.")
time.sleep(wait_time)
max_retries -= 1
continue
else:
logger.error(f"Chunk {chunk_index}: Max retries exceeded for server error.")
raise e
else:
# Handle other errors (400, 401, 403, 413)
logger.error(f"Chunk {chunk_index}: API Error {status_code}: {e.body}")
raise e
# Small delay between pages to be polite to the API
time.sleep(0.5)
return all_results
Step 5: Orchestrating the Full Process
We combine the chunk generation, query creation, and execution into a single function. This function takes the overall start and end dates, splits them into chunks, and aggregates the results.
def fetch_analytics_data(start_date_str: str, end_date_str: str):
"""
Fetches analytics data for a given date range by splitting it into chunks.
Args:
start_date_str: Start date in ISO format (e.g., "2023-01-01T00:00:00Z").
end_date_str: End date in ISO format (e.g., "2023-03-31T23:59:59Z").
Returns:
List[dict]: A list of all conversation detail records.
"""
# Parse dates
start_date = datetime.fromisoformat(start_date_str.replace("Z", "+00:00"))
end_date = datetime.fromisoformat(end_date_str.replace("Z", "+00:00"))
# Initialize API client
api_client = get_auth_configuration()
analytics_api = AnalyticsApi(api_client)
all_conversations = []
chunk_index = 0
# Generate chunks
for chunk_start, chunk_end in generate_time_chunks(start_date, end_date):
chunk_index += 1
logger.info(f"Processing Chunk {chunk_index}: {chunk_start.isoformat()} to {chunk_end.isoformat()}")
# Create query body for this chunk
query_body = create_base_query_body(chunk_start, chunk_end)
# Execute query
try:
chunk_results = execute_chunk_query(analytics_api, query_body, chunk_index)
all_conversations.extend(chunk_results)
except Exception as e:
logger.error(f"Failed to process Chunk {chunk_index}: {e}")
# Decide whether to continue or break based on your requirements
break
# Small delay between chunks to avoid sustained high load
time.sleep(1)
logger.info(f"Total records retrieved: {len(all_conversations)}")
return all_conversations
Complete Working Example
The following script is a complete, runnable example. It uses environment variables for credentials and fetches analytics data for the last 90 days.
#!/usr/bin/env python3
"""
Genesys Cloud Analytics Query Splitter
This script demonstrates how to avoid 413 Entity Too Large errors by splitting
a large date range into smaller chunks when querying the Analytics API.
"""
import os
import sys
import logging
from datetime import datetime, timedelta
import pytz
from dotenv import load_dotenv
from purecloudplatform.client.configuration import Configuration
from purecloudplatform.client.api_client import ApiClient
from purecloudplatform.client.api.analytics_api import AnalyticsApi
from purecloudplatform.client.model.conversation_details_query_body import ConversationDetailsQueryBody
from purecloudplatform.client.model.conversation_view import ConversationView
from purecloudplatform.client.rest import ApiException
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_auth_configuration():
"""Creates and returns a configured API client instance."""
load_dotenv()
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
configuration = Configuration()
configuration.host = base_url
configuration.client_id = client_id
configuration.client_secret = client_secret
return ApiClient(configuration)
def create_base_query_body(start_date: datetime, end_date: datetime) -> ConversationDetailsQueryBody:
"""Creates a ConversationDetailsQueryBody with fixed filters and dynamic date range."""
query_body = ConversationDetailsQueryBody()
query_body.date_from = start_date.isoformat() + "Z"
query_body.date_to = end_date.isoformat() + "Z"
view = ConversationView("details")
query_body.view = view
query_body.filter = {
"type": "AND",
"clauses": [
{
"path": "type",
"operation": "equals",
"value": "call"
}
]
}
query_body.size = 1000
return query_body
def generate_time_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 13):
"""Generates a list of (start_date, end_date) tuples covering the full range."""
current_start = start_date
while current_start < end_date:
current_end = current_start + timedelta(days=chunk_days)
if current_end > end_date:
current_end = end_date
yield (current_start, current_end)
current_start = current_end
def execute_chunk_query(api_instance: AnalyticsApi, query_body: ConversationDetailsQueryBody, chunk_index: int):
"""Executes a single analytics query chunk with pagination and retry logic."""
all_results = []
next_page_token = None
max_retries = 3
while True:
try:
request_kwargs = {"body": query_body}
if next_page_token:
request_kwargs["next_page"] = next_page_token
logger.info(f"Chunk {chunk_index}: Executing query...")
response = api_instance.post_analytics_conversations_details_query(**request_kwargs)
if response.entities:
all_results.extend(response.entities)
logger.info(f"Chunk {chunk_index}: Retrieved {len(response.entities)} records.")
if response.next_page:
next_page_token = response.next_page
else:
break
except ApiException as e:
status_code = e.status
if status_code == 429:
retry_after = int(e.headers.get("Retry-After", 5))
logger.warning(f"Chunk {chunk_index}: Rate limited. Retrying after {retry_after} seconds.")
time.sleep(retry_after)
continue
elif 500 <= status_code < 600:
if max_retries > 0:
wait_time = 2 ** (3 - max_retries)
logger.warning(f"Chunk {chunk_index}: Server error {status_code}. Retrying in {wait_time} seconds.")
time.sleep(wait_time)
max_retries -= 1
continue
else:
logger.error(f"Chunk {chunk_index}: Max retries exceeded for server error.")
raise e
else:
logger.error(f"Chunk {chunk_index}: API Error {status_code}: {e.body}")
raise e
time.sleep(0.5)
return all_results
def fetch_analytics_data(start_date_str: str, end_date_str: str):
"""Fetches analytics data for a given date range by splitting it into chunks."""
start_date = datetime.fromisoformat(start_date_str.replace("Z", "+00:00"))
end_date = datetime.fromisoformat(end_date_str.replace("Z", "+00:00"))
api_client = get_auth_configuration()
analytics_api = AnalyticsApi(api_client)
all_conversations = []
chunk_index = 0
for chunk_start, chunk_end in generate_time_chunks(start_date, end_date):
chunk_index += 1
logger.info(f"Processing Chunk {chunk_index}: {chunk_start.isoformat()} to {chunk_end.isoformat()}")
query_body = create_base_query_body(chunk_start, chunk_end)
try:
chunk_results = execute_chunk_query(analytics_api, query_body, chunk_index)
all_conversations.extend(chunk_results)
except Exception as e:
logger.error(f"Failed to process Chunk {chunk_index}: {e}")
break
time.sleep(1)
logger.info(f"Total records retrieved: {len(all_conversations)}")
return all_conversations
if __name__ == "__main__":
# Define the date range (last 90 days)
end_date = datetime.now(pytz.utc)
start_date = end_date - timedelta(days=90)
start_date_str = start_date.isoformat().replace("+00:00", "Z")
end_date_str = end_date.isoformat().replace("+00:00", "Z")
try:
data = fetch_analytics_data(start_date_str, end_date_str)
# Process the data as needed
print(f"Successfully retrieved {len(data)} conversations.")
except Exception as e:
logger.error(f"An error occurred: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 413 Entity Too Large
- What causes it: The request body exceeds the server’s maximum allowed size. This often happens when the date range is too large or the filter is too complex.
- How to fix it: Reduce the date range per query. The chunking strategy demonstrated in this tutorial is the primary fix. Ensure you are not including unnecessary fields in the
viewparameter. - Code showing the fix: The
generate_time_chunksfunction splits the 90-day range into 13-day segments, ensuring each request body remains small.
Error: 429 Too Many Requests
- What causes it: You have exceeded the rate limit for the Analytics API.
- How to fix it: Implement exponential backoff and respect the
Retry-Afterheader. Theexecute_chunk_queryfunction includes logic to handle this. - Code showing the fix:
if status_code == 429: retry_after = int(e.headers.get("Retry-After", 5)) time.sleep(retry_after) continue
Error: 401 Unauthorized
- What causes it: The OAuth token is invalid or expired.
- How to fix it: Ensure your client ID and secret are correct. The SDK handles token refresh automatically, but if the client credentials are wrong, it will fail.
- Code showing the fix: Check the
get_auth_configurationfunction to ensureclient_idandclient_secretare loaded correctly from environment variables.
Error: 400 Bad Request
- What causes it: The query body is malformed. This can happen if the date format is incorrect or if the filter syntax is invalid.
- How to fix it: Validate the
ConversationDetailsQueryBodyobject. Ensuredate_fromanddate_toare in ISO 8601 format with a “Z” suffix for UTC. - Code showing the fix: The
create_base_query_bodyfunction ensures correct date formatting by appending “Z” to the ISO string.