Exporting Genesys Cloud Conversation Analytics to S3 with Python

Exporting Genesys Cloud Conversation Analytics to S3 with Python

What You Will Build

  • A Python script that queries the Genesys Cloud CX Analytics API for daily conversation metrics and uploads the resulting JSON data to an Amazon S3 bucket.
  • The solution uses the Genesys Cloud Python SDK for authentication and data retrieval, and boto3 for S3 storage operations.
  • This tutorial covers Python 3.9+ using the genesyscloud_python SDK and boto3.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth 2.0 client (Service Account or Client Credentials) with the following scopes:
    • analytics:conversation:read
    • analytics:report:read
  • AWS Credentials: An IAM user or role with s3:PutObject permissions on the target bucket.
  • SDK Versions:
    • genesyscloud_python >= 135.0.0
    • boto3 >= 1.28.0
  • Runtime: Python 3.9 or higher.
  • External Dependencies: Install via pip:
    pip install genesyscloud_python boto3
    

Authentication Setup

Genesys Cloud uses OAuth 2.0. For server-to-server integrations like this export job, the Client Credentials Grant flow is standard. The Python SDK handles token acquisition and refresh automatically if you configure the PureCloudPlatformClientV2 correctly.

You must set environment variables for your credentials to avoid hardcoding secrets.

import os

# Required Environment Variables
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_REGION = os.getenv("GENESYS_REGION", "us-east-1") # e.g., mypurecloud.com

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

Initialize the Genesys Cloud platform client. The SDK will cache the access token and refresh it before expiration.

from purecloudplatform.client import PureCloudPlatformClientV2

def get_genesys_client() -> PureCloudPlatformClientV2:
    """
    Initializes the Genesys Cloud API client with Client Credentials.
    """
    if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
        raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    client = PureCloudPlatformClientV2()
    client.set_environment(GENESYS_REGION)
    
    # The SDK handles token caching automatically
    return client

Implementation

Step 1: Query Conversation Analytics Data

The core of this job is retrieving conversation data. We will use the POST /api/v2/analytics/conversations/details/query endpoint. This endpoint allows for complex filtering by time range and view type.

Required Scope: analytics:conversation:read

We need to define a query body. For a daily export, we typically want a “Summary” view to get aggregated metrics rather than individual conversation transcripts, which would be massive.

from purecloudplatform.client.rest import ApiException
from purecloudplatform.client.models import ConversationQuery
from datetime import datetime, timedelta
import json

def fetch_daily_analytics(client: PureCloudPlatformClientV2, export_date: datetime) -> dict:
    """
    Fetches aggregated conversation analytics for a specific date.
    
    Args:
        client: The initialized Genesys Cloud client.
        export_date: The datetime for the start of the day to export.
        
    Returns:
        A dictionary containing the analytics response data.
    """
    # Define the time range: Start of the day to start of the next day
    start_time = export_date.replace(hour=0, minute=0, second=0, microsecond=0)
    end_time = start_time + timedelta(days=1)
    
    # Format ISO 8601 strings
    start_iso = start_time.isoformat() + "Z"
    end_iso = end_time.isoformat() + "Z"

    # Define the query body
    query_body = {
        "view": "summary",
        "interval": "P1D", # 1 Day interval
        "dateFrom": start_iso,
        "dateTo": end_iso,
        "metrics": [
            "conversationCount",
            "abandonedCount",
            "handleTime",
            "waitTime",
            "resolvedCount"
        ]
    }

    try:
        # API Call: POST /api/v2/analytics/conversations/details/query
        response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        # The SDK returns a purecloudplatform.client.models object.
        # We convert it to a dictionary for easier serialization.
        return response.to_dict()
        
    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}")
        raise

Expected Response Structure:
The response contains a partitions array. For a summary view, this array contains the aggregated metrics for the requested interval.

{
  "partitionName": "summary",
  "partitions": [
    {
      "partitionName": "summary",
      "groups": [
        {
          "key": "",
          "metrics": {
            "conversationCount": {
              "value": 1250,
              "unit": "count"
            },
            "abandonedCount": {
              "value": 12,
              "unit": "count"
            },
            "handleTime": {
              "value": 45000000,
              "unit": "millisecond"
            }
          }
        }
      ]
    }
  ]
}

Step 2: Handle Pagination and Errors

The post_analytics_conversations_details/query endpoint supports pagination via the pageSize and cursor parameters. While a daily summary usually fits in one page, transactional views or high-volume queues may require pagination.

Additionally, we must handle 429 Too Many Requests. Genesys Cloud enforces rate limits. A production job must implement exponential backoff.

import time
import random

def fetch_all_pages(client: PureCloudPlatformClientV2, query_body: dict) -> list:
    """
    Fetches all pages of analytics data with retry logic for 429 errors.
    """
    all_partitions = []
    cursor = None
    max_retries = 5
    base_delay = 1.0

    while True:
        # Add pagination parameters if not on the first page
        if cursor:
            query_body["cursor"] = cursor
        
        # Add page size if not specified
        if "pageSize" not in query_body:
            query_body["pageSize"] = 500

        retries = 0
        while retries < max_retries:
            try:
                response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
                data = response.to_dict()
                
                # Collect partitions
                if "partitions" in data and data["partitions"]:
                    all_partitions.extend(data["partitions"])
                
                # Check for next page
                cursor = data.get("nextPageCursor")
                if not cursor:
                    return all_partitions
                
                # Break retry loop to proceed to next page
                break
                
            except ApiException as e:
                if e.status == 429:
                    # Exponential backoff with jitter
                    delay = base_delay * (2 ** retries) + random.uniform(0, 1)
                    print(f"Rate limited (429). Retrying in {delay:.2f} seconds...")
                    time.sleep(delay)
                    retries += 1
                else:
                    # Non-429 error, raise immediately
                    raise

        if retries == max_retries:
            raise Exception("Max retries exceeded for 429 Too Many Requests")

    return all_partitions

Step 3: Process and Serialize Results

Raw API responses are nested. For S3 storage, we want a clean JSON structure. We will flatten the metrics slightly and ensure the output is valid JSON.

def process_analytics_data(raw_partitions: list) -> dict:
    """
    Cleans and structures the raw API response for storage.
    """
    # We expect a list of partition objects. 
    # Usually, for 'summary' view, we just want the metrics from the first group.
    
    if not raw_partitions:
        return {"status": "empty", "message": "No data found for the requested period."}

    # Take the first partition (summary)
    summary_partition = raw_partitions[0]
    
    # Extract metrics from the first group
    groups = summary_partition.get("groups", [])
    if not groups:
        return {"status": "empty", "message": "No groups in partition."}
    
    metrics_group = groups[0]
    metrics = metrics_group.get("metrics", {})
    
    # Flatten metrics for easier reading in downstream systems
    flat_metrics = {}
    for metric_name, metric_data in metrics.items():
        if metric_data:
            flat_metrics[metric_name] = metric_data.get("value")
            
    return {
        "export_timestamp": datetime.utcnow().isoformat(),
        "data": flat_metrics
    }

Step 4: Upload to Amazon S3

We use boto3 to upload the processed JSON. We will structure the S3 key to include the date for easy retrieval and lifecycle management.

Required AWS Permission: s3:PutObject

import boto3
import io

def upload_to_s3(data: dict, bucket: str, key: str) -> bool:
    """
    Uploads JSON data to an S3 bucket.
    
    Args:
        data: The dictionary to serialize and upload.
        bucket: The S3 bucket name.
        key: The S3 object key (path).
        
    Returns:
        True if successful, False otherwise.
    """
    s3_client = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION
    )
    
    try:
        # Serialize to JSON
        json_bytes = json.dumps(data, indent=2).encode('utf-8')
        
        # Create a file-like object from the bytes
        file_obj = io.BytesIO(json_bytes)
        
        # Upload
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=file_obj,
            ContentType='application/json',
            ServerSideEncryption='AES256'
        )
        
        print(f"Successfully uploaded to s3://{bucket}/{key}")
        return True
        
    except Exception as e:
        print(f"Failed to upload to S3: {e}")
        return False

Complete Working Example

This script combines all steps into a single executable module. It accepts a date from command line arguments or defaults to yesterday.

#!/usr/bin/env python3
"""
Genesys Cloud Daily Analytics Export to S3
------------------------------------------
Exports conversation summary metrics for a given date to Amazon S3.
"""

import os
import sys
import json
import io
import time
import random
from datetime import datetime, timedelta

import boto3
from purecloudplatform.client import PureCloudPlatformClientV2
from purecloudplatform.client.rest import ApiException

# --- Configuration ---
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_REGION = os.getenv("GENESYS_REGION", "mypurecloud.com")

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

# --- Helper Functions ---

def get_genesys_client() -> PureCloudPlatformClientV2:
    if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
        raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
    
    client = PureCloudPlatformClientV2()
    client.set_environment(GENESYS_REGION)
    return client

def fetch_all_pages(client: PureCloudPlatformClientV2, query_body: dict) -> list:
    all_partitions = []
    cursor = None
    max_retries = 5
    base_delay = 1.0

    while True:
        if cursor:
            query_body["cursor"] = cursor
        if "pageSize" not in query_body:
            query_body["pageSize"] = 500

        retries = 0
        while retries < max_retries:
            try:
                response = client.analytics_api.post_analytics_conversations_details_query(body=query_body)
                data = response.to_dict()
                
                if "partitions" in data and data["partitions"]:
                    all_partitions.extend(data["partitions"])
                
                cursor = data.get("nextPageCursor")
                if not cursor:
                    return all_partitions
                break
                
            except ApiException as e:
                if e.status == 429:
                    delay = base_delay * (2 ** retries) + random.uniform(0, 1)
                    print(f"Rate limited (429). Retrying in {delay:.2f} seconds...")
                    time.sleep(delay)
                    retries += 1
                else:
                    raise
        if retries == max_retries:
            raise Exception("Max retries exceeded for 429 Too Many Requests")
    return all_partitions

def process_analytics_data(raw_partitions: list) -> dict:
    if not raw_partitions:
        return {"status": "empty", "message": "No data found."}

    summary_partition = raw_partitions[0]
    groups = summary_partition.get("groups", [])
    if not groups:
        return {"status": "empty", "message": "No groups in partition."}
    
    metrics = groups[0].get("metrics", {})
    flat_metrics = {}
    for metric_name, metric_data in metrics.items():
        if metric_data:
            flat_metrics[metric_name] = metric_data.get("value")
            
    return {
        "export_timestamp": datetime.utcnow().isoformat(),
        "data": flat_metrics
    }

def upload_to_s3(data: dict, bucket: str, key: str) -> bool:
    s3_client = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION
    )
    
    try:
        json_bytes = json.dumps(data, indent=2).encode('utf-8')
        file_obj = io.BytesIO(json_bytes)
        
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=file_obj,
            ContentType='application/json',
            ServerSideEncryption='AES256'
        )
        return True
    except Exception as e:
        print(f"S3 Upload Error: {e}")
        return False

def main():
    # Determine export date (default: yesterday)
    if len(sys.argv) > 1:
        try:
            export_date = datetime.strptime(sys.argv[1], "%Y-%m-%d")
        except ValueError:
            print("Invalid date format. Use YYYY-MM-DD. Defaulting to yesterday.")
            export_date = datetime.utcnow() - timedelta(days=1)
    else:
        export_date = datetime.utcnow() - timedelta(days=1)

    print(f"Exporting analytics for date: {export_date.strftime('%Y-%m-%d')}")

    # 1. Initialize Client
    client = get_genesys_client()

    # 2. Build Query
    start_time = export_date.replace(hour=0, minute=0, second=0, microsecond=0)
    end_time = start_time + timedelta(days=1)
    
    query_body = {
        "view": "summary",
        "interval": "P1D",
        "dateFrom": start_time.isoformat() + "Z",
        "dateTo": end_time.isoformat() + "Z",
        "metrics": [
            "conversationCount",
            "abandonedCount",
            "handleTime",
            "waitTime",
            "resolvedCount",
            "wrapUpTime",
            "talkTime"
        ]
    }

    # 3. Fetch Data
    try:
        print("Fetching data from Genesys Cloud...")
        raw_data = fetch_all_pages(client, query_body)
        print(f"Retrieved {len(raw_data)} partition(s).")
        
        # 4. Process Data
        processed_data = process_analytics_data(raw_data)
        
        # 5. Upload to S3
        # Key format: analytics/daily/YYYY-MM-DD.json
        s3_key = f"analytics/daily/{export_date.strftime('%Y-%m-%d')}.json"
        
        if upload_to_s3(processed_data, S3_BUCKET_NAME, s3_key):
            print("Export job completed successfully.")
        else:
            print("Export job failed during S3 upload.")
            sys.exit(1)

    except Exception as e:
        print(f"Fatal error during export: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

Cause: Invalid Client ID, Client Secret, or expired token.
Fix: Verify your environment variables. Ensure the OAuth client is active in the Genesys Cloud Admin Console. Check that the client has the analytics:conversation:read scope assigned.

Error: 403 Forbidden

Cause: The OAuth client lacks the required scope, or the IAM user lacks S3 permissions.
Fix:

  1. In Genesys Cloud Admin, go to Setup > Administration > OAuth Clients. Edit your client and ensure analytics:conversation:read is checked.
  2. In AWS IAM, ensure the user/role has a policy allowing s3:PutObject on the specific bucket.

Error: 429 Too Many Requests

Cause: You exceeded the Genesys Cloud API rate limit.
Fix: The provided code includes exponential backoff. If you still hit this limit, reduce the frequency of your job or request a rate limit increase from Genesys Cloud Support. Do not increase the pageSize arbitrarily, as this can increase processing time on the server side and still count against rate limits.

Error: S3 ClientError: Access Denied

Cause: AWS credentials are invalid or the bucket policy denies access.
Fix: Check your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Ensure the region matches the bucket’s region.

Official References