Implementing a Daily Analytics Export Job for Genesys Cloud to S3
What You Will Build
- A Python script that queries Genesys Cloud analytics data for a specific date range and exports the results to an Amazon S3 bucket.
- This solution utilizes the Genesys Cloud Python SDK (
genesyscloud) and the AWS SDK for Python (boto3). - The tutorial covers Python 3.9+ with type hints, asynchronous API calls, and robust error handling for production environments.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth client with the following scopes:
analytics:conversation:read(required for querying conversation details)analytics:report:read(optional, if using report definitions)
- AWS Credentials: An IAM user or role with
s3:PutObjectpermissions on the target bucket. - SDK Versions:
genesyscloud>= 14.0.0boto3>= 1.28.0
- Python Runtime: Python 3.9 or later.
- Dependencies:
pip install genesyscloud boto3 pandas
Authentication Setup
Genesys Cloud uses OAuth 2.0 for API authentication. The Python SDK handles token management internally when configured with client credentials. For production jobs, avoid hardcoding secrets. Use environment variables or a secrets manager.
The following code initializes the Genesys Cloud API client. It assumes you have set the environment variables GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, and GENESYS_REGION.
import os
from purecloudplatformclientv2 import ApiClient, Configuration, PureCloudAuthStore
from purecloudplatformclientv2.rest import ApiException
def get_genesys_api_client() -> ApiClient:
"""
Initializes and returns an authenticated Genesys Cloud API Client.
"""
# Load credentials from environment
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Configure the client
configuration = Configuration(
region=region,
client_id=client_id,
client_secret=client_secret
)
# Create the API client
api_client = ApiClient(configuration=configuration)
# Authenticate the client (fetches access token)
api_client.auth_store.authenticate()
return api_client
Implementation
Step 1: Define the Analytics Query
The Genesys Cloud Analytics API uses a query-based model. You must construct a AnalyticsConversationDetailsQuery object. This object defines the date range, the metrics to retrieve, and the grouping dimensions.
For a daily export, we typically want a summary of conversations by hour or by skill. This example retrieves conversation details grouped by hour for a specific date.
Required Scope: analytics:conversation:read
from purecloudplatformclientv2 import (
AnalyticsConversationDetailsQuery,
AnalyticsConversationDetailsQueryDateRange,
AnalyticsConversationDetailsQueryGroupBy,
AnalyticsConversationDetailsQueryMetrics
)
from datetime import datetime, timedelta
from typing import List
def build_daily_query(target_date: datetime) -> AnalyticsConversationDetailsQuery:
"""
Constructs an analytics query for a single day.
Args:
target_date: The date for which to retrieve analytics.
Returns:
Configured AnalyticsConversationDetailsQuery object.
"""
# Define the date range: Start of day to end of day
start_dt = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = target_date.replace(hour=23, minute=59, second=59, microsecond=0)
date_range = AnalyticsConversationDetailsQueryDateRange(
start=start_dt.isoformat() + "Z",
end=end_dt.isoformat() + "Z"
)
# Define metrics to retrieve
metrics = AnalyticsConversationDetailsQueryMetrics(
total_handle_time=True,
talk_time=True,
wait_time=True,
after_call_work_time=True,
wrap_up_time=True,
total_conversations=True
)
# Define grouping dimensions
group_by = AnalyticsConversationDetailsQueryGroupBy(
hour=True
)
# Build the query object
query = AnalyticsConversationDetailsQuery(
date_range=date_range,
metrics=metrics,
group_by=group_by
)
return query
Step 2: Execute the Query and Handle Pagination
The get_analytics_conversations_details_query method returns a ConversationDetailsQueryResponse. This response contains a list of ConversationDetailsQueryResponseGroup objects. For large datasets, the API may paginate results, though the Python SDK often handles initial pagination for detail queries. However, for robustness, we will iterate through the groups and flatten the data into a list of dictionaries suitable for Pandas.
Error Handling: We catch ApiException to handle 4xx and 5xx errors. Specifically, we check for 429 (Too Many Requests) to implement retry logic if necessary.
from purecloudplatformclientv2 import AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
import logging
logger = logging.getLogger(__name__)
def fetch_analytics_data(api_client: ApiClient, query: AnalyticsConversationDetailsQuery) -> List[dict]:
"""
Executes the analytics query and returns flattened data.
Args:
api_client: Authenticated Genesys API Client.
query: The configured analytics query.
Returns:
A list of dictionaries representing the analytics data.
"""
analytics_api = AnalyticsApi(api_client)
data_rows = []
try:
# Execute the query
# Note: The SDK method is get_analytics_conversations_details_query
response = analytics_api.get_analytics_conversations_details_query(body=query)
# Check if response groups exist
if response.groups:
for group in response.groups:
# Each group contains intervals (hours in this case)
if group.intervals:
for interval in group.intervals:
row = {}
# Extract dimension values
if interval.hour:
row['hour'] = interval.hour
# Extract metrics
if interval.metrics:
row['total_handle_time'] = interval.metrics.total_handle_time
row['talk_time'] = interval.metrics.talk_time
row['wait_time'] = interval.metrics.wait_time
row['after_call_work_time'] = interval.metrics.after_call_work_time
row['wrap_up_time'] = interval.metrics.wrap_up_time
row['total_conversations'] = interval.metrics.total_conversations
data_rows.append(row)
logger.info(f"Retrieved {len(data_rows)} data rows.")
except ApiException as e:
logger.error(f"API Exception: {e.status} {e.reason}")
if e.status == 429:
logger.warning("Rate limited. Implement backoff strategy.")
elif e.status == 401 or e.status == 403:
logger.error("Authentication or Authorization failed. Check OAuth scopes.")
raise
return data_rows
Step 3: Process Results and Convert to CSV
To write to S3, we will convert the list of dictionaries into a Pandas DataFrame, then serialize it to a CSV string. This approach is memory-efficient and ensures consistent formatting.
import pandas as pd
import io
def convert_to_csv(data_rows: List[dict]) -> str:
"""
Converts a list of dictionaries to a CSV string.
Args:
data_rows: List of data dictionaries.
Returns:
CSV formatted string.
"""
if not data_rows:
raise ValueError("No data to convert to CSV.")
df = pd.DataFrame(data_rows)
# Convert to CSV string without index
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_content = csv_buffer.getvalue()
return csv_content
Step 4: Upload to Amazon S3
We use boto3 to upload the CSV content to S3. The filename will include the date to ensure unique keys for daily exports.
Required AWS Permissions: s3:PutObject
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
def upload_to_s3(csv_content: str, bucket_name: str, date_str: str) -> bool:
"""
Uploads CSV content to an S3 bucket.
Args:
csv_content: The CSV string to upload.
bucket_name: The target S3 bucket name.
date_str: The date string for the filename.
Returns:
True if upload successful, False otherwise.
"""
s3_client = boto3.client('s3')
# Define the S3 key (filename)
s3_key = f"analytics/conversations/{date_str}_conversation_details.csv"
try:
s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=csv_content.encode('utf-8'),
ContentType='text/csv'
)
logger.info(f"Successfully uploaded to s3://{bucket_name}/{s3_key}")
return True
except ClientError as e:
logger.error(f"Error uploading to S3: {e.response['Error']['Message']}")
return False
Complete Working Example
The following script combines all components into a single executable module. It accepts a date argument or defaults to yesterday.
#!/usr/bin/env python3
import os
import sys
import logging
from datetime import datetime, timedelta
from typing import List
# Third-party imports
import boto3
import pandas as pd
import io
from purecloudplatformclientv2 import (
ApiClient,
AnalyticsConversationDetailsQuery,
AnalyticsConversationDetailsQueryDateRange,
AnalyticsConversationDetailsQueryGroupBy,
AnalyticsConversationDetailsQueryMetrics,
AnalyticsApi
)
from purecloudplatformclientv2.rest import ApiException
from botocore.exceptions import ClientError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_genesys_api_client() -> ApiClient:
"""Initializes Genesys Cloud API Client."""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
from purecloudplatformclientv2 import Configuration
configuration = Configuration(
region=region,
client_id=client_id,
client_secret=client_secret
)
api_client = ApiClient(configuration=configuration)
api_client.auth_store.authenticate()
return api_client
def build_daily_query(target_date: datetime) -> AnalyticsConversationDetailsQuery:
"""Constructs an analytics query for a single day."""
start_dt = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = target_date.replace(hour=23, minute=59, second=59, microsecond=0)
date_range = AnalyticsConversationDetailsQueryDateRange(
start=start_dt.isoformat() + "Z",
end=end_dt.isoformat() + "Z"
)
metrics = AnalyticsConversationDetailsQueryMetrics(
total_handle_time=True,
talk_time=True,
wait_time=True,
after_call_work_time=True,
wrap_up_time=True,
total_conversations=True
)
group_by = AnalyticsConversationDetailsQueryGroupBy(
hour=True
)
query = AnalyticsConversationDetailsQuery(
date_range=date_range,
metrics=metrics,
group_by=group_by
)
return query
def fetch_analytics_data(api_client: ApiClient, query: AnalyticsConversationDetailsQuery) -> List[dict]:
"""Executes the analytics query and returns flattened data."""
analytics_api = AnalyticsApi(api_client)
data_rows = []
try:
response = analytics_api.get_analytics_conversations_details_query(body=query)
if response.groups:
for group in response.groups:
if group.intervals:
for interval in group.intervals:
row = {}
if interval.hour:
row['hour'] = interval.hour
if interval.metrics:
row['total_handle_time'] = interval.metrics.total_handle_time
row['talk_time'] = interval.metrics.talk_time
row['wait_time'] = interval.metrics.wait_time
row['after_call_work_time'] = interval.metrics.after_call_work_time
row['wrap_up_time'] = interval.metrics.wrap_up_time
row['total_conversations'] = interval.metrics.total_conversations
data_rows.append(row)
logger.info(f"Retrieved {len(data_rows)} data rows.")
except ApiException as e:
logger.error(f"API Exception: {e.status} {e.reason}")
raise
return data_rows
def convert_to_csv(data_rows: List[dict]) -> str:
"""Converts a list of dictionaries to a CSV string."""
if not data_rows:
raise ValueError("No data to convert to CSV.")
df = pd.DataFrame(data_rows)
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
return csv_buffer.getvalue()
def upload_to_s3(csv_content: str, bucket_name: str, date_str: str) -> bool:
"""Uploads CSV content to an S3 bucket."""
s3_client = boto3.client('s3')
s3_key = f"analytics/conversations/{date_str}_conversation_details.csv"
try:
s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=csv_content.encode('utf-8'),
ContentType='text/csv'
)
logger.info(f"Successfully uploaded to s3://{bucket_name}/{s3_key}")
return True
except ClientError as e:
logger.error(f"Error uploading to S3: {e.response['Error']['Message']}")
return False
def main(target_date_str: str = None):
"""Main execution function."""
# Determine target date
if target_date_str:
target_date = datetime.strptime(target_date_str, "%Y-%m-%d")
else:
target_date = datetime.utcnow() - timedelta(days=1)
date_str = target_date.strftime("%Y-%m-%d")
bucket_name = os.getenv("AWS_S3_BUCKET_NAME")
if not bucket_name:
raise ValueError("AWS_S3_BUCKET_NAME must be set.")
logger.info(f"Starting analytics export for {date_str}")
try:
# 1. Authenticate
api_client = get_genesys_api_client()
# 2. Build Query
query = build_daily_query(target_date)
# 3. Fetch Data
data_rows = fetch_analytics_data(api_client, query)
if not data_rows:
logger.warning("No data retrieved for this date.")
return
# 4. Convert to CSV
csv_content = convert_to_csv(data_rows)
# 5. Upload to S3
success = upload_to_s3(csv_content, bucket_name, date_str)
if success:
logger.info("Export job completed successfully.")
else:
logger.error("Export job failed during upload.")
sys.exit(1)
except Exception as e:
logger.exception(f"Job failed with error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
# Allow passing date as command line argument
date_arg = sys.argv[1] if len(sys.argv) > 1 else None
main(date_arg)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Invalid OAuth client ID or secret, or expired token.
- Fix: Verify
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETin your environment. Ensure the client is active in Genesys Cloud Admin. - Code Check: Ensure
api_client.auth_store.authenticate()is called before any API request.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope
analytics:conversation:read. - Fix: Go to Genesys Cloud Admin > Security > OAuth Clients. Edit your client and add the
analytics:conversation:readscope. Save and regenerate credentials if necessary.
Error: 429 Too Many Requests
- Cause: Exceeding Genesys Cloud API rate limits.
- Fix: Implement exponential backoff. The example above logs the warning. In production, wrap the API call in a retry loop with increasing delays.
import time
def fetch_with_retry(api_client, query, max_retries=3):
for attempt in range(max_retries):
try:
return fetch_analytics_data(api_client, query)
except ApiException as e:
if e.status == 429:
wait_time = 2 ** attempt
logger.warning(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error: 500 Internal Server Error
- Cause: Temporary Genesys Cloud service issue.
- Fix: Retry the request after a short delay. If persistent, contact Genesys Cloud Support.