Handling Token Refresh Logic — Preventing Batch Job Failures

Handling Token Refresh Logic — Preventing Batch Job Failures

What You Will Build

  • A robust HTTP client wrapper in Python that automatically refreshes Genesys Cloud CX OAuth access tokens when they expire during long-running batch operations.
  • The solution uses the requests library with session hooks to intercept 401 Unauthorized responses and trigger a silent token refresh before retrying the original request.
  • The programming language covered is Python 3.8+.

Prerequisites

  • OAuth Client Type: Public or Confidential Client registered in the Genesys Cloud CX Developer Portal.
  • Required Scopes: admin, user, or specific resource scopes (e.g., conversation:read, analytics:read) depending on the API endpoints you call.
  • SDK/API Version: Genesys Cloud CX API v2.
  • Language/Runtime: Python 3.8 or higher.
  • External Dependencies:
    • requests: For HTTP interactions.
    • requests-oauthlib: For OAuth2 session management (optional, but we will build a custom hook for finer control over batch retries).
    • pyjwt: For decoding token expiration claims locally (optional optimization).

Authentication Setup

Before addressing batch failures, you must establish a baseline authentication flow. Genesys Cloud uses standard OAuth 2.0. The access token is valid for one hour, but the refresh token is valid for much longer (typically 30 days for public clients, longer for confidential).

If your batch job runs for more than 60 minutes, or if you spawn multiple threads that consume tokens simultaneously, relying on a single static token at the start of the script will cause failures.

The following code demonstrates how to obtain the initial token pair.

import requests
import json
import os

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mygenesys"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{environment}.pure.cloudapi.net"
        self.token_endpoint = f"{self.base_url}/oauth/token"
        self.access_token = None
        self.refresh_token = None

    def get_initial_tokens(self) -> dict:
        """
        Performs the initial OAuth2 client credentials or authorization code grant.
        For batch jobs, Client Credentials Grant is common if service accounts are used.
        """
        payload = {
            "grant_type": "client_credentials",
            "scope": "admin"
        }
        
        # Basic Auth header for Client Credentials Grant
        auth_header = (self.client_id, self.client_secret)
        
        response = requests.post(
            self.token_endpoint,
            data=payload,
            auth=auth_header,
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        
        if response.status_code != 200:
            raise Exception(f"Failed to obtain initial tokens: {response.text}")
            
        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.refresh_token = token_data["refresh_token"]
        
        return token_data

    def refresh_access_token(self) -> str:
        """
        Uses the refresh token to obtain a new access token.
        This is the critical method called when a 401 error occurs.
        """
        if not self.refresh_token:
            raise Exception("No refresh token available. Cannot refresh.")

        payload = {
            "grant_type": "refresh_token",
            "refresh_token": self.refresh_token
        }
        
        # Client credentials are still required for refresh in many grant types
        auth_header = (self.client_id, self.client_secret)
        
        response = requests.post(
            self.token_endpoint,
            data=payload,
            auth=auth_header,
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        
        if response.status_code != 200:
            raise Exception(f"Token refresh failed: {response.text}")
            
        token_data = response.json()
        self.access_token = token_data["access_token"]
        # Some grants return a new refresh token, update if present
        if "refresh_token" in token_data:
            self.refresh_token = token_data["refresh_token"]
            
        return self.access_token

Implementation

Step 1: Building the Auto-Refreshing Session

The core problem is that requests does not natively retry with new headers after a 401. You must wrap the session or use hooks. We will use the response hook feature of the requests.Session object. This hook allows you to intercept the response before the calling code sees it. If the status is 401, we trigger the refresh, update the session headers, and retry the original request.

Critical Note: You must ensure that the retry logic does not create an infinite loop if the refresh token itself is invalid. We will limit retries to one attempt per 401.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class GenesysSession(requests.Session):
    def __init__(self, auth_handler: GenesysAuth):
        super().__init__()
        self.auth_handler = auth_handler
        self.headers.update({
            "Content-Type": "application/json",
            "Accept": "application/json"
        })
        
        # Attach the hook to every request made by this session
        self.hooks['response'].append(self._handle_401)
        
        # Configure standard retries for 429 (Rate Limit) and 5xx (Server Error)
        # This is separate from the 401 logic
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("http://", adapter)
        self.mount("https://", adapter)

    def _handle_401(self, response, *args, **kwargs):
        """
        Hook that intercepts 401 Unauthorized responses.
        If a 401 is detected, it refreshes the token and retries the request once.
        """
        # Only process if the request was not already retried for 401
        if response.status_code == 401 and not response.request.headers.get('X-401-Retried'):
            try:
                print("Access token expired. Refreshing...")
                new_token = self.auth_handler.refresh_access_token()
                
                # Update the session header for future requests
                self.headers['Authorization'] = f"Bearer {new_token}"
                
                # Update the current request's header
                response.request.headers['Authorization'] = f"Bearer {new_token}"
                response.request.headers['X-401-Retried'] = 'true'
                
                # Send the request again using the same session
                # This returns the new response, which replaces the old one
                return self.send(response.request, **kwargs)
                
            except Exception as e:
                print(f"Failed to refresh token: {e}")
                # If refresh fails, let the original 401 response pass through
                # so the caller can handle the fatal error
                return response
        return response

    def request(self, method, url, **kwargs):
        # Inject the current access token into the header if not already present
        if 'Authorization' not in kwargs.get('headers', {}):
            kwargs.setdefault('headers', {})
            kwargs['headers']['Authorization'] = f"Bearer {self.auth_handler.access_token}"
        
        return super().request(method, url, **kwargs)

Step 2: Executing a Long-Running Batch Job

Now that we have a session that heals itself, we can write the batch logic. In this example, we will retrieve a large set of users and then perform an update on each. This simulates a workload that exceeds the 1-hour token lifetime.

We will use the GET /api/v2/users endpoint to fetch users and PUT /api/v2/users/{userId} to update them.

OAuth Scope Required: user:read, user:write

import time
import uuid

def batch_update_users(session: GenesysSession, environment: str):
    """
    Fetches all users and updates a custom attribute on each.
    Simulates a long-running process.
    """
    base_url = f"https://{environment}.pure.cloudapi.net"
    users_endpoint = f"{base_url}/api/v2/users"
    
    all_users = []
    
    # Step 1: Fetch all users with pagination
    page = 1
    while True:
        print(f"Fetching user page {page}...")
        response = session.get(
            users_endpoint,
            params={
                "page": page,
                "pageSize": 100,
                "expand": ["custom_attributes"]
            }
        )
        
        if response.status_code != 200:
            print(f"Error fetching users: {response.status_code} - {response.text}")
            break
            
        data = response.json()
        entities = data.get("entities", [])
        all_users.extend(entities)
        
        if len(entities) < 100:
            break
            
        page += 1
        # Small delay to be respectful of rate limits, though the session handles 429s
        time.sleep(0.5)

    print(f"Total users fetched: {len(all_users)}")

    # Step 2: Update each user
    # We will simulate a long delay to force token expiration in a test environment
    # In production, the API calls themselves might take time, or the volume might be high.
    
    updated_count = 0
    for user in all_users:
        user_id = user["id"]
        
        # Prepare update payload
        # Adding a unique tag to verify the update worked
        update_payload = {
            "customAttributes": {
                "batch_processed_by_python": str(uuid.uuid4())
            }
        }
        
        put_endpoint = f"{users_endpoint}/{user_id}"
        
        try:
            response = session.put(
                put_endpoint,
                json=update_payload
            )
            
            if response.status_code == 204:
                updated_count += 1
            else:
                print(f"Failed to update user {user_id}: {response.status_code}")
                
        except requests.exceptions.RequestException as e:
            print(f"Network error updating user {user_id}: {e}")

    print(f"Successfully updated {updated_count} users.")

Step 3: Simulating Token Expiration (Testing Strategy)

To verify this logic works without waiting an hour, you can manually invalidate the token in the Genesys Cloud Developer Portal or use a short-lived token scope if available. However, a more robust way to test locally is to modify the GenesysAuth class to return a “fake” expired token after a certain number of calls, or simply delete the access_token attribute in the middle of the loop in a debug session.

For production readiness, ensure your logging captures the refresh event.

# Example of how to initialize and run the job
def main():
    # Load credentials from environment variables for security
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mygenesys")
    
    if not client_id or not client_secret:
        raise ValueError("Missing environment variables for GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET")

    # 1. Initialize Auth
    auth = GenesysAuth(client_id, client_secret, environment)
    try:
        auth.get_initial_tokens()
    except Exception as e:
        print(f"Initial auth failed: {e}")
        return

    # 2. Initialize Session with the auto-refresh hook
    session = GenesysSession(auth)

    # 3. Run the batch job
    try:
        batch_update_users(session, environment)
    except Exception as e:
        print(f"Batch job failed: {e}")

if __name__ == "__main__":
    main()

Complete Working Example

Below is the consolidated, copy-pasteable script. Save this as genesys_batch_runner.py. Ensure you have installed requests via pip install requests.

import os
import time
import uuid
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mygenesys"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{environment}.pure.cloudapi.net"
        self.token_endpoint = f"{self.base_url}/oauth/token"
        self.access_token = None
        self.refresh_token = None

    def get_initial_tokens(self) -> dict:
        payload = {
            "grant_type": "client_credentials",
            "scope": "user:read user:write"
        }
        auth_header = (self.client_id, self.client_secret)
        
        response = requests.post(
            self.token_endpoint,
            data=payload,
            auth=auth_header,
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        
        if response.status_code != 200:
            raise Exception(f"Failed to obtain initial tokens: {response.text}")
            
        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.refresh_token = token_data["refresh_token"]
        return token_data

    def refresh_access_token(self) -> str:
        if not self.refresh_token:
            raise Exception("No refresh token available.")

        payload = {
            "grant_type": "refresh_token",
            "refresh_token": self.refresh_token
        }
        auth_header = (self.client_id, self.client_secret)
        
        response = requests.post(
            self.token_endpoint,
            data=payload,
            auth=auth_header,
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        
        if response.status_code != 200:
            raise Exception(f"Token refresh failed: {response.text}")
            
        token_data = response.json()
        self.access_token = token_data["access_token"]
        if "refresh_token" in token_data:
            self.refresh_token = token_data["refresh_token"]
        return self.access_token

class GenesysSession(requests.Session):
    def __init__(self, auth_handler: GenesysAuth):
        super().__init__()
        self.auth_handler = auth_handler
        self.headers.update({
            "Content-Type": "application/json",
            "Accept": "application/json"
        })
        
        self.hooks['response'].append(self._handle_401)
        
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.mount("http://", adapter)
        self.mount("https://", adapter)

    def _handle_401(self, response, *args, **kwargs):
        if response.status_code == 401 and not response.request.headers.get('X-401-Retried'):
            try:
                print("[INFO] Access token expired. Refreshing token...")
                new_token = self.auth_handler.refresh_access_token()
                
                self.headers['Authorization'] = f"Bearer {new_token}"
                response.request.headers['Authorization'] = f"Bearer {new_token}"
                response.request.headers['X-401-Retried'] = 'true'
                
                return self.send(response.request, **kwargs)
            except Exception as e:
                print(f"[ERROR] Failed to refresh token: {e}")
                return response
        return response

    def request(self, method, url, **kwargs):
        if 'Authorization' not in kwargs.get('headers', {}):
            kwargs.setdefault('headers', {})
            kwargs['headers']['Authorization'] = f"Bearer {self.auth_handler.access_token}"
        return super().request(method, url, **kwargs)

def run_batch_job():
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mygenesys")
    
    if not client_id or not client_secret:
        raise ValueError("Missing environment variables for GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET")

    auth = GenesysAuth(client_id, client_secret, environment)
    auth.get_initial_tokens()
    
    session = GenesysSession(auth)
    base_url = f"https://{environment}.pure.cloudapi.net"
    users_endpoint = f"{base_url}/api/v2/users"
    
    # Fetch first 100 users
    response = session.get(users_endpoint, params={"page": 1, "pageSize": 100})
    if response.status_code != 200:
        print(f"Failed to fetch users: {response.text}")
        return
        
    users = response.json().get("entities", [])
    print(f"Found {len(users)} users to process.")
    
    # Process users
    for i, user in enumerate(users):
        print(f"Processing user {i+1}/{len(users)}: {user['name']}")
        
        # Simulate work that might take time
        time.sleep(0.2)
        
        # Update a custom attribute
        update_payload = {
            "customAttributes": {
                "last_processed_by_script": str(uuid.uuid4())
            }
        }
        
        put_resp = session.put(
            f"{users_endpoint}/{user['id']}",
            json=update_payload
        )
        
        if put_resp.status_code not in [200, 204]:
            print(f"Warning: Failed to update user {user['id']}. Status: {put_resp.status_code}")

    print("Batch job completed successfully.")

if __name__ == "__main__":
    run_batch_job()

Common Errors & Debugging

Error: 401 Unauthorized (Persistent)

What causes it: The refresh token itself has expired, or the client credentials used in the refresh request are incorrect.
How to fix it: Check the refresh_token validity. If using the Client Credentials Grant, note that refresh tokens are not issued for this grant type. If you are using Client Credentials, you must request a new token pair every hour via the client_credentials grant flow, not a refresh flow. The code above assumes an Authorization Code Grant or Implicit Grant where refresh tokens are provided. If you are using Client Credentials, modify GenesysAuth to call get_initial_tokens() again when a 401 occurs, rather than refresh_access_token().

Error: 429 Too Many Requests

What causes it: You are exceeding the API rate limits (e.g., more than 100 requests per minute per client).
How to fix it: The HTTPAdapter with Retry strategy in the GenesysSession class automatically handles 429s with exponential backoff. If you are still seeing failures, reduce the concurrency of your batch job or increase the backoff_factor.

Error: 403 Forbidden

What causes it: The OAuth token does not have the required scope for the specific API endpoint.
How to fix it: Verify the scope parameter in the get_initial_tokens method. For example, updating users requires user:write. Reading analytics requires analytics:read.

Official References