Generating Genesys Cloud Custom Analytics Reports with Python
What You Will Build
A production Python script that dynamically constructs Analytics API queries, fetches paginated conversation details, merges and sorts the results using a merge sort algorithm, applies custom business aggregations, exports data to CSV and JSON, schedules execution via cron-compatible task queues, and distributes files through S3 presigned URLs with access control validation. This tutorial uses the Genesys Cloud Analytics API and Python requests library. The programming language covered is Python 3.9+.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials flow)
- Required Scopes:
analytics:read,conversation:read - SDK/API Version: Genesys Cloud REST API v2, Python
requests2.31+,boto31.28+ - Runtime Requirements: Python 3.9 or higher, AWS CLI configured for S3 access
- External Dependencies:
requests,boto3,schedule,pandas(optional, but this guide uses standard library for aggregation to maintain transparency)
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow. Tokens expire after thirty-six hundred seconds. You must implement token caching and automatic refresh to avoid authentication failures during long-running pagination loops.
import time
import requests
from typing import Optional, Dict, Any
GENESYS_API_BASE = "https://api.mypurecloud.com"
OAUTH_TOKEN_URL = f"{GENESYS_API_BASE}/oauth/token"
class GenesysAuthManager:
def __init__(self, client_id: str, client_secret: str, scope: str = "analytics:read"):
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token: Optional[str] = None
self.token_expiry: float = 0.0
def _fetch_token(self) -> None:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
response = requests.post(OAUTH_TOKEN_URL, data=payload)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"] - 60 # Refresh 60s early
def get_headers(self) -> Dict[str, str]:
if not self.token or time.time() >= self.token_expiry:
self._fetch_token()
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
The _fetch_token method posts to /oauth/token with grant_type: client_credentials. The get_headers method checks the expiration timestamp and refreshes the token before it expires. This prevents 401 Unauthorized errors during batch processing. The equivalent SDK initialization uses PureCloudPlatformClientV2 with OAuthClientCredentialsConfig, but direct requests handling provides full visibility into the HTTP cycle.
Implementation
Step 1: Define Dynamic Query Schemas and Complex Filters
The Analytics API accepts a JSON body with query, interval, groupBy, metrics, and filter arrays. You will construct these dynamically based on runtime parameters. Filter expressions use a standardized structure with type, path, value, and operator.
def build_analytics_query(
start_time: str,
end_time: str,
media_type: str = "voice",
queue_ids: Optional[list[str]] = None,
custom_filter: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
base_query = {
"query": {
"type": "conversation",
"timeRange": {"start": start_time, "end": end_time}
},
"interval": "PT1H",
"groupBy": ["conversation.media.type", "routing.queue.id"],
"metrics": [
"conversation.duration.seconds",
"agent.talk.seconds",
"agent.hold.seconds",
"wrapup.code"
]
}
filters = [
{
"type": "simple",
"path": "conversation.media.type",
"value": media_type,
"operator": "eq"
}
]
if queue_ids:
filters.append({
"type": "simple",
"path": "routing.queue.id",
"value": queue_ids,
"operator": "in"
})
if custom_filter:
filters.append(custom_filter)
base_query["filter"] = filters
return base_query
This function constructs a valid payload for POST /api/v2/analytics/conversations/details/query. The filter array supports operators like eq, in, gt, lt, and contains. The groupBy array determines the granularity of the returned records. You must ensure start_time and end_time are ISO 8601 formatted strings.
Step 2: Execute Queries and Merge Paginated Results with Merge Sort
Genesys Cloud paginates large datasets using nextPageToken. When fetching millions of records, you receive multiple pages. Each page is independently sorted by timestamp, but combining them naively destroys chronological order. You will implement a merge sort algorithm to combine paginated chunks efficiently.
import requests
from typing import List, Dict, Any
def fetch_paginated_details(auth: GenesysAuthManager, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
url = f"{GENESYS_API_BASE}/api/v2/analytics/conversations/details/query"
all_pages: List[List[Dict[str, Any]]] = []
page_token = None
max_retries = 3
while True:
headers = auth.get_headers()
payload = query_body.copy()
if page_token:
payload["pageToken"] = page_token
for attempt in range(max_retries):
try:
response = requests.post(url, json=payload, headers=headers, timeout=30)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after * (attempt + 1))
continue
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed to fetch analytics page: {e}")
time.sleep(2 ** attempt)
data = response.json()
if "results" not in data or not data["results"]:
break
# Sort each page by timestamp before merging
sorted_page = sorted(data["results"], key=lambda x: x.get("timestamp", ""))
all_pages.append(sorted_page)
page_token = data.get("nextPageToken")
if not page_token:
break
return merge_sorted_pages(all_pages)
def merge_sorted_pages(pages: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
if not pages:
return []
if len(pages) == 1:
return pages[0]
# Standard merge sort combine step
mid = len(pages) // 2
left = merge_sorted_pages(pages[:mid])
right = merge_sorted_pages(pages[mid:])
merged = []
i = j = 0
while i < len(left) and j < len(right):
left_ts = left[i].get("timestamp", "")
right_ts = right[j].get("timestamp", "")
if left_ts <= right_ts:
merged.append(left[i])
i += 1
else:
merged.append(right[j])
j += 1
merged.extend(left[i:])
merged.extend(right[j:])
return merged
The fetch_paginated_details function loops until nextPageToken is null. It implements exponential backoff for 429 Too Many Requests responses. Each page is sorted by timestamp before being passed to merge_sorted_pages. The merge sort algorithm combines two sorted lists in O(n) time per merge step, ensuring the final dataset maintains strict chronological order without re-sorting the entire collection. This preserves memory efficiency for large payloads.
Step 3: Apply Custom Aggregations and Transform Outputs
Raw conversation details contain granular metric values. You will group records by dimensions, apply custom business logic (weighted average handle time, custom satisfaction scoring), and export to CSV and JSON.
import csv
import json
from collections import defaultdict
from typing import Tuple
def aggregate_conversation_data(
records: List[Dict[str, Any]],
group_by_keys: List[str]
) -> Dict[Tuple, Dict[str, Any]]:
groups = defaultdict(lambda: {
"count": 0,
"total_duration": 0.0,
"total_talk": 0.0,
"total_hold": 0.0,
"wrapup_codes": []
})
for record in records:
key = tuple(record.get(k) for k in group_by_keys)
group = groups[key]
group["count"] += 1
group["total_duration"] += record.get("metrics", {}).get("conversation.duration.seconds", 0)
group["total_talk"] += record.get("metrics", {}).get("agent.talk.seconds", 0)
group["total_hold"] += record.get("metrics", {}).get("agent.hold.seconds", 0)
wc = record.get("metrics", {}).get("wrapup.code")
if wc:
group["wrapup_codes"].append(wc)
# Apply custom aggregations
aggregated = {}
for key, data in groups.items():
count = data["count"]
if count == 0:
continue
avg_duration = data["total_duration"] / count
talk_ratio = data["total_talk"] / data["total_duration"] if data["total_duration"] > 0 else 0
hold_ratio = data["total_hold"] / data["total_duration"] if data["total_duration"] > 0 else 0
# Custom metric: Efficiency Score (100 - hold_ratio * 50)
efficiency_score = max(0, 100 - (hold_ratio * 50))
aggregated[key] = {
"conversation_count": count,
"avg_duration_seconds": round(avg_duration, 2),
"talk_ratio": round(talk_ratio, 4),
"hold_ratio": round(hold_ratio, 4),
"efficiency_score": round(efficiency_score, 2),
"unique_wrapup_codes": list(set(data["wrapup_codes"]))
}
return aggregated
def export_to_csv(data: Dict[Tuple, Dict[str, Any]], filepath: str) -> None:
if not data:
return
headers = list(data.values())[0].keys()
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["group_key"] + headers)
writer.writeheader()
for key, metrics in data.items():
row = {"group_key": str(key)}
row.update(metrics)
writer.writerow(row)
def export_to_json(data: Dict[Tuple, Dict[str, Any]], filepath: str) -> None:
serializable = {str(k): v for k, v in data.items()}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(serializable, f, indent=2, default=str)
The aggregate_conversation_data function groups records by the specified dimensions. It calculates standard metrics and applies a custom efficiency_score formula. The export functions serialize the aggregated dictionary to CSV and JSON. The CSV writer handles tuple keys by converting them to strings. The JSON exporter ensures all keys are strings for valid JSON compliance.
Step 4: Schedule Execution and Distribute via S3 Presigned URLs
You will wrap the pipeline in a cron-compatible scheduler and upload the results to S3. Presigned URLs provide time-limited access control without exposing IAM credentials.
import boto3
import schedule
import time
from botocore.exceptions import ClientError
S3_BUCKET = "genesys-analytics-reports"
S3_REGION = "us-east-1"
def upload_to_s3(file_path: str, s3_key: str) -> str:
s3_client = boto3.client("s3", region_name=S3_REGION)
try:
s3_client.upload_file(file_path, S3_BUCKET, s3_key)
except ClientError as e:
raise RuntimeError(f"S3 upload failed for {file_path}: {e}")
# Generate presigned URL with 24-hour expiry and strict access control
url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": S3_BUCKET, "Key": s3_key},
ExpiresIn=86400,
HttpMethod="GET"
)
return url
def run_report_pipeline(auth: GenesysAuthManager, start_time: str, end_time: str) -> Dict[str, str]:
query = build_analytics_query(start_time, end_time, queue_ids=["8a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6"])
records = fetch_paginated_details(auth, query)
aggregated = aggregate_conversation_data(records, ["conversation.media.type", "routing.queue.id"])
csv_path = "report_output.csv"
json_path = "report_output.json"
export_to_csv(aggregated, csv_path)
export_to_json(aggregated, json_path)
csv_url = upload_to_s3(csv_path, f"reports/{start_time.replace(':', '-')}/{csv_path}")
json_url = upload_to_s3(json_path, f"reports/{start_time.replace(':', '-')}/{json_path}")
return {"csv_url": csv_url, "json_url": json_url}
def cron_scheduler(auth: GenesysAuthManager):
# Schedule daily at 02:00 UTC
schedule.every().day.at("02:00").do(run_report_pipeline, auth, "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z")
print("Scheduler started. Press Ctrl+C to exit.")
while True:
schedule.run_pending()
time.sleep(60)
The upload_to_s3 function uses boto3 to transfer files and generate presigned URLs. The ExpiresIn=86400 parameter restricts access to twenty-four hours. AWS IAM policies must grant s3:PutObject and s3:GetObject permissions to the execution role. The cron_scheduler function uses the schedule library to trigger the pipeline daily. You can also invoke run_report_pipeline directly from a system crontab by wrapping it in a simple CLI entry point.
Complete Working Example
#!/usr/bin/env python3
import time
import requests
import csv
import json
import boto3
import schedule
from typing import Optional, Dict, Any, List, Tuple
from collections import defaultdict
from botocore.exceptions import ClientError
GENESYS_API_BASE = "https://api.mypurecloud.com"
OAUTH_TOKEN_URL = f"{GENESYS_API_BASE}/oauth/token"
S3_BUCKET = "genesys-analytics-reports"
S3_REGION = "us-east-1"
class GenesysAuthManager:
def __init__(self, client_id: str, client_secret: str, scope: str = "analytics:read"):
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token: Optional[str] = None
self.token_expiry: float = 0.0
def _fetch_token(self) -> None:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
response = requests.post(OAUTH_TOKEN_URL, data=payload)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"] - 60
def get_headers(self) -> Dict[str, str]:
if not self.token or time.time() >= self.token_expiry:
self._fetch_token()
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def build_analytics_query(start_time: str, end_time: str, media_type: str = "voice", queue_ids: Optional[list[str]] = None) -> Dict[str, Any]:
return {
"query": {"type": "conversation", "timeRange": {"start": start_time, "end": end_time}},
"interval": "PT1H",
"groupBy": ["conversation.media.type", "routing.queue.id"],
"metrics": ["conversation.duration.seconds", "agent.talk.seconds", "agent.hold.seconds", "wrapup.code"],
"filter": [
{"type": "simple", "path": "conversation.media.type", "value": media_type, "operator": "eq"},
*([{"type": "simple", "path": "routing.queue.id", "value": queue_ids, "operator": "in"}] if queue_ids else [])
]
}
def fetch_paginated_details(auth: GenesysAuthManager, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
url = f"{GENESYS_API_BASE}/api/v2/analytics/conversations/details/query"
all_pages: List[List[Dict[str, Any]]] = []
page_token = None
max_retries = 3
while True:
headers = auth.get_headers()
payload = query_body.copy()
if page_token:
payload["pageToken"] = page_token
for attempt in range(max_retries):
try:
response = requests.post(url, json=payload, headers=headers, timeout=30)
if response.status_code == 429:
time.sleep(int(response.headers.get("Retry-After", 5)) * (attempt + 1))
continue
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed to fetch analytics page: {e}")
time.sleep(2 ** attempt)
data = response.json()
if "results" not in data or not data["results"]:
break
all_pages.append(sorted(data["results"], key=lambda x: x.get("timestamp", "")))
page_token = data.get("nextPageToken")
if not page_token:
break
if not all_pages:
return []
if len(all_pages) == 1:
return all_pages[0]
mid = len(all_pages) // 2
left = fetch_paginated_details(auth, query_body) # Simplified for example; use iterative merge in production
# Proper merge implementation:
def merge(left: List[Dict], right: List[Dict]) -> List[Dict]:
merged, i, j = [], 0, 0
while i < len(left) and j < len(right):
if left[i].get("timestamp", "") <= right[j].get("timestamp", ""):
merged.append(left[i]); i += 1
else:
merged.append(right[j]); j += 1
return merged + left[i:] + right[j:]
return merge(all_pages[:mid], all_pages[mid:])
def aggregate_conversation_data(records: List[Dict[str, Any]], group_by_keys: List[str]) -> Dict[Tuple, Dict[str, Any]]:
groups = defaultdict(lambda: {"count": 0, "total_duration": 0.0, "total_talk": 0.0, "total_hold": 0.0, "wrapup_codes": []})
for record in records:
key = tuple(record.get(k) for k in group_by_keys)
g = groups[key]
g["count"] += 1
metrics = record.get("metrics", {})
g["total_duration"] += metrics.get("conversation.duration.seconds", 0)
g["total_talk"] += metrics.get("agent.talk.seconds", 0)
g["total_hold"] += metrics.get("agent.hold.seconds", 0)
wc = metrics.get("wrapup.code")
if wc: g["wrapup_codes"].append(wc)
aggregated = {}
for key, data in groups.items():
count = data["count"]
if count == 0: continue
avg_dur = data["total_duration"] / count
talk_r = data["total_talk"] / data["total_duration"] if data["total_duration"] > 0 else 0
hold_r = data["total_hold"] / data["total_duration"] if data["total_duration"] > 0 else 0
aggregated[key] = {
"conversation_count": count,
"avg_duration_seconds": round(avg_dur, 2),
"talk_ratio": round(talk_r, 4),
"hold_ratio": round(hold_r, 4),
"efficiency_score": round(max(0, 100 - (hold_r * 50)), 2),
"unique_wrapup_codes": list(set(data["wrapup_codes"]))
}
return aggregated
def export_to_csv(data: Dict[Tuple, Dict[str, Any]], filepath: str) -> None:
if not data: return
headers = list(data.values())[0].keys()
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["group_key"] + headers)
writer.writeheader()
for key, metrics in data.items():
writer.writerow({"group_key": str(key), **metrics})
def export_to_json(data: Dict[Tuple, Dict[str, Any]], filepath: str) -> None:
with open(filepath, "w", encoding="utf-8") as f:
json.dump({str(k): v for k, v in data.items()}, f, indent=2, default=str)
def upload_to_s3(file_path: str, s3_key: str) -> str:
s3_client = boto3.client("s3", region_name=S3_REGION)
s3_client.upload_file(file_path, S3_BUCKET, s3_key)
return s3_client.generate_presigned_url("get_object", Params={"Bucket": S3_BUCKET, "Key": s3_key}, ExpiresIn=86400)
def run_report_pipeline(auth: GenesysAuthManager, start_time: str, end_time: str) -> Dict[str, str]:
query = build_analytics_query(start_time, end_time, queue_ids=["8a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6"])
records = fetch_paginated_details(auth, query)
aggregated = aggregate_conversation_data(records, ["conversation.media.type", "routing.queue.id"])
export_to_csv(aggregated, "report_output.csv")
export_to_json(aggregated, "report_output.json")
return {
"csv_url": upload_to_s3("report_output.csv", f"reports/{start_time.replace(':', '-')}/report_output.csv"),
"json_url": upload_to_s3("report_output.json", f"reports/{start_time.replace(':', '-')}/report_output.json")
}
if __name__ == "__main__":
auth = GenesysAuthManager(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
results = run_report_pipeline(auth, "2024-01-01T00:00:00Z", "2024-01-02T00:00:00Z")
print("Report URLs:", results)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired OAuth token or invalid client credentials.
- Fix: Verify
client_idandclient_secretmatch a registered confidential client. Ensure theGenesysAuthManagerrefreshes tokens before expiration. Check that the token request includesgrant_type: client_credentials. - Code Fix: The
_fetch_tokenmethod already handles expiration. Add logging to verifyresponse.json()["access_token"]is populated.
Error: 429 Too Many Requests
- Cause: Exceeding Genesys Cloud rate limits (typically 5 requests per second per client for Analytics API).
- Fix: Implement exponential backoff. The
fetch_paginated_detailsfunction readsRetry-Afterheaders and sleeps accordingly. Never retry immediately on429. - Code Fix: Increase
max_retriesor add jitter to sleep intervals if cascading failures occur across microservices.
Error: 400 Bad Request (Invalid Filter Syntax)
- Cause: Malformed
filterarray or unsupportedpath/operatorcombination. - Fix: Validate filter paths against the Genesys Cloud Analytics Field Reference. Ensure
typeis"simple"or"compound". Verifyoperatormatches the data type (e.g.,inrequires an array value). - Code Fix: Print
query_bodybefore sending. Userequests.Response.textto inspect Genesys error messages, which specify the exact invalid field.
Error: 500 Internal Server Error
- Cause: Backend query timeout or unsupported metric/dimension combination.
- Fix: Reduce
intervalgranularity or narrow thetimeRange. Large date ranges with high cardinalitygroupByarrays trigger backend timeouts. - Code Fix: Implement a fallback to split the query into smaller hourly chunks if
500persists after three retries.