How to Split a 90-Day Analytics Query to Avoid 413 Entity Too Large Errors
What You Will Build
- You will build a Python script that queries Genesys Cloud CX analytics for conversation metrics over a 90-day period without triggering HTTP 413 errors.
- You will use the Genesys Cloud CX Python SDK (
genesyscloudpackage) and the underlyingrequestslibrary for low-level control when necessary. - You will implement a chunking strategy that splits large date ranges into smaller, compliant intervals.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:conversation:readoranalytics:detail:readdepending on the specific endpoint used. For this tutorial, we will useanalytics:conversation:readwhich allows access to aggregated conversation metrics. - SDK Version:
genesyscloudPython SDK v2.0.0 or later. - Language/Runtime: Python 3.8+.
- Dependencies:
pip install genesyscloud python-dateutil requests - Environment Variables: You must have
GENESYS_ORGANIZATION_ID,GENESYS_CLIENT_ID,GENESYS_CLIENT_SECRET, andGENESYS_REGIONdefined in your environment.
Authentication Setup
Genesys Cloud uses OAuth 2.0 for API authentication. The SDK handles token acquisition and refresh automatically, but you must initialize the client correctly.
import os
import logging
from genesyscloud.rest import Configuration
from genesyscloud.analytics.api import analytics_api
# Configure logging to see SDK debug info if needed
logging.basicConfig(level=logging.INFO)
def get_analytics_client() -> analytics_api.AnalyticsApi:
"""
Initializes and returns an authenticated Analytics API client.
"""
organization_id = os.getenv("GENESYS_ORGANIZATION_ID")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not all([organization_id, client_id, client_secret]):
raise ValueError("Missing required environment variables for Genesys Cloud authentication.")
# The SDK automatically handles OAuth token refresh
configuration = Configuration(
host=f"https://{region}.mypurecloud.com",
organization_id=organization_id,
client_id=client_id,
client_secret=client_secret
)
return analytics_api.AnalyticsApi(configuration)
Implementation
Step 1: Understanding the 413 Limit and Date Chunking
The Genesys Cloud Analytics API imposes a limit on the size of the request payload. When querying for data over a long period (e.g., 90 days) with high granularity (e.g., hourly or daily buckets), the resulting JSON payload can exceed the server’s maximum request size, resulting in a 413 Entity Too Large error.
The solution is to split the 90-day range into smaller chunks (e.g., 14 days or 30 days) and query each chunk separately. Then, aggregate the results in your application.
We will define a helper function to split a date range into chunks of a specified number of days.
from datetime import datetime, timedelta
from dateutil import parser as date_parser
def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> list[tuple[str, str]]:
"""
Splits a date range into smaller chunks.
Args:
start_date: ISO 8601 start date string.
end_date: ISO 8601 end date string.
chunk_days: Number of days per chunk.
Returns:
A list of tuples, each containing (start_date_str, end_date_str) for a chunk.
"""
start = date_parser.isoparse(start_date)
end = date_parser.isoparse(end_date)
chunks = []
current_start = start
while current_start < end:
current_end = min(current_start + timedelta(days=chunk_days), end)
chunks.append((current_start.isoformat(), current_end.isoformat()))
current_start = current_end
return chunks
Step 2: Constructing the Analytics Query Payload
The POST /api/v2/analytics/conversations/details/query endpoint requires a specific JSON structure. Key fields include dateRange, interval, groupBy, and metricFilters.
Critical Note: The interval field determines the granularity. A smaller interval (e.g., 1h) results in more data points and a larger response payload, increasing the risk of hitting limits on the response side as well, though the 413 error is typically on the request side due to complex filtering or large date ranges in certain query types. For this tutorial, we will use 1d (daily) intervals to keep the payload manageable while demonstrating the chunking logic.
from genesyscloud.analytics.model import ConversationDetailsQuery
def build_query_payload(start_date: str, end_date: str) -> ConversationDetailsQuery:
"""
Builds the ConversationDetailsQuery object for a specific date chunk.
Args:
start_date: Start date of the chunk.
end_date: End date of the chunk.
Returns:
A ConversationDetailsQuery object.
"""
# Define the query parameters
query = ConversationDetailsQuery(
date_range={
"startDate": start_date,
"endDate": end_date
},
interval="1d", # Daily intervals
group_by=["channel"], # Group by channel (voice, chat, etc.)
metric_filters=[
{
"metric": "conversationCount",
"aggregation": "sum"
}
],
select=["conversationCount"]
)
return query
Step 3: Executing Chunked Queries and Aggregating Results
We will iterate through the chunks generated in Step 1, execute the API call for each chunk, and aggregate the results. We will also implement error handling for common HTTP errors.
from genesyscloud.analytics.model import ConversationDetailsResponse
import time
def query_analytics_in_chunks(client: analytics_api.AnalyticsApi, start_date: str, end_date: str, chunk_days: int = 14) -> dict:
"""
Queries analytics data in chunks to avoid 413 errors.
Args:
client: Authenticated Analytics API client.
start_date: Overall start date.
end_date: Overall end date.
chunk_days: Days per chunk.
Returns:
A dictionary aggregating the results from all chunks.
"""
chunks = split_date_range(start_date, end_date, chunk_days)
aggregated_data = {}
for i, (chunk_start, chunk_end) in enumerate(chunks):
print(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
try:
# Build the query for this chunk
query_payload = build_query_payload(chunk_start, chunk_end)
# Execute the API call
# Note: The SDK method is post_analytics_conversations_details_query
response: ConversationDetailsResponse = client.post_analytics_conversations_details_query(
body=query_payload
)
# Process the response
if response.entities:
for entity in response.entities:
# Key: Channel name
# Value: Total conversation count
channel = entity.get("channel", "Unknown")
count = entity.get("conversationCount", 0)
if channel in aggregated_data:
aggregated_data[channel] += count
else:
aggregated_data[channel] = count
# Optional: Add a small delay to respect rate limits
time.sleep(0.5)
except Exception as e:
print(f"Error processing chunk {chunk_start} to {chunk_end}: {e}")
# In a production system, you might want to retry or log this error
continue
return aggregated_data
Complete Working Example
The following script combines all the previous steps into a single, runnable module. It authenticates, splits the date range, queries the API, and prints the aggregated results.
import os
import logging
from datetime import datetime, timedelta
from dateutil import parser as date_parser
from genesyscloud.rest import Configuration
from genesyscloud.analytics.api import analytics_api
from genesyscloud.analytics.model import ConversationDetailsQuery, ConversationDetailsResponse
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_analytics_client() -> analytics_api.AnalyticsApi:
"""
Initializes and returns an authenticated Analytics API client.
"""
organization_id = os.getenv("GENESYS_ORGANIZATION_ID")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not all([organization_id, client_id, client_secret]):
raise ValueError("Missing required environment variables for Genesys Cloud authentication.")
configuration = Configuration(
host=f"https://{region}.mypurecloud.com",
organization_id=organization_id,
client_id=client_id,
client_secret=client_secret
)
return analytics_api.AnalyticsApi(configuration)
def split_date_range(start_date: str, end_date: str, chunk_days: int = 14) -> list[tuple[str, str]]:
"""
Splits a date range into smaller chunks.
"""
start = date_parser.isoparse(start_date)
end = date_parser.isoparse(end_date)
chunks = []
current_start = start
while current_start < end:
current_end = min(current_start + timedelta(days=chunk_days), end)
chunks.append((current_start.isoformat(), current_end.isoformat()))
current_start = current_end
return chunks
def build_query_payload(start_date: str, end_date: str) -> ConversationDetailsQuery:
"""
Builds the ConversationDetailsQuery object for a specific date chunk.
"""
query = ConversationDetailsQuery(
date_range={
"startDate": start_date,
"endDate": end_date
},
interval="1d",
group_by=["channel"],
metric_filters=[
{
"metric": "conversationCount",
"aggregation": "sum"
}
],
select=["conversationCount"]
)
return query
def query_analytics_in_chunks(client: analytics_api.AnalyticsApi, start_date: str, end_date: str, chunk_days: int = 14) -> dict:
"""
Queries analytics data in chunks to avoid 413 errors.
"""
chunks = split_date_range(start_date, end_date, chunk_days)
aggregated_data = {}
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start} to {chunk_end}")
try:
query_payload = build_query_payload(chunk_start, chunk_end)
response: ConversationDetailsResponse = client.post_analytics_conversations_details_query(
body=query_payload
)
if response.entities:
for entity in response.entities:
channel = entity.get("channel", "Unknown")
count = entity.get("conversationCount", 0)
if channel in aggregated_data:
aggregated_data[channel] += count
else:
aggregated_data[channel] = count
# Respect rate limits
time.sleep(0.5)
except Exception as e:
logger.error(f"Error processing chunk {chunk_start} to {chunk_end}: {e}")
continue
return aggregated_data
if __name__ == "__main__":
try:
# Define the date range (90 days ago to now)
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
start_date_str = start_date.isoformat()
end_date_str = end_date.isoformat()
logger.info(f"Starting analytics query from {start_date_str} to {end_date_str}")
# Initialize client
client = get_analytics_client()
# Execute chunked query
results = query_analytics_in_chunks(client, start_date_str, end_date_str, chunk_days=14)
# Print results
logger.info("Aggregated Results:")
for channel, count in results.items():
logger.info(f" {channel}: {count}")
except Exception as e:
logger.error(f"Fatal error: {e}")
Common Errors & Debugging
Error: HTTP 413 Entity Too Large
What causes it:
The server refuses the request because the payload size exceeds the configured limit. This often happens when querying large date ranges with complex filters or high-resolution intervals (e.g., 1m or 1h over 90 days).
How to fix it:
Reduce the date range per request. The chunking strategy implemented above solves this by splitting the 90-day range into 14-day chunks. If you still encounter 413 errors, reduce the chunk_days value further (e.g., to 7 days).
Code showing the fix:
# Reduce chunk size from 14 days to 7 days
results = query_analytics_in_chunks(client, start_date_str, end_date_str, chunk_days=7)
Error: HTTP 429 Too Many Requests
What causes it:
You are sending requests too frequently. Genesys Cloud APIs have rate limits. The Analytics API is particularly sensitive to bursty traffic.
How to fix it:
Implement exponential backoff or a fixed delay between requests. The example above includes a time.sleep(0.5) call. For production systems, consider using a retry library like tenacity.
Code showing the fix:
import tenacity
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception_type(Exception)
)
def safe_api_call(client, query_payload):
return client.post_analytics_conversations_details_query(body=query_payload)
Error: HTTP 400 Bad Request
What causes it:
The query payload is malformed. Common issues include invalid date formats, unsupported metrics, or invalid group-by fields.
How to fix it:
Validate the ConversationDetailsQuery object before sending. Ensure dates are in ISO 8601 format and that metrics and group-by fields are supported for the endpoint.
Code showing the fix:
# Validate date format
try:
date_parser.isoparse(start_date)
date_parser.isoparse(end_date)
except ValueError:
raise ValueError("Invalid date format. Use ISO 8601.")