Resolve 413 Entity Too Large Errors in Genesys Cloud Analytics by Splitting Date Ranges
What You Will Build
- You will build a Python script that retrieves detailed conversation analytics data for a 90-day period.
- You will use the Genesys Cloud Platform Client SDK (
genesys-cloud-sdk) to handle authentication and API calls. - You will implement a date-range splitting strategy to bypass the 413 Entity Too Large error caused by oversized query payloads.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth client with the
analytics:conversation:readscope. - SDK Version:
genesys-cloud-sdkversion 135.0.0 or higher. - Runtime: Python 3.8+.
- Dependencies:
genesys-cloud-sdk,python-dateutil.
Install the dependencies via pip:
pip install genesys-cloud-sdk python-dateutil
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. The SDK handles the token acquisition and refresh logic internally when you initialize the PlatformClient. You must provide your client ID, client secret, and the environment URL (e.g., https://api.mypurecloud.com or https://api.euw2.pure.cloud).
import os
from purecloud_platform_client import PlatformClient
def get_platform_client() -> PlatformClient:
"""
Initializes and returns a configured PlatformClient instance.
"""
# In production, load these from environment variables or a secure vault
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
env_url = os.getenv("GENESYS_ENV_URL", "https://api.mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
platform_client = PlatformClient()
platform_client.set_environment(env_url)
platform_client.login_client_credentials(client_id, client_secret)
return platform_client
Implementation
Step 1: Understand the 413 Error and the Query Payload
The Genesys Cloud Analytics API endpoint /api/v2/analytics/conversations/details/query accepts a QueryConversationDetailsRequest body. This body can contain complex filters, such as selectBy clauses, groupBy buckets, and metrics.
A 413 Entity Too Large error occurs when the JSON payload exceeds the server’s configured limit. This often happens when:
- The date range is excessively large, causing the server to attempt to process too many potential data points.
- The query includes a massive number of
selectByentries (e.g., filtering by thousands of specific queue IDs or user IDs). - The
groupByconfiguration creates a combinatorial explosion of buckets.
For this tutorial, we focus on the most common cause: a wide date range combined with detailed metrics. The solution is to split the 90-day range into smaller chunks (e.g., 14-day intervals) and aggregate the results client-side.
Step 2: Define the Date Splitting Logic
We need a helper function to split a start and end date into smaller intervals. We will use 14 days as the chunk size, which is generally safe from 413 errors while minimizing the number of API calls.
from datetime import datetime, timedelta
from dateutil import parser
def split_date_range(start_date_str: str, end_date_str: str, days_per_chunk: int = 14) -> list[tuple[str, str]]:
"""
Splits a date range into smaller chunks of `days_per_chunk`.
Args:
start_date_str: ISO format start date string.
end_date_str: ISO format end date string.
days_per_chunk: Number of days for each chunk.
Returns:
A list of tuples, each containing a (start, end) date string pair.
"""
start_date = parser.isoparse(start_date_str)
end_date = parser.isoparse(end_date_str)
chunks = []
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=days_per_chunk), end_date)
# Ensure ISO format with timezone if present, or Z suffix
start_iso = current_start.isoformat()
end_iso = current_end.isoformat()
chunks.append((start_iso, end_iso))
current_start = current_end
# Safety break to prevent infinite loops if end_date is not reached due to timezone issues
if current_end == end_date:
break
return chunks
Step 3: Construct the Query Payload
We need to build a QueryConversationDetailsRequest object. This object defines what data we want. We will request details for all conversations in a specific queue, grouped by hour.
Required Scope: analytics:conversation:read
from purecloud_platform_client.models import QueryConversationDetailsRequest, SelectBy, Metric
def create_query_request(queue_id: str, start_date: str, end_date: str) -> QueryConversationDetailsRequest:
"""
Creates a QueryConversationDetailsRequest for a specific date range.
"""
# Initialize the request object
request = QueryConversationDetailsRequest()
# Set the date range for this specific chunk
request.date_from = start_date
request.date_to = end_date
# Define the granularity (e.g., hourly buckets)
request.granularity = "hour"
# Define the metrics we want to aggregate
metrics = ["handle-time", "wrap-up-time", "hold-time"]
request.metrics = metrics
# Define the view (summary view is standard for aggregation)
request.view = "summary"
# Define the selectBy clause (filtering by queue)
select_by = SelectBy()
select_by.type_ = "queue" # Note: type_ is used because 'type' is a reserved keyword
select_by.id = queue_id
request.select_by = [select_by]
# Optional: Group by user to see individual agent performance
# This increases payload size slightly but is usually safe
request.group_by = ["user"]
return request
Step 4: Execute the Query with Pagination and Error Handling
The Analytics API returns paginated results. We must loop through all pages for each date chunk. We also need to handle 413 errors specifically by re-raising them with a clearer message, and handle 429 (Rate Limit) errors by implementing a simple backoff.
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_chunk_data(platform_client: PlatformClient, queue_id: str, start_date: str, end_date: str) -> list:
"""
Fetches analytics data for a single date chunk.
Handles pagination and basic error retries.
"""
api_instance = platform_client.analytics
request_body = create_query_request(queue_id, start_date, end_date)
all_results = []
page = 1
max_retries = 3
while True:
try:
# Call the API
response = api_instance.post_analytics_conversations_details_query(
body=request_body,
page=page,
page_size=100 # Standard page size
)
# Append results
if response.entities and len(response.entities) > 0:
all_results.extend(response.entities)
# Check if there are more pages
if response.page * response.page_size >= response.total:
break
page += 1
except Exception as e:
# Handle 429 Too Many Requests
if hasattr(e, 'status') and e.status == 429:
retry_after = int(e.headers.get('Retry-After', 5)) if hasattr(e, 'headers') else 5
logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
# Handle 413 Entity Too Large (Should not happen with split ranges, but good to log)
if hasattr(e, 'status') and e.status == 413:
logger.error(f"Payload still too large for range {start_date} to {end_date}. Consider smaller chunks.")
raise e
# Handle other errors
logger.error(f"Error fetching data for range {start_date} - {end_date}: {e}")
if max_retries > 0:
max_retries -= 1
time.sleep(1)
continue
else:
raise e
return all_results
Step 5: Aggregate Results Client-Side
Since we are splitting the query, the results will be fragmented. We need to merge the results back together. For simple aggregation, we can sum the metrics. For complex grouping (like grouping by user), we need to ensure we do not double-count users across chunks if they appear in multiple chunks (which they will). However, since the date ranges are disjoint, we can safely sum the total metrics per user across all chunks.
from collections import defaultdict
def aggregate_results(all_chunks_results: list) -> dict:
"""
Aggregates results from multiple date chunks.
Sums up metrics for each unique user.
"""
aggregated_data = defaultdict(lambda: {
"handle-time": 0,
"wrap-up-time": 0,
"hold-time": 0,
"conversation-count": 0
})
for chunk_result in all_chunks_results:
# chunk_result is a list of entities from one date chunk
for entity in chunk_result:
# Identify the user (assuming group_by=["user"])
user_id = None
if hasattr(entity, 'user') and entity.user:
user_id = entity.user.id
if not user_id:
# Handle system conversations or unassigned if necessary
user_id = "unassigned"
# Sum the metrics
if hasattr(entity, 'metrics'):
metrics = entity.metrics
for metric_name in ["handle-time", "wrap-up-time", "hold-time"]:
if metric_name in metrics and metrics[metric_name].value:
aggregated_data[user_id][metric_name] += metrics[metric_name].value
# Count conversations
aggregated_data[user_id]["conversation-count"] += 1
return dict(aggregated_data)
Complete Working Example
This script ties all the components together. It retrieves 90 days of data for a specific queue, splits it into 14-day chunks, fetches each chunk, and aggregates the results.
import os
from datetime import datetime, timedelta
from purecloud_platform_client import PlatformClient
from purecloud_platform_client.models import QueryConversationDetailsRequest, SelectBy
from collections import defaultdict
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_platform_client() -> PlatformClient:
"""Initializes and returns a configured PlatformClient instance."""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
env_url = os.getenv("GENESYS_ENV_URL", "https://api.mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
platform_client = PlatformClient()
platform_client.set_environment(env_url)
platform_client.login_client_credentials(client_id, client_secret)
return platform_client
def split_date_range(start_date_str: str, end_date_str: str, days_per_chunk: int = 14) -> list[tuple[str, str]]:
"""Splits a date range into smaller chunks."""
from dateutil import parser
start_date = parser.isoparse(start_date_str)
end_date = parser.isoparse(end_date_str)
chunks = []
current_start = start_date
while current_start < end_date:
current_end = min(current_start + timedelta(days=days_per_chunk), end_date)
start_iso = current_start.isoformat()
end_iso = current_end.isoformat()
chunks.append((start_iso, end_iso))
current_start = current_end
if current_end == end_date:
break
return chunks
def create_query_request(queue_id: str, start_date: str, end_date: str) -> QueryConversationDetailsRequest:
"""Creates a QueryConversationDetailsRequest for a specific date range."""
request = QueryConversationDetailsRequest()
request.date_from = start_date
request.date_to = end_date
request.granularity = "hour"
request.metrics = ["handle-time", "wrap-up-time", "hold-time"]
request.view = "summary"
select_by = SelectBy()
select_by.type_ = "queue"
select_by.id = queue_id
request.select_by = [select_by]
request.group_by = ["user"]
return request
def fetch_chunk_data(platform_client: PlatformClient, queue_id: str, start_date: str, end_date: str) -> list:
"""Fetches analytics data for a single date chunk with pagination and error handling."""
api_instance = platform_client.analytics
request_body = create_query_request(queue_id, start_date, end_date)
all_results = []
page = 1
max_retries = 3
while True:
try:
response = api_instance.post_analytics_conversations_details_query(
body=request_body,
page=page,
page_size=100
)
if response.entities and len(response.entities) > 0:
all_results.extend(response.entities)
if response.page * response.page_size >= response.total:
break
page += 1
except Exception as e:
if hasattr(e, 'status') and e.status == 429:
retry_after = int(e.headers.get('Retry-After', 5)) if hasattr(e, 'headers') else 5
logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
if hasattr(e, 'status') and e.status == 413:
logger.error(f"Payload still too large for range {start_date} to {end_date}.")
raise e
logger.error(f"Error fetching data for range {start_date} - {end_date}: {e}")
if max_retries > 0:
max_retries -= 1
time.sleep(1)
continue
else:
raise e
return all_results
def aggregate_results(all_chunks_results: list) -> dict:
"""Aggregates results from multiple date chunks."""
aggregated_data = defaultdict(lambda: {
"handle-time": 0,
"wrap-up-time": 0,
"hold-time": 0,
"conversation-count": 0
})
for chunk_result in all_chunks_results:
for entity in chunk_result:
user_id = None
if hasattr(entity, 'user') and entity.user:
user_id = entity.user.id
if not user_id:
user_id = "unassigned"
if hasattr(entity, 'metrics'):
metrics = entity.metrics
for metric_name in ["handle-time", "wrap-up-time", "hold-time"]:
if metric_name in metrics and metrics[metric_name].value:
aggregated_data[user_id][metric_name] += metrics[metric_name].value
aggregated_data[user_id]["conversation-count"] += 1
return dict(aggregated_data)
def main():
# Configuration
QUEUE_ID = os.getenv("GENESYS_QUEUE_ID", "your-queue-id-here")
DAYS_BACK = 90
# Calculate date range
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=DAYS_BACK)
start_date_str = start_date.isoformat()
end_date_str = end_date.isoformat()
logger.info(f"Starting analytics retrieval for Queue {QUEUE_ID} from {start_date_str} to {end_date_str}")
try:
# 1. Authenticate
platform_client = get_platform_client()
# 2. Split date range
date_chunks = split_date_range(start_date_str, end_date_str, days_per_chunk=14)
logger.info(f"Split date range into {len(date_chunks)} chunks.")
# 3. Fetch data for each chunk
all_chunk_results = []
for i, (chunk_start, chunk_end) in enumerate(date_chunks):
logger.info(f"Fetching chunk {i+1}/{len(date_chunks)}: {chunk_start} to {chunk_end}")
chunk_results = fetch_chunk_data(platform_client, QUEUE_ID, chunk_start, chunk_end)
all_chunk_results.append(chunk_results)
logger.info(f"Chunk {i+1} complete. Retrieved {len(chunk_results)} entities.")
# 4. Aggregate results
logger.info("Aggregating results...")
final_data = aggregate_results(all_chunk_results)
# 5. Output results
logger.info("Final Aggregated Data (Top 5 users by conversation count):")
sorted_users = sorted(final_data.items(), key=lambda x: x[1]["conversation-count"], reverse=True)
for user_id, metrics in sorted_users[:5]:
print(f"User ID: {user_id}")
print(f" Conversations: {metrics['conversation-count']}")
print(f" Total Handle Time: {metrics['handle-time']}")
print(f" Total Hold Time: {metrics['hold-time']}")
print("-" * 20)
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 413 Entity Too Large
What causes it:
The JSON payload sent to /api/v2/analytics/conversations/details/query exceeds the server’s maximum request body size. This is common when querying large date ranges with detailed groupBy configurations or many selectBy filters.
How to fix it:
- Split the Date Range: As shown in the tutorial, reduce the
date_fromanddate_tointerval. Start with 7-day or 14-day chunks. - Reduce
groupByComplexity: If you are grouping by multiple dimensions (e.g.,user,queue,channel), the number of buckets can explode. Remove unnecessary group-by fields. - Limit
selectByEntries: If filtering by a list of IDs, ensure the list is not excessively long. If it is, consider splitting the query by ID batches as well.
Code Fix:
Adjust the days_per_chunk parameter in split_date_range to a smaller value (e.g., 7 instead of 14).
Error: 429 Too Many Requests
What causes it:
You are exceeding the rate limit for the Analytics API. This is common when looping through many date chunks or pages rapidly.
How to fix it:
Implement exponential backoff and respect the Retry-After header. The provided code includes a basic retry logic for 429 errors.
Error: 400 Bad Request
What causes it:
The query parameters are invalid. Common issues include:
date_fromis afterdate_to.- Invalid metric names.
granularitydoes not match the date range (e.g., using “day” granularity for a range larger than 365 days is not allowed in some contexts, though “hour” is generally safer for smaller ranges).
How to fix it:
Check the response.message in the exception. Ensure your date strings are in valid ISO 8601 format with timezone indicators.