Export Genesys Cloud Analytics to S3 with Python and Boto3
What You Will Build
- A Python script that queries Genesys Cloud Conversation Details, aggregates the data, and uploads the resulting CSV to an Amazon S3 bucket.
- This uses the Genesys Cloud Analytics API (
/api/v2/analytics/conversations/details/query) and the AWS Boto3 SDK. - The implementation covers Python 3.8+ with
requestsfor API calls andboto3for S3 operations.
Prerequisites
- Genesys Cloud OAuth Client: A machine-to-machine (M2M) client with the scope
analytics:conversation:view. - AWS Credentials: An IAM user with
s3:PutObjectpermissions on the target bucket. - Python Environment: Python 3.8 or higher.
- Dependencies:
pip install requests boto3 python-dateutil
Authentication Setup
Genesys Cloud uses OAuth 2.0 for API authentication. For server-side jobs, the Client Credentials flow is the standard approach. You must store your Client ID and Client Secret securely, preferably using environment variables or a secrets manager.
The following function retrieves an access token. It handles the basic error cases where the credentials are invalid or the endpoint is unreachable.
import os
import requests
from typing import Dict, Optional
GENESYS_DOMAIN = "api.mypurecloud.com"
OAUTH_URL = f"https://{GENESYS_DOMAIN}/oauth/token"
def get_access_token() -> str:
"""
Retrieves an OAuth 2.0 access token using Client Credentials flow.
Returns:
str: The access token string.
Raises:
requests.exceptions.HTTPError: If the token request fails.
"""
client_id = os.environ.get("GENESYS_CLIENT_ID")
client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(OAUTH_URL, data=payload)
response.raise_for_status()
return response.json()["access_token"]
except requests.exceptions.HTTPError as e:
print(f"Failed to retrieve token: {e}")
raise
You must cache this token if your job runs multiple queries within the same hour, as the token expires after 3600 seconds. For a single daily batch job, fetching a new token at the start is sufficient.
Implementation
Step 1: Querying Conversation Details with Pagination
The Analytics API does not return all conversations in a single call. You must use pagination. The POST /api/v2/analytics/conversations/details/query endpoint accepts a request body defining the date range, metrics, and grouping.
The response contains a nextPageUrl if more data exists. You must follow this URL until it is null.
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
def build_query_payload(date_str: str) -> Dict[str, Any]:
"""
Constructs the JSON payload for the analytics query.
Args:
date_str: The date to query in YYYY-MM-DD format.
Returns:
Dict containing the query parameters.
"""
start_time = f"{date_str}T00:00:00.000Z"
end_time = f"{date_str}T23:59:59.999Z"
return {
"dateFrom": start_time,
"dateTo": end_time,
"groupBy": ["conversationId"],
"interval": "PT1H",
"metrics": {
"conversations": {"type": "COUNT"},
"handled": {"type": "COUNT"},
"answered": {"type": "COUNT"},
"abandoned": {"type": "COUNT"},
"serviceLevel": {"type": "SUM"}
},
"select": [
"conversationId",
"channel",
"mediaType",
"queueName",
"wrapUpCode",
"duration"
],
"view": "default",
"includeZeroIntervals": False
}
def fetch_all_conversations(token: str, date_str: str) -> List[Dict[str, Any]]:
"""
Fetches all conversation details for a given date using pagination.
Args:
token: The OAuth access token.
date_str: The date to query in YYYY-MM-DD format.
Returns:
List of conversation detail objects.
"""
base_url = f"https://{GENESYS_DOMAIN}/api/v2/analytics/conversations/details/query"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
payload = build_query_payload(date_str)
all_conversations = []
current_url = base_url
print(f"Starting fetch for date: {date_str}")
while current_url:
try:
# Use POST for the initial query, GET for subsequent pages if nextPageUrl is provided
# Note: The nextPageUrl usually requires a GET request with the query params appended
if current_url == base_url:
response = requests.post(current_url, json=payload, headers=headers)
else:
response = requests.get(current_url, headers=headers)
response.raise_for_status()
data = response.json()
# Accumulate results
if "entities" in data and data["entities"]:
all_conversations.extend(data["entities"])
print(f"Fetched {len(data['entities'])} records. Total: {len(all_conversations)}")
# Check for next page
current_url = data.get("nextPageUrl")
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
print("Rate limited. Waiting 1 second...")
import time
time.sleep(1)
continue
else:
print(f"HTTP Error: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
raise
return all_conversations
Step 2: Processing and Aggregating Data
The raw response from Genesys Cloud contains nested objects and metadata. You need to flatten this data into a format suitable for CSV export. The conversations list in the response contains individual conversation records.
This step transforms the complex JSON objects into a list of dictionaries, where each dictionary represents a row in the final CSV.
import csv
import io
from typing import List, Dict, Any
def process_conversations(raw_conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Flattens and cleans conversation data for CSV export.
Args:
raw_conversations: List of conversation objects from the API.
Returns:
List of flattened dictionaries.
"""
processed_data = []
for conv in raw_conversations:
# Extract nested fields safely
queue_name = conv.get("queueName") or "Unknown"
channel = conv.get("channel") or "Unknown"
media_type = conv.get("mediaType") or "Unknown"
wrap_up_code = conv.get("wrapUpCode") or "None"
# Calculate duration in seconds if available
duration_ms = conv.get("duration", 0)
duration_sec = duration_ms / 1000.0 if duration_ms else 0.0
# Get metrics if present (some endpoints return metrics separately)
# For /details/query, metrics are often in the 'metrics' field of the entity
metrics = conv.get("metrics", {})
handled = metrics.get("handled", {}).get("value", 0)
answered = metrics.get("answered", {}).get("value", 0)
row = {
"conversationId": conv.get("conversationId"),
"queueName": queue_name,
"channel": channel,
"mediaType": media_type,
"wrapUpCode": wrap_up_code,
"durationSeconds": round(duration_sec, 2),
"handled": handled,
"answered": answered,
"startTime": conv.get("startTime"),
"endTime": conv.get("endTime")
}
processed_data.append(row)
return processed_data
def generate_csv_bytes(data: List[Dict[str, Any]]) -> bytes:
"""
Converts a list of dictionaries to CSV bytes.
Args:
data: List of dictionaries to convert.
Returns:
Bytes object containing the CSV content.
"""
if not data:
return b""
output = io.StringIO()
fieldnames = list(data[0].keys())
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return output.getvalue().encode("utf-8")
Step 3: Uploading to Amazon S3
The final step uploads the generated CSV bytes to S3. You must configure the Boto3 client with your AWS credentials. The script uses boto3.client('s3') to put the object.
import boto3
from botocore.exceptions import ClientError
from typing import Optional
def upload_to_s3(bucket_name: str, key: str, data: bytes) -> bool:
"""
Uploads data to an S3 bucket.
Args:
bucket_name: The name of the S3 bucket.
key: The S3 object key (path).
data: The bytes to upload.
Returns:
True if upload succeeds, False otherwise.
"""
s3_client = boto3.client('s3')
try:
s3_client.put_object(
Bucket=bucket_name,
Key=key,
Body=data,
ContentType="text/csv"
)
print(f"Successfully uploaded to s3://{bucket_name}/{key}")
return True
except ClientError as e:
print(f"Failed to upload to S3: {e}")
return False
Complete Working Example
This is the full, copy-pasteable script. Save this as genesys_s3_export.py.
import os
import sys
import json
import csv
import io
import requests
import boto3
from datetime import datetime, timedelta
from typing import Dict, List, Any
from botocore.exceptions import ClientError
# Configuration
GENESYS_DOMAIN = "api.mypurecloud.com"
OAUTH_URL = f"https://{GENESYS_DOMAIN}/oauth/token"
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "my-analytics-bucket")
def get_access_token() -> str:
"""Retrieves an OAuth 2.0 access token using Client Credentials flow."""
client_id = os.environ.get("GENESYS_CLIENT_ID")
client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
try:
response = requests.post(OAUTH_URL, data=payload)
response.raise_for_status()
return response.json()["access_token"]
except requests.exceptions.HTTPError as e:
print(f"Failed to retrieve token: {e}")
raise
def build_query_payload(date_str: str) -> Dict[str, Any]:
"""Constructs the JSON payload for the analytics query."""
start_time = f"{date_str}T00:00:00.000Z"
end_time = f"{date_str}T23:59:59.999Z"
return {
"dateFrom": start_time,
"dateTo": end_time,
"groupBy": ["conversationId"],
"interval": "PT1H",
"metrics": {
"conversations": {"type": "COUNT"},
"handled": {"type": "COUNT"},
"answered": {"type": "COUNT"},
"abandoned": {"type": "COUNT"}
},
"select": [
"conversationId",
"channel",
"mediaType",
"queueName",
"wrapUpCode",
"duration",
"startTime",
"endTime"
],
"view": "default",
"includeZeroIntervals": False
}
def fetch_all_conversations(token: str, date_str: str) -> List[Dict[str, Any]]:
"""Fetches all conversation details for a given date using pagination."""
base_url = f"https://{GENESYS_DOMAIN}/api/v2/analytics/conversations/details/query"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
payload = build_query_payload(date_str)
all_conversations = []
current_url = base_url
print(f"Starting fetch for date: {date_str}")
while current_url:
try:
if current_url == base_url:
response = requests.post(current_url, json=payload, headers=headers)
else:
response = requests.get(current_url, headers=headers)
response.raise_for_status()
data = response.json()
if "entities" in data and data["entities"]:
all_conversations.extend(data["entities"])
print(f"Fetched {len(data['entities'])} records. Total: {len(all_conversations)}")
current_url = data.get("nextPageUrl")
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
print("Rate limited. Waiting 1 second...")
import time
time.sleep(1)
continue
else:
print(f"HTTP Error: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
raise
return all_conversations
def process_conversations(raw_conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Flattens and cleans conversation data for CSV export."""
processed_data = []
for conv in raw_conversations:
queue_name = conv.get("queueName") or "Unknown"
channel = conv.get("channel") or "Unknown"
media_type = conv.get("mediaType") or "Unknown"
wrap_up_code = conv.get("wrapUpCode") or "None"
duration_ms = conv.get("duration", 0)
duration_sec = duration_ms / 1000.0 if duration_ms else 0.0
metrics = conv.get("metrics", {})
handled = metrics.get("handled", {}).get("value", 0)
answered = metrics.get("answered", {}).get("value", 0)
row = {
"conversationId": conv.get("conversationId"),
"queueName": queue_name,
"channel": channel,
"mediaType": media_type,
"wrapUpCode": wrap_up_code,
"durationSeconds": round(duration_sec, 2),
"handled": handled,
"answered": answered,
"startTime": conv.get("startTime"),
"endTime": conv.get("endTime")
}
processed_data.append(row)
return processed_data
def generate_csv_bytes(data: List[Dict[str, Any]]) -> bytes:
"""Converts a list of dictionaries to CSV bytes."""
if not data:
return b""
output = io.StringIO()
fieldnames = list(data[0].keys())
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return output.getvalue().encode("utf-8")
def upload_to_s3(bucket_name: str, key: str, data: bytes) -> bool:
"""Uploads data to an S3 bucket."""
s3_client = boto3.client('s3')
try:
s3_client.put_object(
Bucket=bucket_name,
Key=key,
Body=data,
ContentType="text/csv"
)
print(f"Successfully uploaded to s3://{bucket_name}/{key}")
return True
except ClientError as e:
print(f"Failed to upload to S3: {e}")
return False
def main():
"""Main execution function."""
# Determine date (default to yesterday)
if len(sys.argv) > 1:
date_str = sys.argv[1]
else:
date_str = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
print(f"Processing analytics for date: {date_str}")
# Step 1: Authenticate
try:
token = get_access_token()
except Exception as e:
print(f"Authentication failed: {e}")
sys.exit(1)
# Step 2: Fetch Data
try:
conversations = fetch_all_conversations(token, date_str)
except Exception as e:
print(f"Data fetch failed: {e}")
sys.exit(1)
if not conversations:
print("No conversations found for the specified date.")
sys.exit(0)
# Step 3: Process Data
processed_data = process_conversations(conversations)
csv_bytes = generate_csv_bytes(processed_data)
# Step 4: Upload to S3
s3_key = f"analytics/conversations/{date_str}.csv"
success = upload_to_s3(S3_BUCKET_NAME, s3_key, csv_bytes)
if success:
print("Job completed successfully.")
else:
print("Job completed with errors.")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or missing.
Fix: Verify that GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are set correctly. Ensure the client has the analytics:conversation:view scope. Check the token response for errors.
Error: 403 Forbidden
Cause: The OAuth client lacks the necessary permissions or the user does not have access to the analytics data.
Fix: In the Genesys Cloud admin portal, check the OAuth Client settings. Ensure the analytics:conversation:view scope is enabled. Verify that the user associated with the client has “View Analytics” permissions in the role.
Error: 429 Too Many Requests
Cause: You have exceeded the Genesys Cloud API rate limit.
Fix: Implement exponential backoff. The code above includes a basic 1-second sleep on 429 errors. For production, increase the delay and use a jitter strategy.
Error: botocore.exceptions.ClientError: Access Denied
Cause: The AWS IAM user does not have permission to write to the S3 bucket.
Fix: Attach the AmazonS3FullAccess policy or a custom policy with s3:PutObject to the IAM user. Ensure the bucket name in S3_BUCKET_NAME is correct and exists.
Error: KeyError: ‘entities’
Cause: The API response structure changed or the query returned no data.
Fix: Add a check for "entities" in the response data before accessing it. The code above includes this check. If the query returns no data, the script will exit gracefully.