Handle Access Token Expiration Mid-Batch Without Data Loss

Handle Access Token Expiration Mid-Batch Without Data Loss

What You Will Build

  • A robust batch processing worker that automatically detects expired access tokens, refreshes them transparently, and retries failed API calls.
  • This implementation uses the Genesys Cloud CX REST API with Python requests and httpx for granular control over the OAuth flow.
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant). This flow is standard for backend batch jobs where no user interaction occurs.
  • Required Scopes: analytics:query, analytics:export (depending on the specific endpoint used).
  • SDK Version: genesys-cloud-purecloud-platform-client v158.0.0 or higher.
  • Runtime: Python 3.9+.
  • Dependencies:
    • httpx: For async HTTP requests with built-in retry and timeout management.
    • purecloudplatformclientv2: The official Genesys Cloud Python SDK.
    • pydantic: For data validation in the response processing step.

Install dependencies:

pip install httpx purecloudplatformclientv2 pydantic

Authentication Setup

The core problem in batch jobs is that the standard Client Credentials flow provides an access token with a validity of approximately 1 hour. If your batch job processes 50,000 records and takes 90 minutes, the token expires at minute 60, causing the remaining requests to fail with 401 Unauthorized.

You must implement a token manager that caches the token, checks its expiration time before every request, and fetches a new token only when necessary.

Token Manager Implementation

We will build a TokenManager class that handles the OAuth2 client credentials grant. It stores the expires_in timestamp and calculates when the token becomes invalid.

import time
import httpx
from typing import Optional, Dict, Any

class TokenManager:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://{environment}/oauth/token"
        
        # State
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[float] = None
        self.refresh_buffer_seconds = 60  # Refresh 60 seconds before actual expiry

    def _is_token_valid(self) -> bool:
        """Check if the current token is still valid with a safety buffer."""
        if self.access_token is None or self.token_expiry is None:
            return False
        return time.time() < (self.token_expiry - self.refresh_buffer_seconds)

    def get_access_token(self) -> str:
        """
        Returns a valid access token.
        Refreshes the token if it is expired or close to expiration.
        """
        if self._is_token_valid():
            return self.access_token

        # Token is invalid or expired; refresh it
        self._refresh_token()
        return self.access_token

    def _refresh_token(self) -> None:
        """Fetches a new token using Client Credentials Grant."""
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        # Use httpx for robust connection handling
        with httpx.Client(timeout=30.0) as client:
            response = client.post(
                self.token_url,
                content=payload,  # httpx handles urlencoding of dict if content type is form-urlencoded
                headers=headers
            )
            
            if response.status_code != 200:
                raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}")

            data = response.json()
            self.access_token = data["access_token"]
            # expires_in is in seconds from now
            self.token_expiry = time.time() + data["expires_in"]
            
            print(f"Token refreshed. Expires in {data['expires_in']} seconds.")

Implementation

Step 1: Configure the HTTP Client with Automatic Retry and Token Injection

When making API calls, you cannot simply use a static Authorization: Bearer <token> header because the token changes. You need an HTTP client that dynamically injects the current valid token for every request and handles transient errors (like 429 Too Many Requests or 5xx Server Errors).

We will use httpx because it allows us to use Event Hooks or Transports to intercept requests. For simplicity and clarity, we will create a wrapper method that ensures the token is fresh before sending the request.

import httpx
from purecloudplatformclientv2 import AnalyticsApi, ApiClient, Configuration
import json

class GenesysBatchWorker:
    def __init__(self, token_manager: TokenManager, environment: str = "mypurecloud.com"):
        self.token_manager = token_manager
        self.environment = environment
        
        # Configure the Genesys SDK's underlying API client to use our custom auth logic
        # Note: The SDK expects a Configuration object. We will bypass the SDK's internal auth 
        # for the batch query to demonstrate raw control, but we will use the SDK for parsing.
        
        self.analytics_api = AnalyticsApi(
            api_client=self._create_api_client()
        )

    def _create_api_client(self) -> ApiClient:
        """
        Creates an ApiClient that uses our TokenManager for authentication.
        The SDK's ApiClient allows us to set a custom 'get_access_token' callable.
        """
        config = Configuration(
            host=f"https://{self.environment}",
            get_access_token=self.token_manager.get_access_token,
        )
        
        # Configure retry strategy for 429 and 5xx errors
        # The SDK uses urllib3 under the hood, but we can influence retries via the configuration
        # However, for fine-grained control, we often wrap the SDK calls.
        return ApiClient(config)

    def _make_request_with_retry(self, func, *args, **kwargs) -> Any:
        """
        Wrapper to handle 429 rate limits and 5xx server errors with exponential backoff.
        """
        max_retries = 3
        base_delay = 2.0

        for attempt in range(max_retries):
            try:
                # Ensure token is valid before every attempt
                # The SDK's ApiClient will call get_access_token() automatically if configured correctly,
                # but explicit control is safer for complex batch flows.
                
                response = func(*args, **kwargs)
                return response

            except Exception as e:
                # Check for specific HTTP errors
                error_code = getattr(e, 'status', None)
                
                if error_code == 429:
                    # Rate Limited: Wait and retry
                    wait_time = base_delay * (2 ** attempt)
                    print(f"Rate limited (429). Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                elif error_code and 500 <= error_code < 600:
                    # Server Error: Wait and retry
                    wait_time = base_delay * (2 ** attempt)
                    print(f"Server error ({error_code}). Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                else:
                    # Other errors (401, 403, 400) should not be retried blindly
                    raise e
                    
        raise Exception("Max retries exceeded")

Step 2: Construct the Analytics Query Payload

Batch jobs typically pull conversation details. The endpoint /api/v2/analytics/conversations/details/query is paginated and supports large datasets. You must construct a valid JSON payload with the correct interval, view, and entityTypes.

Required Scope: analytics:query

    def build_query_payload(self, start_time: str, end_time: str) -> Dict[str, Any]:
        """
        Builds the JSON payload for the Analytics API.
        
        Args:
            start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00Z")
            end_time: ISO 8601 end time (e.g., "2023-10-01T23:59:59Z")
        """
        return {
            "interval": f"{start_time}/{end_time}",
            "view": "conversation",
            "entityTypes": ["conversation"],
            "groupings": [],
            "select": [
                "conversationId",
                "mediaType",
                "channelId",
                "startTime",
                "endTime",
                "duration",
                "wrapupCode",
                "queueId",
                "queueName",
                "skillId",
                "skillName"
            ],
            "pageSize": 1000,  # Max page size for this endpoint
            "pageToken": None  # Will be updated in the loop
        }

Step 3: Process Results with Pagination and Token Refresh

This is the critical section. The loop must:

  1. Check token validity.
  2. Send the request.
  3. If the request fails with 401, the TokenManager should have already refreshed it, but if the SDK throws a 401, we must force a refresh and retry.
  4. Handle pagination using the nextPageToken.
  5. Process the data chunk.
    def fetch_conversation_batch(self, start_time: str, end_time: str) -> list:
        """
        Fetches all conversations within the time range, handling pagination and token expiration.
        """
        all_conversations = []
        payload = self.build_query_payload(start_time, end_time)
        
        page_token = None
        max_pages = 1000  # Safety break to prevent infinite loops
        
        for page_num in range(max_pages):
            print(f"Fetching page {page_num + 1}...")
            
            # Update payload with current page token
            payload["pageToken"] = page_token
            
            try:
                # The SDK method for querying conversations
                # We wrap it in our retry logic
                response = self._make_request_with_retry(
                    self.analytics_api.post_analytics_conversations_details_query,
                    body=payload
                )
                
                # Extract data
                if not response.entities:
                    print("No more entities found.")
                    break
                
                all_conversations.extend(response.entities)
                
                # Check for next page
                page_token = response.next_page_token
                
                if not page_token:
                    print("End of results reached.")
                    break
                    
            except Exception as e:
                # Handle 401 explicitly in case the SDK's internal auth failed
                error_status = getattr(e, 'status', None)
                if error_status == 401:
                    print("Access token expired mid-request. Forcing refresh...")
                    self.token_manager._refresh_token()
                    # Retry the current page after refresh
                    continue
                else:
                    print(f"Unexpected error on page {page_num + 1}: {str(e)}")
                    raise e

        print(f"Total conversations fetched: {len(all_conversations)}")
        return all_conversations

Complete Working Example

This script combines the TokenManager and GenesysBatchWorker into a runnable module. Replace CLIENT_ID, CLIENT_SECRET, and the time range before running.

import time
import httpx
from purecloudplatformclientv2 import AnalyticsApi, ApiClient, Configuration
from typing import Optional, Dict, Any
import os

# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = "mypurecloud.com"

class TokenManager:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://{environment}/oauth/token"
        
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[float] = None
        self.refresh_buffer_seconds = 60

    def _is_token_valid(self) -> bool:
        if self.access_token is None or self.token_expiry is None:
            return False
        return time.time() < (self.token_expiry - self.refresh_buffer_seconds)

    def get_access_token(self) -> str:
        if self._is_token_valid():
            return self.access_token
        self._refresh_token()
        return self.access_token

    def _refresh_token(self) -> None:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        with httpx.Client(timeout=30.0) as client:
            response = client.post(self.token_url, content=payload, headers=headers)
            if response.status_code != 200:
                raise Exception(f"Token refresh failed: {response.status_code} - {response.text}")
            
            data = response.json()
            self.access_token = data["access_token"]
            self.token_expiry = time.time() + data["expires_in"]
            print(f"Token refreshed. Expires in {data['expires_in']} seconds.")

class GenesysBatchWorker:
    def __init__(self, token_manager: TokenManager, environment: str = "mypurecloud.com"):
        self.token_manager = token_manager
        self.environment = environment
        self.analytics_api = AnalyticsApi(api_client=self._create_api_client())

    def _create_api_client(self) -> ApiClient:
        config = Configuration(
            host=f"https://{self.environment}",
            get_access_token=self.token_manager.get_access_token,
        )
        return ApiClient(config)

    def _make_request_with_retry(self, func, *args, **kwargs) -> Any:
        max_retries = 3
        base_delay = 2.0

        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                error_code = getattr(e, 'status', None)
                if error_code == 429:
                    wait_time = base_delay * (2 ** attempt)
                    print(f"Rate limited (429). Retrying in {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                elif error_code and 500 <= error_code < 600:
                    wait_time = base_delay * (2 ** attempt)
                    print(f"Server error ({error_code}). Retrying in {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    raise e
        raise Exception("Max retries exceeded")

    def fetch_conversation_batch(self, start_time: str, end_time: str) -> list:
        all_conversations = []
        payload = {
            "interval": f"{start_time}/{end_time}",
            "view": "conversation",
            "entityTypes": ["conversation"],
            "groupings": [],
            "select": ["conversationId", "mediaType", "startTime", "endTime", "duration"],
            "pageSize": 1000,
            "pageToken": None
        }
        
        page_token = None
        max_pages = 1000
        
        for page_num in range(max_pages):
            print(f"Fetching page {page_num + 1}...")
            payload["pageToken"] = page_token
            
            try:
                response = self._make_request_with_retry(
                    self.analytics_api.post_analytics_conversations_details_query,
                    body=payload
                )
                
                if not response.entities:
                    break
                
                all_conversations.extend(response.entities)
                page_token = response.next_page_token
                
                if not page_token:
                    break
                    
            except Exception as e:
                error_status = getattr(e, 'status', None)
                if error_status == 401:
                    print("401 Error: Forcing token refresh...")
                    self.token_manager._refresh_token()
                    continue
                else:
                    raise e

        return all_conversations

if __name__ == "__main__":
    if not CLIENT_ID or not CLIENT_SECRET:
        raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # Initialize
    tm = TokenManager(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
    worker = GenesysBatchWorker(tm, ENVIRONMENT)

    # Define Time Range (Last 24 hours)
    from datetime import datetime, timedelta
    end = datetime.utcnow()
    start = end - timedelta(hours=24)
    start_str = start.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_str = end.strftime("%Y-%m-%dT%H:%M:%SZ")

    try:
        results = worker.fetch_conversation_batch(start_str, end_str)
        print(f"Success. Retrieved {len(results)} conversations.")
    except Exception as e:
        print(f"Job failed: {e}")

Common Errors & Debugging

Error: 401 Unauthorized (Mid-Batch)

  • Cause: The access token expired between the last check and the current request, or the SDK cached an old token.
  • Fix: Ensure your TokenManager checks time.time() < expiry - buffer before every request. The buffer_seconds (set to 60s in the example) prevents edge-case expirations. If the SDK still throws 401, catch it and force a _refresh_token() call immediately before retrying the same request.

Error: 429 Too Many Requests

  • Cause: You are exceeding the Genesys Cloud rate limits (typically 10 requests per second for analytics queries).
  • Fix: Implement exponential backoff. The _make_request_with_retry method in the example waits 2 * (2^attempt) seconds. Do not retry immediately. Also, ensure you are not spawning multiple threads hitting the same endpoint without rate-limiting each thread.

Error: 400 Bad Request (Invalid Interval)

  • Cause: The interval format in the analytics query is incorrect. Genesys requires ISO 8601 format with Z suffix.
  • Fix: Verify your start_time and end_time strings. Example: "2023-10-01T00:00:00Z/2023-10-01T23:59:59Z". Do not include milliseconds unless necessary, and ensure the timezone is UTC.

Error: MemoryError (Large Batches)

  • Cause: Storing all conversation entities in a list (all_conversations) for a large date range (e.g., 30 days) can exhaust RAM.
  • Fix: Process data in chunks. Instead of all_conversations.extend(), write each page to a file (CSV/JSONL) or database immediately. Clear the local list after writing.

Official References