Analytics API returning 413 Entity Too Large — how to split a query that spans 90 days
What You Will Build
- A Python script that successfully retrieves conversation detail analytics for a 90-day period by programmatically splitting the date range into smaller chunks.
- This tutorial uses the Genesys Cloud CX Analytics API (
/api/v2/analytics/conversations/details/query) and the official Genesys Cloud Python SDK. - The code is written in Python 3.9+ and handles pagination, rate limiting, and payload size constraints.
Prerequisites
- OAuth Client Type: Service Account or Confidential Client.
- Required Scopes:
analytics:conversation:viewandconversation:view. - SDK Version:
genesyscloudPython SDK (version 10.0.0 or higher). - Runtime Requirements: Python 3.9 or higher.
- External Dependencies:
genesyscloud: The official Genesys Cloud SDK.tqdm: For progress bar visualization (optional but recommended for long queries).pandas: For efficient data aggregation (optional).
Install the dependencies using pip:
pip install genesyscloud tqdm pandas
Authentication Setup
The Genesys Cloud Python SDK handles OAuth token management automatically when configured correctly. You must provide your client ID, client secret, and environment (e.g., us-east-1). The SDK caches the access token and refreshes it automatically before expiration.
Do not attempt to manually manage tokens unless you are building a custom auth server. Use the SDK’s login method or the environment-based configuration.
from genesyscloud.auth import oauth_client
# Configure the OAuth client
oauth_client.set_default(
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
environment="us-east-1" # Change to your deployment region
)
Implementation
Step 1: Define the Query Splitting Logic
The core issue with the 413 error is that the JSON body of the POST request exceeds the server’s maximum payload size. This often happens when querying a large date range with high granularity (e.g., 1-minute intervals) or including many filters.
To solve this, we must split the 90-day range into smaller sub-ranges. A safe chunk size for conversations/details/query is typically 7 to 14 days, depending on the volume of conversations. We will use 7-day chunks to ensure stability.
We need a function that takes a start date and an end date and returns a list of sub-ranges.
from datetime import datetime, timedelta
from typing import List, Tuple
def split_date_range(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> List[Tuple[datetime, datetime]]:
"""
Splits a large date range into smaller chunks to avoid 413 Entity Too Large errors.
Args:
start_date: The beginning of the analytics period.
end_date: The end of the analytics period.
chunk_days: The number of days per chunk. Default is 7.
Returns:
A list of tuples, where each tuple contains (chunk_start, chunk_end).
"""
ranges = []
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=chunk_days), end_date)
ranges.append((current_start, current_end))
current_start = current_end
return ranges
Step 2: Construct the Analytics Query Body
The conversations/details/query endpoint requires a specific JSON structure. We must define the dateRange, groupBy, and view parameters.
Note: The view parameter determines which fields are returned. default is lightweight. full includes more fields but increases payload size. Use default or summary when possible to reduce memory usage.
We will create a function that generates the query body for a specific date chunk.
from genesyscloud.analytics.models import ConversationDetailsQuery
def create_query_body(start_date: datetime, end_date: datetime) -> dict:
"""
Constructs the query body for the analytics API.
Args:
start_date: Start of the chunk.
end_date: End of the chunk.
Returns:
A dictionary representing the request body.
"""
# Format dates as ISO 8601 with timezone (UTC)
start_iso = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_iso = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
query_body = {
"dateRange": {
"startDate": start_iso,
"endDate": end_iso
},
"groupBy": ["mediaType"], # Group by media type (voice, chat, etc.)
"view": "default", # Use 'default' to minimize payload size
"select": [
"conversationId",
"mediaType",
"startTime",
"endTime",
"duration",
"wrapUpCode"
],
"filters": {
"type": "and",
"clauses": [
{
"dimension": "mediaType",
"operator": "eq",
"value": ["voice", "webchat"] # Only include specific media types if needed
}
]
}
}
return query_body
Step 3: Execute the Query with Pagination and Retry Logic
The Genesys Cloud SDK provides a convenient method query_conversation_details which handles pagination automatically via the continuation_token. However, for precise control over error handling and progress tracking, we will implement a manual loop.
We will use the genesyscloud.analytics.analytics_api client. We must handle:
- 429 Too Many Requests: Implement exponential backoff.
- 413 Entity Too Large: This should be avoided by splitting, but if it occurs, we will catch it and log the error.
- Pagination: Use the
continuation_tokenreturned in the response to fetch the next page.
import time
import logging
from genesyscloud.analytics import analytics_api
from genesyscloud.rest import exceptions
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_analytics_chunk(analytics_client: analytics_api, query_body: dict, chunk_index: int) -> list:
"""
Fetches analytics data for a single date chunk, handling pagination and retries.
Args:
analytics_client: The initialized Genesys Cloud Analytics API client.
query_body: The query payload for this chunk.
chunk_index: Index of the chunk for logging purposes.
Returns:
A list of conversation detail records.
"""
all_records = []
continuation_token = None
max_retries = 3
while True:
try:
# Add continuation token if present
if continuation_token:
query_body["continuationToken"] = continuation_token
# Execute the query
response = analytics_client.post_analytics_conversations_details_query(
body=query_body,
async_req=False
)
# Check if response has data
if response.entity and response.entity.conversations:
all_records.extend(response.entity.conversations)
logger.info(f"Chunk {chunk_index}: Fetched {len(response.entity.conversations)} records. Total so far: {len(all_records)}")
# Check for more pages
if response.entity and response.entity.nextPageToken:
continuation_token = response.entity.nextPageToken
else:
break
except exceptions.ApiException as e:
status_code = e.status
if status_code == 429:
# Rate limit hit - wait and retry
wait_time = 2 ** max_retries * 2 # Exponential backoff
logger.warning(f"Chunk {chunk_index}: Rate limit hit (429). Waiting {wait_time} seconds.")
time.sleep(wait_time)
continue
elif status_code == 413:
# Payload too large - this should not happen if chunks are small enough
logger.error(f"Chunk {chunk_index}: Payload too large (413). Reduce chunk size or select fewer fields.")
raise e
else:
# Other errors - re-raise
logger.error(f"Chunk {chunk_index}: API Error {status_code}: {e.body}")
raise e
except Exception as e:
logger.error(f"Chunk {chunk_index}: Unexpected error: {str(e)}")
raise e
return all_records
Step 4: Orchestrate the Full 90-Day Query
Now we combine the splitting logic, query construction, and execution into a main function. This function will iterate through each 7-day chunk, fetch the data, and aggregate the results.
from tqdm import tqdm
import pandas as pd
def fetch_90_day_analytics(start_date: datetime, end_date: datetime) -> pd.DataFrame:
"""
Fetches conversation analytics for a 90-day period by splitting into chunks.
Args:
start_date: The start of the 90-day period.
end_date: The end of the 90-day period.
Returns:
A pandas DataFrame containing all conversation records.
"""
# Initialize the Analytics API client
analytics_client = analytics_api.AnalyticsApi()
# Split the date range
chunks = split_date_range(start_date, end_date, chunk_days=7)
logger.info(f"Splitting { (end_date - start_date).days } days into {len(chunks)} chunks.")
all_data = []
# Iterate through each chunk with a progress bar
for i, (chunk_start, chunk_end) in enumerate(tqdm(chunks, desc="Processing Chunks")):
query_body = create_query_body(chunk_start, chunk_end)
try:
records = fetch_analytics_chunk(analytics_client, query_body, chunk_index=i)
all_data.extend(records)
except Exception as e:
logger.error(f"Failed to fetch chunk {i} ({chunk_start} to {chunk_end}). Skipping.")
continue
# Convert to DataFrame for easier analysis
if all_data:
df = pd.json_normalize(all_data)
logger.info(f"Successfully fetched {len(df)} records.")
return df
else:
logger.warning("No records found.")
return pd.DataFrame()
Complete Working Example
Below is the complete, runnable script. Save this as fetch_analytics.py. Ensure you have set your GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables or update the configuration section directly.
import os
import logging
from datetime import datetime, timedelta
from typing import List, Tuple
import pandas as pd
from tqdm import tqdm
# Genesys Cloud SDK Imports
from genesyscloud.auth import oauth_client
from genesyscloud.analytics import analytics_api
from genesyscloud.rest import exceptions
# Configure Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def split_date_range(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> List[Tuple[datetime, datetime]]:
"""Splits a large date range into smaller chunks."""
ranges = []
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=chunk_days), end_date)
ranges.append((current_start, current_end))
current_start = current_end
return ranges
def create_query_body(start_date: datetime, end_date: datetime) -> dict:
"""Constructs the query body for the analytics API."""
start_iso = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_iso = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
query_body = {
"dateRange": {
"startDate": start_iso,
"endDate": end_iso
},
"groupBy": ["mediaType"],
"view": "default",
"select": [
"conversationId",
"mediaType",
"startTime",
"endTime",
"duration",
"wrapUpCode"
],
"filters": {
"type": "and",
"clauses": [] # Add specific filters here if needed
}
}
return query_body
def fetch_analytics_chunk(analytics_client: analytics_api, query_body: dict, chunk_index: int) -> list:
"""Fetches analytics data for a single date chunk, handling pagination and retries."""
all_records = []
continuation_token = None
max_retries = 3
while True:
try:
if continuation_token:
query_body["continuationToken"] = continuation_token
response = analytics_client.post_analytics_conversations_details_query(
body=query_body,
async_req=False
)
if response.entity and response.entity.conversations:
all_records.extend(response.entity.conversations)
logger.info(f"Chunk {chunk_index}: Fetched {len(response.entity.conversations)} records.")
if response.entity and response.entity.nextPageToken:
continuation_token = response.entity.nextPageToken
else:
break
except exceptions.ApiException as e:
status_code = e.status
if status_code == 429:
wait_time = 2 ** max_retries * 2
logger.warning(f"Chunk {chunk_index}: Rate limit hit (429). Waiting {wait_time} seconds.")
time.sleep(wait_time)
continue
elif status_code == 413:
logger.error(f"Chunk {chunk_index}: Payload too large (413). Reduce chunk size.")
raise e
else:
logger.error(f"Chunk {chunk_index}: API Error {status_code}: {e.body}")
raise e
except Exception as e:
logger.error(f"Chunk {chunk_index}: Unexpected error: {str(e)}")
raise e
return all_records
def main():
# 1. Authentication
# Use environment variables for security
client_id = os.getenv("GENESYS_CLIENT_ID", "YOUR_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "us-east-1")
oauth_client.set_default(
client_id=client_id,
client_secret=client_secret,
environment=environment
)
# 2. Define Date Range (Last 90 Days)
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
logger.info(f"Fetching analytics from {start_date} to {end_date}")
# 3. Initialize API Client
analytics_client = analytics_api.AnalyticsApi()
# 4. Split Date Range
chunks = split_date_range(start_date, end_date, chunk_days=7)
logger.info(f"Splitting date range into {len(chunks)} chunks of 7 days.")
all_data = []
# 5. Fetch Data Chunk by Chunk
for i, (chunk_start, chunk_end) in enumerate(tqdm(chunks, desc="Processing Chunks")):
query_body = create_query_body(chunk_start, chunk_end)
try:
records = fetch_analytics_chunk(analytics_client, query_body, chunk_index=i)
all_data.extend(records)
except Exception as e:
logger.error(f"Failed to fetch chunk {i}. Skipping.")
continue
# 6. Process Results
if all_data:
df = pd.json_normalize(all_data)
logger.info(f"Successfully fetched {len(df)} records.")
# Example: Save to CSV
output_filename = f"analytics_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}.csv"
df.to_csv(output_filename, index=False)
logger.info(f"Data saved to {output_filename}")
else:
logger.warning("No records found.")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 413 Entity Too Large
- Cause: The JSON body sent to the API exceeds the server’s maximum payload limit. This is common when querying large date ranges with high granularity or many selected fields.
- Fix: Reduce the
chunk_daysparameter insplit_date_range. Try reducing it from 7 to 3 or 2 days. Also, review theselectlist increate_query_bodyand remove unnecessary fields. Useview: "default"instead ofview: "full". - Code Fix: Modify the call:
split_date_range(start_date, end_date, chunk_days=3).
Error: 429 Too Many Requests
- Cause: You have exceeded the API rate limit. The Genesys Cloud Analytics API has strict rate limits, especially for detailed queries.
- Fix: Implement exponential backoff. The provided code includes a basic retry mechanism. If you continue to hit 429s, increase the
wait_timein the retry logic or add a delay between chunks. - Code Fix: Increase
wait_timeinfetch_analytics_chunkor addtime.sleep(1)between chunks in the main loop.
Error: 401 Unauthorized
- Cause: The OAuth token is invalid or expired.
- Fix: Ensure your
client_idandclient_secretare correct. Verify that the OAuth client has theanalytics:conversation:viewscope. The SDK handles token refresh, but if you are using a custom auth flow, ensure the token is not expired. - Code Fix: Check environment variables and scopes in the Genesys Cloud Admin Portal.
Error: 403 Forbidden
- Cause: The OAuth client lacks the necessary permissions.
- Fix: Verify that the service account has the
analytics:conversation:viewandconversation:viewroles assigned in the Genesys Cloud Admin Portal. - Code Fix: Assign the correct roles to the user associated with the OAuth client.