Implementing Robust Token Refresh Logic for Long-Running Batch Jobs

Implementing Robust Token Refresh Logic for Long-Running Batch Jobs

What You Will Build

  • A production-grade Python utility that automatically handles OAuth2 access token expiration during long-running data export or import operations.
  • A retry mechanism that intercepts 401 Unauthorized responses, refreshes the token using the client credentials flow, and resumes the batch job without data loss.
  • The implementation uses the requests library with a custom session wrapper and demonstrates the pattern using Genesys Cloud CX and NICE CXone APIs.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant). This flow provides a refresh token or allows re-authentication using the client secret.
  • Required Scopes: analytics:conversation:read, user:read, or any scope relevant to your batch operation.
  • SDK/API Version: Genesys Cloud API v2, NICE CXone API v2.
  • Language/Runtime: Python 3.9+.
  • External Dependencies: requests>=2.31.0, pydantic>=2.0 (for configuration validation).

Authentication Setup

The core problem in batch processing is that OAuth2 access tokens typically expire after 2 hours (7200 seconds). If your batch job processes 10,000 records and takes 3 hours, the token will expire mid-stream. The naive approach is to fetch a new token every request, which adds unnecessary latency. The correct approach is to cache the token and refresh it only when the API returns a 401 Unauthorized or when the token is nearing expiration.

We will implement a TokenManager class that holds the current access token and the refresh token (if applicable) or the client credentials. It exposes a get_valid_token() method that returns a fresh token if the current one is invalid or expired.

import time
import requests
from typing import Optional, Dict, Any

class TokenManager:
    """
    Manages OAuth2 token lifecycle for Genesys Cloud CX or NICE CXone.
    Supports Client Credentials Grant.
    """
    def __init__(self, env_url: str, client_id: str, client_secret: str, scope: str):
        self.env_url = env_url.rstrip('/')
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.token_type: str = "Bearer"

    def _get_token_endpoint(self) -> str:
        """
        Returns the OAuth token endpoint based on the environment URL.
        Genesys Cloud uses /oauth/token
        NICE CXone uses /oauth2/token
        """
        if "pure.cloud" in self.env_url or "genesi" in self.env_url:
            return f"{self.env_url}/oauth/token"
        elif "nice.incontact" in self.env_url or "nicecxone" in self.env_url:
            return f"{self.env_url}/oauth2/token"
        else:
            # Default fallback for Genesys
            return f"{self.env_url}/oauth/token"

    def _fetch_token(self) -> Dict[str, Any]:
        """
        Fetches a new access token using Client Credentials Grant.
        """
        token_url = self._get_token_endpoint()
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scope
        }
        
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        response = requests.post(token_url, data=payload, headers=headers)
        
        if response.status_code != 200:
            raise Exception(f"Failed to fetch token: {response.status_code} - {response.text}")
        
        data = response.json()
        self.access_token = data.get("access_token")
        self.token_type = data.get("token_type", "Bearer")
        # Genesys returns expires_in in seconds. CXone may vary, but usually follows standard.
        expires_in = data.get("expires_in", 7200)
        self.token_expiry = time.time() + expires_in - 300 # Subtract 5 mins buffer
        
        return data

    def get_valid_token(self) -> str:
        """
        Returns the current access token. If expired or None, fetches a new one.
        """
        if not self.access_token or time.time() >= self.token_expiry:
            self._fetch_token()
        return self.access_token

Implementation

Step 1: Creating a Resilient HTTP Session

We need a mechanism to intercept HTTP requests, attach the authentication header, and handle 401 errors. We will subclass requests.Session to override the request method. This allows us to transparently refresh the token and retry the request without changing the calling code.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class AuthenticatedSession(requests.Session):
    """
    A requests Session that automatically handles token refresh on 401 errors.
    """
    def __init__(self, token_manager: TokenManager, max_retries: int = 3):
        super().__init__()
        self.token_manager = token_manager
        self.max_retries = max_retries
        
        # Configure retry strategy for non-auth errors (e.g., 503, 429)
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("https://", adapter)
        self.mount("http://", adapter)

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """
        Overrides the request method to inject auth header and handle 401 retries.
        """
        # Ensure we have a valid token before sending
        token = self.token_manager.get_valid_token()
        
        # Inject Authorization header
        if "headers" not in kwargs:
            kwargs["headers"] = {}
        kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {token}"
        
        # First attempt
        response = super().request(method, url, **kwargs)
        
        # If 401 Unauthorized, try refreshing token once and retry
        if response.status_code == 401:
            # Check if this is a fresh token issue (e.g., token revoked on server side)
            # Force a new token fetch
            self.token_manager._fetch_token()
            new_token = self.token_manager.get_valid_token()
            kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {new_token}"
            
            # Retry the request with the new token
            response = super().request(method, url, **kwargs)
        
        # Raise for status if still not successful
        if response.status_code not in [200, 201, 204, 429]:
            response.raise_for_status()
            
        return response

Step 2: Implementing the Batch Job with Pagination

Now we apply this session to a real-world scenario: exporting conversation details from Genesys Cloud. The /api/v2/analytics/conversations/details/query endpoint is paginated. If the job runs long, the token will expire. Our AuthenticatedSession will handle this transparently.

We must handle pagination correctly. Genesys Cloud uses nextPage in the response body.

from typing import List, Dict, Any, Generator
import json

def export_conversations(session: AuthenticatedSession, query_body: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
    """
    Fetches all conversations matching the query, handling pagination and token refresh.
    
    Args:
        session: An AuthenticatedSession instance.
        query_body: The JSON payload for the analytics query.
        
    Yields:
        Individual conversation detail objects.
    """
    url = f"{session.token_manager.env_url}/api/v2/analytics/conversations/details/query"
    
    # Initial request
    response = session.post(url, json=query_body)
    
    while True:
        data = response.json()
        
        # Process results
        results = data.get("results", [])
        for result in results:
            yield result
            
        # Check for pagination
        next_page = data.get("nextPage")
        if not next_page:
            break
            
        # Fetch next page
        # Note: Genesys Cloud returns the next page as a URL or a query ID.
        # For details query, it often returns a 'nextPage' URL or requires re-querying with cursor.
        # Here we assume the standard nextLink pattern or cursor-based pagination.
        # For Genesys Analytics Details, it usually returns a 'nextPage' URL.
        
        response = session.get(next_page)
        
        # If the next page request fails due to token expiry, the session handles it.
        # If it fails for other reasons (e.g., 429), the retry strategy handles it.

Step 3: Processing Results and Error Handling

In a production environment, you must handle cases where the API returns a 429 (Too Many Requests) or 5xx errors. Our AuthenticatedSession already retries 429s. However, for 401s, we only retry once. If the second attempt also fails with 401, it indicates a fundamental authentication issue (wrong client ID/secret or scope), and we should stop and alert.

We also need to ensure that if the job fails partway through, we can resume. This is done by tracking the last processed ID or cursor.

def run_batch_export(
    env_url: str, 
    client_id: str, 
    client_secret: str, 
    scope: str, 
    query_params: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Main function to run the batch export with robust error handling.
    """
    # 1. Initialize Token Manager
    token_mgr = TokenManager(env_url, client_id, client_secret, scope)
    
    # 2. Initialize Session
    session = AuthenticatedSession(token_mgr, max_retries=3)
    
    all_conversations = []
    
    try:
        # 3. Execute Export
        for conv in export_conversations(session, query_params):
            all_conversations.append(conv)
            
            # Optional: Log progress every 100 records
            if len(all_conversations) % 100 == 0:
                print(f"Processed {len(all_conversations)} conversations...")
                
        print(f"Export complete. Total records: {len(all_conversations)}")
        
    except requests.exceptions.HTTPError as e:
        # Handle specific HTTP errors
        if e.response.status_code == 401:
            print("CRITICAL: Authentication failed even after token refresh. Check credentials and scopes.")
        elif e.response.status_code == 403:
            print("CRITICAL: Forbidden. Check OAuth scopes.")
        elif e.response.status_code == 429:
            print("CRITICAL: Rate limit exceeded after retries. Consider reducing query frequency.")
        else:
            print(f"HTTP Error: {e}")
            
    except Exception as e:
        print(f"Unexpected error: {e}")
        
    return all_conversations

Complete Working Example

Below is the full, copy-pasteable script. It uses requests and handles the entire lifecycle of a batch job, including token refresh.

#!/usr/bin/env python3
"""
Genesys Cloud CX / NICE CXone Batch Export with Auto-Refresh Token Logic

This script demonstrates how to handle OAuth2 token expiration during long-running
batch operations. It uses the Client Credentials Grant flow.

Prerequisites:
pip install requests pydantic
"""

import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, List, Generator
import os

# --- Configuration ---
# Replace these with your actual credentials
ENV_URL = os.getenv("GENESYS_ENV_URL", "https://api.mypurecloud.com")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
SCOPE = os.getenv("GENESYS_SCOPE", "analytics:conversation:read")

# --- Token Manager ---

class TokenManager:
    def __init__(self, env_url: str, client_id: str, client_secret: str, scope: str):
        self.env_url = env_url.rstrip('/')
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.token_type: str = "Bearer"

    def _get_token_endpoint(self) -> str:
        if "pure.cloud" in self.env_url or "genesi" in self.env_url:
            return f"{self.env_url}/oauth/token"
        elif "nice.incontact" in self.env_url or "nicecxone" in self.env_url:
            return f"{self.env_url}/oauth2/token"
        else:
            return f"{self.env_url}/oauth/token"

    def _fetch_token(self) -> Dict[str, Any]:
        token_url = self._get_token_endpoint()
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scope
        }
        
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        response = requests.post(token_url, data=payload, headers=headers)
        
        if response.status_code != 200:
            raise Exception(f"Failed to fetch token: {response.status_code} - {response.text}")
        
        data = response.json()
        self.access_token = data.get("access_token")
        self.token_type = data.get("token_type", "Bearer")
        expires_in = data.get("expires_in", 7200)
        self.token_expiry = time.time() + expires_in - 300 # 5 min buffer
        
        return data

    def get_valid_token(self) -> str:
        if not self.access_token or time.time() >= self.token_expiry:
            self._fetch_token()
        return self.access_token

# --- Authenticated Session ---

class AuthenticatedSession(requests.Session):
    def __init__(self, token_manager: TokenManager, max_retries: int = 3):
        super().__init__()
        self.token_manager = token_manager
        self.max_retries = max_retries
        
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("https://", adapter)
        self.mount("http://", adapter)

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        token = self.token_manager.get_valid_token()
        
        if "headers" not in kwargs:
            kwargs["headers"] = {}
        kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {token}"
        
        response = super().request(method, url, **kwargs)
        
        if response.status_code == 401:
            self.token_manager._fetch_token()
            new_token = self.token_manager.get_valid_token()
            kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {new_token}"
            response = super().request(method, url, **kwargs)
        
        if response.status_code not in [200, 201, 204, 429]:
            response.raise_for_status()
            
        return response

# --- Batch Logic ---

def export_conversations(session: AuthenticatedSession, query_body: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
    url = f"{session.token_manager.env_url}/api/v2/analytics/conversations/details/query"
    
    response = session.post(url, json=query_body)
    
    while True:
        data = response.json()
        results = data.get("results", [])
        
        for result in results:
            yield result
            
        next_page = data.get("nextPage")
        if not next_page:
            break
            
        response = session.get(next_page)

def main():
    # Define query parameters
    # This query fetches conversations from the last 24 hours
    query_body = {
        "dateFrom": "2023-10-01T00:00:00.000Z",
        "dateTo": "2023-10-02T00:00:00.000Z",
        "size": 50,
        "query": "type:voice",
        "groupBy": [],
        "select": ["id", "type", "startTime", "endTime"]
    }

    token_mgr = TokenManager(ENV_URL, CLIENT_ID, CLIENT_SECRET, SCOPE)
    session = AuthenticatedSession(token_mgr, max_retries=3)
    
    all_conversations = []
    
    try:
        for conv in export_conversations(session, query_body):
            all_conversations.append(conv)
            if len(all_conversations) % 100 == 0:
                print(f"Processed {len(all_conversations)} conversations...")
                
        print(f"Export complete. Total records: {len(all_conversations)}")
        
    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized After Refresh

  • Cause: The client ID or secret is incorrect, or the OAuth application does not have the required scope. Alternatively, the token was revoked on the server side.
  • Fix: Verify the client credentials in the Genesys Cloud Admin Console or CXone Admin Center. Ensure the scope analytics:conversation:read is assigned to the OAuth app.
  • Code Fix: The AuthenticatedSession catches this after one retry. If it persists, the requests.exceptions.HTTPError will be raised with status 401. Check your logs for the exact error message from the _fetch_token method.

Error: 429 Too Many Requests

  • Cause: You are exceeding the API rate limits. Genesys Cloud has specific limits for analytics queries.
  • Fix: Implement exponential backoff. The HTTPAdapter with Retry strategy in AuthenticatedSession handles this automatically by retrying up to max_retries times with backoff.
  • Code Fix: Increase max_retries in AuthenticatedSession if your job is very large, but be aware that this increases total execution time.

Error: Token Expiry Mid-Pagination

  • Cause: The token expired between fetching page N and page N+1.
  • Fix: The AuthenticatedSession intercepts the 401 on the nextPage GET request, refreshes the token, and retries the GET. This is transparent to the export_conversations generator.
  • Code Fix: Ensure you are using the AuthenticatedSession for all requests, including pagination. Do not switch back to a standard requests.Session.

Official References