Export Genesys Cloud Analytics to S3 Using Python and Boto3
What You Will Build
- A Python script that queries Genesys Cloud CX for daily conversation details, paginates through the results, and streams the JSON data directly to an Amazon S3 bucket.
- This implementation uses the Genesys Cloud
analytics/conversations/details/queryAPI endpoint and theboto3SDK for S3 operations. - The tutorial covers Python 3.9+ with
requestsfor HTTP handling andboto3for cloud storage.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:conversation:readis mandatory for retrieving conversation details. - SDK/API Version: Genesys Cloud API v2 (REST). No specific SDK wrapper is required; raw HTTP requests provide better control over streaming pagination.
- Language/Runtime: Python 3.9 or later.
- External Dependencies:
requests(for HTTP calls)boto3(for AWS S3 interaction)python-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations like data exports, the Client Credentials Grant is the standard flow. This flow exchanges your client ID and client secret for an access token.
You must store your credentials securely. Using environment variables is the recommended practice.
# .env file
GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your-client-id
GENESYS_CLOUD_CLIENT_SECRET=your-client-secret
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_REGION=us-east-1
S3_BUCKET_NAME=your-analytics-bucket
The following function handles the token retrieval. It returns the access token and the expiration timestamp. In a production job, you should check if the existing token is still valid before requesting a new one to avoid unnecessary API calls.
import os
import requests
import time
from datetime import datetime, timezone
def get_genesys_access_token() -> tuple[str, int]:
"""
Retrieves an OAuth2 access token from Genesys Cloud.
Returns:
tuple: (access_token, expires_in_seconds)
"""
region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
# Construct the login URL based on the region
if region == "us-east-1":
login_url = "https://login.mypurecloud.com"
elif region == "us-east-2":
login_url = "https://login.mypurecloud.us"
elif region == "eu-west-1":
login_url = "https://login.eu.mypurecloud.com"
else:
raise ValueError(f"Unsupported Genesys Cloud region: {region}")
token_url = f"{login_url}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "analytics:conversation:read"
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(token_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]
return access_token, expires_in
Implementation
Step 1: Define the Analytics Query and Pagination Logic
The analytics/conversations/details/query endpoint does not return all conversations in a single request. It returns a page of results and a nextPageLink if more data exists. You must follow this link until it is null.
The query body defines the time window and the data fields you want to retrieve. For a daily export, you typically query for a 24-hour period ending at the current time or midnight of the previous day.
import json
from typing import Generator
def build_query_body(start_time: str, end_time: str) -> dict:
"""
Constructs the query body for the analytics API.
Args:
start_time: ISO 8601 start timestamp (e.g., '2023-10-27T00:00:00Z')
end_time: ISO 8601 end timestamp (e.g., '2023-10-27T23:59:59Z')
Returns:
dict: The JSON payload for the POST request.
"""
return {
"dateRange": {
"from": start_time,
"to": end_time
},
"intervalType": "daily",
"aggregations": [],
"filters": {
"type": {
"type": "and",
"clauses": [
{
"type": "equals",
"path": "conversation.type",
"value": ["voice", "chat"] # Adjust types as needed
}
]
}
},
"groupBy": ["conversation.type", "queue.id"],
"include": ["conversation", "participants"],
"pageSize": 100 # Max page size is typically 1000, but 100 is safer for memory
}
The pagination loop handles the core logic. It sends the initial query, processes the results, and then iterates through subsequent pages using the nextPageLink provided in the response headers or body.
def fetch_conversations_generator(access_token: str, start_time: str, end_time: str) -> Generator[dict, None, None]:
"""
Generator that yields individual conversation objects from Genesys Cloud.
Args:
access_token: Valid OAuth2 access token.
start_time: Start of the query window.
end_time: End of the query window.
Yields:
dict: A single conversation object.
"""
region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
base_url = f"https://{region}.mypurecloud.com/api/v2"
query_body = build_query_body(start_time, end_time)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Initial request
url = f"{base_url}/analytics/conversations/details/query"
response = requests.post(url, json=query_body, headers=headers)
if response.status_code == 429:
# Handle Rate Limiting
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
response = requests.post(url, json=query_body, headers=headers)
response.raise_for_status()
data = response.json()
# Yield results from the first page
if "conversations" in data:
for conversation in data["conversations"]:
yield conversation
# Follow pagination links
next_page_link = data.get("nextPageLink")
while next_page_link:
time.sleep(0.5) # Polite delay to avoid hitting rate limits
# The nextPageLink is a full URL
response = requests.get(next_page_link, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
response = requests.get(next_page_link, headers=headers)
response.raise_for_status()
data = response.json()
if "conversations" in data:
for conversation in data["conversations"]:
yield conversation
next_page_link = data.get("nextPageLink")
Step 2: Stream Data to Amazon S3
Writing large datasets to memory and then uploading them can cause MemoryError exceptions. Instead, use a streaming approach. In Python, you can write to a file-like object in memory (using io.BytesIO) or stream directly. However, S3 multipart uploads are efficient for large files. For simplicity and robustness in this tutorial, we will write JSON lines to a BytesIO buffer and upload it. For very large datasets (millions of records), consider using boto3’s TransferManager or writing to temporary local files.
This step initializes the S3 client and prepares the upload function.
import boto3
import io
import json
def upload_to_s3(bucket_name: str, key: str, data_bytes: bytes) -> bool:
"""
Uploads bytes to an S3 bucket.
Args:
bucket_name: The name of the S3 bucket.
key: The object key (file path) in S3.
data_bytes: The binary data to upload.
Returns:
bool: True if successful, False otherwise.
"""
s3_client = boto3.client('s3', region_name=os.getenv("AWS_REGION"))
try:
s3_client.put_object(
Bucket=bucket_name,
Key=key,
Body=data_bytes,
ContentType='application/json'
)
print(f"Successfully uploaded to s3://{bucket_name}/{key}")
return True
except Exception as e:
print(f"Error uploading to S3: {e}")
return False
Step 3: Orchestrate the Export Job
Combine the authentication, fetching, and uploading logic into a single execution flow. This function calculates the date range for the previous day, fetches the data, converts it to JSON Lines format (which is efficient for analytics), and uploads it.
from datetime import datetime, timedelta, timezone
def run_daily_export() -> None:
"""
Main function to orchestrate the daily analytics export.
"""
# 1. Authenticate
print("Authenticating with Genesys Cloud...")
access_token, expires_in = get_genesys_access_token()
print(f"Token acquired. Expires in {expires_in} seconds.")
# 2. Define Date Range (Previous Day)
now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
start_time = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).isoformat().replace('+00:00', 'Z')
end_time = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).isoformat().replace('+00:00', 'Z')
print(f"Querying conversations from {start_time} to {end_time}")
# 3. Prepare S3 Destination
bucket_name = os.getenv("S3_BUCKET_NAME")
date_str = yesterday.strftime("%Y-%m-%d")
s3_key = f"analytics/conversations/{date_str}.jsonl"
# 4. Fetch and Stream to S3
# We use io.BytesIO to buffer the data in memory before uploading.
# For extremely large datasets, consider writing to a temp file instead.
buffer = io.BytesIO()
record_count = 0
try:
for conversation in fetch_conversations_generator(access_token, start_time, end_time):
# Convert each conversation dict to a JSON string
json_line = json.dumps(conversation, default=str) + "\n"
buffer.write(json_line.encode('utf-8'))
record_count += 1
# Optional: Flush to disk periodically if memory is a concern
if record_count % 1000 == 0:
print(f"Processed {record_count} records...")
except requests.exceptions.HTTPError as e:
print(f"HTTP Error during fetch: {e}")
# Log partial data if needed
if record_count > 0:
print(f"Partial data retrieved: {record_count} records.")
return
print(f"Total records fetched: {record_count}")
if record_count == 0:
print("No conversations found for the specified date range.")
return
# 5. Upload to S3
print(f"Uploading {record_count} records to S3...")
buffer.seek(0) # Reset buffer pointer to start
success = upload_to_s3(bucket_name, s3_key, buffer.read())
if success:
print("Export job completed successfully.")
else:
print("Export job failed during S3 upload.")
Complete Working Example
The following script combines all components into a single, runnable file. Save this as export_analytics.py.
import os
import requests
import boto3
import io
import json
import time
from datetime import datetime, timedelta, timezone
from typing import Generator
import sys
# Install dependencies via pip:
# pip install requests boto3 python-dotenv
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def get_genesys_access_token() -> tuple[str, int]:
"""
Retrieves an OAuth2 access token from Genesys Cloud.
"""
region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET must be set.")
if region == "us-east-1":
login_url = "https://login.mypurecloud.com"
elif region == "us-east-2":
login_url = "https://login.mypurecloud.us"
elif region == "eu-west-1":
login_url = "https://login.eu.mypurecloud.com"
else:
raise ValueError(f"Unsupported Genesys Cloud region: {region}")
token_url = f"{login_url}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "analytics:conversation:read"
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
try:
response = requests.post(token_url, data=payload, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to get access token: {e}")
token_data = response.json()
return token_data["access_token"], token_data["expires_in"]
def build_query_body(start_time: str, end_time: str) -> dict:
"""
Constructs the query body for the analytics API.
"""
return {
"dateRange": {
"from": start_time,
"to": end_time
},
"intervalType": "daily",
"aggregations": [],
"filters": {
"type": {
"type": "and",
"clauses": [
{
"type": "equals",
"path": "conversation.type",
"value": ["voice", "chat"]
}
]
}
},
"groupBy": ["conversation.type"],
"include": ["conversation", "participants"],
"pageSize": 100
}
def fetch_conversations_generator(access_token: str, start_time: str, end_time: str) -> Generator[dict, None, None]:
"""
Generator that yields individual conversation objects from Genesys Cloud.
"""
region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
base_url = f"https://{region}.mypurecloud.com/api/v2"
query_body = build_query_body(start_time, end_time)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
url = f"{base_url}/analytics/conversations/details/query"
# Initial request
response = requests.post(url, json=query_body, headers=headers)
# Handle Rate Limiting
while response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
response = requests.post(url, json=query_body, headers=headers)
response.raise_for_status()
data = response.json()
if "conversations" in data:
for conversation in data["conversations"]:
yield conversation
next_page_link = data.get("nextPageLink")
while next_page_link:
time.sleep(0.5) # Polite delay
response = requests.get(next_page_link, headers=headers)
while response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
response = requests.get(next_page_link, headers=headers)
response.raise_for_status()
data = response.json()
if "conversations" in data:
for conversation in data["conversations"]:
yield conversation
next_page_link = data.get("nextPageLink")
def upload_to_s3(bucket_name: str, key: str, data_bytes: bytes) -> bool:
"""
Uploads bytes to an S3 bucket.
"""
s3_client = boto3.client('s3', region_name=os.getenv("AWS_REGION"))
try:
s3_client.put_object(
Bucket=bucket_name,
Key=key,
Body=data_bytes,
ContentType='application/json'
)
return True
except Exception as e:
print(f"Error uploading to S3: {e}")
return False
def run_daily_export() -> None:
"""
Main function to orchestrate the daily analytics export.
"""
print("Starting Genesys Cloud Analytics Export...")
try:
access_token, expires_in = get_genesys_access_token()
print(f"Token acquired. Expires in {expires_in} seconds.")
except Exception as e:
print(f"Authentication failed: {e}")
sys.exit(1)
now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
start_time = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).isoformat().replace('+00:00', 'Z')
end_time = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).isoformat().replace('+00:00', 'Z')
print(f"Querying conversations from {start_time} to {end_time}")
bucket_name = os.getenv("S3_BUCKET_NAME")
if not bucket_name:
raise ValueError("S3_BUCKET_NAME environment variable is not set.")
date_str = yesterday.strftime("%Y-%m-%d")
s3_key = f"analytics/conversations/{date_str}.jsonl"
buffer = io.BytesIO()
record_count = 0
try:
for conversation in fetch_conversations_generator(access_token, start_time, end_time):
json_line = json.dumps(conversation, default=str) + "\n"
buffer.write(json_line.encode('utf-8'))
record_count += 1
if record_count % 1000 == 0:
print(f"Processed {record_count} records...")
except requests.exceptions.HTTPError as e:
print(f"HTTP Error during fetch: {e}")
if record_count > 0:
print(f"Partial data retrieved: {record_count} records.")
return
print(f"Total records fetched: {record_count}")
if record_count == 0:
print("No conversations found for the specified date range.")
return
print(f"Uploading {record_count} records to S3...")
buffer.seek(0)
success = upload_to_s3(bucket_name, s3_key, buffer.read())
if success:
print("Export job completed successfully.")
else:
print("Export job failed during S3 upload.")
if __name__ == "__main__":
run_daily_export()
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The OAuth token is expired, invalid, or missing.
- How to fix it: Ensure your client ID and secret are correct. Check that the token was retrieved successfully before making API calls. If the script runs for a long time, the token may expire mid-process. Implement token refresh logic if the job duration exceeds the token lifetime (typically 1 hour).
- Code Fix: Add a check for token expiration in the generator loop if the job runs longer than
expires_in.
Error: 403 Forbidden
- What causes it: The OAuth token does not have the required scope.
- How to fix it: Verify that the client credentials have the
analytics:conversation:readscope assigned in the Genesys Cloud Admin portal under Platform Apps. - Code Fix: Ensure the
scopeparameter inget_genesys_access_tokenincludesanalytics:conversation:read.
Error: 429 Too Many Requests
- What causes it: You have exceeded the API rate limit.
- How to fix it: Implement exponential backoff. The provided code includes a basic retry mechanism that sleeps for the duration specified in the
Retry-Afterheader. - Code Fix: The
fetch_conversations_generatorfunction already includes this logic. Ensure you do not remove thetime.sleepcalls.
Error: 500 Internal Server Error
- What causes it: A transient error on the Genesys Cloud side.
- How to fix it: Retry the request after a short delay.
- Code Fix: Wrap the
requests.postandrequests.getcalls in a retry decorator or loop for 5xx errors.