Exporting Genesys Cloud Conversation Analytics to S3 with Python
What You Will Build
- A Python script that queries the Genesys Cloud CX Analytics API for daily conversation metrics and uploads the resulting JSON data to an Amazon S3 bucket.
- The solution uses the Genesys Cloud Python SDK for authentication and data retrieval, and
boto3for S3 storage operations. - This tutorial covers Python 3.9+ using the
genesyscloud_pythonSDK andboto3.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth 2.0 client (Service Account or Client Credentials) with the following scopes:
analytics:conversation:readanalytics:report:read
- AWS Credentials: An IAM user or role with
s3:PutObjectpermissions on the target bucket. - SDK Versions:
genesyscloud_python>= 135.0.0boto3>= 1.28.0
- Runtime: Python 3.9 or higher.
- External Dependencies: Install via pip:
pip install genesyscloud_python boto3
Authentication Setup
Genesys Cloud uses OAuth 2.0. For server-to-server integrations like this export job, the Client Credentials Grant flow is standard. The Python SDK handles token acquisition and refresh automatically if you configure the PureCloudPlatformClientV2 correctly.
You must set environment variables for your credentials to avoid hardcoding secrets.
import os
# Required Environment Variables
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_REGION = os.getenv("GENESYS_REGION", "us-east-1") # e.g., mypurecloud.com
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
Initialize the Genesys Cloud platform client. The SDK will cache the access token and refresh it before expiration.
from purecloudplatform.client import PureCloudPlatformClientV2
def get_genesys_client() -> PureCloudPlatformClientV2:
"""
Initializes the Genesys Cloud API client with Client Credentials.
"""
if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
client = PureCloudPlatformClientV2()
client.set_environment(GENESYS_REGION)
# The SDK handles token caching automatically
return client
Implementation
Step 1: Query Conversation Analytics Data
The core of this job is retrieving conversation data. We will use the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint allows for complex filtering by time range and view type.
Required Scope: analytics:conversation:read
We need to define a query body. For a daily export, we typically want a “Summary” view to get aggregated metrics rather than individual conversation transcripts, which would be massive.
from purecloudplatform.client.rest import ApiException
from purecloudplatform.client.models import ConversationQuery
from datetime import datetime, timedelta
import json
def fetch_daily_analytics(client: PureCloudPlatformClientV2, export_date: datetime) -> dict:
"""
Fetches aggregated conversation analytics for a specific date.
Args:
client: The initialized Genesys Cloud client.
export_date: The datetime for the start of the day to export.
Returns:
A dictionary containing the analytics response data.
"""
# Define the time range: Start of the day to start of the next day
start_time = export_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=1)
# Format ISO 8601 strings
start_iso = start_time.isoformat() + "Z"
end_iso = end_time.isoformat() + "Z"
# Define the query body
query_body = {
"view": "summary",
"interval": "P1D", # 1 Day interval
"dateFrom": start_iso,
"dateTo": end_iso,
"metrics": [
"conversationCount",
"abandonedCount",
"handleTime",
"waitTime",
"resolvedCount"
]
}
try:
# API Call: POST /api/v2/analytics/conversations/details/query
response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
# The SDK returns a purecloudplatform.client.models object.
# We convert it to a dictionary for easier serialization.
return response.to_dict()
except ApiException as e:
print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}")
raise
Expected Response Structure:
The response contains a partitions array. For a summary view, this array contains the aggregated metrics for the requested interval.
{
"partitionName": "summary",
"partitions": [
{
"partitionName": "summary",
"groups": [
{
"key": "",
"metrics": {
"conversationCount": {
"value": 1250,
"unit": "count"
},
"abandonedCount": {
"value": 12,
"unit": "count"
},
"handleTime": {
"value": 45000000,
"unit": "millisecond"
}
}
}
]
}
]
}
Step 2: Handle Pagination and Errors
The post_analytics_conversations_details/query endpoint supports pagination via the pageSize and cursor parameters. While a daily summary usually fits in one page, transactional views or high-volume queues may require pagination.
Additionally, we must handle 429 Too Many Requests. Genesys Cloud enforces rate limits. A production job must implement exponential backoff.
import time
import random
def fetch_all_pages(client: PureCloudPlatformClientV2, query_body: dict) -> list:
"""
Fetches all pages of analytics data with retry logic for 429 errors.
"""
all_partitions = []
cursor = None
max_retries = 5
base_delay = 1.0
while True:
# Add pagination parameters if not on the first page
if cursor:
query_body["cursor"] = cursor
# Add page size if not specified
if "pageSize" not in query_body:
query_body["pageSize"] = 500
retries = 0
while retries < max_retries:
try:
response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
data = response.to_dict()
# Collect partitions
if "partitions" in data and data["partitions"]:
all_partitions.extend(data["partitions"])
# Check for next page
cursor = data.get("nextPageCursor")
if not cursor:
return all_partitions
# Break retry loop to proceed to next page
break
except ApiException as e:
if e.status == 429:
# Exponential backoff with jitter
delay = base_delay * (2 ** retries) + random.uniform(0, 1)
print(f"Rate limited (429). Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
else:
# Non-429 error, raise immediately
raise
if retries == max_retries:
raise Exception("Max retries exceeded for 429 Too Many Requests")
return all_partitions
Step 3: Process and Serialize Results
Raw API responses are nested. For S3 storage, we want a clean JSON structure. We will flatten the metrics slightly and ensure the output is valid JSON.
def process_analytics_data(raw_partitions: list) -> dict:
"""
Cleans and structures the raw API response for storage.
"""
# We expect a list of partition objects.
# Usually, for 'summary' view, we just want the metrics from the first group.
if not raw_partitions:
return {"status": "empty", "message": "No data found for the requested period."}
# Take the first partition (summary)
summary_partition = raw_partitions[0]
# Extract metrics from the first group
groups = summary_partition.get("groups", [])
if not groups:
return {"status": "empty", "message": "No groups in partition."}
metrics_group = groups[0]
metrics = metrics_group.get("metrics", {})
# Flatten metrics for easier reading in downstream systems
flat_metrics = {}
for metric_name, metric_data in metrics.items():
if metric_data:
flat_metrics[metric_name] = metric_data.get("value")
return {
"export_timestamp": datetime.utcnow().isoformat(),
"data": flat_metrics
}
Step 4: Upload to Amazon S3
We use boto3 to upload the processed JSON. We will structure the S3 key to include the date for easy retrieval and lifecycle management.
Required AWS Permission: s3:PutObject
import boto3
import io
def upload_to_s3(data: dict, bucket: str, key: str) -> bool:
"""
Uploads JSON data to an S3 bucket.
Args:
data: The dictionary to serialize and upload.
bucket: The S3 bucket name.
key: The S3 object key (path).
Returns:
True if successful, False otherwise.
"""
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
try:
# Serialize to JSON
json_bytes = json.dumps(data, indent=2).encode('utf-8')
# Create a file-like object from the bytes
file_obj = io.BytesIO(json_bytes)
# Upload
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=file_obj,
ContentType='application/json',
ServerSideEncryption='AES256'
)
print(f"Successfully uploaded to s3://{bucket}/{key}")
return True
except Exception as e:
print(f"Failed to upload to S3: {e}")
return False
Complete Working Example
This script combines all steps into a single executable module. It accepts a date from command line arguments or defaults to yesterday.
#!/usr/bin/env python3
"""
Genesys Cloud Daily Analytics Export to S3
------------------------------------------
Exports conversation summary metrics for a given date to Amazon S3.
"""
import os
import sys
import json
import io
import time
import random
from datetime import datetime, timedelta
import boto3
from purecloudplatform.client import PureCloudPlatformClientV2
from purecloudplatform.client.rest import ApiException
# --- Configuration ---
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_REGION = os.getenv("GENESYS_REGION", "mypurecloud.com")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
# --- Helper Functions ---
def get_genesys_client() -> PureCloudPlatformClientV2:
if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
client = PureCloudPlatformClientV2()
client.set_environment(GENESYS_REGION)
return client
def fetch_all_pages(client: PureCloudPlatformClientV2, query_body: dict) -> list:
all_partitions = []
cursor = None
max_retries = 5
base_delay = 1.0
while True:
if cursor:
query_body["cursor"] = cursor
if "pageSize" not in query_body:
query_body["pageSize"] = 500
retries = 0
while retries < max_retries:
try:
response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
data = response.to_dict()
if "partitions" in data and data["partitions"]:
all_partitions.extend(data["partitions"])
cursor = data.get("nextPageCursor")
if not cursor:
return all_partitions
break
except ApiException as e:
if e.status == 429:
delay = base_delay * (2 ** retries) + random.uniform(0, 1)
print(f"Rate limited (429). Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
else:
raise
if retries == max_retries:
raise Exception("Max retries exceeded for 429 Too Many Requests")
return all_partitions
def process_analytics_data(raw_partitions: list) -> dict:
if not raw_partitions:
return {"status": "empty", "message": "No data found."}
summary_partition = raw_partitions[0]
groups = summary_partition.get("groups", [])
if not groups:
return {"status": "empty", "message": "No groups in partition."}
metrics = groups[0].get("metrics", {})
flat_metrics = {}
for metric_name, metric_data in metrics.items():
if metric_data:
flat_metrics[metric_name] = metric_data.get("value")
return {
"export_timestamp": datetime.utcnow().isoformat(),
"data": flat_metrics
}
def upload_to_s3(data: dict, bucket: str, key: str) -> bool:
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
try:
json_bytes = json.dumps(data, indent=2).encode('utf-8')
file_obj = io.BytesIO(json_bytes)
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=file_obj,
ContentType='application/json',
ServerSideEncryption='AES256'
)
return True
except Exception as e:
print(f"S3 Upload Error: {e}")
return False
def main():
# Determine export date (default: yesterday)
if len(sys.argv) > 1:
try:
export_date = datetime.strptime(sys.argv[1], "%Y-%m-%d")
except ValueError:
print("Invalid date format. Use YYYY-MM-DD. Defaulting to yesterday.")
export_date = datetime.utcnow() - timedelta(days=1)
else:
export_date = datetime.utcnow() - timedelta(days=1)
print(f"Exporting analytics for date: {export_date.strftime('%Y-%m-%d')}")
# 1. Initialize Client
client = get_genesys_client()
# 2. Build Query
start_time = export_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=1)
query_body = {
"view": "summary",
"interval": "P1D",
"dateFrom": start_time.isoformat() + "Z",
"dateTo": end_time.isoformat() + "Z",
"metrics": [
"conversationCount",
"abandonedCount",
"handleTime",
"waitTime",
"resolvedCount",
"wrapUpTime",
"talkTime"
]
}
# 3. Fetch Data
try:
print("Fetching data from Genesys Cloud...")
raw_data = fetch_all_pages(client, query_body)
print(f"Retrieved {len(raw_data)} partition(s).")
# 4. Process Data
processed_data = process_analytics_data(raw_data)
# 5. Upload to S3
# Key format: analytics/daily/YYYY-MM-DD.json
s3_key = f"analytics/daily/{export_date.strftime('%Y-%m-%d')}.json"
if upload_to_s3(processed_data, S3_BUCKET_NAME, s3_key):
print("Export job completed successfully.")
else:
print("Export job failed during S3 upload.")
sys.exit(1)
except Exception as e:
print(f"Fatal error during export: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: Invalid Client ID, Client Secret, or expired token.
Fix: Verify your environment variables. Ensure the OAuth client is active in the Genesys Cloud Admin Console. Check that the client has the analytics:conversation:read scope assigned.
Error: 403 Forbidden
Cause: The OAuth client lacks the required scope, or the IAM user lacks S3 permissions.
Fix:
- In Genesys Cloud Admin, go to Setup > Administration > OAuth Clients. Edit your client and ensure
analytics:conversation:readis checked. - In AWS IAM, ensure the user/role has a policy allowing
s3:PutObjecton the specific bucket.
Error: 429 Too Many Requests
Cause: You exceeded the Genesys Cloud API rate limit.
Fix: The provided code includes exponential backoff. If you still hit this limit, reduce the frequency of your job or request a rate limit increase from Genesys Cloud Support. Do not increase the pageSize arbitrarily, as this can increase processing time on the server side and still count against rate limits.
Error: S3 ClientError: Access Denied
Cause: AWS credentials are invalid or the bucket policy denies access.
Fix: Check your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Ensure the region matches the bucket’s region.