Export Genesys Cloud Analytics to S3 Using Python and Boto3

Export Genesys Cloud Analytics to S3 Using Python and Boto3

What You Will Build

  • A Python script that queries Genesys Cloud CX for daily conversation details, paginates through the results, and streams the JSON data directly to an Amazon S3 bucket.
  • This implementation uses the Genesys Cloud analytics/conversations/details/query API endpoint and the boto3 SDK for S3 operations.
  • The tutorial covers Python 3.9+ with requests for HTTP handling and boto3 for cloud storage.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant).
  • Required Scopes: analytics:conversation:read is mandatory for retrieving conversation details.
  • SDK/API Version: Genesys Cloud API v2 (REST). No specific SDK wrapper is required; raw HTTP requests provide better control over streaming pagination.
  • Language/Runtime: Python 3.9 or later.
  • External Dependencies:
    • requests (for HTTP calls)
    • boto3 (for AWS S3 interaction)
    • python-dotenv (for secure credential management)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations like data exports, the Client Credentials Grant is the standard flow. This flow exchanges your client ID and client secret for an access token.

You must store your credentials securely. Using environment variables is the recommended practice.

# .env file
GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your-client-id
GENESYS_CLOUD_CLIENT_SECRET=your-client-secret
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_REGION=us-east-1
S3_BUCKET_NAME=your-analytics-bucket

The following function handles the token retrieval. It returns the access token and the expiration timestamp. In a production job, you should check if the existing token is still valid before requesting a new one to avoid unnecessary API calls.

import os
import requests
import time
from datetime import datetime, timezone

def get_genesys_access_token() -> tuple[str, int]:
    """
    Retrieves an OAuth2 access token from Genesys Cloud.
    
    Returns:
        tuple: (access_token, expires_in_seconds)
    """
    region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    # Construct the login URL based on the region
    if region == "us-east-1":
        login_url = "https://login.mypurecloud.com"
    elif region == "us-east-2":
        login_url = "https://login.mypurecloud.us"
    elif region == "eu-west-1":
        login_url = "https://login.eu.mypurecloud.com"
    else:
        raise ValueError(f"Unsupported Genesys Cloud region: {region}")

    token_url = f"{login_url}/oauth/token"

    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "analytics:conversation:read"
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    response = requests.post(token_url, data=payload, headers=headers)
    response.raise_for_status()

    token_data = response.json()
    access_token = token_data["access_token"]
    expires_in = token_data["expires_in"]

    return access_token, expires_in

Implementation

Step 1: Define the Analytics Query and Pagination Logic

The analytics/conversations/details/query endpoint does not return all conversations in a single request. It returns a page of results and a nextPageLink if more data exists. You must follow this link until it is null.

The query body defines the time window and the data fields you want to retrieve. For a daily export, you typically query for a 24-hour period ending at the current time or midnight of the previous day.

import json
from typing import Generator

def build_query_body(start_time: str, end_time: str) -> dict:
    """
    Constructs the query body for the analytics API.
    
    Args:
        start_time: ISO 8601 start timestamp (e.g., '2023-10-27T00:00:00Z')
        end_time: ISO 8601 end timestamp (e.g., '2023-10-27T23:59:59Z')
    
    Returns:
        dict: The JSON payload for the POST request.
    """
    return {
        "dateRange": {
            "from": start_time,
            "to": end_time
        },
        "intervalType": "daily",
        "aggregations": [],
        "filters": {
            "type": {
                "type": "and",
                "clauses": [
                    {
                        "type": "equals",
                        "path": "conversation.type",
                        "value": ["voice", "chat"] # Adjust types as needed
                    }
                ]
            }
        },
        "groupBy": ["conversation.type", "queue.id"],
        "include": ["conversation", "participants"],
        "pageSize": 100 # Max page size is typically 1000, but 100 is safer for memory
    }

The pagination loop handles the core logic. It sends the initial query, processes the results, and then iterates through subsequent pages using the nextPageLink provided in the response headers or body.

def fetch_conversations_generator(access_token: str, start_time: str, end_time: str) -> Generator[dict, None, None]:
    """
    Generator that yields individual conversation objects from Genesys Cloud.
    
    Args:
        access_token: Valid OAuth2 access token.
        start_time: Start of the query window.
        end_time: End of the query window.
    
    Yields:
        dict: A single conversation object.
    """
    region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
    base_url = f"https://{region}.mypurecloud.com/api/v2"
    
    query_body = build_query_body(start_time, end_time)
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    # Initial request
    url = f"{base_url}/analytics/conversations/details/query"
    response = requests.post(url, json=query_body, headers=headers)
    
    if response.status_code == 429:
        # Handle Rate Limiting
        retry_after = int(response.headers.get("Retry-After", 60))
        print(f"Rate limited. Waiting {retry_after} seconds...")
        time.sleep(retry_after)
        response = requests.post(url, json=query_body, headers=headers)
    
    response.raise_for_status()
    
    data = response.json()
    
    # Yield results from the first page
    if "conversations" in data:
        for conversation in data["conversations"]:
            yield conversation

    # Follow pagination links
    next_page_link = data.get("nextPageLink")
    
    while next_page_link:
        time.sleep(0.5) # Polite delay to avoid hitting rate limits
        
        # The nextPageLink is a full URL
        response = requests.get(next_page_link, headers=headers)
        
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            print(f"Rate limited. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            response = requests.get(next_page_link, headers=headers)
            
        response.raise_for_status()
        
        data = response.json()
        
        if "conversations" in data:
            for conversation in data["conversations"]:
                yield conversation
        
        next_page_link = data.get("nextPageLink")

Step 2: Stream Data to Amazon S3

Writing large datasets to memory and then uploading them can cause MemoryError exceptions. Instead, use a streaming approach. In Python, you can write to a file-like object in memory (using io.BytesIO) or stream directly. However, S3 multipart uploads are efficient for large files. For simplicity and robustness in this tutorial, we will write JSON lines to a BytesIO buffer and upload it. For very large datasets (millions of records), consider using boto3’s TransferManager or writing to temporary local files.

This step initializes the S3 client and prepares the upload function.

import boto3
import io
import json

def upload_to_s3(bucket_name: str, key: str, data_bytes: bytes) -> bool:
    """
    Uploads bytes to an S3 bucket.
    
    Args:
        bucket_name: The name of the S3 bucket.
        key: The object key (file path) in S3.
        data_bytes: The binary data to upload.
    
    Returns:
        bool: True if successful, False otherwise.
    """
    s3_client = boto3.client('s3', region_name=os.getenv("AWS_REGION"))
    
    try:
        s3_client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=data_bytes,
            ContentType='application/json'
        )
        print(f"Successfully uploaded to s3://{bucket_name}/{key}")
        return True
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return False

Step 3: Orchestrate the Export Job

Combine the authentication, fetching, and uploading logic into a single execution flow. This function calculates the date range for the previous day, fetches the data, converts it to JSON Lines format (which is efficient for analytics), and uploads it.

from datetime import datetime, timedelta, timezone

def run_daily_export() -> None:
    """
    Main function to orchestrate the daily analytics export.
    """
    # 1. Authenticate
    print("Authenticating with Genesys Cloud...")
    access_token, expires_in = get_genesys_access_token()
    print(f"Token acquired. Expires in {expires_in} seconds.")

    # 2. Define Date Range (Previous Day)
    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(days=1)
    
    start_time = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).isoformat().replace('+00:00', 'Z')
    end_time = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).isoformat().replace('+00:00', 'Z')
    
    print(f"Querying conversations from {start_time} to {end_time}")

    # 3. Prepare S3 Destination
    bucket_name = os.getenv("S3_BUCKET_NAME")
    date_str = yesterday.strftime("%Y-%m-%d")
    s3_key = f"analytics/conversations/{date_str}.jsonl"

    # 4. Fetch and Stream to S3
    # We use io.BytesIO to buffer the data in memory before uploading.
    # For extremely large datasets, consider writing to a temp file instead.
    buffer = io.BytesIO()
    
    record_count = 0
    
    try:
        for conversation in fetch_conversations_generator(access_token, start_time, end_time):
            # Convert each conversation dict to a JSON string
            json_line = json.dumps(conversation, default=str) + "\n"
            buffer.write(json_line.encode('utf-8'))
            record_count += 1
            
            # Optional: Flush to disk periodically if memory is a concern
            if record_count % 1000 == 0:
                print(f"Processed {record_count} records...")
                
    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error during fetch: {e}")
        # Log partial data if needed
        if record_count > 0:
            print(f"Partial data retrieved: {record_count} records.")
        return

    print(f"Total records fetched: {record_count}")

    if record_count == 0:
        print("No conversations found for the specified date range.")
        return

    # 5. Upload to S3
    print(f"Uploading {record_count} records to S3...")
    buffer.seek(0) # Reset buffer pointer to start
    success = upload_to_s3(bucket_name, s3_key, buffer.read())
    
    if success:
        print("Export job completed successfully.")
    else:
        print("Export job failed during S3 upload.")

Complete Working Example

The following script combines all components into a single, runnable file. Save this as export_analytics.py.

import os
import requests
import boto3
import io
import json
import time
from datetime import datetime, timedelta, timezone
from typing import Generator
import sys

# Install dependencies via pip:
# pip install requests boto3 python-dotenv
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

def get_genesys_access_token() -> tuple[str, int]:
    """
    Retrieves an OAuth2 access token from Genesys Cloud.
    """
    region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
    client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET must be set.")

    if region == "us-east-1":
        login_url = "https://login.mypurecloud.com"
    elif region == "us-east-2":
        login_url = "https://login.mypurecloud.us"
    elif region == "eu-west-1":
        login_url = "https://login.eu.mypurecloud.com"
    else:
        raise ValueError(f"Unsupported Genesys Cloud region: {region}")

    token_url = f"{login_url}/oauth/token"

    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "analytics:conversation:read"
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    try:
        response = requests.post(token_url, data=payload, headers=headers)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to get access token: {e}")

    token_data = response.json()
    return token_data["access_token"], token_data["expires_in"]

def build_query_body(start_time: str, end_time: str) -> dict:
    """
    Constructs the query body for the analytics API.
    """
    return {
        "dateRange": {
            "from": start_time,
            "to": end_time
        },
        "intervalType": "daily",
        "aggregations": [],
        "filters": {
            "type": {
                "type": "and",
                "clauses": [
                    {
                        "type": "equals",
                        "path": "conversation.type",
                        "value": ["voice", "chat"]
                    }
                ]
            }
        },
        "groupBy": ["conversation.type"],
        "include": ["conversation", "participants"],
        "pageSize": 100
    }

def fetch_conversations_generator(access_token: str, start_time: str, end_time: str) -> Generator[dict, None, None]:
    """
    Generator that yields individual conversation objects from Genesys Cloud.
    """
    region = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
    base_url = f"https://{region}.mypurecloud.com/api/v2"
    
    query_body = build_query_body(start_time, end_time)
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    url = f"{base_url}/analytics/conversations/details/query"
    
    # Initial request
    response = requests.post(url, json=query_body, headers=headers)
    
    # Handle Rate Limiting
    while response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        print(f"Rate limited. Waiting {retry_after} seconds...")
        time.sleep(retry_after)
        response = requests.post(url, json=query_body, headers=headers)
    
    response.raise_for_status()
    
    data = response.json()
    
    if "conversations" in data:
        for conversation in data["conversations"]:
            yield conversation

    next_page_link = data.get("nextPageLink")
    
    while next_page_link:
        time.sleep(0.5) # Polite delay
        
        response = requests.get(next_page_link, headers=headers)
        
        while response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            print(f"Rate limited. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            response = requests.get(next_page_link, headers=headers)
            
        response.raise_for_status()
        
        data = response.json()
        
        if "conversations" in data:
            for conversation in data["conversations"]:
                yield conversation
        
        next_page_link = data.get("nextPageLink")

def upload_to_s3(bucket_name: str, key: str, data_bytes: bytes) -> bool:
    """
    Uploads bytes to an S3 bucket.
    """
    s3_client = boto3.client('s3', region_name=os.getenv("AWS_REGION"))
    
    try:
        s3_client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=data_bytes,
            ContentType='application/json'
        )
        return True
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return False

def run_daily_export() -> None:
    """
    Main function to orchestrate the daily analytics export.
    """
    print("Starting Genesys Cloud Analytics Export...")
    
    try:
        access_token, expires_in = get_genesys_access_token()
        print(f"Token acquired. Expires in {expires_in} seconds.")
    except Exception as e:
        print(f"Authentication failed: {e}")
        sys.exit(1)

    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(days=1)
    
    start_time = yesterday.replace(hour=0, minute=0, second=0, microsecond=0).isoformat().replace('+00:00', 'Z')
    end_time = yesterday.replace(hour=23, minute=59, second=59, microsecond=0).isoformat().replace('+00:00', 'Z')
    
    print(f"Querying conversations from {start_time} to {end_time}")

    bucket_name = os.getenv("S3_BUCKET_NAME")
    if not bucket_name:
        raise ValueError("S3_BUCKET_NAME environment variable is not set.")

    date_str = yesterday.strftime("%Y-%m-%d")
    s3_key = f"analytics/conversations/{date_str}.jsonl"

    buffer = io.BytesIO()
    record_count = 0
    
    try:
        for conversation in fetch_conversations_generator(access_token, start_time, end_time):
            json_line = json.dumps(conversation, default=str) + "\n"
            buffer.write(json_line.encode('utf-8'))
            record_count += 1
            
            if record_count % 1000 == 0:
                print(f"Processed {record_count} records...")
                
    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error during fetch: {e}")
        if record_count > 0:
            print(f"Partial data retrieved: {record_count} records.")
        return

    print(f"Total records fetched: {record_count}")

    if record_count == 0:
        print("No conversations found for the specified date range.")
        return

    print(f"Uploading {record_count} records to S3...")
    buffer.seek(0)
    success = upload_to_s3(bucket_name, s3_key, buffer.read())
    
    if success:
        print("Export job completed successfully.")
    else:
        print("Export job failed during S3 upload.")

if __name__ == "__main__":
    run_daily_export()

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The OAuth token is expired, invalid, or missing.
  • How to fix it: Ensure your client ID and secret are correct. Check that the token was retrieved successfully before making API calls. If the script runs for a long time, the token may expire mid-process. Implement token refresh logic if the job duration exceeds the token lifetime (typically 1 hour).
  • Code Fix: Add a check for token expiration in the generator loop if the job runs longer than expires_in.

Error: 403 Forbidden

  • What causes it: The OAuth token does not have the required scope.
  • How to fix it: Verify that the client credentials have the analytics:conversation:read scope assigned in the Genesys Cloud Admin portal under Platform Apps.
  • Code Fix: Ensure the scope parameter in get_genesys_access_token includes analytics:conversation:read.

Error: 429 Too Many Requests

  • What causes it: You have exceeded the API rate limit.
  • How to fix it: Implement exponential backoff. The provided code includes a basic retry mechanism that sleeps for the duration specified in the Retry-After header.
  • Code Fix: The fetch_conversations_generator function already includes this logic. Ensure you do not remove the time.sleep calls.

Error: 500 Internal Server Error

  • What causes it: A transient error on the Genesys Cloud side.
  • How to fix it: Retry the request after a short delay.
  • Code Fix: Wrap the requests.post and requests.get calls in a retry decorator or loop for 5xx errors.

Official References