Building a Daily Genesys Cloud Analytics Export Job to Amazon S3
What You Will Build
- A Python script that queries Genesys Cloud Conversation Details Analytics and streams the results directly to an Amazon S3 bucket.
- This implementation uses the Genesys Cloud Python SDK (
genesys-cloud-py-client) for API interaction andboto3for S3 operations. - The tutorial covers Python 3.8+ with asynchronous handling for large data sets and robust error management.
Prerequisites
- Genesys Cloud OAuth Client: You need a Service Account or Public/Private Key client type.
- Required OAuth Scopes:
analytics:conversation:details:read(for querying conversation details)analytics:conversation:summary:read(if you expand to summary data later)
- Genesys Cloud Python SDK: Version 138.0.0 or higher.
- Amazon Web Services (AWS) Account: With an active S3 bucket and IAM credentials (Access Key ID and Secret Access Key) that have
s3:PutObjectpermissions. - Python Runtime: Python 3.8 or newer.
- External Dependencies:
pip install genesys-cloud-py-client boto3 requests
Authentication Setup
Genesys Cloud uses OAuth 2.0. For server-side jobs like this, the Client Credentials Grant flow is the standard. This flow exchanges your client ID and secret for an access token without user interaction.
The Genesys Cloud Python SDK handles the token caching and refresh automatically if configured correctly. You must provide the environment (e.g., mypurecloud.com or usw2.pure.cloud) and the authentication details.
import os
from purecloud_platform_client import PlatformClient, PureCloudAuthFlow
def initialize_platform_client():
"""
Initializes the Genesys Cloud PlatformClient with OAuth credentials.
Uses environment variables for security.
"""
# Load credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment variables.")
# Create the platform client
platform_client = PlatformClient()
# Configure authentication
# The SDK handles token caching and automatic refresh
platform_client.set_oauth_client_credentials(
client_id,
client_secret,
PureCloudAuthFlow(client_credentials=environment)
)
return platform_client
# Initialize the client globally or in the main function
pc = initialize_platform_client()
Implementation
Step 1: Querying Genesys Cloud Conversation Details
The analytics/conversations/details/query endpoint is the primary source for granular conversation data. This endpoint supports complex filtering and returns paginated results. We will use the Python SDK’s AnalyticsApi to construct the query.
Key Parameters:
body: The query object containingdateFrom,dateTo,view, andfilters.async_req: Set toTrueto avoid blocking the main thread during large queries.limit: The maximum number of records per page (max 10,000 for details).
from purecloud_platform_client.rest import ApiException
from datetime import datetime, timedelta
import json
def get_conversation_details(pc, start_date: datetime, end_date: datetime, view_name: str = "default"):
"""
Retrieves conversation details from Genesys Cloud.
Args:
pc: The initialized PlatformClient.
start_date: Start of the analytics window.
end_date: End of the analytics window.
view_name: The name of the analytics view to use.
Returns:
A list of conversation detail records.
"""
analytics_api = pc.analytics_api
# Define the query body
# Note: dateFrom and dateTo must be ISO 8601 formatted strings
query_body = {
"dateFrom": start_date.isoformat() + "Z",
"dateTo": end_date.isoformat() + "Z",
"view": view_name,
"filter": [], # Add specific filters here if needed, e.g., {"dimension": "routingQueue.id", "operator": "is", "value": "queue-id"}
"groupings": [],
"interval": "P1D", # Daily interval
"includeZeroCount": False
}
all_records = []
next_page_token = None
page_count = 0
try:
while True:
page_count += 1
print(f"Fetching page {page_count}...")
# Execute the query
# The SDK handles pagination via the 'pageToken' parameter if provided
if next_page_token:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
page_token=next_page_token,
limit=10000
)
else:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
limit=10000
)
# Append records to the list
if response.entities:
all_records.extend(response.entities)
# Check for next page
if response.next_page_token:
next_page_token = response.next_page_token
else:
break
except ApiException as e:
print(f"Genesys Cloud API Error: {e.status} - {e.reason}")
# Handle specific status codes
if e.status == 429:
print("Rate limited. Consider implementing exponential backoff.")
elif e.status == 400:
print("Bad Request. Check your query parameters.")
raise
print(f"Total records fetched: {len(all_records)}")
return all_records
Step 2: Serializing and Streaming to S3
Writing large lists of objects directly to memory can cause MemoryError in Python. Instead, we will serialize the data to a JSON Lines (.jsonl) format and stream it to S3 in chunks. JSON Lines is preferred for analytics data because each line is a valid JSON object, making it easily parseable by downstream tools like AWS Athena or Glue.
We will use boto3 to upload the data. To optimize performance, we will write to a local temporary file first, then upload it using boto3’s upload_fileobj which handles multipart uploads automatically for large files.
import boto3
import os
import tempfile
import json
def upload_to_s3(records: list, bucket_name: str, file_key: str):
"""
Streams conversation records to an S3 bucket in JSON Lines format.
Args:
records: List of conversation detail objects.
bucket_name: The target S3 bucket name.
file_key: The S3 object key (path/filename).
"""
# Initialize S3 client
s3_client = boto3.client('s3')
# Check if bucket exists (optional, but good for debugging)
try:
s3_client.head_bucket(Bucket=bucket_name)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
raise ValueError(f"Bucket {bucket_name} does not exist.")
raise
# Create a temporary file to hold the JSON Lines data
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.jsonl') as tmp_file:
temp_path = tmp_file.name
# Write each record as a JSON line
for record in records:
# Convert the SDK object to a dictionary
# The SDK objects have an 'as_dict()' method or can be accessed via attributes
# Here we assume 'record' is a dict-like object from the SDK
# If it is a pure SDK object, use json.loads(json.dumps(record)) or custom serialization
# For simplicity, we assume the SDK returns dict-like structures or we convert them
try:
# Ensure the record is serializable
record_dict = record.as_dict() if hasattr(record, 'as_dict') else record
json_line = json.dumps(record_dict, default=str) + '\n'
tmp_file.write(json_line)
except Exception as e:
print(f"Error serializing record: {e}")
continue
# Upload the temporary file to S3
try:
print(f"Uploading {temp_path} to s3://{bucket_name}/{file_key}...")
s3_client.upload_file(temp_path, bucket_name, file_key)
print(f"Upload complete: s3://{bucket_name}/{file_key}")
except Exception as e:
print(f"S3 Upload Error: {e}")
raise
finally:
# Clean up the temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
Step 3: Orchestrating the Daily Job
This step combines the query and upload logic into a single executable function. It calculates the previous day’s date range to ensure we capture a complete day of data without overlapping with the current day.
from datetime import datetime, timedelta, timezone
def run_daily_export():
"""
Main function to orchestrate the daily analytics export.
"""
# Define the date range for the previous day
now = datetime.now(timezone.utc)
end_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=1)
# Configuration
BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "my-analytics-bucket")
VIEW_NAME = os.getenv("GENESYS_VIEW_NAME", "default")
# Generate the S3 key with date stamp
date_str = start_date.strftime("%Y-%m-%d")
file_key = f"analytics/conversations/details/{date_str}.jsonl"
print(f"Starting daily export for date range: {start_date.isoformat()} to {end_date.isoformat()}")
try:
# Step 1: Fetch data from Genesys Cloud
records = get_conversation_details(pc, start_date, end_date, VIEW_NAME)
if not records:
print("No records found for the specified date range.")
return
# Step 2: Upload to S3
upload_to_s3(records, BUCKET_NAME, file_key)
except Exception as e:
print(f"Job failed: {e}")
# Here you could add logic to send an alert (e.g., via Slack, PagerDuty, or Email)
raise
if __name__ == "__main__":
run_daily_export()
Complete Working Example
Below is the full, copy-pasteable script. Save this as genesys_s3_export.py.
Requirements:
- Install dependencies:
pip install genesys-cloud-py-client boto3 - Set the following environment variables:
GENESYS_CLIENT_IDGENESYS_CLIENT_SECRETGENESYS_ENVIRONMENT(default:mypurecloud.com)AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_DEFAULT_REGIONS3_BUCKET_NAME
import os
import json
import tempfile
import boto3
from datetime import datetime, timedelta, timezone
from purecloud_platform_client import PlatformClient, PureCloudAuthFlow
from purecloud_platform_client.rest import ApiException
def initialize_platform_client():
"""
Initializes the Genesys Cloud PlatformClient with OAuth credentials.
"""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
platform_client = PlatformClient()
platform_client.set_oauth_client_credentials(
client_id,
client_secret,
PureCloudAuthFlow(client_credentials=environment)
)
return platform_client
def get_conversation_details(pc, start_date: datetime, end_date: datetime, view_name: str):
"""
Retrieves conversation details from Genesys Cloud with pagination.
"""
analytics_api = pc.analytics_api
query_body = {
"dateFrom": start_date.isoformat() + "Z",
"dateTo": end_date.isoformat() + "Z",
"view": view_name,
"filter": [],
"groupings": [],
"interval": "P1D",
"includeZeroCount": False
}
all_records = []
next_page_token = None
page_count = 0
try:
while True:
page_count += 1
print(f"Fetching page {page_count}...")
if next_page_token:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
page_token=next_page_token,
limit=10000
)
else:
response = analytics_api.post_analytics_conversations_details_query(
body=query_body,
limit=10000
)
if response.entities:
all_records.extend(response.entities)
if response.next_page_token:
next_page_token = response.next_page_token
else:
break
except ApiException as e:
print(f"Genesys Cloud API Error: {e.status} - {e.reason}")
if e.status == 429:
print("Rate limited. Implement backoff logic.")
raise
print(f"Total records fetched: {len(all_records)}")
return all_records
def upload_to_s3(records: list, bucket_name: str, file_key: str):
"""
Streams conversation records to an S3 bucket in JSON Lines format.
"""
s3_client = boto3.client('s3')
# Verify bucket access
try:
s3_client.head_bucket(Bucket=bucket_name)
except Exception as e:
raise ValueError(f"Cannot access bucket {bucket_name}: {e}")
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.jsonl') as tmp_file:
temp_path = tmp_file.name
for record in records:
try:
# Convert SDK object to dict
record_dict = record.as_dict() if hasattr(record, 'as_dict') else record
json_line = json.dumps(record_dict, default=str) + '\n'
tmp_file.write(json_line)
except Exception as e:
print(f"Serialization error: {e}")
continue
try:
print(f"Uploading to s3://{bucket_name}/{file_key}...")
s3_client.upload_file(temp_path, bucket_name, file_key)
print("Upload complete.")
except Exception as e:
print(f"S3 Upload Error: {e}")
raise
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
def run_daily_export():
"""
Main orchestration function.
"""
pc = initialize_platform_client()
now = datetime.now(timezone.utc)
end_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=1)
BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "my-analytics-bucket")
VIEW_NAME = os.getenv("GENESYS_VIEW_NAME", "default")
date_str = start_date.strftime("%Y-%m-%d")
file_key = f"analytics/conversations/details/{date_str}.jsonl"
print(f"Exporting data for {start_date.date()} to {end_date.date()}")
try:
records = get_conversation_details(pc, start_date, end_date, VIEW_NAME)
if not records:
print("No records found.")
return
upload_to_s3(records, BUCKET_NAME, file_key)
except Exception as e:
print(f"Job failed: {e}")
raise
if __name__ == "__main__":
run_daily_export()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Invalid Client ID, Client Secret, or expired token.
- Fix: Verify that
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare correct. Ensure the client type is “Service Account” or “Public/Private Key”. The SDK should auto-refresh, but if the token is invalid, the initial grant will fail. Check the Genesys Cloud Admin Console under Organization > Security > OAuth Clients.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope
analytics:conversation:details:read. - Fix: Go to Organization > Security > OAuth Clients, select your client, and add the
analytics:conversation:details:readscope. Save and restart the script.
Error: 429 Too Many Requests
- Cause: Genesys Cloud rate limits analytics queries. The limit is typically based on the number of API calls per minute.
- Fix: Implement exponential backoff. In the
get_conversation_detailsfunction, catch theApiExceptionwith status 429, wait for a calculated delay (e.g.,2 ** attempt * 0.1seconds), and retry.
import time
# Inside the loop, replace the simple raise with:
except ApiException as e:
if e.status == 429:
wait_time = min(60, 2 ** page_count * 0.5) # Cap at 60 seconds
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
continue # Retry the same page
raise
Error: AttributeError: 'NoneType' object has no attribute 'as_dict'
- Cause: The SDK returns
Nonefor certain fields or the record structure is unexpected. - Fix: Ensure you are using the latest version of the
genesys-cloud-py-client. Check theresponse.entitiestype. Ifas_dict()is not available, usejson.dumps(record, default=str)directly, but be aware that nested objects might not serialize cleanly without a custom encoder.
Error: ClientError: An error occurred (403) when calling the HeadBucket operation
- Cause: AWS IAM permissions are insufficient.
- Fix: Ensure the IAM user or role has
s3:ListBucketands3:PutObjectpermissions for the target bucket.