Automating Genesys Cloud Analytics Exports to AWS S3 with Python

Automating Genesys Cloud Analytics Exports to AWS S3 with Python

What You Will Build

  • This tutorial builds a Python script that queries Genesys Cloud conversation analytics and uploads the resulting JSON data to an Amazon S3 bucket.
  • This integration uses the Genesys Cloud REST API for analytics queries and the boto3 SDK for AWS S3 operations.
  • The code is written in Python 3.9+ and utilizes the requests library for HTTP interactions and boto3 for cloud storage.

Prerequisites

Genesys Cloud Configuration

  • OAuth 2.0 Client: You need a Genesys Cloud OAuth 2.0 Client ID and Secret.
  • Required Scopes: The client must have the analytics:report:read scope. For conversation details, you may also need analytics:conversations:read.
  • Environment URL: Identify your Genesys Cloud environment URL (e.g., https://api.mypurecloud.com).

AWS Configuration

  • AWS Account: An active AWS account with S3 enabled.
  • IAM User: An IAM user with programmatic access and a policy allowing s3:PutObject to the target bucket.
  • Credentials: Access Key ID and Secret Access Key.
  • Bucket: A pre-existing S3 bucket with a designated prefix/path for the exports.

Development Environment

  • Python: Version 3.9 or higher.
  • Dependencies: Install the required libraries using pip.
    pip install requests boto3 python-dotenv
    
  • Environment Variables: Create a .env file in your project root to store secrets securely.
    GENESYS_CLIENT_ID=your_client_id
    GENESYS_CLIENT_SECRET=your_client_secret
    GENESYS_REGION=us-east-1
    AWS_ACCESS_KEY_ID=your_aws_access_key
    AWS_SECRET_ACCESS_KEY=your_aws_secret_key
    AWS_DEFAULT_REGION=us-east-1
    S3_BUCKET_NAME=your-analytics-bucket
    S3_PREFIX=genexports/daily
    

Authentication Setup

Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server integrations. We will create a helper function to acquire and cache tokens. While this script runs daily, implementing a simple in-memory cache with a refresh mechanism is best practice to avoid hitting the token endpoint unnecessarily if the script runs multiple queries.

import os
import time
import requests
from dotenv import load_dotenv

load_dotenv()

class GenesysAuth:
    def __init__(self):
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        self.region = os.getenv("GENESYS_REGION", "us-east-1")
        self.token_url = f"https://api.{self.region}.mypurecloud.com/oauth/token"
        self.access_token = None
        self.token_expiry = 0

    def get_token(self) -> str:
        # Check if token is still valid (buffer of 60 seconds)
        if self.access_token and time.time() < self.token_expiry - 60:
            return self.access_token

        # Request new token
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.token_url, data=payload, timeout=10)
            response.raise_for_status()
            
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + token_data["expires_in"]
            
            return self.access_token
        except requests.exceptions.HTTPError as e:
            print(f"OAuth Error: {e.response.status_code} - {e.response.text}")
            raise
        except requests.exceptions.RequestException as e:
            print(f"Network Error during OAuth: {e}")
            raise

Implementation

Step 1: Query Genesys Cloud Analytics

We will use the Get Analytics Conversation Details Query endpoint. This endpoint allows us to define a complex query with date ranges, filters, and specific data points (columns) to retrieve.

Endpoint: POST /api/v2/analytics/conversations/details/query

Required Scope: analytics:conversations:read

The request body defines the query parameters. We will retrieve data for the previous day to ensure the data is finalized.

from datetime import datetime, timedelta
import json

class GenesysAnalyticsExporter:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = f"https://api.{auth.region}.mypurecloud.com"
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def get_previous_day_dates(self) -> tuple:
        """
        Returns the start and end timestamps for the previous day in ISO 8601 format.
        """
        now = datetime.utcnow()
        previous_day = now - timedelta(days=1)
        
        # Start of previous day: 00:00:00
        start_dt = previous_day.replace(hour=0, minute=0, second=0, microsecond=0)
        # End of previous day: 23:59:59.999
        end_dt = previous_day.replace(hour=23, minute=59, second=59, microsecond=999999)
        
        return start_dt.isoformat() + "Z", end_dt.isoformat() + "Z"

    def build_query_payload(self, start_time: str, end_time: str) -> dict:
        """
        Constructs the query payload for the analytics API.
        """
        return {
            "dateFrom": start_time,
            "dateTo": end_time,
            "interval": "P1D",  # Aggregate by day
            "groupBy": ["conversation.mediaType"], # Group results by media type (voice, chat, etc.)
            "view": "default",
            "select": [
                "conversation.id",
                "conversation.mediaType",
                "conversation.startTime",
                "conversation.endTime",
                "participant.type",
                "participant.userId",
                "participant.firstName",
                "participant.lastName",
                "conversation.queue.name",
                "conversation.wrapupCode",
                "conversation.totalHoldTime",
                "conversation.totalTalkTime",
                "conversation.totalWaitTime"
            ],
            "where": [
                {
                    "path": "conversation.mediaType",
                    "operator": "in",
                    "value": ["voice", "chat", "webchat", "email", "sms"]
                }
            ],
            "size": 1000 # Max page size
        }

    def fetch_analytics_data(self) -> list:
        """
        Fetches all pages of analytics data for the previous day.
        """
        start_time, end_time = self.get_previous_day_dates()
        payload = self.build_query_payload(start_time, end_time)
        
        all_records = []
        next_page = None
        
        # Header for Authorization
        self.headers["Authorization"] = f"Bearer {self.auth.get_token()}"
        
        endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
        
        try:
            while True:
                # If there is a next page token, we use a different endpoint for pagination
                if next_page:
                    endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query/next/{next_page}"
                    response = requests.get(endpoint, headers=self.headers, timeout=30)
                else:
                    response = requests.post(endpoint, json=payload, headers=self.headers, timeout=30)
                
                # Handle Rate Limiting (429)
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"Rate limited. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                
                data = response.json()
                
                # Extract records
                if "records" in data:
                    all_records.extend(data["records"])
                
                # Check for pagination
                next_page = data.get("nextPage")
                if not next_page:
                    break
                else:
                    # Small delay to be polite to the API
                    time.sleep(1)
                    
            print(f"Successfully fetched {len(all_records)} records.")
            return all_records
            
        except requests.exceptions.HTTPError as e:
            print(f"HTTP Error: {e.response.status_code}")
            print(f"Response: {e.response.text}")
            raise
        except Exception as e:
            print(f"Error fetching analytics: {e}")
            raise

Step 2: Process and Format Data

Raw analytics data from Genesys Cloud can be nested and complex. For S3 storage, we want flat, consistent JSON lines or a structured JSON array. We will clean the data slightly to ensure it is serializable and remove unnecessary null fields if desired, though keeping the structure intact is often better for downstream analytics tools like Athena.

    def clean_record(self, record: dict) -> dict:
        """
        Optional: Clean or transform individual records.
        Currently, this just ensures deep dictionaries are handled if needed.
        """
        # Example: Flatten participant info if needed for simpler CSV export later
        # For S3 JSON, we keep the structure as is for flexibility.
        return record

    def process_records(self, raw_records: list) -> list:
        """
        Process raw records from Genesys Cloud.
        """
        cleaned_records = [self.clean_record(r) for r in raw_records]
        return cleaned_records

Step 3: Upload to AWS S3

We use boto3 to upload the processed data. We will save the file with a timestamp in the filename to ensure uniqueness and enable partitioning by date in S3.

import boto3
from botocore.exceptions import ClientError

class S3Uploader:
    def __init__(self):
        self.bucket_name = os.getenv("S3_BUCKET_NAME")
        self.prefix = os.getenv("S3_PREFIX", "exports")
        self.s3_client = boto3.client('s3')

    def upload_json_data(self, data: list, filename: str) -> bool:
        """
        Uploads a list of dictionaries as a JSON file to S3.
        """
        # Create the S3 key (path)
        # Format: exports/daily/YYYY-MM-DD/conversations.json
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        s3_key = f"{self.prefix}/{date_str}/{filename}"
        
        # Convert data to JSON string
        json_payload = json.dumps(data, indent=2, default=str)
        
        try:
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=s3_key,
                Body=json_payload,
                ContentType='application/json'
            )
            print(f"Successfully uploaded to s3://{self.bucket_name}/{s3_key}")
            return True
        except ClientError as e:
            print(f"Error uploading to S3: {e}")
            raise
        except Exception as e:
            print(f"Unexpected error during S3 upload: {e}")
            raise

Complete Working Example

Below is the full, consolidated script. Save this as genexys_s3_exporter.py. Ensure your .env file is in the same directory.

import os
import time
import json
import requests
import boto3
from datetime import datetime, timedelta
from dotenv import load_dotenv
from botocore.exceptions import ClientError

# Load environment variables
load_dotenv()

class GenesysAuth:
    def __init__(self):
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        self.region = os.getenv("GENESYS_REGION", "us-east-1")
        self.token_url = f"https://api.{self.region}.mypurecloud.com/oauth/token"
        self.access_token = None
        self.token_expiry = 0

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry - 60:
            return self.access_token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.token_url, data=payload, timeout=10)
            response.raise_for_status()
            
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + token_data["expires_in"]
            
            return self.access_token
        except requests.exceptions.HTTPError as e:
            raise Exception(f"OAuth Error: {e.response.status_code} - {e.response.text}")
        except requests.exceptions.RequestException as e:
            raise Exception(f"Network Error during OAuth: {e}")

class GenesysAnalyticsExporter:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = f"https://api.{auth.region}.mypurecloud.com"
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def get_previous_day_dates(self) -> tuple:
        now = datetime.utcnow()
        previous_day = now - timedelta(days=1)
        start_dt = previous_day.replace(hour=0, minute=0, second=0, microsecond=0)
        end_dt = previous_day.replace(hour=23, minute=59, second=59, microsecond=999999)
        return start_dt.isoformat() + "Z", end_dt.isoformat() + "Z"

    def build_query_payload(self, start_time: str, end_time: str) -> dict:
        return {
            "dateFrom": start_time,
            "dateTo": end_time,
            "interval": "P1D",
            "groupBy": ["conversation.mediaType"],
            "view": "default",
            "select": [
                "conversation.id",
                "conversation.mediaType",
                "conversation.startTime",
                "conversation.endTime",
                "participant.type",
                "participant.userId",
                "participant.firstName",
                "participant.lastName",
                "conversation.queue.name",
                "conversation.wrapupCode",
                "conversation.totalHoldTime",
                "conversation.totalTalkTime",
                "conversation.totalWaitTime"
            ],
            "where": [
                {
                    "path": "conversation.mediaType",
                    "operator": "in",
                    "value": ["voice", "chat", "webchat", "email", "sms"]
                }
            ],
            "size": 1000
        }

    def fetch_analytics_data(self) -> list:
        start_time, end_time = self.get_previous_day_dates()
        payload = self.build_query_payload(start_time, end_time)
        
        all_records = []
        next_page = None
        self.headers["Authorization"] = f"Bearer {self.auth.get_token()}"
        
        endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
        
        try:
            while True:
                if next_page:
                    endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query/next/{next_page}"
                    response = requests.get(endpoint, headers=self.headers, timeout=30)
                else:
                    response = requests.post(endpoint, json=payload, headers=self.headers, timeout=30)
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                
                data = response.json()
                if "records" in data:
                    all_records.extend(data["records"])
                
                next_page = data.get("nextPage")
                if not next_page:
                    break
                time.sleep(1)
                    
            return all_records
            
        except requests.exceptions.HTTPError as e:
            raise Exception(f"HTTP Error: {e.response.status_code} - {e.response.text}")

class S3Uploader:
    def __init__(self):
        self.bucket_name = os.getenv("S3_BUCKET_NAME")
        self.prefix = os.getenv("S3_PREFIX", "exports")
        self.s3_client = boto3.client('s3')

    def upload_json_data(self, data: list, filename: str) -> str:
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        s3_key = f"{self.prefix}/{date_str}/{filename}"
        json_payload = json.dumps(data, indent=2, default=str)
        
        try:
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=s3_key,
                Body=json_payload,
                ContentType='application/json'
            )
            return s3_key
        except ClientError as e:
            raise Exception(f"Error uploading to S3: {e}")

def main():
    print("Starting Genesys Cloud Analytics Export Job...")
    
    try:
        # 1. Initialize Authentication
        auth = GenesysAuth()
        token = auth.get_token()
        print("Authentication successful.")

        # 2. Initialize Exporter and Fetch Data
        exporter = GenesysAnalyticsExporter(auth)
        records = exporter.fetch_analytics_data()
        
        if not records:
            print("No records found for the previous day.")
            return

        print(f"Fetched {len(records)} records.")

        # 3. Initialize S3 Uploader and Upload
        uploader = S3Uploader()
        filename = f"conversations_{datetime.utcnow().strftime('%Y%m%d')}.json"
        s3_key = uploader.upload_json_data(records, filename)
        
        print(f"Job completed. Data stored at s3://{uploader.bucket_name}/{s3_key}")

    except Exception as e:
        print(f"Job failed: {e}")
        # In a production environment, send this error to a monitoring service (e.g., PagerDuty, Slack)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.

Fix:

  1. Verify that GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET in your .env file match the Genesys Cloud admin console.
  2. Ensure the OAuth client has the analytics:conversations:read scope assigned.
  3. Check the get_token method logs to see if the initial token request failed.

Error: 403 Forbidden

Cause: The OAuth client lacks the necessary permissions for the specific analytics query.

Fix:

  1. Navigate to Admin > Security > OAuth 2.0 Clients in Genesys Cloud.
  2. Select your client and verify the Scopes tab.
  3. Ensure analytics:report:read or analytics:conversations:read is checked.
  4. If using a custom role, ensure the role assigned to the client has permission to view analytics data for the queues/media types queried.

Error: 429 Too Many Requests

Cause: You have exceeded the Genesys Cloud API rate limits.

Fix:

  1. The provided code includes a basic retry mechanism for 429 errors.
  2. If you are still hitting limits, increase the time.sleep duration in the loop.
  3. Consider batching your requests or using the Genesys Cloud Bulk API if available for larger datasets.
  4. Check the Retry-After header in the response to determine the exact wait time.

Error: S3 Access Denied

Cause: The AWS IAM user does not have permission to write to the specified bucket.

Fix:

  1. Verify the IAM policy attached to the user includes s3:PutObject.
  2. Ensure the bucket name in S3_BUCKET_NAME is correct and exists.
  3. Check if the bucket has a block public access setting that might interfere (though this usually affects read access, not write from IAM users).
  4. Ensure the AWS_DEFAULT_REGION matches the region where the S3 bucket is located.

Official References