How to Parse the Nested JSON Structure of a Genesys Cloud Analytics Conversation Aggregate Query
What You Will Build
- You will build a Python script that queries the Genesys Cloud Analytics API for conversation aggregates and flattens the deeply nested JSON response into a processable list of metrics.
- This tutorial uses the Genesys Cloud v2 Analytics REST API and the official
genesys-cloud-sdk-python. - The programming language covered is Python 3.8+.
Prerequisites
- OAuth Client Type: Service Account or Password Grant.
- Required Scopes:
analytics:conversation:viewis mandatory for reading analytics data. - SDK Version:
genesys-cloud-sdk-python>= 3.0.0. - Runtime: Python 3.8 or higher.
- Dependencies:
genesys-cloud-sdk-pythonpython-dotenv(for secure credential management)pandas(optional, for final data tabulation)
Authentication Setup
Genesys Cloud uses OAuth 2.0. The Python SDK handles token acquisition and refresh automatically when initialized with a service account client ID and secret. You must store these credentials in environment variables.
Create a .env file in your project root:
GENESYS_REGION="mypurecloud.com"
GENESYS_CLIENT_ID="your_client_id_here"
GENESYS_CLIENT_SECRET="your_client_secret_here"
Install the required packages:
pip install genesys-cloud-sdk-python python-dotenv pandas
Initialize the SDK client. The PlatformClient is the entry point for all API interactions.
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient
load_dotenv()
def get_platform_client() -> PlatformClient:
"""
Initializes the Genesys Cloud Platform Client using environment variables.
Returns:
PlatformClient: An authenticated client instance.
"""
client = PlatformClient()
client.set_environment(os.getenv('GENESYS_REGION'))
client.clientcredentials_authorize(
client_id=os.getenv('GENESYS_CLIENT_ID'),
client_secret=os.getenv('GENESYS_CLIENT_SECRET')
)
return client
Implementation
Step 1: Constructing the Aggregate Query
The POST /api/v2/analytics/conversations/aggregate/query endpoint requires a specific JSON body structure. This structure defines the time window, the entities (users, queues, skills), and the metrics you wish to retrieve.
Unlike detail queries, aggregate queries return summarized data. The response is not a flat list of records; it is a hierarchical structure containing groups and metrics.
from purecloudplatformclientv2.models import (
ConversationAggregateQuery,
ConversationAggregateInterval,
ConversationAggregateMetric,
ConversationAggregateFilter
)
from purecloudplatformclientv2.api import analytics_api
from datetime import datetime, timedelta
import pytz
def build_aggregate_query(client: PlatformClient):
"""
Constructs an aggregate query for the last 7 days, grouped by user.
Args:
client: Authenticated PlatformClient instance.
Returns:
ConversationAggregateQuery: The query object to be sent to the API.
"""
analytics = analytics_api.AnalyticsApi(client)
# Define the time window: Last 7 days
end_time = datetime.now(pytz.utc)
start_time = end_time - timedelta(days=7)
# Define the interval: Daily aggregation
interval = ConversationAggregateInterval(
type="day",
value=1
)
# Define the metrics to retrieve
# Note: 'offerCount' is a common metric for outbound campaigns
# 'wrapupCount' is common for inbound support
metrics = [
ConversationAggregateMetric(name="offerCount"),
ConversationAggregateMetric(name="wrapupCount"),
ConversationAggregateMetric(name="conversationCount")
]
# Define the groups: We want data broken down by User
# This creates the nested structure we will parse later
groups = [
ConversationAggregateFilter(
type="user",
id=None # Null ID means all users
)
]
# Assemble the full query
query = ConversationAggregateQuery(
start_time=start_time.isoformat(),
end_time=end_time.isoformat(),
interval=interval,
metrics=metrics,
groups=groups
)
return analytics, query
Critical Parameter Explanation:
interval: Determines the granularity of the time buckets. If you set this today, the response will contain one group per day. If you set it tohour, you get one group per hour.groups: This is the source of the nesting. If you addqueueanduserto groups, the JSON structure will nest users inside queues inside time intervals.
Step 2: Executing the Query and Handling Pagination
The Analytics API supports pagination via the pageSize parameter. For aggregate queries, the default page size is often small. You must handle pagination to ensure you retrieve all data points.
def execute_aggregate_query(analytics: analytics_api.AnalyticsApi, query: ConversationAggregateQuery):
"""
Executes the aggregate query and handles pagination.
Args:
analytics: The Analytics API client.
query: The ConversationAggregateQuery object.
Returns:
list: A list of all ConversationAggregateResponse objects.
"""
all_responses = []
page_size = 100
while True:
try:
# Execute the query
response = analytics.post_analytics_conversations_aggregate_query(
body=query,
page_size=page_size
)
# Append the current page to our collection
all_responses.append(response)
# Check if there are more pages
if response.page_token is None:
break
# Update the query with the next page token
# Note: The SDK does not automatically chain page tokens for POST queries
# in the same way it does for GET, so we must manually update the query object
# or use the token in the next request header if the SDK version supports it.
# In purecloudplatformclientv2, we typically pass the token in the next call.
# However, the post method signature usually expects the body.
# We rely on the 'page_token' field in the response to construct the next request.
# For the POST endpoint, pagination is handled by including the page_token
# in the subsequent request body or headers depending on SDK version.
# The standard approach in v3 SDK:
query.page_token = response.page_token
except Exception as e:
print(f"Error executing query: {e}")
break
return all_responses
Step 3: Parsing the Nested JSON Structure
This is the core complexity. The response from post_analytics_conversations_aggregate_query is a ConversationAggregateResponse object. This object contains a groups list. Each item in groups is a ConversationAggregateGroup object, which contains:
entity: The value of the group (e.g., User ID).metrics: A dictionary of metric names to their aggregated values.groups: A nested list of sub-groups (if multiple groupings were requested).
If you grouped by Day and User, the structure looks like this conceptually:
- Root Response
- Group: Day 1
- Metric: Total Calls (for all users on Day 1)
- Sub-Group: User A
- Metric: User A’s calls on Day 1
- Sub-Group: User B
- Metric: User B’s calls on Day 1
- Group: Day 2
- …
- Group: Day 1
You must recursively flatten this structure to get a usable dataset.
from typing import List, Dict, Any
def flatten_aggregate_groups(
groups: List[Any],
current_path: Dict[str, str] = None,
result: List[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Recursively flattens the nested ConversationAggregateGroup structure.
Args:
groups: List of ConversationAggregateGroup objects.
current_path: Dictionary tracking the hierarchy (e.g., {'day': '2023-10-01', 'user': '12345'}).
result: Accumulator list for flattened records.
Returns:
List[Dict[str, Any]]: A flat list of dictionaries containing metrics and grouping keys.
"""
if result is None:
result = []
if current_path is None:
current_path = {}
for group in groups:
# Extract the entity ID and name if available
# The 'entity' field is a string ID. The 'name' field might be present depending on the group type.
entity_id = group.entity
entity_name = group.name if hasattr(group, 'name') and group.name else "Unknown"
# Determine the key for this group level
# We need to know what type of group this is (user, queue, day, etc.)
# The SDK object does not explicitly label the "type" of the group in the response object itself
# in a simple way, so we often infer it from the query structure or use a generic key.
# However, for this tutorial, we will assume a single level of grouping for simplicity
# or use a generic 'group_entity' key.
# To make this robust, we usually pass the group type down from the query context.
# Here, we will use a simplified approach: store the ID.
new_path = current_path.copy()
new_path['entity_id'] = entity_id
new_path['entity_name'] = entity_name
# Extract metrics
metrics = {}
if group.metrics:
for metric_name, metric_value in group.metrics.items():
# Metric value is an object with 'value', 'count', etc.
if metric_value is not None:
metrics[metric_name] = metric_value.value
else:
metrics[metric_name] = 0
# If there are sub-groups, recurse
if group.groups:
flatten_aggregate_groups(group.groups, new_path, result)
else:
# Leaf node: append the record
record = {**new_path, **metrics}
result.append(record)
return result
def parse_aggregate_response(responses: List[Any]) -> List[Dict[str, Any]]:
"""
Parses the list of API responses into a flat list of records.
Args:
responses: List of ConversationAggregateResponse objects.
Returns:
List[Dict[str, Any]]: Flattened data ready for pandas or database insertion.
"""
all_records = []
for response in responses:
# The response object has a 'groups' attribute
if response.groups:
# We need to know the group type to label the keys properly.
# In a real scenario, you might map this based on your query definition.
# For this example, we assume the top-level groups are 'Day' intervals.
# Note: The SDK response does not explicitly tag the group with its type (e.g. 'user').
# You must correlate this with your query's 'groups' definition.
# Let's assume the first group in our query was 'interval' (Day)
# and the second was 'user'.
# We will call the recursive flattener
records = flatten_aggregate_groups(response.groups)
all_records.extend(records)
return all_records
Refining the Parser for Multiple Groupings:
The previous flatten_aggregate_groups function is generic. To make it production-ready, you should pass the group types from your query definition so you can label the columns correctly (e.g., user_id vs queue_id).
def parse_with_context(
responses: List[Any],
query_groups: List[ConversationAggregateFilter]
) -> List[Dict[str, Any]]:
"""
Parses responses while mapping group keys to their types.
Args:
responses: List of API response objects.
query_groups: The list of filters used in the query to determine key names.
Returns:
List[Dict[str, Any]]: Flattened records with named keys.
"""
def recursive_parse(
groups: List[Any],
group_index: int,
current_record: Dict[str, Any],
result: List[Dict[str, Any]]
):
if not groups:
return
# Determine the key name for this level
if group_index < len(query_groups):
group_type = query_groups[group_index].type
key_name = f"{group_type}_id"
name_key = f"{group_type}_name"
else:
key_name = f"group_{group_index}_id"
name_key = f"group_{group_index}_name"
for group in groups:
# Update current record with this group's identity
current_record[key_name] = group.entity
current_record[name_key] = group.name if group.name else ""
# Add metrics if this is a leaf node (no sub-groups)
if not group.groups:
if group.metrics:
for metric_name, metric_val in group.metrics.items():
current_record[metric_name] = metric_val.value if metric_val else 0
result.append(current_record.copy()) # Append a copy to avoid reference issues
else:
# Recurse into sub-groups
recursive_parse(group.groups, group_index + 1, current_record, result)
# Cleanup: remove the keys added at this level for the next sibling
del current_record[key_name]
del current_record[name_key]
all_records = []
for response in responses:
if response.groups:
recursive_parse(response.groups, 0, {}, all_records)
return all_records
Step 4: Processing Results into a DataFrame
Once flattened, the data is easily converted into a Pandas DataFrame for analysis or export.
import pandas as pd
def process_results(records: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Converts the list of records into a Pandas DataFrame.
Args:
records: List of flattened dictionaries.
Returns:
pd.DataFrame: Structured data table.
"""
if not records:
return pd.DataFrame()
df = pd.DataFrame(records)
# Clean up column names if necessary
# Ensure numeric columns are numeric
numeric_cols = ['offerCount', 'wrapupCount', 'conversationCount']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
Complete Working Example
This script combines all steps into a single executable module.
import os
import sys
from dotenv import load_dotenv
from purecloudplatformclientv2 import PlatformClient
from purecloudplatformclientv2.api import analytics_api
from purecloudplatformclientv2.models import (
ConversationAggregateQuery,
ConversationAggregateInterval,
ConversationAggregateMetric,
ConversationAggregateFilter
)
from datetime import datetime, timedelta
import pytz
import pandas as pd
from typing import List, Dict, Any
# Load environment variables
load_dotenv()
def get_platform_client() -> PlatformClient:
client = PlatformClient()
client.set_environment(os.getenv('GENESYS_REGION'))
client.clientcredentials_authorize(
client_id=os.getenv('GENESYS_CLIENT_ID'),
client_secret=os.getenv('GENESYS_CLIENT_SECRET')
)
return client
def build_query() -> tuple:
end_time = datetime.now(pytz.utc)
start_time = end_time - timedelta(days=7)
interval = ConversationAggregateInterval(type="day", value=1)
metrics = [
ConversationAggregateMetric(name="offerCount"),
ConversationAggregateMetric(name="wrapupCount")
]
groups = [
ConversationAggregateFilter(type="day"),
ConversationAggregateFilter(type="user")
]
query = ConversationAggregateQuery(
start_time=start_time.isoformat(),
end_time=end_time.isoformat(),
interval=interval,
metrics=metrics,
groups=groups
)
return analytics_api.AnalyticsApi(get_platform_client()), query
def parse_with_context(
responses: List[Any],
query_groups: List[ConversationAggregateFilter]
) -> List[Dict[str, Any]]:
def recursive_parse(
groups: List[Any],
group_index: int,
current_record: Dict[str, Any],
result: List[Dict[str, Any]]
):
if not groups:
return
if group_index < len(query_groups):
group_type = query_groups[group_index].type
key_name = f"{group_type}_id"
name_key = f"{group_type}_name"
else:
key_name = f"group_{group_index}_id"
name_key = f"group_{group_index}_name"
for group in groups:
current_record[key_name] = group.entity
current_record[name_key] = group.name if group.name else ""
if not group.groups:
if group.metrics:
for metric_name, metric_val in group.metrics.items():
current_record[metric_name] = metric_val.value if metric_val else 0
result.append(current_record.copy())
else:
recursive_parse(group.groups, group_index + 1, current_record, result)
del current_record[key_name]
del current_record[name_key]
all_records = []
for response in responses:
if response.groups:
recursive_parse(response.groups, 0, {}, all_records)
return all_records
def main():
try:
analytics, query = build_query()
# Execute Query
all_responses = []
page_size = 100
query.page_size = page_size
while True:
response = analytics.post_analytics_conversations_aggregate_query(body=query)
all_responses.append(response)
if response.page_token is None:
break
query.page_token = response.page_token
# Parse Nested JSON
flat_records = parse_with_context(all_responses, query.groups)
# Convert to DataFrame
df = pd.DataFrame(flat_records)
# Display Results
print(f"Total records fetched: {len(df)}")
if not df.empty:
print(df.head(10))
df.to_csv("analytics_aggregate_results.csv", index=False)
print("Results saved to analytics_aggregate_results.csv")
else:
print("No data found for the specified criteria.")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token has expired, or the Client ID/Secret is incorrect.
Fix: Ensure your .env file is loaded correctly. The PlatformClient automatically refreshes tokens, but if the initial authorization fails, it throws an exception. Check the credentials in the Genesys Cloud Admin Console under Developers > OAuth Client Credentials.
Error: 403 Forbidden
Cause: The OAuth client does not have the analytics:conversation:view scope.
Fix: Go to Developers > OAuth Client Credentials, select your client, and ensure analytics:conversation:view is checked in the Scopes section. Save the changes. Note that scope changes may take a few minutes to propagate.
Error: Empty Response or Null Metrics
Cause: The query parameters do not match any conversations in the time window.
Fix: Verify the start_time and end_time. Ensure the interval type (day, hour) matches your expectations. Check if the users or queues specified actually had activity. If you filter by a specific User ID that has no conversations in that period, the metric values will be null or the group may be omitted entirely depending on the SDK version.
Error: RecursionError in Parser
Cause: The nesting depth exceeds Python’s recursion limit.
Fix: This is rare in Genesys Cloud analytics as nesting is usually shallow (e.g., Day → User → Skill). If you encounter this, switch the recursive_parse function to an iterative stack-based approach. For typical use cases, the default recursion limit (1000) is sufficient.