Handling Access Token Expiration in Batch Operations

Handling Access Token Expiration in Batch Operations

What You Will Build

  • A robust utility module that automatically detects expired access tokens and retrieves fresh credentials before a Genesys Cloud API request fails.
  • A batch processing loop that queries conversation analytics data, handling pagination and mid-stream token rotation without losing state.
  • Python 3.9+ code using the official genesyscloud SDK with custom session management.

Prerequisites

  • OAuth Client Type: Client Credentials Grant (Recommended for server-to-server batch jobs).
  • Required Scopes: analytics:conversation:read, analytics:details:query.
  • SDK Version: genesyscloud Python SDK v3.0.0 or later.
  • Language/Runtime: Python 3.9+.
  • External Dependencies: pip install genesyscloud requests.

Authentication Setup

The standard Genesys Cloud SDK (PureCloudPlatformClientV2) handles token caching internally, but its default behavior can be brittle in long-running batch scripts. If a token expires exactly between the SDK’s internal check and the HTTP request, or if the SDK’s retry logic is not configured for your specific network environment, the job crashes.

To solve this, we will wrap the SDK client in a custom session manager. This manager intercepts the authorization header injection. It checks the token’s expiration time before every request. If the token is expired or close to expiring, it proactively fetches a new one.

The Custom Session Manager

This class extends the standard Configuration object provided by the SDK. It overrides the method that provides the access token to ensure it is always valid.

import time
import logging
from typing import Optional
from genesyscloud.platform.client import PureCloudPlatformClientV2
from genesyscloud.platform.client.configuration import Configuration
from genesyscloud.platform.client.api_exception import ApiException
import requests

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AutoRefreshingConfig(Configuration):
    """
    A Configuration subclass that automatically refreshes the access token 
    if it has expired or is about to expire.
    """
    def __init__(self, client_id: str, client_secret: str, env: str = "mypurecloud.com"):
        super().__init__()
        self.client_id = client_id
        self.client_secret = client_secret
        self.env = env
        self.token_endpoint = f"https://{env}/oauth/token"
        
        # Internal cache for the token and its expiry
        self._access_token: Optional[str] = None
        self._token_expiry: float = 0.0
        
    def get_access_token(self) -> str:
        """
        Returns a valid access token. Refreshes if necessary.
        """
        # Check if we have a token and if it is still valid (with a 60-second buffer)
        if self._access_token and (time.time() < (self._token_expiry - 60)):
            return self._access_token

        logger.info("Access token is missing or expired. Refreshing...")
        self._refresh_token()
        return self._access_token

    def _refresh_token(self) -> None:
        """
        Performs the OAuth 2.0 Client Credentials grant to obtain a new token.
        """
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "analytics:conversation:read analytics:details:query"
        }
        
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        try:
            response = requests.post(
                self.token_endpoint, 
                data=payload, 
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
            
            token_data = response.json()
            self._access_token = token_data["access_token"]
            
            # Genesys Cloud tokens typically last 1 hour (3600 seconds)
            # Store the absolute expiry time
            self._token_expiry = time.time() + token_data.get("expires_in", 3600)
            
            logger.info("Successfully refreshed access token.")
            
        except requests.exceptions.HTTPError as e:
            if response.status_code == 401:
                logger.error("Invalid Client ID or Secret. Check your configuration.")
            else:
                logger.error(f"Failed to refresh token: {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error during token refresh: {e}")
            raise

    def get_oauth_token(self) -> str:
        """
        Required by the SDK Configuration base class. 
        Delegates to our custom get_access_token.
        """
        return self.get_access_token()

Initialization

You initialize the client using this custom configuration instead of the standard one.

def get_genesys_client(client_id: str, client_secret: str) -> PureCloudPlatformClientV2:
    """
    Creates and returns a PureCloudPlatformClientV2 instance with auto-refreshing auth.
    """
    config = AutoRefreshingConfig(
        client_id=client_id,
        client_secret=client_secret,
        env="mypurecloud.com" # Change to your specific environment if needed
    )
    
    return PureCloudPlatformClientV2(config)

Implementation

Step 1: Define the Batch Query Parameters

We will query conversation analytics data. The endpoint /api/v2/analytics/conversations/details/query is the standard for retrieving detailed conversation records. This endpoint supports pagination via the nextPageToken field in the response.

The request body requires a specific JSON structure. Key fields include view, interval, dateFrom, dateTo, and size.

def build_query_body(date_from: str, date_to: str, page_size: int = 50) -> dict:
    """
    Constructs the JSON body for the analytics query.
    
    Args:
        date_from: ISO 8601 start date (e.g., "2023-01-01T00:00:00Z")
        date_to: ISO 8601 end date (e.g., "2023-01-02T00:00:00Z")
        page_size: Number of records per page (max 1000 for details)
    
    Returns:
        dict: The JSON payload for the API request.
    """
    return {
        "view": "default",
        "interval": "PT1H",
        "dateFrom": date_from,
        "dateTo": date_to,
        "size": page_size,
        "groupBy": ["channel.mediaType"],
        "select": [
            "conversationId",
            "channel.mediaType",
            "direction",
            "durationSeconds",
            "waitSeconds",
            "holdSeconds",
            "talkSeconds",
            "wrapupSeconds",
            "startTimestamp",
            "endTimestamp",
            "participants"
        ]
    }

Step 2: Core Logic with Pagination and Retry

The core loop iterates through pages. The critical part is handling the nextPageToken. If the token expires during the processing of a page (unlikely but possible if processing takes time) or before the next API call, our AutoRefreshingConfig will handle it transparently.

However, network issues can occur. We add a simple retry mechanism for transient errors (429 Too Many Requests, 5xx Server Errors).

import time

def fetch_analytics_batch(client: PureCloudPlatformClientV2, date_from: str, date_to: str) -> list:
    """
    Fetches all analytics records for the given date range, handling pagination and retries.
    
    Args:
        client: The initialized PureCloudPlatformClientV2 instance.
        date_from: Start date string.
        date_to: End date string.
        
    Returns:
        list: A list of all conversation detail objects.
    """
    all_records = []
    query_body = build_query_body(date_from, date_to, page_size=100)
    
    # The analytics API uses the 'api_analytics' module in the SDK
    analytics_api = client.analytics
    
    page_token = None
    max_retries = 3
    retry_count = 0

    while True:
        try:
            # The SDK call. Our custom config ensures the token is fresh.
            # We pass the nextPageToken if it exists.
            if page_token:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_body,
                    next_page_token=page_token
                )
            else:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_body
                )
            
            # Reset retry count on success
            retry_count = 0
            
            # Extract the entities (records) from the response
            if response.entities:
                all_records.extend(response.entities)
                logger.info(f"Fetched {len(response.entities)} records. Total so far: {len(all_records)}")
            
            # Check for next page
            if response.next_page_token:
                page_token = response.next_page_token
            else:
                logger.info("No more pages. Batch complete.")
                break
                
        except ApiException as e:
            status_code = e.status
            logger.warning(f"API Error: {status_code} - {e.reason}")
            
            # Handle Rate Limiting (429) and Server Errors (5xx)
            if status_code in [429, 500, 502, 503, 504]:
                retry_count += 1
                if retry_count > max_retries:
                    logger.error(f"Max retries exceeded after {max_retries} attempts.")
                    raise
                
                # Exponential backoff: 2^retry_count seconds
                wait_time = 2 ** retry_count
                logger.info(f"Retrying in {wait_time} seconds... (Attempt {retry_count}/{max_retries})")
                time.sleep(wait_time)
                continue
            else:
                # For other errors (400, 401, 403), do not retry
                logger.error(f"Non-retryable error: {status_code}")
                raise
                
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

    return all_records

Step 3: Processing Results

Once the batch is fetched, you process the data. In this example, we simply count the records by media type. This demonstrates that the data structure is intact after the pagination loop.

def process_records(records: list) -> dict:
    """
    Processes the fetched records to generate a summary.
    """
    summary = {}
    
    for record in records:
        # Accessing nested properties safely
        channel_type = record.channel.media_type if record.channel and record.channel.media_type else "Unknown"
        
        if channel_type not in summary:
            summary[channel_type] = 0
        summary[channel_type] += 1
        
    return summary

Complete Working Example

This is the full, copy-pasteable script. Save it as batch_analytics.py.

import time
import logging
import sys
from typing import Optional

# Ensure you have installed the genesyscloud SDK
# pip install genesyscloud requests

from genesyscloud.platform.client import PureCloudPlatformClientV2
from genesyscloud.platform.client.configuration import Configuration
from genesyscloud.platform.client.api_exception import ApiException
import requests

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class AutoRefreshingConfig(Configuration):
    """
    Custom Configuration that handles token refresh automatically.
    """
    def __init__(self, client_id: str, client_secret: str, env: str = "mypurecloud.com"):
        super().__init__()
        self.client_id = client_id
        self.client_secret = client_secret
        self.env = env
        self.token_endpoint = f"https://{env}/oauth/token"
        
        self._access_token: Optional[str] = None
        self._token_expiry: float = 0.0
        
    def get_access_token(self) -> str:
        """
        Returns a valid access token. Refreshes if necessary.
        """
        # Refresh if no token or if it expires within the next 60 seconds
        if self._access_token and (time.time() < (self._token_expiry - 60)):
            return self._access_token

        logger.info("Access token is missing or expiring soon. Refreshing...")
        self._refresh_token()
        return self._access_token

    def _refresh_token(self) -> None:
        """
        Performs the OAuth 2.0 Client Credentials grant.
        """
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "analytics:conversation:read analytics:details:query"
        }
        
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        try:
            response = requests.post(
                self.token_endpoint, 
                data=payload, 
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
            
            token_data = response.json()
            self._access_token = token_data["access_token"]
            self._token_expiry = time.time() + token_data.get("expires_in", 3600)
            logger.info("Successfully refreshed access token.")
            
        except requests.exceptions.HTTPError as e:
            logger.error(f"Failed to refresh token: {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error during token refresh: {e}")
            raise

    def get_oauth_token(self) -> str:
        """
        Required by the SDK Configuration base class.
        """
        return self.get_access_token()


def build_query_body(date_from: str, date_to: str, page_size: int = 100) -> dict:
    """
    Constructs the JSON body for the analytics query.
    """
    return {
        "view": "default",
        "interval": "PT1H",
        "dateFrom": date_from,
        "dateTo": date_to,
        "size": page_size,
        "groupBy": ["channel.mediaType"],
        "select": [
            "conversationId",
            "channel.mediaType",
            "direction",
            "durationSeconds",
            "startTimestamp",
            "endTimestamp"
        ]
    }


def fetch_analytics_batch(client: PureCloudPlatformClientV2, date_from: str, date_to: str) -> list:
    """
    Fetches all analytics records for the given date range.
    """
    all_records = []
    query_body = build_query_body(date_from, date_to, page_size=100)
    
    analytics_api = client.analytics
    
    page_token = None
    max_retries = 3
    retry_count = 0

    while True:
        try:
            if page_token:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_body,
                    next_page_token=page_token
                )
            else:
                response = analytics_api.post_analytics_conversations_details_query(
                    body=query_body
                )
            
            retry_count = 0
            
            if response.entities:
                all_records.extend(response.entities)
                logger.info(f"Fetched {len(response.entities)} records. Total: {len(all_records)}")
            
            if response.next_page_token:
                page_token = response.next_page_token
            else:
                logger.info("No more pages. Batch complete.")
                break
                
        except ApiException as e:
            status_code = e.status
            logger.warning(f"API Error: {status_code} - {e.reason}")
            
            if status_code in [429, 500, 502, 503, 504]:
                retry_count += 1
                if retry_count > max_retries:
                    logger.error(f"Max retries exceeded.")
                    raise
                
                wait_time = 2 ** retry_count
                logger.info(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                continue
            else:
                logger.error(f"Non-retryable error: {status_code}")
                raise
                
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

    return all_records


def main():
    # CONFIGURATION
    CLIENT_ID = "your_client_id_here"
    CLIENT_SECRET = "your_client_secret_here"
    
    # Date range for the query (ISO 8601 format)
    # Example: Last 24 hours
    from datetime import datetime, timedelta
    end_time = datetime.utcnow().isoformat() + "Z"
    start_time = (datetime.utcnow() - timedelta(days=1)).isoformat() + "Z"
    
    print(f"Starting batch job for {start_time} to {end_time}")
    
    try:
        # Initialize client with auto-refreshing config
        config = AutoRefreshingConfig(CLIENT_ID, CLIENT_SECRET)
        client = PureCloudPlatformClientV2(config)
        
        # Fetch data
        records = fetch_analytics_batch(client, start_time, end_time)
        
        # Process data
        print(f"Total records fetched: {len(records)}")
        
        # Simple summary
        media_counts = {}
        for record in records:
            if record.channel and record.channel.media_type:
                mt = record.channel.media_type
                media_counts[mt] = media_counts.get(mt, 0) + 1
        
        print("Summary by Media Type:")
        for mt, count in media_counts.items():
            print(f"  {mt}: {count}")
            
    except Exception as e:
        logger.error(f"Job failed: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized during batch loop

Cause: The access token expired, and the SDK did not refresh it in time. This often happens if the SDK’s internal token cache is stale or if the script ran for longer than the token’s TTL (Time To Live, usually 1 hour).

Fix: Use the AutoRefreshingConfig class provided above. It checks the token validity before every request. Ensure your client_id and client_secret are correct. If the error persists, check if the OAuth application in Genesys Cloud has the analytics:conversation:read scope enabled.

Error: 429 Too Many Requests

Cause: You are sending requests faster than Genesys Cloud allows. The Analytics API has strict rate limits.

Fix: The code above includes exponential backoff for 429 errors. If you still hit this, reduce the page_size in build_query_body or increase the wait_time in the retry logic. Do not use multiple threads for the same API endpoint without careful rate-limit coordination.

Error: 400 Bad Request - “Invalid nextPageToken”

Cause: The nextPageToken is tied to the specific query parameters (dateFrom, dateTo, view, select, etc.). If you modify the query body between pages, the token becomes invalid.

Fix: Ensure the query_body variable remains constant throughout the while loop. Do not modify it based on partial results. The token is opaque and tied to the exact request signature.

Error: ModuleNotFoundError: No module named ‘genesyscloud’

Cause: The SDK is not installed.

Fix: Run pip install genesyscloud. Ensure you are using the same Python environment where you installed the package.

Official References