Parse Genesys Cloud v2 Analytics Conversation Aggregate JSON
What You Will Build
- You will build a Python script that queries the Genesys Cloud Analytics API for conversation aggregates and flattens the nested JSON response into a structured list of dictionaries.
- This tutorial uses the
PureCloudPlatformClientV2Python SDK and the underlying HTTP response structures. - The programming language covered is Python 3.9+.
Prerequisites
- OAuth Client Type: A Public or Confidential Client with the scope
analytics:conversation:view. - SDK Version:
genesys-cloud-purecloud-platform-clientv123.0.0 or later. - Runtime: Python 3.9+ with
venvorconda. - Dependencies:
genesys-cloud-purecloud-platform-clientpandas(optional, for final data manipulation)python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for all API requests. For server-to-server integrations, you must use the Client Credentials flow. The SDK handles token caching and refresh automatically, but you must initialize the client correctly.
Create a .env file in your project root:
# .env
GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_ENVIRONMENT=us-east-1 # e.g., us-east-1, eu-west-1
Initialize the API client in your script:
import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
AnalyticsApi
)
load_dotenv()
def get_analytics_api_instance() -> AnalyticsApi:
"""
Configures and returns an authenticated AnalyticsApi instance.
"""
# 1. Load configuration from environment variables
configuration = Configuration()
configuration.host = f"https://{os.getenv('GENESYS_ENVIRONMENT')}.mypurecloud.com"
# 2. Set up the OAuth client credentials
configuration.client_id = os.getenv('GENESYS_CLIENT_ID')
configuration.client_secret = os.getenv('GENESYS_CLIENT_SECRET')
# 3. Create the API client with automatic token management
api_client = ApiClient(configuration=configuration)
# 4. Instantiate the specific API module
analytics_api = AnalyticsApi(api_client)
return analytics_api
Implementation
Step 1: Constructing the Aggregate Query
The endpoint POST /api/v2/analytics/conversations/aggregates/query accepts a complex JSON body. The most common mistake is misaligning the filters, groups, and metrics structures.
You must define:
timeFilter: The start and end time (ISO 8601).filters: Constraints on the data (e.g.,queue.id,conversation.type).groups: How to split the data (e.g., byqueue.id).metrics: What numbers to calculate (e.g.,conversation.count,handledDuration).
from purecloudplatformclientv2 import (
ConversationAggregateQuery,
ConversationTimeFilter,
ConversationAggregateGroup,
ConversationAggregateMetric
)
from datetime import datetime, timedelta, timezone
def build_query_body() -> ConversationAggregateQuery:
"""
Constructs a query for the last 24 hours of voice conversations,
grouped by Queue ID.
"""
now = datetime.now(timezone.utc)
start_time = now - timedelta(hours=24)
# 1. Time Filter
time_filter = ConversationTimeFilter(
start_time=start_time.isoformat(),
end_time=now.isoformat()
)
# 2. Filters: Only include Voice conversations
# Note: The SDK allows direct object construction or dictionary mapping.
# Using dictionaries is often clearer for complex nested filters.
filters = {
"conversationType": {
"operation": "equals",
"value": "voice"
}
}
# 3. Groups: Split results by Queue ID
groups = [
ConversationAggregateGroup(
field="queue.id",
type="queue"
)
]
# 4. Metrics: Count of conversations and total handled duration
metrics = [
ConversationAggregateMetric(name="conversationCount"),
ConversationAggregateMetric(name="handledDuration")
]
# Assemble the final query object
query = ConversationAggregateQuery(
time_filter=time_filter,
filters=filters,
groups=groups,
metrics=metrics,
size=50 # Max 50 items per page for aggregates
)
return query
Step 2: Executing the Query and Handling Pagination
The Analytics API returns paginated results. You must check the nextPageToken in the response to fetch subsequent pages. If you ignore pagination, you will only analyze the first 50 records.
from purecloudplatformclientv2.rest import ApiException
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_all_aggregates(analytics_api: AnalyticsApi, query: ConversationAggregateQuery):
"""
Fetches all pages of aggregate data.
"""
all_results = []
page_token = None
page_number = 1
while True:
try:
logger.info(f"Fetching page {page_number}...")
# The SDK method expects the query object
response = analytics_api.post_analytics_conversations_aggregates_query(
body=query,
page_token=page_token
)
# Append the current page's entities
if response.entities:
all_results.extend(response.entities)
# Check for next page
if response.next_page_token:
page_token = response.next_page_token
page_number += 1
else:
logger.info("No more pages found.")
break
except ApiException as e:
logger.error(f"API Exception: {e.status} {e.reason}")
if e.status == 429:
logger.warning("Rate limited. Implement exponential backoff in production.")
elif e.status in [401, 403]:
logger.error("Authentication or Authorization failed. Check scopes.")
raise
return all_results
Step 3: Parsing the Nested JSON Structure
The core of this tutorial is parsing the entities list. Each entity is a ConversationAggregateEntity. The structure is nested because the groups and metrics are separated.
Raw JSON Structure Example:
{
"entities": [
{
"id": "queue-id-123",
"groups": {
"queue.id": "queue-id-123"
},
"metrics": {
"conversationCount": { "value": 150 },
"handledDuration": { "value": 45000 }
}
}
]
}
The Parsing Logic:
- Iterate through
response.entities. - Extract the group key (e.g.,
queue.id). - Extract the metric values. Note that metrics are wrapped in an object with a
valuekey. - Flatten into a simple dictionary for database insertion or DataFrame creation.
from purecloudplatformclientv2 import ConversationAggregateEntity
from typing import List, Dict, Any
def parse_aggregate_entities(entities: List[ConversationAggregateEntity]) -> List[Dict[str, Any]]:
"""
Flattens nested ConversationAggregateEntity objects into simple dictionaries.
"""
parsed_data = []
for entity in entities:
# Initialize a record for this aggregate
record = {}
# 1. Extract Group Keys
# The 'groups' attribute is a dictionary where keys are the group fields
# and values are the actual group values (e.g., Queue ID, User ID).
if entity.groups:
for group_field, group_value in entity.groups.items():
# Sanitize field names for use as dictionary keys
safe_key = group_field.replace(".", "_")
record[safe_key] = group_value
# 2. Extract Metric Values
# The 'metrics' attribute is a dictionary where keys are metric names
# and values are objects containing 'value', 'min', 'max', etc.
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
# We primarily want the aggregate value
# metric_obj is a ConversationAggregateMetricValue object
if hasattr(metric_obj, 'value') and metric_obj.value is not None:
record[metric_name] = metric_obj.value
else:
record[metric_name] = 0 # Handle null metrics gracefully
# 3. Add Metadata (Optional)
# The entity ID is usually the composite key of the groups
record['aggregate_id'] = entity.id
parsed_data.append(record)
return parsed_data
Complete Working Example
This script combines authentication, query construction, pagination, and parsing into a single runnable file.
import os
import json
import logging
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
AnalyticsApi,
ConversationAggregateQuery,
ConversationTimeFilter,
ConversationAggregateGroup,
ConversationAggregateMetric
)
from purecloudplatformclientv2.rest import ApiException
from typing import List, Dict, Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_analytics_api_instance() -> AnalyticsApi:
"""Initializes the Genesys Cloud Analytics API client."""
load_dotenv()
configuration = Configuration()
env = os.getenv('GENESYS_ENVIRONMENT', 'us-east-1')
configuration.host = f"https://{env}.mypurecloud.com"
configuration.client_id = os.getenv('GENESYS_CLIENT_ID')
configuration.client_secret = os.getenv('GENESYS_CLIENT_SECRET')
if not configuration.client_id or not configuration.client_secret:
raise ValueError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET in .env")
api_client = ApiClient(configuration=configuration)
return AnalyticsApi(api_client)
def build_query_body() -> ConversationAggregateQuery:
"""Builds the query for voice conversations grouped by Queue."""
now = datetime.now(timezone.utc)
start_time = now - timedelta(hours=24)
time_filter = ConversationTimeFilter(
start_time=start_time.isoformat(),
end_time=now.isoformat()
)
filters = {
"conversationType": {
"operation": "equals",
"value": "voice"
}
}
groups = [
ConversationAggregateGroup(field="queue.id", type="queue")
]
metrics = [
ConversationAggregateMetric(name="conversationCount"),
ConversationAggregateMetric(name="handledDuration"),
ConversationAggregateMetric(name="abandonCount")
]
return ConversationAggregateQuery(
time_filter=time_filter,
filters=filters,
groups=groups,
metrics=metrics,
size=50
)
def fetch_all_aggregates(analytics_api: AnalyticsApi, query: ConversationAggregateQuery) -> List[Any]:
"""Fetches all pages of aggregate data."""
all_entities = []
page_token = None
page_number = 1
while True:
try:
logger.info(f"Fetching page {page_number}...")
response = analytics_api.post_analytics_conversations_aggregates_query(
body=query,
page_token=page_token
)
if response.entities:
all_entities.extend(response.entities)
if response.next_page_token:
page_token = response.next_page_token
page_number += 1
else:
break
except ApiException as e:
logger.error(f"API Error: {e.status} {e.reason}")
if e.body:
logger.error(f"Response Body: {e.body}")
raise
return all_entities
def parse_aggregate_entities(entities: List[Any]) -> List[Dict[str, Any]]:
"""Flattens nested entity objects into simple dictionaries."""
parsed_data = []
for entity in entities:
record = {}
# Extract Groups
if entity.groups:
for group_field, group_value in entity.groups.items():
record[group_field.replace(".", "_")] = group_value
# Extract Metrics
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
# Access the 'value' attribute from the metric object
value = metric_obj.value if hasattr(metric_obj, 'value') else 0
record[metric_name] = value
parsed_data.append(record)
return parsed_data
def main():
try:
logger.info("Initializing Analytics API...")
analytics_api = get_analytics_api_instance()
logger.info("Building Query...")
query = build_query_body()
logger.info("Fetching Data...")
raw_entities = fetch_all_aggregates(analytics_api, query)
logger.info(f"Fetched {len(raw_entities)} total entities. Parsing...")
flattened_data = parse_aggregate_entities(raw_entities)
# Output results
logger.info("Results:")
for row in flattened_data:
print(json.dumps(row, indent=2))
# Optional: Save to JSON file
with open('analytics_aggregates.json', 'w') as f:
json.dump(flattened_data, f, indent=2)
logger.info("Data saved to analytics_aggregates.json")
except Exception as e:
logger.error(f"Fatal error: {e}")
raise
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request - “Invalid Query”
Cause: The timeFilter is malformed, or the groups field contains a value that does not exist in your Genesys Cloud instance (e.g., grouping by user.id when no users are associated with the filtered conversations).
Fix:
- Verify
start_timeandend_timeare valid ISO 8601 strings with timezone info (e.g.,2023-10-27T12:00:00Z). - Ensure the
filtersare not too restrictive. If you filter by a specificqueue.idthat had no conversations in the last 24 hours, the API returns an empty list, not an error. However, if you group by a field that is incompatible with the metric, you get a 400.
Debug Code:
# Print the raw JSON body before sending
import json
query_dict = analytics_api.api_client.sanitize_for_serialization(query)
print("Sending Query:", json.dumps(query_dict, indent=2))
Error: 429 Too Many Requests
Cause: You are hitting the Analytics API rate limit. Analytics queries are expensive.
Fix: Implement exponential backoff. The SDK does not handle retries automatically for 429s.
import time
def fetch_with_retry(analytics_api, query, max_retries=3):
for attempt in range(max_retries):
try:
return analytics_api.post_analytics_conversations_aggregates_query(
body=query
)
except ApiException as e:
if e.status == 429:
wait_time = 2 ** attempt
logger.warning(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error: Metric Value is None
Cause: The metric exists in the response structure, but the value is null because no data matched the filter for that specific group.
Fix: Always check for None before processing numeric metrics.
if metric_obj.value is not None:
record[metric_name] = metric_obj.value
else:
record[metric_name] = 0