Export Genesys Cloud Analytics to S3 with Python and Boto3

Export Genesys Cloud Analytics to S3 with Python and Boto3

What You Will Build

  • A Python script that queries Genesys Cloud Conversation Details, aggregates the data, and uploads the resulting CSV to an Amazon S3 bucket.
  • This uses the Genesys Cloud Analytics API (/api/v2/analytics/conversations/details/query) and the AWS Boto3 SDK.
  • The implementation covers Python 3.8+ with requests for API calls and boto3 for S3 operations.

Prerequisites

  • Genesys Cloud OAuth Client: A machine-to-machine (M2M) client with the scope analytics:conversation:view.
  • AWS Credentials: An IAM user with s3:PutObject permissions on the target bucket.
  • Python Environment: Python 3.8 or higher.
  • Dependencies:
    pip install requests boto3 python-dateutil
    

Authentication Setup

Genesys Cloud uses OAuth 2.0 for API authentication. For server-side jobs, the Client Credentials flow is the standard approach. You must store your Client ID and Client Secret securely, preferably using environment variables or a secrets manager.

The following function retrieves an access token. It handles the basic error cases where the credentials are invalid or the endpoint is unreachable.

import os
import requests
from typing import Dict, Optional

GENESYS_DOMAIN = "api.mypurecloud.com"
OAUTH_URL = f"https://{GENESYS_DOMAIN}/oauth/token"

def get_access_token() -> str:
    """
    Retrieves an OAuth 2.0 access token using Client Credentials flow.
    
    Returns:
        str: The access token string.
        
    Raises:
        requests.exceptions.HTTPError: If the token request fails.
    """
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")

    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    try:
        response = requests.post(OAUTH_URL, data=payload)
        response.raise_for_status()
        return response.json()["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Failed to retrieve token: {e}")
        raise

You must cache this token if your job runs multiple queries within the same hour, as the token expires after 3600 seconds. For a single daily batch job, fetching a new token at the start is sufficient.

Implementation

Step 1: Querying Conversation Details with Pagination

The Analytics API does not return all conversations in a single call. You must use pagination. The POST /api/v2/analytics/conversations/details/query endpoint accepts a request body defining the date range, metrics, and grouping.

The response contains a nextPageUrl if more data exists. You must follow this URL until it is null.

import json
from datetime import datetime, timedelta
from typing import List, Dict, Any

def build_query_payload(date_str: str) -> Dict[str, Any]:
    """
    Constructs the JSON payload for the analytics query.
    
    Args:
        date_str: The date to query in YYYY-MM-DD format.
        
    Returns:
        Dict containing the query parameters.
    """
    start_time = f"{date_str}T00:00:00.000Z"
    end_time = f"{date_str}T23:59:59.999Z"

    return {
        "dateFrom": start_time,
        "dateTo": end_time,
        "groupBy": ["conversationId"],
        "interval": "PT1H",
        "metrics": {
            "conversations": {"type": "COUNT"},
            "handled": {"type": "COUNT"},
            "answered": {"type": "COUNT"},
            "abandoned": {"type": "COUNT"},
            "serviceLevel": {"type": "SUM"}
        },
        "select": [
            "conversationId",
            "channel",
            "mediaType",
            "queueName",
            "wrapUpCode",
            "duration"
        ],
        "view": "default",
        "includeZeroIntervals": False
    }

def fetch_all_conversations(token: str, date_str: str) -> List[Dict[str, Any]]:
    """
    Fetches all conversation details for a given date using pagination.
    
    Args:
        token: The OAuth access token.
        date_str: The date to query in YYYY-MM-DD format.
        
    Returns:
        List of conversation detail objects.
    """
    base_url = f"https://{GENESYS_DOMAIN}/api/v2/analytics/conversations/details/query"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    
    payload = build_query_payload(date_str)
    all_conversations = []
    current_url = base_url
    
    print(f"Starting fetch for date: {date_str}")

    while current_url:
        try:
            # Use POST for the initial query, GET for subsequent pages if nextPageUrl is provided
            # Note: The nextPageUrl usually requires a GET request with the query params appended
            if current_url == base_url:
                response = requests.post(current_url, json=payload, headers=headers)
            else:
                response = requests.get(current_url, headers=headers)
            
            response.raise_for_status()
            data = response.json()
            
            # Accumulate results
            if "entities" in data and data["entities"]:
                all_conversations.extend(data["entities"])
                print(f"Fetched {len(data['entities'])} records. Total: {len(all_conversations)}")
            
            # Check for next page
            current_url = data.get("nextPageUrl")
            
        except requests.exceptions.HTTPError as e:
            if response.status_code == 429:
                print("Rate limited. Waiting 1 second...")
                import time
                time.sleep(1)
                continue
            else:
                print(f"HTTP Error: {e}")
                raise
        except requests.exceptions.RequestException as e:
            print(f"Network error: {e}")
            raise

    return all_conversations

Step 2: Processing and Aggregating Data

The raw response from Genesys Cloud contains nested objects and metadata. You need to flatten this data into a format suitable for CSV export. The conversations list in the response contains individual conversation records.

This step transforms the complex JSON objects into a list of dictionaries, where each dictionary represents a row in the final CSV.

import csv
import io
from typing import List, Dict, Any

def process_conversations(raw_conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Flattens and cleans conversation data for CSV export.
    
    Args:
        raw_conversations: List of conversation objects from the API.
        
    Returns:
        List of flattened dictionaries.
    """
    processed_data = []
    
    for conv in raw_conversations:
        # Extract nested fields safely
        queue_name = conv.get("queueName") or "Unknown"
        channel = conv.get("channel") or "Unknown"
        media_type = conv.get("mediaType") or "Unknown"
        wrap_up_code = conv.get("wrapUpCode") or "None"
        
        # Calculate duration in seconds if available
        duration_ms = conv.get("duration", 0)
        duration_sec = duration_ms / 1000.0 if duration_ms else 0.0
        
        # Get metrics if present (some endpoints return metrics separately)
        # For /details/query, metrics are often in the 'metrics' field of the entity
        metrics = conv.get("metrics", {})
        handled = metrics.get("handled", {}).get("value", 0)
        answered = metrics.get("answered", {}).get("value", 0)
        
        row = {
            "conversationId": conv.get("conversationId"),
            "queueName": queue_name,
            "channel": channel,
            "mediaType": media_type,
            "wrapUpCode": wrap_up_code,
            "durationSeconds": round(duration_sec, 2),
            "handled": handled,
            "answered": answered,
            "startTime": conv.get("startTime"),
            "endTime": conv.get("endTime")
        }
        processed_data.append(row)
        
    return processed_data

def generate_csv_bytes(data: List[Dict[str, Any]]) -> bytes:
    """
    Converts a list of dictionaries to CSV bytes.
    
    Args:
        data: List of dictionaries to convert.
        
    Returns:
        Bytes object containing the CSV content.
    """
    if not data:
        return b""
        
    output = io.StringIO()
    fieldnames = list(data[0].keys())
    writer = csv.DictWriter(output, fieldnames=fieldnames)
    
    writer.writeheader()
    writer.writerows(data)
    
    return output.getvalue().encode("utf-8")

Step 3: Uploading to Amazon S3

The final step uploads the generated CSV bytes to S3. You must configure the Boto3 client with your AWS credentials. The script uses boto3.client('s3') to put the object.

import boto3
from botocore.exceptions import ClientError
from typing import Optional

def upload_to_s3(bucket_name: str, key: str, data: bytes) -> bool:
    """
    Uploads data to an S3 bucket.
    
    Args:
        bucket_name: The name of the S3 bucket.
        key: The S3 object key (path).
        data: The bytes to upload.
        
    Returns:
        True if upload succeeds, False otherwise.
    """
    s3_client = boto3.client('s3')
    
    try:
        s3_client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=data,
            ContentType="text/csv"
        )
        print(f"Successfully uploaded to s3://{bucket_name}/{key}")
        return True
    except ClientError as e:
        print(f"Failed to upload to S3: {e}")
        return False

Complete Working Example

This is the full, copy-pasteable script. Save this as genesys_s3_export.py.

import os
import sys
import json
import csv
import io
import requests
import boto3
from datetime import datetime, timedelta
from typing import Dict, List, Any
from botocore.exceptions import ClientError

# Configuration
GENESYS_DOMAIN = "api.mypurecloud.com"
OAUTH_URL = f"https://{GENESYS_DOMAIN}/oauth/token"
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "my-analytics-bucket")

def get_access_token() -> str:
    """Retrieves an OAuth 2.0 access token using Client Credentials flow."""
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")

    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    try:
        response = requests.post(OAUTH_URL, data=payload)
        response.raise_for_status()
        return response.json()["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Failed to retrieve token: {e}")
        raise

def build_query_payload(date_str: str) -> Dict[str, Any]:
    """Constructs the JSON payload for the analytics query."""
    start_time = f"{date_str}T00:00:00.000Z"
    end_time = f"{date_str}T23:59:59.999Z"

    return {
        "dateFrom": start_time,
        "dateTo": end_time,
        "groupBy": ["conversationId"],
        "interval": "PT1H",
        "metrics": {
            "conversations": {"type": "COUNT"},
            "handled": {"type": "COUNT"},
            "answered": {"type": "COUNT"},
            "abandoned": {"type": "COUNT"}
        },
        "select": [
            "conversationId",
            "channel",
            "mediaType",
            "queueName",
            "wrapUpCode",
            "duration",
            "startTime",
            "endTime"
        ],
        "view": "default",
        "includeZeroIntervals": False
    }

def fetch_all_conversations(token: str, date_str: str) -> List[Dict[str, Any]]:
    """Fetches all conversation details for a given date using pagination."""
    base_url = f"https://{GENESYS_DOMAIN}/api/v2/analytics/conversations/details/query"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    
    payload = build_query_payload(date_str)
    all_conversations = []
    current_url = base_url
    
    print(f"Starting fetch for date: {date_str}")

    while current_url:
        try:
            if current_url == base_url:
                response = requests.post(current_url, json=payload, headers=headers)
            else:
                response = requests.get(current_url, headers=headers)
            
            response.raise_for_status()
            data = response.json()
            
            if "entities" in data and data["entities"]:
                all_conversations.extend(data["entities"])
                print(f"Fetched {len(data['entities'])} records. Total: {len(all_conversations)}")
            
            current_url = data.get("nextPageUrl")
            
        except requests.exceptions.HTTPError as e:
            if response.status_code == 429:
                print("Rate limited. Waiting 1 second...")
                import time
                time.sleep(1)
                continue
            else:
                print(f"HTTP Error: {e}")
                raise
        except requests.exceptions.RequestException as e:
            print(f"Network error: {e}")
            raise

    return all_conversations

def process_conversations(raw_conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Flattens and cleans conversation data for CSV export."""
    processed_data = []
    
    for conv in raw_conversations:
        queue_name = conv.get("queueName") or "Unknown"
        channel = conv.get("channel") or "Unknown"
        media_type = conv.get("mediaType") or "Unknown"
        wrap_up_code = conv.get("wrapUpCode") or "None"
        
        duration_ms = conv.get("duration", 0)
        duration_sec = duration_ms / 1000.0 if duration_ms else 0.0
        
        metrics = conv.get("metrics", {})
        handled = metrics.get("handled", {}).get("value", 0)
        answered = metrics.get("answered", {}).get("value", 0)
        
        row = {
            "conversationId": conv.get("conversationId"),
            "queueName": queue_name,
            "channel": channel,
            "mediaType": media_type,
            "wrapUpCode": wrap_up_code,
            "durationSeconds": round(duration_sec, 2),
            "handled": handled,
            "answered": answered,
            "startTime": conv.get("startTime"),
            "endTime": conv.get("endTime")
        }
        processed_data.append(row)
        
    return processed_data

def generate_csv_bytes(data: List[Dict[str, Any]]) -> bytes:
    """Converts a list of dictionaries to CSV bytes."""
    if not data:
        return b""
        
    output = io.StringIO()
    fieldnames = list(data[0].keys())
    writer = csv.DictWriter(output, fieldnames=fieldnames)
    
    writer.writeheader()
    writer.writerows(data)
    
    return output.getvalue().encode("utf-8")

def upload_to_s3(bucket_name: str, key: str, data: bytes) -> bool:
    """Uploads data to an S3 bucket."""
    s3_client = boto3.client('s3')
    
    try:
        s3_client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=data,
            ContentType="text/csv"
        )
        print(f"Successfully uploaded to s3://{bucket_name}/{key}")
        return True
    except ClientError as e:
        print(f"Failed to upload to S3: {e}")
        return False

def main():
    """Main execution function."""
    # Determine date (default to yesterday)
    if len(sys.argv) > 1:
        date_str = sys.argv[1]
    else:
        date_str = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
    
    print(f"Processing analytics for date: {date_str}")
    
    # Step 1: Authenticate
    try:
        token = get_access_token()
    except Exception as e:
        print(f"Authentication failed: {e}")
        sys.exit(1)
    
    # Step 2: Fetch Data
    try:
        conversations = fetch_all_conversations(token, date_str)
    except Exception as e:
        print(f"Data fetch failed: {e}")
        sys.exit(1)
    
    if not conversations:
        print("No conversations found for the specified date.")
        sys.exit(0)
    
    # Step 3: Process Data
    processed_data = process_conversations(conversations)
    csv_bytes = generate_csv_bytes(processed_data)
    
    # Step 4: Upload to S3
    s3_key = f"analytics/conversations/{date_str}.csv"
    success = upload_to_s3(S3_BUCKET_NAME, s3_key, csv_bytes)
    
    if success:
        print("Job completed successfully.")
    else:
        print("Job completed with errors.")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is invalid, expired, or missing.
Fix: Verify that GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are set correctly. Ensure the client has the analytics:conversation:view scope. Check the token response for errors.

Error: 403 Forbidden

Cause: The OAuth client lacks the necessary permissions or the user does not have access to the analytics data.
Fix: In the Genesys Cloud admin portal, check the OAuth Client settings. Ensure the analytics:conversation:view scope is enabled. Verify that the user associated with the client has “View Analytics” permissions in the role.

Error: 429 Too Many Requests

Cause: You have exceeded the Genesys Cloud API rate limit.
Fix: Implement exponential backoff. The code above includes a basic 1-second sleep on 429 errors. For production, increase the delay and use a jitter strategy.

Error: botocore.exceptions.ClientError: Access Denied

Cause: The AWS IAM user does not have permission to write to the S3 bucket.
Fix: Attach the AmazonS3FullAccess policy or a custom policy with s3:PutObject to the IAM user. Ensure the bucket name in S3_BUCKET_NAME is correct and exists.

Error: KeyError: ‘entities’

Cause: The API response structure changed or the query returned no data.
Fix: Add a check for "entities" in the response data before accessing it. The code above includes this check. If the query returns no data, the script will exit gracefully.

Official References