Parsing Genesys Cloud v2 Analytics Conversation Aggregate Events
What You Will Build
- You will build a Python application that queries the Genesys Cloud Analytics API for conversation aggregates and parses the deeply nested JSON response to extract specific metrics such as wrap-up time and hold time.
- This tutorial uses the Genesys Cloud v2 Analytics API endpoint
/api/v2/analytics/conversations/summary/queryvia the officialgenesyscloudPython SDK. - The programming language covered is Python 3.10+.
Prerequisites
- OAuth Client: You need a Genesys Cloud OAuth application with the
agentorconfidentialclient type. - Required Scopes: The client must be granted the
analytics:conversation:readscope. Without this, the API will return a 403 Forbidden error. - SDK Version:
genesyscloud>= 135.0.0 (The SDK structure remains consistent across recent minor versions). - Runtime: Python 3.10 or higher.
- Dependencies:
genesyscloud: The official Genesys Cloud Python SDK.python-dotenv: For secure management of environment variables.
Install the dependencies using pip:
pip install genesyscloud python-dotenv
Authentication Setup
The Genesys Cloud SDK handles the OAuth 2.0 Client Credentials flow automatically when initialized. You must provide your client_id, client_secret, and the environment (e.g., mypurecloud.com).
Create a file named .env in your project root:
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
GENESYS_ENVIRONMENT=mypurecloud.com
Initialize the SDK in your Python script. The PlatformClient is the entry point for all API interactions.
import os
from dotenv import load_dotenv
from purecloud_platform_client import PlatformClient
load_dotenv()
def get_platform_client():
"""
Initializes and returns the Genesys Cloud PlatformClient.
Raises an exception if credentials are missing.
"""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET in environment variables.")
platform_client = PlatformClient(client_id, client_secret, environment)
# Optional: Configure retry logic for 429 Rate Limits
# The SDK handles retries internally, but you can configure the max retries.
# platform_client.set_configuration_option("max_retries", 5)
return platform_client
Implementation
Step 1: Constructing the Aggregate Query
The Analytics API does not return raw logs. It returns aggregated data based on a query body. The response structure is a nested dictionary containing entities, divisions, and potentially groups.
You must define a ConversationSummaryQuery object. This object specifies the date range, the metrics to retrieve, and the grouping strategy.
Critical Note on Grouping: If you do not specify a group, the API returns a single aggregate bucket for the entire date range. If you group by conversationType, the response splits into multiple entities. This tutorial focuses on a flat aggregate for simplicity, but the parsing logic scales to grouped data.
from purecloud_platform_client.rest import ApiException
from purecloud_platform_client.models import ConversationSummaryQuery, DateRange, ConversationSummaryQueryFilter
def build_aggregate_query(platform_client: PlatformClient):
"""
Builds a ConversationSummaryQuery for the last 24 hours.
"""
api_instance = platform_client.analytics_api
# Define the date range (Last 24 Hours)
# The API requires ISO 8601 format strings
import datetime
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(hours=24)
date_range = DateRange(
start=start_time.isoformat() + "Z",
end=end_time.isoformat() + "Z"
)
# Define the metrics you want to retrieve
# Common metrics: wrapupTime, holdTime, talkTime, waitTime
query_filter = ConversationSummaryQueryFilter(
date_range=date_range,
# Optional: Filter by specific conversation type (e.g., 'voice', 'chat')
# conversation_type="voice"
)
# Construct the main query object
query = ConversationSummaryQuery(
filter=query_filter,
# Specify the metrics to include in the response
metrics=[
"wrapupTime",
"holdTime",
"talkTime",
"waitTime",
"conversationCount"
]
)
return api_instance, query
Step 2: Executing the Query and Handling Errors
Execute the query using post_analytics_conversations_summary_query. This endpoint accepts the query body and returns a ConversationSummaryQueryResponse.
You must handle ApiException to catch 401 (Unauthorized), 403 (Forbidden/Scope Issue), and 429 (Rate Limit) errors.
def execute_query(platform_client: PlatformClient):
"""
Executes the aggregate query and returns the response object.
"""
api_instance, query = build_aggregate_query(platform_client)
try:
# Execute the POST request
response = api_instance.post_analytics_conversations_summary_query(
body=query
)
return response
except ApiException as e:
print(f"Exception when calling AnalyticsApi->post_analytics_conversations_summary_query: {e}\n")
if e.status == 401:
raise Exception("Authentication failed. Check Client ID and Secret.")
elif e.status == 403:
raise Exception("Forbidden. Ensure the OAuth client has 'analytics:conversation:read' scope.")
elif e.status == 429:
raise Exception("Rate Limited. Wait before retrying.")
else:
raise e
Step 3: Parsing the Nested JSON Structure
This is the most complex part. The ConversationSummaryQueryResponse object contains a property called entities. This is a list of ConversationSummaryEntity objects.
Inside each ConversationSummaryEntity, you will find:
id: The identifier for the bucket (e.g., “all” if no grouping is used).metrics: A dictionary of metric names toConversationSummaryMetricobjects.division: (Optional) If grouped by division.
Each ConversationSummaryMetric object contains:
value: The raw numeric value (e.g., seconds).unit: The unit of measurement (e.g., “s”, “ms”).label: A human-readable label.
Parsing Strategy:
- Access
response.entities. - Iterate through the list.
- Access the
metricsdictionary within the entity. - Extract the
valuefrom specific metric keys.
def parse_aggregate_response(response):
"""
Parses the ConversationSummaryQueryResponse to extract specific metrics.
Returns a dictionary of metrics with their values and units.
"""
if not response or not response.entities:
print("No entities found in response. Check date range and filters.")
return {}
parsed_data = {}
# The entities list may contain multiple items if grouped by time or type
for entity in response.entities:
entity_id = entity.id
print(f"Processing Entity ID: {entity_id}")
entity_metrics = {}
# The metrics attribute is a dict: { "wrapupTime": ConversationSummaryMetric, ... }
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
# metric_obj is a ConversationSummaryMetric object
value = metric_obj.value
unit = metric_obj.unit
# Store in a clean structure
entity_metrics[metric_name] = {
"value": value,
"unit": unit
}
parsed_data[entity_id] = entity_metrics
return parsed_data
Step 4: Processing and Formatting Results
Raw seconds are difficult to read. A helper function to convert seconds to minutes or hours improves usability.
def format_time(seconds: float, unit: str) -> str:
"""
Converts seconds to a human-readable string.
"""
if seconds is None:
return "N/A"
if unit == "s":
minutes = seconds / 60
return f"{minutes:.2f} minutes"
elif unit == "ms":
seconds_val = seconds / 1000
return f"{seconds_val:.2f} seconds"
else:
return f"{seconds} {unit}"
def display_results(parsed_data: dict):
"""
Prints the parsed metrics in a readable format.
"""
for entity_id, metrics in parsed_data.items():
print(f"\n--- Entity: {entity_id} ---")
# Check for specific metrics
if "wrapupTime" in metrics:
wu = metrics["wrapupTime"]
print(f"Wrap-up Time: {format_time(wu['value'], wu['unit'])}")
if "holdTime" in metrics:
ht = metrics["holdTime"]
print(f"Hold Time: {format_time(ht['value'], ht['unit'])}")
if "talkTime" in metrics:
tt = metrics["talkTime"]
print(f"Talk Time: {format_time(tt['value'], tt['unit'])}")
if "waitTime" in metrics:
wt = metrics["waitTime"]
print(f"Wait Time: {format_time(wt['value'], wt['unit'])}")
if "conversationCount" in metrics:
cc = metrics["conversationCount"]
print(f"Conversation Count: {cc['value']}")
Complete Working Example
The following script combines all steps into a single runnable module. Save this as parse_genesis_analytics.py.
import os
import sys
import datetime
from dotenv import load_dotenv
from purecloud_platform_client import PlatformClient
from purecloud_platform_client.rest import ApiException
from purecloud_platform_client.models import ConversationSummaryQuery, DateRange, ConversationSummaryQueryFilter
# Load environment variables
load_dotenv()
def get_platform_client():
"""Initializes the Genesys Cloud PlatformClient."""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET in environment variables.")
return PlatformClient(client_id, client_secret, environment)
def build_aggregate_query():
"""Builds a ConversationSummaryQuery for the last 24 hours."""
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(hours=24)
date_range = DateRange(
start=start_time.isoformat() + "Z",
end=end_time.isoformat() + "Z"
)
query_filter = ConversationSummaryQueryFilter(
date_range=date_range
)
# Request specific metrics
metrics_to_query = [
"wrapupTime",
"holdTime",
"talkTime",
"waitTime",
"conversationCount"
]
query = ConversationSummaryQuery(
filter=query_filter,
metrics=metrics_to_query
)
return query
def execute_and_parse():
"""Main execution flow."""
try:
# 1. Authenticate
platform_client = get_platform_client()
analytics_api = platform_client.analytics_api
# 2. Build Query
query = build_aggregate_query()
# 3. Execute Query
print("Executing Analytics Query...")
response = analytics_api.post_analytics_conversations_summary_query(body=query)
# 4. Parse Response
if not response.entities:
print("No data returned for the specified date range.")
return
print("Parsing nested JSON structure...")
# Iterate through entities
for entity in response.entities:
print(f"\n[Entity ID: {entity.id}]")
# Access the metrics dictionary
if entity.metrics:
for metric_name, metric_obj in entity.metrics.items():
# metric_obj is a ConversationSummaryMetric instance
value = metric_obj.value
unit = metric_obj.unit
# Handle None values
if value is None:
print(f" {metric_name}: N/A")
else:
# Format for readability
if unit == "s":
formatted_val = f"{value/60:.2f} min"
elif unit == "ms":
formatted_val = f"{value/1000:.2f} sec"
else:
formatted_val = f"{value} {unit}"
print(f" {metric_name}: {formatted_val}")
else:
print(" No metrics found in entity.")
except ApiException as e:
print(f"API Exception: {e.status} - {e.reason}")
if e.body:
print(f"Response Body: {e.body}")
sys.exit(1)
except Exception as e:
print(f"Unexpected Error: {e}")
sys.exit(1)
if __name__ == "__main__":
execute_and_parse()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth client does not have the required scope.
Fix: Log in to the Genesys Cloud Admin portal. Navigate to Developers > OAuth Applications. Select your client. Go to the Scopes tab. Ensure analytics:conversation:read is checked. Save and regenerate the token if necessary.
Error: 401 Unauthorized
Cause: Invalid client_id or client_secret, or the token has expired.
Fix: Verify the .env file contains the correct credentials. The SDK handles token refresh automatically, but if the client credentials are revoked, you must update them.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Analytics API. Analytics queries are computationally expensive.
Fix: Implement exponential backoff. The Genesys Cloud Python SDK includes a default retry mechanism. You can increase the max retries by configuring the client:
platform_client.set_configuration_option("max_retries", 5)
platform_client.set_configuration_option("retry_backoff_factor", 2)
Error: KeyError on Metrics
Cause: The requested metric is not available for the filtered data. For example, holdTime may be null or missing if no holds occurred in the date range or if the conversation type does not support holds.
Fix: Always check if the metric key exists in the entity.metrics dictionary before accessing it. Use .get() or if metric_name in entity.metrics: checks.
Error: Response Contains Empty Entities
Cause: The date range is in the future, or the filter excludes all conversations.
Fix: Verify the start and end ISO 8601 strings. Ensure the filter does not have conflicting constraints (e.g., filtering for a user ID that does not exist).