Token Refresh Logic: Handling Access Token Expiry Mid-Batch

Token Refresh Logic: Handling Access Token Expiry Mid-Batch

What You Will Build

  • A robust Python utility that automatically detects expired OAuth access tokens during long-running data processing jobs and refreshes them without interrupting the workflow.
  • This tutorial uses the Genesys Cloud PureCloud Platform Client V2 SDK and the underlying requests library for direct HTTP interaction.
  • The implementation covers Python 3.9+ with type hints, async/await patterns for concurrent API calls, and exponential backoff for rate limiting.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC). This guide assumes Client Credentials for server-to-server integrations, which is the standard for batch processing.
  • Required Scopes: analytics:query, user:read, or any scope required by your batch job. The refresh mechanism itself requires no additional scopes beyond what the initial token possesses.
  • SDK Version: genesys-cloud-purecloud-platform-client v130.0.0 or later.
  • Language/Runtime: Python 3.9+ (requires asyncio and aiolimiter for concurrency control).
  • External Dependencies:
    • pip install genesys-cloud-purecloud-platform-client
    • pip install httpx (for robust async HTTP handling)
    • pip install tenacity (for declarative retry logic)

Authentication Setup

The core issue in batch processing is that OAuth access tokens from Genesys Cloud typically expire after 3600 seconds (1 hour). If your batch job processes 10,000 records and takes 45 minutes, the token will expire halfway through, causing subsequent API calls to fail with 401 Unauthorized.

The solution is not to hardcode a refresh interval, but to implement a lazy refresh pattern. You attempt the API call. If it fails with a 401 or 403 (specifically when the error message indicates token expiry), you trigger a refresh, cache the new token, and retry the original request exactly once.

Below is the foundational authentication wrapper.

import os
import time
import threading
from typing import Optional, Dict, Any
from genesyscloud.platform_client_v2.platform_client import PlatformClient
from genesyscloud.platform_client_v2.api_client import ApiClient
import httpx

class GenesysAuthManager:
    """
    Manages OAuth token lifecycle for Genesys Cloud API calls.
    Implements lazy refresh and thread-safe token storage.
    """
    
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_cache: Dict[str, Any] = {}
        self.token_expires_at: float = 0.0
        self._lock = threading.Lock()
        
        # Initialize the Genesys SDK Platform Client
        self.platform_client = PlatformClient()
        self._configure_sdk_client()

    def _configure_sdk_client(self):
        """Configures the SDK client to use our custom auth logic."""
        # We do not set credentials directly here. Instead, we intercept 
        # requests or manage the token manually and inject it.
        # For this tutorial, we will use the SDK's ability to set a token 
        # programmatically after retrieval.
        pass

    def get_access_token(self) -> str:
        """
        Returns a valid access token.
        If the current token is expired or close to expiration, it refreshes.
        """
        current_time = time.time()
        
        # Check if token is expired or will expire in the next 60 seconds
        # We add a buffer to avoid race conditions where the token expires 
        # between the check and the API call.
        if current_time >= self.token_expires_at - 60:
            self._refresh_token()
            
        return self.token_cache.get("access_token", "")

    def _refresh_token(self):
        """
        Performs the OAuth2 Client Credentials Grant flow.
        """
        with self._lock:
            # Double-check pattern to prevent duplicate refreshes in concurrent threads
            if time.time() < self.token_expires_at - 60:
                return

            token_url = f"https://api.{self.environment}/oauth/token"
            headers = {
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json"
            }
            data = {
                "grant_type": "client_credentials",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                # Optional: Specify scopes. If omitted, defaults to client config.
                # "scope": "analytics:query user:read" 
            }

            try:
                with httpx.Client(timeout=10.0) as client:
                    response = client.post(token_url, headers=headers, data=data)
                    response.raise_for_status()
                    
                    token_data = response.json()
                    expires_in = token_data.get("expires_in", 3600)
                    
                    self.token_cache = token_data
                    self.token_expires_at = time.time() + expires_in
                    
            except httpx.HTTPStatusError as e:
                raise Exception(f"Failed to refresh token: {e.response.text}") from e
            except Exception as e:
                raise Exception(f"Unexpected error during token refresh: {str(e)}") from e

    def inject_token_into_sdk(self):
        """
        Injects the current valid token into the Genesys SDK client.
        """
        token = self.get_access_token()
        # The SDK allows setting the token directly on the auth manager
        self.platform_client.auth_manager.set_token(token)

Implementation

Step 1: Building the Retry Logic with 401 Detection

The most critical part of this tutorial is handling the 401 error. A naive retry loop will fail if it does not distinguish between a “Bad Request” (400) and an “Unauthorized” (401). You must only retry on 401 or specific 403 cases related to token validity.

We will use the tenacity library to handle retries declaratively. This keeps the business logic clean.

import tenacity
import logging

logger = logging.getLogger(__name__)

def is_auth_error(exception: Exception) -> bool:
    """
    Predicate to determine if an exception is caused by an expired token.
    """
    # Check for Genesys SDK specific exceptions
    if hasattr(exception, 'status_code'):
        status_code = exception.status_code
        # 401 Unauthorized is the primary indicator of token expiry
        if status_code == 401:
            return True
        # 403 Forbidden can sometimes indicate scope issues or token revocation
        if status_code == 403:
            # Inspect the response body if possible
            if hasattr(exception, 'body') and 'token' in str(exception.body).lower():
                return True
    return False

@tenacity.retry(
    stop=tenacity.stop_after_attempt(3),
    wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
    retry=tenacity.retry_if_exception(is_auth_error),
    reraise=True
)
def make_api_call_with_retry(auth_manager: GenesysAuthManager, api_func: callable, *args, **kwargs):
    """
    Wrapper that ensures a valid token is present before calling the API,
    and retries if a 401 is received.
    """
    # 1. Ensure token is fresh before the call
    auth_manager.inject_token_into_sdk()
    
    try:
        # 2. Execute the API call
        return api_func(*args, **kwargs)
    except Exception as e:
        # 3. If it is an auth error, trigger an immediate refresh
        if is_auth_error(e):
            logger.warning("Detected authentication error. Refreshing token...")
            auth_manager._refresh_token()
            auth_manager.inject_token_into_sdk()
            raise  # Re-raise to trigger tenacity retry
        else:
            raise

Step 2: Core Logic - Processing Analytics Data

Now we apply this pattern to a real-world scenario: fetching conversation details. The /api/v2/analytics/conversations/details/query endpoint is heavy and often requires pagination. If the token expires between pages, the job fails without this logic.

from genesyscloud.platform_client_v2.api.analytics_api import AnalyticsApi
from genesyscloud.analytics.models import ConversationDetailsQuery

def fetch_conversation_details(auth_manager: GenesysAuthManager, query_params: Dict[str, Any]) -> list:
    """
    Fetches all conversation details matching the query, handling pagination and token refresh.
    """
    analytics_api = AnalyticsApi(auth_manager.platform_client)
    all_conversations = []
    next_page_token = None
    
    max_pages = 50  # Safety limit to prevent infinite loops
    
    for page_num in range(max_pages):
        # Construct the query object
        body = ConversationDetailsQuery(
            entity_ids=query_params.get("entityIds"),
            date_from=query_params.get("dateFrom"),
            date_to=query_params.get("dateTo"),
            size=250,  # Max page size for this endpoint
            page_token=next_page_token
        )
        
        try:
            # Use the retry wrapper
            response = make_api_call_with_retry(
                auth_manager, 
                analytics_api.post_analytics_conversations_details_query,
                body=body
            )
            
            # Process results
            if response.entities:
                all_conversations.extend(response.entities)
                logger.info(f"Fetched page {page_num + 1}, total records so far: {len(all_conversations)}")
            
            # Check for pagination
            if response.next_page_token:
                next_page_token = response.next_page_token
            else:
                break
                
        except Exception as e:
            logger.error(f"Failed to fetch page {page_num + 1}: {str(e)}")
            # If retries are exhausted, tenacity will raise the final exception
            raise

    return all_conversations

Step 3: Processing Results and Error Handling

When processing the results, you must handle cases where the API returns partial data due to a timeout or network error. However, for token expiry, the retry logic in Step 1 ensures that the entire request is repeated.

It is important to note that tenacity retries the function call. If the API is idempotent (like GET or POST with unique IDs), this is safe. For non-idempotent operations (like creating a user), you must ensure the retry logic does not create duplicates. In this tutorial, we focus on read-only analytics, which is idempotent.

def process_batch_job(auth_manager: GenesysAuthManager):
    """
    Main entry point for the batch job.
    """
    # Example query parameters
    query_params = {
        "dateFrom": "2023-10-01T00:00:00.000Z",
        "dateTo": "2023-10-02T00:00:00.000Z",
        "entityIds": ["your_queue_id_here"] # Replace with actual ID
    }
    
    try:
        conversations = fetch_conversation_details(auth_manager, query_params)
        print(f"Successfully processed {len(conversations)} conversations.")
        
        # Example: Write to local file or database
        # with open("conversations.json", "w") as f:
        #     json.dump(conversations, f)
        
    except Exception as e:
        logger.critical(f"Batch job failed after retries: {str(e)}")
        # Handle final failure (e.g., send alert)

Complete Working Example

Below is the full, copy-pasteable script. Replace the placeholder credentials with your Genesys Cloud OAuth client ID and secret.

import os
import time
import threading
import logging
import json
from typing import Optional, Dict, Any

# Install dependencies: pip install genesys-cloud-purecloud-platform-client httpx tenacity
from genesyscloud.platform_client_v2.platform_client import PlatformClient
from genesyscloud.platform_client_v2.api.analytics_api import AnalyticsApi
from genesyscloud.analytics.models import ConversationDetailsQuery
import httpx
import tenacity

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class GenesysAuthManager:
    """
    Manages OAuth token lifecycle for Genesys Cloud API calls.
    """
    
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_cache: Dict[str, Any] = {}
        self.token_expires_at: float = 0.0
        self._lock = threading.Lock()
        
        self.platform_client = PlatformClient()

    def get_access_token(self) -> str:
        """
        Returns a valid access token.
        """
        current_time = time.time()
        
        # Refresh if expired or within 60 seconds of expiry
        if current_time >= self.token_expires_at - 60:
            self._refresh_token()
            
        return self.token_cache.get("access_token", "")

    def _refresh_token(self):
        """
        Performs the OAuth2 Client Credentials Grant flow.
        """
        with self._lock:
            # Double-check pattern
            if time.time() < self.token_expires_at - 60:
                return

            token_url = f"https://api.{self.environment}/oauth/token"
            headers = {
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json"
            }
            data = {
                "grant_type": "client_credentials",
                "client_id": self.client_id,
                "client_secret": self.client_secret
            }

            try:
                with httpx.Client(timeout=10.0) as client:
                    response = client.post(token_url, headers=headers, data=data)
                    response.raise_for_status()
                    
                    token_data = response.json()
                    expires_in = token_data.get("expires_in", 3600)
                    
                    self.token_cache = token_data
                    self.token_expires_at = time.time() + expires_in
                    logger.info("Token refreshed successfully.")
                    
            except httpx.HTTPStatusError as e:
                raise Exception(f"Failed to refresh token: {e.response.text}") from e
            except Exception as e:
                raise Exception(f"Unexpected error during token refresh: {str(e)}") from e

    def inject_token_into_sdk(self):
        """
        Injects the current valid token into the Genesys SDK client.
        """
        token = self.get_access_token()
        self.platform_client.auth_manager.set_token(token)


def is_auth_error(exception: Exception) -> bool:
    """
    Predicate to determine if an exception is caused by an expired token.
    """
    if hasattr(exception, 'status_code'):
        status_code = exception.status_code
        if status_code == 401:
            return True
        if status_code == 403:
            if hasattr(exception, 'body') and 'token' in str(exception.body).lower():
                return True
    return False

@tenacity.retry(
    stop=tenacity.stop_after_attempt(3),
    wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
    retry=tenacity.retry_if_exception(is_auth_error),
    reraise=True
)
def make_api_call_with_retry(auth_manager: GenesysAuthManager, api_func: callable, *args, **kwargs):
    """
    Wrapper that ensures a valid token is present before calling the API,
    and retries if a 401 is received.
    """
    auth_manager.inject_token_into_sdk()
    
    try:
        return api_func(*args, **kwargs)
    except Exception as e:
        if is_auth_error(e):
            logger.warning("Detected authentication error. Refreshing token...")
            auth_manager._refresh_token()
            auth_manager.inject_token_into_sdk()
            raise  # Re-raise to trigger tenacity retry
        else:
            raise


def fetch_conversation_details(auth_manager: GenesysAuthManager, query_params: Dict[str, Any]) -> list:
    """
    Fetches all conversation details matching the query, handling pagination and token refresh.
    """
    analytics_api = AnalyticsApi(auth_manager.platform_client)
    all_conversations = []
    next_page_token = None
    
    max_pages = 50 
    
    for page_num in range(max_pages):
        body = ConversationDetailsQuery(
            entity_ids=query_params.get("entityIds"),
            date_from=query_params.get("dateFrom"),
            date_to=query_params.get("dateTo"),
            size=250,
            page_token=next_page_token
        )
        
        try:
            response = make_api_call_with_retry(
                auth_manager, 
                analytics_api.post_analytics_conversations_details_query,
                body=body
            )
            
            if response.entities:
                all_conversations.extend(response.entities)
                logger.info(f"Fetched page {page_num + 1}, total records so far: {len(all_conversations)}")
            
            if response.next_page_token:
                next_page_token = response.next_page_token
            else:
                break
                
        except Exception as e:
            logger.error(f"Failed to fetch page {page_num + 1}: {str(e)}")
            raise

    return all_conversations


def main():
    # Configuration
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
    CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
    ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
    
    if CLIENT_ID == "your_client_id":
        print("Error: Please set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")
        return

    # Initialize Auth Manager
    auth_manager = GenesysAuthManager(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
    
    # Example Query
    query_params = {
        "dateFrom": "2023-10-01T00:00:00.000Z",
        "dateTo": "2023-10-02T00:00:00.000Z",
        "entityIds": ["your_queue_id_here"] 
    }
    
    try:
        conversations = fetch_conversation_details(auth_manager, query_params)
        print(f"Successfully processed {len(conversations)} conversations.")
        
        # Optional: Save to file
        # with open("output.json", "w") as f:
        #     json.dump(conversations, f, default=str, indent=2)
        
    except Exception as e:
        logger.critical(f"Batch job failed: {str(e)}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The access token has expired, been revoked, or was never valid.
  • How to fix it: Ensure the GenesysAuthManager is initialized with correct client_id and client_secret. Verify that the OAuth client has the necessary scopes. The retry logic in this tutorial handles expiry automatically. If it persists, check that the client credentials have not been rotated in the Genesys Cloud admin console.

Error: 403 Forbidden

  • What causes it: The token is valid but lacks the required scope (e.g., analytics:query), or the client ID does not have permission to access the specific resource (e.g., a queue in a different org).
  • How to fix it: Verify the OAuth client’s scopes in the Genesys Cloud Admin console under Security > OAuth. Ensure the scope matches the API endpoint requirements.

Error: 429 Too Many Requests

  • What causes it: You have exceeded the Genesys Cloud API rate limits. This is common in batch jobs.
  • How to fix it: The tenacity library in this tutorial uses wait_exponential. This helps mitigate rate limits by slowing down retries. For sustained batch processing, implement a token bucket or leaky bucket algorithm to throttle requests per second. Consider using the Genesys Cloud Async API endpoints if available for your use case, as they are designed for high-volume data extraction.

Error: 500 Internal Server Error

  • What causes it: A temporary issue on the Genesys Cloud server side.
  • How to fix it: Retry the request with exponential backoff. The tenacity configuration in this tutorial handles generic retries for non-auth errors if you modify the retry_if_exception predicate to include tenacity.retry_if_exception_type(Exception). However, be cautious with non-idempotent operations.

Official References