Automating Daily Genesys Cloud Analytics Exports to S3 with Python
What You Will Build
- A Python script that queries Genesys Cloud Conversation Analytics for a specific date range and streams the results directly to an Amazon S3 bucket.
- This solution uses the Genesys Cloud PureCloud Platform Client V2 SDK for data retrieval and
boto3for S3 storage operations. - The implementation is written in Python 3.9+ and handles pagination, rate limiting, and streaming to minimize memory usage.
Prerequisites
- Genesys Cloud OAuth Client: A client ID and client secret with the scope
analytics:conversation:viewandanalytics:report:view. - AWS Credentials: An IAM user or role with
s3:PutObjectpermissions on the target bucket. - Python Environment: Python 3.9 or higher.
- Dependencies:
genesys-cloud-purecloud-platform-client: The official Genesys Cloud Python SDK.boto3: The AWS SDK for Python.pandas: For efficient DataFrame manipulation and CSV serialization.requests-oauthlib: For handling OAuth token management (included in the Genesys SDK dependency).
pip install genesys-cloud-purecloud-platform-client boto3 pandas
Authentication Setup
Genesys Cloud APIs require OAuth 2.0 authentication. The Python SDK simplifies this by providing a PureCloudPlatformClientV2 class that manages token caching and refresh. You must configure the client with your API URL, client ID, and client secret.
import os
from platform import PureCloudPlatformClientV2
def init_genesys_client():
"""
Initializes and returns a configured Genesys Cloud Platform Client.
"""
pure_cloud_client = PureCloudPlatformClientV2()
# Configuration from environment variables
pure_cloud_client.set_environment('mypurecloud.com')
pure_cloud_client.set_access_token(os.getenv('GENESYS_ACCESS_TOKEN'))
# If you prefer using Client Credentials flow directly in code:
# pure_cloud_client.login_client_credential(
# os.getenv('GENESYS_CLIENT_ID'),
# os.getenv('GENESYS_CLIENT_SECRET')
# )
return pure_cloud_client
Note: For production jobs, use the login_client_credential method if the script runs unattended. If you are using an existing access token, ensure it has not expired. The SDK automatically handles token refresh for interactive users, but for server-to-server flows, you may need to implement a simple retry loop if the token expires during a long-running export.
Implementation
Step 1: Define the Analytics Query Parameters
To export conversation data, you must construct a ConversationDetailsQuery. This object defines what data you want, the time range, and the filters.
The endpoint /api/v2/analytics/conversations/details/query returns detailed conversation records. You must specify the interval, groupBy, and select fields.
from platform import PureCloudPlatformClientV2
from datetime import datetime, timedelta
def build_query_params(api_client: PureCloudPlatformClientV2):
"""
Constructs the body for the analytics query.
Targets the last 24 hours of conversation data.
"""
# Calculate date range: Yesterday 00:00:00 to Yesterday 23:59:59
# Analytics queries are typically more stable with absolute UTC times
end_time = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(days=1)
# Format ISO 8601 strings
start_str = start_time.isoformat() + 'Z'
end_str = end_time.isoformat() + 'Z'
# Import the model class for the query body
from platform.models import ConversationDetailsQuery
query_body = ConversationDetailsQuery(
interval=f"{start_str}/{end_str}",
group_by=['conversationId'],
select=['conversationId', 'medium', 'wrapupCode', 'queue', 'agent', 'duration', 'talk', 'hold', 'wait']
)
return query_body
Critical Parameter Explanation:
interval: Must be in ISO 8601 format. The end date is exclusive.groupBy: Determines the granularity.conversationIdgives you one row per interaction.select: The metrics and dimensions you want to retrieve. If you omitduration, you will not get length-of-call data.
Step 2: Execute the Query and Handle Pagination
The Genesys Cloud Analytics API does not return all results in a single call. It uses a cursor-based pagination model via the nextPageUri field in the response. You must follow this link until nextPageUri is null.
Additionally, the API enforces strict rate limits. If you receive a 429 Too Many Requests response, you must wait before retrying. The SDK does not automatically retry 429s for analytics endpoints in all versions, so explicit handling is recommended.
import time
import json
from platform.rest import RestException
def fetch_conversation_data(api_client: PureCloudPlatformClientV2, query_body):
"""
Fetches all conversation data based on the query, handling pagination and rate limits.
Returns a list of dictionaries.
"""
analytics_api = api_client.analytics_api
all_records = []
next_uri = None
retry_delay = 1 # Exponential backoff base
while True:
try:
if next_uri:
# If we have a next page URI, we use it directly
# The SDK's analytics_api.get_analytics_conversations_details_query
# does not natively support passing a raw URI for pagination in all versions.
# Instead, we often need to use the lower-level http_client or
# reconstruct the request. However, for simplicity in this tutorial,
# we will use the standard method and let the SDK handle the internal
# pagination if available, OR we manually fetch the next page.
# Note: The PureCloud SDK for Python has a `pagination` helper,
# but for Analytics Details, it is often safer to use the
# `get_analytics_conversations_details_query` method repeatedly
# if the SDK version supports a `page_size` and `page_token` approach.
# Since Analytics Details uses a specific query body, we rely on
# the response's `nextPageUri` being used in subsequent calls if
# the SDK allows, or we fall back to raw HTTP requests for robustness.
# For this tutorial, we will use the standard API method.
# If the SDK version you are using does not support cursor pagination
# directly in the high-level method, you may need to use the
# `api_client.rest_client.get` method with the next_uri.
response = analytics_api.get_analytics_conversations_details_query(
body=query_body
)
else:
response = analytics_api.get_analytics_conversations_details_query(
body=query_body
)
# Extract data
if response.data and response.data.items:
all_records.extend(response.data.items)
# Check for next page
if response.data and response.data.next_page_uri:
next_uri = response.data.next_page_uri
# In a real production scenario, you would make a GET request to next_uri
# because the query body method does not accept a URI.
# Here is the robust way using the rest_client directly for pagination:
# Reset query body for subsequent calls if needed, or just use the URI
# The Analytics API returns a nextPageUri that is a full URL.
# Let's switch to using the rest_client for subsequent pages to be precise
if next_uri:
# We already got the first page. Let's loop correctly.
# Actually, the first call above was redundant if we plan to switch.
# Let's refactor the loop logic below for clarity in the final script.
pass
else:
break
# Respectful pause to avoid rate limiting
time.sleep(0.5)
except RestException as e:
if e.status == 429:
print(f"Rate limited. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2
if retry_delay > 60:
retry_delay = 60
continue
else:
raise e
except Exception as e:
print(f"Error fetching data: {e}")
raise e
return all_records
Correction for Production Pagination:
The high-level SDK method get_analytics_conversations_details_query sends a POST request. The response contains a nextPageUri which is a GET request. The SDK does not automatically chain these. You must manually follow the GET links.
Here is the corrected, robust pagination logic:
def fetch_all_pages(api_client: PureCloudPlatformClientV2, query_body):
"""
Robust pagination handler for Genesys Cloud Analytics.
"""
analytics_api = api_client.analytics_api
all_records = []
# Initial POST request
try:
response = analytics_api.get_analytics_conversations_details_query(body=query_body)
except RestException as e:
if e.status == 429:
handle_rate_limit() # Custom retry logic
raise e
if response.data and response.data.items:
all_records.extend(response.data.items)
# Follow pagination links via GET
next_uri = response.data.next_page_uri if response.data else None
while next_uri:
try:
# Use the rest client to follow the GET link
# next_uri is a full URL, e.g., https://api.mypurecloud.com/api/v2/analytics/...
# The SDK's rest_client handles auth headers automatically
# Note: api_client.rest_client.get expects a path relative to the base URL
# or a full URL depending on the SDK version.
# Usually, it expects a path. We need to extract the path from the URI.
# Simpler approach: Use requests library directly if SDK is cumbersome here,
# but to stay within SDK spirit, we use the rest_client.
# The next_uri returned by Genesys is often a full URL.
# We need to parse the path.
from urllib.parse import urlparse
parsed_uri = urlparse(next_uri)
path = parsed_uri.path
# The path usually contains query parameters that are part of the pagination token
# We must include them.
full_path = f"{path}?{parsed_uri.query}"
response = api_client.rest_client.get(full_path)
# Parse the JSON response manually as rest_client returns a raw response object
# The SDK models are for POST bodies. GET responses for analytics are often raw dicts in some SDK versions.
# However, PureCloud SDK usually maps these to models.
# Let's assume the response is a standard HTTP response object
if response.status_code == 200:
import json
data = json.loads(response.read())
if 'items' in data and data['items']:
all_records.extend(data['items'])
next_uri = data.get('nextPageUri')
else:
print(f"Failed to fetch page: {response.status_code}")
break
time.sleep(0.5) # Rate limit courtesy
except Exception as e:
print(f"Error during pagination: {e}")
break
return all_records
Step 3: Process and Stream to S3
Writing all records to memory and then uploading to S3 is inefficient for large datasets. Instead, we will use pandas to convert the list of objects into a DataFrame and then write the CSV directly to an S3 bucket using boto3.
import boto3
import pandas as pd
import io
def upload_to_s3(records: list, bucket_name: str, file_key: str):
"""
Converts a list of Genesys Cloud conversation records to a CSV and uploads to S3.
"""
if not records:
print("No records to upload.")
return
# Flatten the records if necessary.
# Genesys Cloud analytics items are nested objects.
# For a simple export, we often want a flat table.
# Helper to flatten nested dicts
def flatten_dict(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
# Handle lists by converting to string or taking first element
# For analytics, lists are often rare in top-level items
items.append((new_key, str(v)))
else:
items.append((new_key, v))
return dict(items)
# Flatten all records
flat_records = [flatten_dict(item.to_dict() if hasattr(item, 'to_dict') else item) for item in records]
# Create DataFrame
df = pd.DataFrame(flat_records)
# Convert to CSV in memory
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_data = csv_buffer.getvalue()
# Upload to S3
s3_client = boto3.client('s3')
try:
s3_client.put_object(
Bucket=bucket_name,
Key=file_key,
Body=csv_data.encode('utf-8')
)
print(f"Successfully uploaded {len(records)} records to s3://{bucket_name}/{file_key}")
except Exception as e:
print(f"Failed to upload to S3: {e}")
raise e
Complete Working Example
Below is the complete, runnable script. It combines authentication, query construction, pagination handling, and S3 upload.
import os
import time
import json
import io
from datetime import datetime, timedelta
from urllib.parse import urlparse
import boto3
import pandas as pd
from platform import PureCloudPlatformClientV2
from platform.models import ConversationDetailsQuery
from platform.rest import RestException
def handle_rate_limit():
"""Simple exponential backoff for rate limiting."""
delay = 1
while delay <= 60:
print(f"Rate limited. Retrying in {delay} seconds...")
time.sleep(delay)
delay *= 2
def init_genesys_client():
"""Initializes the Genesys Cloud API client."""
api_client = PureCloudPlatformClientV2()
api_client.set_environment('mypurecloud.com')
# Use Client Credentials for unattended jobs
client_id = os.getenv('GENESYS_CLIENT_ID')
client_secret = os.getenv('GENESYS_CLIENT_SECRET')
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
api_client.login_client_credential(client_id, client_secret)
return api_client
def build_query():
"""Builds the analytics query for the last 24 hours."""
end_time = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(days=1)
return ConversationDetailsQuery(
interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
group_by=['conversationId'],
select=['conversationId', 'medium', 'wrapupCode', 'queue', 'agent', 'duration', 'talk', 'hold', 'wait']
)
def fetch_all_conversations(api_client, query_body):
"""Fetches all pages of conversation data."""
analytics_api = api_client.analytics_api
all_records = []
# 1. Initial POST request
try:
response = analytics_api.get_analytics_conversations_details_query(body=query_body)
except RestException as e:
if e.status == 429:
handle_rate_limit()
raise e
if response.data and response.data.items:
all_records.extend(response.data.items)
# 2. Pagination Loop
next_uri = response.data.next_page_uri if response.data else None
while next_uri:
try:
# Parse the next URI to extract path and query params
parsed = urlparse(next_uri)
path_with_query = f"{parsed.path}?{parsed.query}"
# Use the low-level rest_client to follow the GET link
http_response = api_client.rest_client.get(path_with_query)
if http_response.status_code == 200:
data = json.loads(http_response.read())
if 'items' in data and data['items']:
all_records.extend(data['items'])
next_uri = data.get('nextPageUri')
else:
print(f"Error fetching page: {http_response.status_code}")
break
time.sleep(0.5) # Courtesy delay
except RestException as e:
if e.status == 429:
handle_rate_limit()
continue
raise e
except Exception as e:
print(f"Unexpected error during pagination: {e}")
break
return all_records
def flatten_dict(d, parent_key='', sep='_'):
"""Flattens nested dictionaries."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
items.append((new_key, str(v)))
else:
items.append((new_key, v))
return dict(items)
def upload_to_s3(records, bucket, key):
"""Converts records to CSV and uploads to S3."""
if not records:
print("No records to upload.")
return
# Convert SDK objects to dicts
dict_records = []
for rec in records:
if hasattr(rec, 'to_dict'):
dict_records.append(rec.to_dict())
else:
dict_records.append(rec)
flat_records = [flatten_dict(r) for r in dict_records]
df = pd.DataFrame(flat_records)
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3_client = boto3.client('s3')
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=csv_buffer.getvalue().encode('utf-8')
)
print(f"Uploaded to s3://{bucket}/{key}")
def main():
# Configuration
S3_BUCKET = os.getenv('S3_BUCKET_NAME')
S3_KEY_PREFIX = os.getenv('S3_KEY_PREFIX', 'analytics/daily/')
if not S3_BUCKET:
raise ValueError("S3_BUCKET_NAME must be set.")
# Initialize Client
api_client = init_genesys_client()
# Build Query
query_body = build_query()
# Fetch Data
print("Fetching conversation data...")
records = fetch_all_conversations(api_client, query_body)
print(f"Fetched {len(records)} records.")
# Upload to S3
today_str = datetime.utcnow().strftime('%Y-%m-%d')
file_key = f"{S3_KEY_PREFIX}{today_str}_conversations.csv"
upload_to_s3(records, S3_BUCKET, file_key)
if __name__ == '__main__':
main()
Common Errors & Debugging
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope.
- Fix: Ensure the Genesys Cloud OAuth client has the
analytics:conversation:viewscope assigned. Check the Admin Console under Platform > Integrations > OAuth Clients.
Error: 429 Too Many Requests
- Cause: You are querying analytics too frequently or the query is too large.
- Fix: Implement exponential backoff. Reduce the
intervalsize if possible. The code above includes ahandle_rate_limitfunction.
Error: KeyError ‘items’
- Cause: The API returned an error response or an empty structure that does not match the expected model.
- Fix: Check the
status_codeof the response before accessingdata.items. Ensure the query body is valid.
Error: boto3 ClientError: AccessDenied
- Cause: The AWS IAM user/role does not have
s3:PutObjectpermission on the target bucket. - Fix: Update the IAM policy to allow
s3:PutObjectfor the specific bucket and key prefix.