Token Refresh Logic — Handling Access Token Expiration During Batch Operations

Token Refresh Logic — Handling Access Token Expiration During Batch Operations

What You Will Build

  • A robust HTTP client wrapper in Python that automatically detects expired access tokens (HTTP 401) and refreshes them before retrying the original request.
  • A batch processing script that queries Genesys Cloud CX Analytics API for conversation details, ensuring the job completes even if the 1-hour token lifespan is exceeded.
  • This tutorial covers Python using the requests library and the official Genesys Cloud Python SDK.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC) if applicable, though Client Credentials is standard for server-to-server batch jobs.
  • Required Scopes: analytics:conversation:read for the example endpoint.
  • SDK Version: Genesys Cloud Python SDK v2.14.0+ (genesys-cloud-sdk).
  • Runtime: Python 3.8+.
  • Dependencies: requests, genesys-cloud-sdk, python-dotenv (for secure credential management).

Install dependencies:

pip install requests genesys-cloud-sdk python-dotenv

Authentication Setup

Genesys Cloud access tokens expire after one hour by default. In a batch job processing thousands of records or iterating through large datasets, a single token will inevitably expire. The standard pattern is to intercept the 401 Unauthorized response, fetch a new token, update the client state, and retry the failed request.

We will use the Client Credentials flow, which is the most secure and reliable method for automated jobs.

Step 1: Implement the Token Provider

First, we create a class responsible for managing the lifecycle of the OAuth token. This class handles the initial fetch and the refresh logic.

import requests
import time
import logging
from typing import Optional, Dict, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GenesysTokenManager:
    def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
        self.client_id = client_id
        self.client_secret = client_secret
        # Map region to the correct OAuth endpoint
        self.oauth_endpoint = f"https://api.{region}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry_time: float = 0
        self._lock = None  # In a multi-threaded scenario, use threading.Lock()

    def _fetch_token(self) -> Dict[str, Any]:
        """
        Requests a new OAuth token from Genesys Cloud.
        Raises requests.exceptions.HTTPError on failure.
        """
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        response = requests.post(
            self.oauth_endpoint,
            data=payload,
            headers=headers
        )
        
        if response.status_code != 200:
            raise requests.exceptions.HTTPError(
                f"Token fetch failed with status {response.status_code}: {response.text}"
            )
        
        return response.json()

    def get_access_token(self) -> str:
        """
        Returns a valid access token.
        If the current token is expired or close to expiring, it fetches a new one.
        """
        # Check if token is expired or will expire in the next 5 minutes (300 seconds)
        if not self.access_token or time.time() >= (self.token_expiry_time - 300):
            logger.info("Access token expired or near expiry. Fetching new token...")
            try:
                token_data = self._fetch_token()
                self.access_token = token_data["access_token"]
                # expires_in is in seconds
                self.token_expiry_time = time.time() + token_data["expires_in"]
                logger.info("New access token acquired.")
            except requests.exceptions.HTTPError as e:
                logger.error(f"Failed to refresh token: {e}")
                raise
            except Exception as e:
                logger.error(f"Unexpected error during token refresh: {e}")
                raise
        
        return self.access_token

Step 2: Create the Retry-Enabled HTTP Client

Instead of relying solely on the SDK for low-level retries, we build a wrapper around requests.Session. This allows us to inject the token header dynamically and handle the 401 retry logic explicitly. This pattern is crucial because the Genesys Python SDK does not automatically retry 401 errors with token refresh; it returns the error to the caller.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class GenesysHttpClient:
    def __init__(self, token_manager: GenesysTokenManager):
        self.token_manager = token_manager
        self.session = requests.Session()
        
        # Configure standard retries for network issues (5xx, 429)
        # Note: We do NOT retry 401 here, as it requires a token refresh first.
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)

    def _build_headers(self) -> Dict[str, str]:
        """
        Returns headers with the current valid access token.
        """
        token = self.token_manager.get_access_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """
        Executes an HTTP request with automatic token refresh on 401.
        """
        headers = self._build_headers()
        # Merge any additional headers provided
        if "headers" in kwargs:
            headers.update(kwargs["headers"])
            del kwargs["headers"]

        try:
            response = self.session.request(method, url, headers=headers, **kwargs)
            
            # If we get a 401, the token is likely invalid/expired.
            # We force a refresh and retry ONCE.
            if response.status_code == 401:
                logger.warning("Received 401 Unauthorized. Refreshing token and retrying...")
                # Force immediate refresh by clearing the cached token validity window
                self.token_manager.access_token = None 
                
                # Re-fetch headers with the new token
                headers = self._build_headers()
                response = self.session.request(method, url, headers=headers, **kwargs)
                
                # If we STILL get a 401 after refresh, raise an error
                if response.status_code == 401:
                    raise requests.exceptions.HTTPError(
                        f"Persistent 401 Unauthorized after token refresh. Check scopes/permissions."
                    )
            
            response.raise_for_status()
            return response

        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP Error: {e}")
            raise
        except requests.exceptions.ConnectionError as e:
            logger.error(f"Connection Error: {e}")
            raise

Step 3: Implement the Batch Processing Logic

Now we apply this client to a real Genesys Cloud API endpoint. We will query /api/v2/analytics/conversations/details/query. This endpoint is commonly used for batch reporting. It supports pagination via the nextPageUri in the response.

Required Scope: analytics:conversation:read

import json
from typing import List, Dict, Any

class ConversationBatchProcessor:
    def __init__(self, client: GenesysHttpClient, region: str = "us-east-1"):
        self.client = client
        self.base_url = f"https://api.{region}.mypurecloud.com"

    def query_conversations(self, body: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Queries conversation details and handles pagination.
        Automatically handles token refresh across page boundaries.
        """
        endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
        all_results = []
        next_page_url = None

        # First request uses the base endpoint
        url = endpoint
        
        while url:
            logger.info(f"Fetching data from: {url}")
            
            try:
                # Send POST request with the query body
                response = self.client.request(
                    "POST",
                    url,
                    json=body,
                    timeout=30  # Set a reasonable timeout for large queries
                )
                
                data = response.json()
                
                # Extract results
                if "results" in data:
                    all_results.extend(data["results"])
                    logger.info(f"Fetched {len(data['results'])} records. Total: {len(all_results)}")
                
                # Check for pagination
                if "nextPageUri" in data and data["nextPageUri"]:
                    # The nextPageUri is a relative path or absolute URL.
                    # Genesys Cloud usually returns a full URL for analytics queries.
                    next_page_url = data["nextPageUri"]
                    
                    # Note: For subsequent pages, we must use GET, not POST, 
                    # and we do NOT send the body. The query state is in the URL.
                    # However, the Genesys Analytics API often requires POST for the initial query
                    # and GET for subsequent pages, OR it returns a full POST-able URL.
                    # To be safe and generic with this wrapper, we will check the method.
                    # In this specific API, nextPageUri usually implies a GET request to that URI.
                    
                else:
                    next_page_url = None

            except requests.exceptions.HTTPError as e:
                # If it's a 429, the retry logic in GenesysHttpClient handles it.
                # If it's a 400/403, we stop.
                logger.error(f"Failed to fetch page: {e}")
                break

        return all_results

Complete Working Example

Below is the full, copy-pasteable script. It combines the token manager, the HTTP client, and the batch processor. It queries for conversations from the last 24 hours.

File: batch_token_refresh.py

import os
import sys
import time
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, List

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# -----------------------------------------------------------------------------
# 1. Token Manager
# -----------------------------------------------------------------------------
class GenesysTokenManager:
    def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.oauth_endpoint = f"https://api.{region}.mypurecloud.com/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry_time: float = 0

    def _fetch_token(self) -> Dict[str, Any]:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        response = requests.post(self.oauth_endpoint, data=payload, headers=headers)
        
        if response.status_code != 200:
            raise requests.exceptions.HTTPError(
                f"Token fetch failed with status {response.status_code}: {response.text}"
            )
        return response.json()

    def get_access_token(self) -> str:
        # Refresh if expired or within 5 minutes of expiry
        if not self.access_token or time.time() >= (self.token_expiry_time - 300):
            logger.info("Token expired or near expiry. Refreshing...")
            token_data = self._fetch_token()
            self.access_token = token_data["access_token"]
            self.token_expiry_time = time.time() + token_data["expires_in"]
            logger.info("New token acquired.")
        return self.access_token

# -----------------------------------------------------------------------------
# 2. HTTP Client with Retry & Token Refresh
# -----------------------------------------------------------------------------
class GenesysHttpClient:
    def __init__(self, token_manager: GenesysTokenManager):
        self.token_manager = token_manager
        self.session = requests.Session()
        
        # Retry on 429 (Rate Limit) and 5xx (Server Errors)
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)

    def _build_headers(self) -> Dict[str, str]:
        token = self.token_manager.get_access_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        headers = self._build_headers()
        if "headers" in kwargs:
            headers.update(kwargs["headers"])
            del kwargs["headers"]

        try:
            response = self.session.request(method, url, headers=headers, **kwargs)
            
            # Handle 401 Unauthorized (Token Expired)
            if response.status_code == 401:
                logger.warning("401 Unauthorized. Refreshing token and retrying once...")
                # Force refresh
                self.token_manager.access_token = None 
                headers = self._build_headers()
                
                # Retry the request
                response = self.session.request(method, url, headers=headers, **kwargs)
                
                if response.status_code == 401:
                    raise requests.exceptions.HTTPError(
                        "Persistent 401 after refresh. Check credentials and scopes."
                    )
            
            response.raise_for_status()
            return response

        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP Error: {e}")
            raise
        except requests.exceptions.ConnectionError as e:
            logger.error(f"Connection Error: {e}")
            raise

# -----------------------------------------------------------------------------
# 3. Batch Processor
# -----------------------------------------------------------------------------
class ConversationBatchProcessor:
    def __init__(self, client: GenesysHttpClient, region: str = "us-east-1"):
        self.client = client
        self.base_url = f"https://api.{region}.mypurecloud.com"

    def query_conversations(self, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
        endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
        all_results = []
        current_url = endpoint
        is_first_page = True

        while current_url:
            logger.info(f"Requesting: {current_url}")
            
            try:
                # Initial page is POST, subsequent pages are usually GET to the nextPageUri
                if is_first_page:
                    response = self.client.request("POST", current_url, json=query_body)
                    is_first_page = False
                else:
                    response = self.client.request("GET", current_url)

                data = response.json()
                
                if "results" in data:
                    all_results.extend(data["results"])
                    logger.info(f"Batch complete. Total records: {len(all_results)}")
                
                # Check for next page
                if "nextPageUri" in data and data["nextPageUri"]:
                    current_url = data["nextPageUri"]
                else:
                    current_url = None

            except requests.exceptions.HTTPError as e:
                logger.error(f"Query failed: {e}")
                break

        return all_results

# -----------------------------------------------------------------------------
# 4. Main Execution
# -----------------------------------------------------------------------------
def main():
    # Load credentials from environment variables
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    region = os.getenv("GENESYS_REGION", "us-east-1")

    if not client_id or not client_secret:
        logger.error("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")
        sys.exit(1)

    # Initialize components
    token_manager = GenesysTokenManager(client_id, client_secret, region)
    http_client = GenesysHttpClient(token_manager)
    processor = ConversationBatchProcessor(http_client, region)

    # Define Query Body: Last 24 hours, max 1000 records
    query_body = {
        "dateFrom": "now-24h",
        "dateTo": "now",
        "viewId": "default",
        "size": 1000,
        "groupBy": ["conversationId"],
        "select": ["conversationId", "startTime", "endTime", "channel"]
    }

    try:
        logger.info("Starting batch conversation query...")
        results = processor.query_conversations(query_body)
        
        if results:
            logger.info(f"Successfully retrieved {len(results)} conversations.")
            # Print first result for verification
            logger.debug(f"Sample record: {json.dumps(results[0], indent=2)}")
        else:
            logger.info("No conversations found in the specified period.")
            
    except Exception as e:
        logger.error(f"Job failed: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized After Refresh

  • Cause: The client credentials are invalid, the scope analytics:conversation:read is missing from the OAuth client, or the token refresh endpoint is unreachable.
  • Fix: Verify the Client ID and Secret in the Genesys Cloud Admin Console. Ensure the OAuth Client has the correct scopes assigned. Check network connectivity to api.{region}.mypurecloud.com.
  • Code Fix: The script above raises an HTTPError with a specific message if the second attempt fails. Add detailed logging to _fetch_token to inspect the raw response body from the OAuth endpoint.

Error: 429 Too Many Requests

  • Cause: The batch job is sending requests faster than Genesys Cloud allows. Analytics queries are heavy and have strict rate limits.
  • Fix: The GenesysHttpClient includes a Retry strategy for 429s with exponential backoff. If failures persist, increase the backoff_factor or add a manual time.sleep() between pages.
  • Code Fix: Adjust the Retry configuration in GenesysHttpClient.__init__:
    retry_strategy = Retry(
        total=5,
        backoff_factor=2, # Increases delay between retries (2^retry_number seconds)
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"]
    )
    

Error: 400 Bad Request on Pagination

  • Cause: The nextPageUri returned by Genesys Cloud might require a GET request, but the client sends a POST, or vice versa.
  • Fix: The ConversationBatchProcessor distinguishes between the first page (POST with body) and subsequent pages (GET to nextPageUri). Ensure you do not send the JSON body on subsequent GET requests.
  • Code Fix: The provided code handles this by setting is_first_page = False after the initial request and switching to GET for subsequent iterations.

Official References