Splitting 90-Day Analytics Queries to Avoid 413 Entity Too Large Errors
What You Will Build
- A Python script that dynamically splits a 90-day date range into smaller, compliant chunks to query Genesys Cloud Conversation Analytics.
- An implementation using the
genesys-cloud-sdk-pythonlibrary that handles pagination, token refresh, and 413 error recovery. - A robust data aggregation pattern that merges partial results into a single coherent dataset.
Prerequisites
- OAuth Client Type: Private key or Client Credentials flow.
- Required Scopes:
analytics:conversation:view(for reading analytics data). - SDK Version:
genesys-cloud-sdk-python>= 170.0.0. - Language/Runtime: Python 3.9+.
- External Dependencies:
genesys-cloud-sdk-pythonpandas(for data manipulation, optional but recommended for this tutorial)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for all API interactions. The SDK handles the complexity of token acquisition and automatic refresh, but you must configure the client correctly. Using a private key is the standard for server-to-server integrations.
Install the SDK:
pip install genesys-cloud-sdk-python pandas
Initialize the client. The PureCloudPlatformClientV2 manages the session state.
import os
from purecloud_platform_client_v2 import PureCloudPlatformClientV2, Configuration
# Load credentials from environment variables
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
def get_platform_client() -> PureCloudPlatformClientV2:
"""
Initializes and returns a configured Genesys Cloud platform client.
"""
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Create a new client instance
client = PureCloudPlatformClientV2()
# Configure the OAuth client
# The SDK supports automatic token refresh when using client credentials
client.set_oauth_client_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
environment=ENVIRONMENT
)
# Set the base URL explicitly if needed, though environment usually suffices
client.set_base_url(f"https://{ENVIRONMENT}")
return client
Implementation
Step 1: Define the Query Structure
The Genesys Cloud Analytics API (/api/v2/analytics/conversations/details/query) accepts a JSON body defining the time range, groupings, and metrics. A 413 Entity Too Large error occurs when the resulting dataset exceeds the server’s processing limits for a single request, or when the request body itself is overly complex. While the 90-day span is often too large for a single details query due to record counts, it is also common for summary queries to hit limits if you request too many groupings.
We will define a base query object that excludes the interval (time range), allowing us to inject different start and end dates dynamically.
from purecloud_platform_client_v2.models import ConversationQueryRequest
def build_base_query() -> dict:
"""
Builds a base analytics query structure.
We exclude the interval here to inject it per chunk.
"""
# Define the metrics you want to analyze
# For this example, we look at average handle time and total calls
query_body = {
"interval": "", # Placeholder, will be replaced
"groupings": ["user"], # Group by agent
"metrics": [
"conversation/count",
"conversation/hold/total",
"conversation/wait/total",
"conversation/work/total"
],
"filters": [
{
"field": "conversation/type",
"operator": "in",
"value": ["voice"] # Only voice conversations
}
]
}
return query_body
Step 2: Implement the Chunking Logic
The core strategy is to break the 90-day period into smaller intervals. Genesys Cloud analytics endpoints generally handle 7-day or 30-day intervals well for details, and up to 365 days for summaries. However, if you are hitting 413, you must reduce the window.
We will use a 7-day chunk size. This ensures each request is lightweight and reduces the risk of timeout or size limits.
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
def generate_date_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> list[tuple[datetime, datetime]]:
"""
Splits a date range into smaller chunks.
Args:
start_date: The beginning of the analysis period.
end_date: The end of the analysis period.
chunk_days: The number of days per chunk (e.g., 7).
Returns:
A list of tuples, each containing (chunk_start, chunk_end).
"""
chunks = []
current_start = start_date
while current_start < end_date:
chunk_end = min(current_start + timedelta(days=chunk_days), end_date)
chunks.append((current_start, chunk_end))
current_start = chunk_end
return chunks
Step 3: Execute Queries with Retry Logic
The SDK provides an AnalyticsApi client. We will iterate through the chunks, execute the query, and handle potential 413 errors explicitly. Although chunking should prevent 413s, it is good practice to include a fallback retry mechanism for transient issues.
from purecloud_platform_client_v2.rest import ApiException
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def query_analytics_chunk(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
"""
Executes a single analytics query for a specific time chunk.
Args:
client: The initialized Genesys Cloud client.
start: Start date of the chunk.
end: End date of the chunk.
query_template: The base query dictionary.
Returns:
A list of result objects from the API.
"""
# Clone the template and set the interval
# Format: ISO 8601 with timezone
interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
request_body = query_template.copy()
request_body["interval"] = interval_str
api_instance = client.analytics_api
try:
# Call the API
# Note: The SDK method name corresponds to POST /api/v2/analytics/conversations/details/query
response = api_instance.post_analytics_conversations_details_query(
body=request_body,
async_req=False
)
logger.info(f"Successfully retrieved data for {start.date()} to {end.date()}")
return response.entities
except ApiException as e:
if e.status == 413:
logger.error(f"413 Entity Too Large for interval {interval_str}. Consider reducing chunk size.")
raise
elif e.status == 429:
# Rate limit handling
retry_after = int(e.headers.get('Retry-After', 5))
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
# Simple retry once
return query_analytics_chunk(client, start, end, query_template)
else:
logger.error(f"API Error {e.status}: {e.body}")
raise
Step 4: Aggregate Results
The API returns paginated results. You must iterate through all pages for each chunk to ensure you capture all data. The post_analytics_conversations_details_query returns an object with an entities list and a nextPageUri.
def fetch_all_pages(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
"""
Fetches all pages of results for a single chunk.
"""
all_entities = []
# Initial call
response = query_analytics_chunk(client, start, end, query_template)
all_entities.extend(response)
# Check for pagination
# Note: The SDK response object structure may vary slightly by version.
# In newer SDKs, the response object from post_analytics_conversations_details_query
# is a ConversationDetailsQueryResponse.
# We need to check if there is a next page.
# However, the SDK often handles pagination internally if you use the iterator,
# but for explicit control, we check the response object.
# The raw response object from the API call contains 'nextPageUri'.
# Since the SDK wraps this, we look at the return value of the API method.
# Actually, the SDK method returns the body. We need to access the full response to get nextPageUri?
# No, the SDK's post_analytics_conversations_details_query returns the entity list directly in some versions,
# or a wrapper object. Let's assume the standard wrapper.
# Correction: The SDK method post_analytics_conversations_details_query returns
# a 'ConversationDetailsQueryResponse' object.
# We need to check if 'next_page_uri' exists.
# Let's adjust the previous function to return the full response object to handle pagination here.
pass
# Revised approach for pagination handling within the chunk function
def query_analytics_chunk_with_pagination(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
"""
Executes a single analytics query for a specific time chunk, handling pagination.
"""
all_entities = []
interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
request_body = query_template.copy()
request_body["interval"] = interval_str
api_instance = client.analytics_api
next_page_uri = None
while True:
try:
if next_page_uri:
# Subsequent calls use the next page URI
response = api_instance.get_analytics_conversations_details_query(
next_page_uri=next_page_uri,
async_req=False
)
else:
# Initial call
response = api_instance.post_analytics_conversations_details_query(
body=request_body,
async_req=False
)
# Extract entities
if response.entities:
all_entities.extend(response.entities)
# Check for next page
if response.next_page_uri:
next_page_uri = response.next_page_uri
else:
break # No more pages
except ApiException as e:
if e.status == 413:
logger.error(f"413 Entity Too Large for interval {interval_str}.")
raise
elif e.status == 429:
retry_after = int(e.headers.get('Retry-After', 5))
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
continue # Retry the current page
else:
logger.error(f"API Error {e.status}: {e.body}")
raise
return all_entities
Complete Working Example
This script combines all components into a runnable module. It retrieves conversation analytics for the last 90 days, split into 7-day chunks.
import os
import logging
from datetime import datetime, timedelta
from purecloud_platform_client_v2 import PureCloudPlatformClientV2
from purecloud_platform_client_v2.rest import ApiException
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_platform_client() -> PureCloudPlatformClientV2:
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
client = PureCloudPlatformClientV2()
client.set_oauth_client_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
environment=ENVIRONMENT
)
return client
def build_base_query() -> dict:
return {
"interval": "",
"groupings": ["user"],
"metrics": [
"conversation/count",
"conversation/hold/total",
"conversation/wait/total",
"conversation/work/total"
],
"filters": [
{
"field": "conversation/type",
"operator": "in",
"value": ["voice"]
}
]
}
def generate_date_chunks(start_date: datetime, end_date: datetime, chunk_days: int = 7) -> list[tuple[datetime, datetime]]:
chunks = []
current_start = start_date
while current_start < end_date:
chunk_end = min(current_start + timedelta(days=chunk_days), end_date)
chunks.append((current_start, chunk_end))
current_start = chunk_end
return chunks
def query_analytics_chunk_with_pagination(client: PureCloudPlatformClientV2, start: datetime, end: datetime, query_template: dict) -> list:
all_entities = []
interval_str = f"{start.isoformat()}Z/{end.isoformat()}Z"
request_body = query_template.copy()
request_body["interval"] = interval_str
api_instance = client.analytics_api
next_page_uri = None
while True:
try:
if next_page_uri:
response = api_instance.get_analytics_conversations_details_query(
next_page_uri=next_page_uri,
async_req=False
)
else:
response = api_instance.post_analytics_conversations_details_query(
body=request_body,
async_req=False
)
if response.entities:
all_entities.extend(response.entities)
if response.next_page_uri:
next_page_uri = response.next_page_uri
else:
break
except ApiException as e:
if e.status == 413:
logger.error(f"413 Entity Too Large for interval {interval_str}.")
raise
elif e.status == 429:
retry_after = int(e.headers.get('Retry-After', 5))
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
continue
else:
logger.error(f"API Error {e.status}: {e.body}")
raise
return all_entities
def main():
try:
client = get_platform_client()
logger.info("Client initialized successfully.")
# Define date range: Last 90 days
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
logger.info(f"Querying analytics from {start_date.date()} to {end_date.date()}")
# Generate chunks
chunks = generate_date_chunks(start_date, end_date, chunk_days=7)
logger.info(f"Splitting date range into {len(chunks)} chunks.")
all_data = []
# Process each chunk
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}: {chunk_start.date()} to {chunk_end.date()}")
query_template = build_base_query()
try:
chunk_data = query_analytics_chunk_with_pagination(client, chunk_start, chunk_end, query_template)
all_data.extend(chunk_data)
logger.info(f"Retrieved {len(chunk_data)} records for this chunk.")
except Exception as e:
logger.error(f"Failed to process chunk {i+1}: {e}")
# Depending on requirements, you might want to skip or abort
continue
logger.info(f"Query complete. Total records retrieved: {len(all_data)}")
# Optional: Save to CSV or process further
if all_data:
import pandas as pd
# Flatten the data if necessary for pandas
# The structure depends on the exact metrics and groupings
df = pd.DataFrame(all_data)
df.to_csv("analytics_90_days.csv", index=False)
logger.info("Data saved to analytics_90_days.csv")
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 413 Entity Too Large
What causes it:
The server rejects the request because the expected payload size (either the request body or the resulting dataset) exceeds the configured limit. For Analytics APIs, this often happens when:
- The time interval is too large (e.g., >30 days for details queries).
- The number of groupings is excessive (e.g., grouping by
user,skill, andqueuesimultaneously). - The number of metrics requested is high.
How to fix it:
- Reduce Chunk Size: Change
chunk_daysingenerate_date_chunksfrom 7 to 3 or 1. - Simplify Groupings: Remove unnecessary groupings from
query_template. - Use Summary API: If you do not need individual conversation records, switch to
/api/v2/analytics/conversations/summary/query.
Code Fix:
Adjust the chunk size in the main function:
# Change from 7 to 3 days
chunks = generate_date_chunks(start_date, end_date, chunk_days=3)
Error: 401 Unauthorized
What causes it:
The OAuth token is invalid, expired, or the client credentials are incorrect.
How to fix it:
- Verify
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare correct. - Ensure the client has the
analytics:conversation:viewscope. - Check if the private key has been revoked.
Error: 429 Too Many Requests
What causes it:
You are exceeding the Genesys Cloud API rate limits. Analytics queries are heavy and consume more quota than standard CRUD operations.
How to fix it:
- Implement exponential backoff.
- Reduce the frequency of calls by increasing the chunk size (if not hitting 413).
- Use the
Retry-Afterheader value as shown in the code.