Automating Genesys Cloud Analytics Exports to AWS S3 with Python
What You Will Build
- This tutorial builds a Python script that queries Genesys Cloud conversation analytics and uploads the resulting JSON data to an Amazon S3 bucket.
- This integration uses the Genesys Cloud REST API for analytics queries and the
boto3SDK for AWS S3 operations. - The code is written in Python 3.9+ and utilizes the
requestslibrary for HTTP interactions andboto3for cloud storage.
Prerequisites
Genesys Cloud Configuration
- OAuth 2.0 Client: You need a Genesys Cloud OAuth 2.0 Client ID and Secret.
- Required Scopes: The client must have the
analytics:report:readscope. For conversation details, you may also needanalytics:conversations:read. - Environment URL: Identify your Genesys Cloud environment URL (e.g.,
https://api.mypurecloud.com).
AWS Configuration
- AWS Account: An active AWS account with S3 enabled.
- IAM User: An IAM user with programmatic access and a policy allowing
s3:PutObjectto the target bucket. - Credentials: Access Key ID and Secret Access Key.
- Bucket: A pre-existing S3 bucket with a designated prefix/path for the exports.
Development Environment
- Python: Version 3.9 or higher.
- Dependencies: Install the required libraries using pip.
pip install requests boto3 python-dotenv - Environment Variables: Create a
.envfile in your project root to store secrets securely.GENESYS_CLIENT_ID=your_client_id GENESYS_CLIENT_SECRET=your_client_secret GENESYS_REGION=us-east-1 AWS_ACCESS_KEY_ID=your_aws_access_key AWS_SECRET_ACCESS_KEY=your_aws_secret_key AWS_DEFAULT_REGION=us-east-1 S3_BUCKET_NAME=your-analytics-bucket S3_PREFIX=genexports/daily
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server integrations. We will create a helper function to acquire and cache tokens. While this script runs daily, implementing a simple in-memory cache with a refresh mechanism is best practice to avoid hitting the token endpoint unnecessarily if the script runs multiple queries.
import os
import time
import requests
from dotenv import load_dotenv
load_dotenv()
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.region = os.getenv("GENESYS_REGION", "us-east-1")
self.token_url = f"https://api.{self.region}.mypurecloud.com/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_token(self) -> str:
# Check if token is still valid (buffer of 60 seconds)
if self.access_token and time.time() < self.token_expiry - 60:
return self.access_token
# Request new token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
print(f"OAuth Error: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network Error during OAuth: {e}")
raise
Implementation
Step 1: Query Genesys Cloud Analytics
We will use the Get Analytics Conversation Details Query endpoint. This endpoint allows us to define a complex query with date ranges, filters, and specific data points (columns) to retrieve.
Endpoint: POST /api/v2/analytics/conversations/details/query
Required Scope: analytics:conversations:read
The request body defines the query parameters. We will retrieve data for the previous day to ensure the data is finalized.
from datetime import datetime, timedelta
import json
class GenesysAnalyticsExporter:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = f"https://api.{auth.region}.mypurecloud.com"
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
def get_previous_day_dates(self) -> tuple:
"""
Returns the start and end timestamps for the previous day in ISO 8601 format.
"""
now = datetime.utcnow()
previous_day = now - timedelta(days=1)
# Start of previous day: 00:00:00
start_dt = previous_day.replace(hour=0, minute=0, second=0, microsecond=0)
# End of previous day: 23:59:59.999
end_dt = previous_day.replace(hour=23, minute=59, second=59, microsecond=999999)
return start_dt.isoformat() + "Z", end_dt.isoformat() + "Z"
def build_query_payload(self, start_time: str, end_time: str) -> dict:
"""
Constructs the query payload for the analytics API.
"""
return {
"dateFrom": start_time,
"dateTo": end_time,
"interval": "P1D", # Aggregate by day
"groupBy": ["conversation.mediaType"], # Group results by media type (voice, chat, etc.)
"view": "default",
"select": [
"conversation.id",
"conversation.mediaType",
"conversation.startTime",
"conversation.endTime",
"participant.type",
"participant.userId",
"participant.firstName",
"participant.lastName",
"conversation.queue.name",
"conversation.wrapupCode",
"conversation.totalHoldTime",
"conversation.totalTalkTime",
"conversation.totalWaitTime"
],
"where": [
{
"path": "conversation.mediaType",
"operator": "in",
"value": ["voice", "chat", "webchat", "email", "sms"]
}
],
"size": 1000 # Max page size
}
def fetch_analytics_data(self) -> list:
"""
Fetches all pages of analytics data for the previous day.
"""
start_time, end_time = self.get_previous_day_dates()
payload = self.build_query_payload(start_time, end_time)
all_records = []
next_page = None
# Header for Authorization
self.headers["Authorization"] = f"Bearer {self.auth.get_token()}"
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
try:
while True:
# If there is a next page token, we use a different endpoint for pagination
if next_page:
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query/next/{next_page}"
response = requests.get(endpoint, headers=self.headers, timeout=30)
else:
response = requests.post(endpoint, json=payload, headers=self.headers, timeout=30)
# Handle Rate Limiting (429)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
# Extract records
if "records" in data:
all_records.extend(data["records"])
# Check for pagination
next_page = data.get("nextPage")
if not next_page:
break
else:
# Small delay to be polite to the API
time.sleep(1)
print(f"Successfully fetched {len(all_records)} records.")
return all_records
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e.response.status_code}")
print(f"Response: {e.response.text}")
raise
except Exception as e:
print(f"Error fetching analytics: {e}")
raise
Step 2: Process and Format Data
Raw analytics data from Genesys Cloud can be nested and complex. For S3 storage, we want flat, consistent JSON lines or a structured JSON array. We will clean the data slightly to ensure it is serializable and remove unnecessary null fields if desired, though keeping the structure intact is often better for downstream analytics tools like Athena.
def clean_record(self, record: dict) -> dict:
"""
Optional: Clean or transform individual records.
Currently, this just ensures deep dictionaries are handled if needed.
"""
# Example: Flatten participant info if needed for simpler CSV export later
# For S3 JSON, we keep the structure as is for flexibility.
return record
def process_records(self, raw_records: list) -> list:
"""
Process raw records from Genesys Cloud.
"""
cleaned_records = [self.clean_record(r) for r in raw_records]
return cleaned_records
Step 3: Upload to AWS S3
We use boto3 to upload the processed data. We will save the file with a timestamp in the filename to ensure uniqueness and enable partitioning by date in S3.
import boto3
from botocore.exceptions import ClientError
class S3Uploader:
def __init__(self):
self.bucket_name = os.getenv("S3_BUCKET_NAME")
self.prefix = os.getenv("S3_PREFIX", "exports")
self.s3_client = boto3.client('s3')
def upload_json_data(self, data: list, filename: str) -> bool:
"""
Uploads a list of dictionaries as a JSON file to S3.
"""
# Create the S3 key (path)
# Format: exports/daily/YYYY-MM-DD/conversations.json
date_str = datetime.utcnow().strftime("%Y-%m-%d")
s3_key = f"{self.prefix}/{date_str}/{filename}"
# Convert data to JSON string
json_payload = json.dumps(data, indent=2, default=str)
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=json_payload,
ContentType='application/json'
)
print(f"Successfully uploaded to s3://{self.bucket_name}/{s3_key}")
return True
except ClientError as e:
print(f"Error uploading to S3: {e}")
raise
except Exception as e:
print(f"Unexpected error during S3 upload: {e}")
raise
Complete Working Example
Below is the full, consolidated script. Save this as genexys_s3_exporter.py. Ensure your .env file is in the same directory.
import os
import time
import json
import requests
import boto3
from datetime import datetime, timedelta
from dotenv import load_dotenv
from botocore.exceptions import ClientError
# Load environment variables
load_dotenv()
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.region = os.getenv("GENESYS_REGION", "us-east-1")
self.token_url = f"https://api.{self.region}.mypurecloud.com/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry - 60:
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
raise Exception(f"OAuth Error: {e.response.status_code} - {e.response.text}")
except requests.exceptions.RequestException as e:
raise Exception(f"Network Error during OAuth: {e}")
class GenesysAnalyticsExporter:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = f"https://api.{auth.region}.mypurecloud.com"
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
def get_previous_day_dates(self) -> tuple:
now = datetime.utcnow()
previous_day = now - timedelta(days=1)
start_dt = previous_day.replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = previous_day.replace(hour=23, minute=59, second=59, microsecond=999999)
return start_dt.isoformat() + "Z", end_dt.isoformat() + "Z"
def build_query_payload(self, start_time: str, end_time: str) -> dict:
return {
"dateFrom": start_time,
"dateTo": end_time,
"interval": "P1D",
"groupBy": ["conversation.mediaType"],
"view": "default",
"select": [
"conversation.id",
"conversation.mediaType",
"conversation.startTime",
"conversation.endTime",
"participant.type",
"participant.userId",
"participant.firstName",
"participant.lastName",
"conversation.queue.name",
"conversation.wrapupCode",
"conversation.totalHoldTime",
"conversation.totalTalkTime",
"conversation.totalWaitTime"
],
"where": [
{
"path": "conversation.mediaType",
"operator": "in",
"value": ["voice", "chat", "webchat", "email", "sms"]
}
],
"size": 1000
}
def fetch_analytics_data(self) -> list:
start_time, end_time = self.get_previous_day_dates()
payload = self.build_query_payload(start_time, end_time)
all_records = []
next_page = None
self.headers["Authorization"] = f"Bearer {self.auth.get_token()}"
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
try:
while True:
if next_page:
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query/next/{next_page}"
response = requests.get(endpoint, headers=self.headers, timeout=30)
else:
response = requests.post(endpoint, json=payload, headers=self.headers, timeout=30)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
if "records" in data:
all_records.extend(data["records"])
next_page = data.get("nextPage")
if not next_page:
break
time.sleep(1)
return all_records
except requests.exceptions.HTTPError as e:
raise Exception(f"HTTP Error: {e.response.status_code} - {e.response.text}")
class S3Uploader:
def __init__(self):
self.bucket_name = os.getenv("S3_BUCKET_NAME")
self.prefix = os.getenv("S3_PREFIX", "exports")
self.s3_client = boto3.client('s3')
def upload_json_data(self, data: list, filename: str) -> str:
date_str = datetime.utcnow().strftime("%Y-%m-%d")
s3_key = f"{self.prefix}/{date_str}/{filename}"
json_payload = json.dumps(data, indent=2, default=str)
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=json_payload,
ContentType='application/json'
)
return s3_key
except ClientError as e:
raise Exception(f"Error uploading to S3: {e}")
def main():
print("Starting Genesys Cloud Analytics Export Job...")
try:
# 1. Initialize Authentication
auth = GenesysAuth()
token = auth.get_token()
print("Authentication successful.")
# 2. Initialize Exporter and Fetch Data
exporter = GenesysAnalyticsExporter(auth)
records = exporter.fetch_analytics_data()
if not records:
print("No records found for the previous day.")
return
print(f"Fetched {len(records)} records.")
# 3. Initialize S3 Uploader and Upload
uploader = S3Uploader()
filename = f"conversations_{datetime.utcnow().strftime('%Y%m%d')}.json"
s3_key = uploader.upload_json_data(records, filename)
print(f"Job completed. Data stored at s3://{uploader.bucket_name}/{s3_key}")
except Exception as e:
print(f"Job failed: {e}")
# In a production environment, send this error to a monitoring service (e.g., PagerDuty, Slack)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.
Fix:
- Verify that
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETin your.envfile match the Genesys Cloud admin console. - Ensure the OAuth client has the
analytics:conversations:readscope assigned. - Check the
get_tokenmethod logs to see if the initial token request failed.
Error: 403 Forbidden
Cause: The OAuth client lacks the necessary permissions for the specific analytics query.
Fix:
- Navigate to Admin > Security > OAuth 2.0 Clients in Genesys Cloud.
- Select your client and verify the Scopes tab.
- Ensure
analytics:report:readoranalytics:conversations:readis checked. - If using a custom role, ensure the role assigned to the client has permission to view analytics data for the queues/media types queried.
Error: 429 Too Many Requests
Cause: You have exceeded the Genesys Cloud API rate limits.
Fix:
- The provided code includes a basic retry mechanism for 429 errors.
- If you are still hitting limits, increase the
time.sleepduration in the loop. - Consider batching your requests or using the Genesys Cloud Bulk API if available for larger datasets.
- Check the
Retry-Afterheader in the response to determine the exact wait time.
Error: S3 Access Denied
Cause: The AWS IAM user does not have permission to write to the specified bucket.
Fix:
- Verify the IAM policy attached to the user includes
s3:PutObject. - Ensure the bucket name in
S3_BUCKET_NAMEis correct and exists. - Check if the bucket has a block public access setting that might interfere (though this usually affects read access, not write from IAM users).
- Ensure the
AWS_DEFAULT_REGIONmatches the region where the S3 bucket is located.