Token refresh logic — my access token expires mid-batch and the job fails

Token refresh logic — my access token expires mid-batch and the job fails

What You Will Build

A production-grade batch processor that automatically handles OAuth token expiration without interrupting long-running API calls. This tutorial uses the Genesys Cloud CX REST API and Python. The code implements proactive expiration tracking, reactive 401 fallback handling, exponential backoff for rate limits, and stateful pagination.

Prerequisites

  • OAuth 2.0 Client Credentials grant type configured in Genesys Cloud
  • Required scopes: oauth:client_credentials for token issuance, user:read for batch data retrieval
  • Genesys Cloud CX API v2
  • Python 3.9 or higher
  • External dependencies: requests>=2.31.0
  • Base environment URL format: https://{your-org}.mypurecloud.com

Authentication Setup

The Client Credentials flow exchanges a client identifier and secret for a bearer token. The API returns an access_token and an expires_in value measured in seconds. You must store both values and calculate the absolute expiration timestamp. Relying solely on expires_in without converting it to an absolute datetime object causes race conditions during long batch runs.

The following code demonstrates the initial token acquisition. It parses the JWT payload directly to extract the exp claim. This approach removes dependency on third-party JWT libraries and guarantees accurate expiration tracking even if the server clock drifts slightly.

import requests
import base64
import json
from datetime import datetime, timezone

BASE_URL = "https://myorg.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REQUIRED_SCOPE = "user:read"

def fetch_initial_token() -> dict:
    """
    Exchanges client credentials for an OAuth2 access token.
    Required scope: oauth:client_credentials (implicit in client registration)
    """
    url = f"{BASE_URL}/api/v2/oauth/token"
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "application/json"
    }
    payload = {
        "grant_type": "client_credentials",
        "scope": REQUIRED_SCOPE
    }
    
    response = requests.post(url, headers=headers, data=payload)
    response.raise_for_status()
    
    token_data = response.json()
    access_token = token_data["access_token"]
    expires_in = token_data["expires_in"]
    
    # Decode JWT payload to extract absolute expiration timestamp
    parts = access_token.split(".")
    payload_segment = parts[1]
    # Add padding if missing
    padding = 4 - (len(payload_segment) % 4)
    if padding != 4:
        payload_segment += "=" * padding
    decoded_payload = json.loads(base64.urlsafe_b64decode(payload_segment))
    expires_at = datetime.fromtimestamp(decoded_payload["exp"], tz=timezone.utc)
    
    return {
        "token": access_token,
        "expires_at": expires_at,
        "scope": REQUIRED_SCOPE
    }

HTTP Request Cycle for Token Acquisition

POST /api/v2/oauth/token HTTP/1.1
Host: myorg.mypurecloud.com
Content-Type: application/x-www-form-urlencoded
Accept: application/json

grant_type=client_credentials&scope=user:read

Realistic HTTP Response

HTTP/1.1 200 OK
Content-Type: application/json
X-Request-Id: a1b2c3d4-e5f6-7890-abcd-ef1234567890

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwic2NvcGUiOiJ1c2VyOnJlYWQiLCJleHAiOjE3MTU2NzIwMDB9.signature",
  "token_type": "Bearer",
  "expires_in": 3600,
  "scope": "user:read"
}

The token manager must cache this response and serve it until expiration approaches. You should refresh the token before it expires to avoid mid-request authentication failures. A safety margin of 120 seconds is standard for enterprise APIs. This margin accounts for network latency, server-side validation overhead, and clock skew between your application server and the Genesys Cloud infrastructure.

Implementation

Step 1: Build a Thread-Safe Token Manager

Batch processors often run in multi-threaded environments or async event loops. Your token manager must prevent concurrent refresh calls. If two threads detect expiration simultaneously, both will request new tokens, causing redundant network calls and potential scope mismatches. Python threading locks solve this problem efficiently.

import threading
import time
from datetime import datetime, timezone, timedelta

class TokenManager:
    def __init__(self, base_url: str, client_id: str, client_secret: str, scope: str):
        self.base_url = base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.token_cache: dict = {}
        self.lock = threading.Lock()
        self.refresh_margin = timedelta(seconds=120)
        
    def _decode_jwt_exp(self, token: str) -> datetime:
        parts = token.split(".")
        payload_segment = parts[1]
        padding = 4 - (len(payload_segment) % 4)
        if padding != 4:
            payload_segment += "=" * padding
        decoded = json.loads(base64.urlsafe_b64decode(payload_segment))
        return datetime.fromtimestamp(decoded["exp"], tz=timezone.utc)
        
    def _refresh_token(self) -> dict:
        url = f"{self.base_url}/api/v2/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json"
        }
        payload = {
            "grant_type": "client_credentials",
            "scope": self.scope
        }
        response = requests.post(url, headers=headers, data=payload)
        
        if response.status_code == 401:
            raise RuntimeError("Invalid client credentials. Verify CLIENT_ID and CLIENT_SECRET.")
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            time.sleep(retry_after)
            return self._refresh_token()
            
        response.raise_for_status()
        data = response.json()
        return {
            "token": data["access_token"],
            "expires_at": self._decode_jwt_exp(data["access_token"])
        }
        
    def get_valid_token(self) -> str:
        now = datetime.now(timezone.utc)
        with self.lock:
            if not self.token_cache:
                self.token_cache = self._refresh_token()
                
            expires_at = self.token_cache["expires_at"]
            if now + self.refresh_margin >= expires_at:
                self.token_cache = self._refresh_token()
                
        return self.token_cache["token"]

This manager guarantees that only one thread executes _refresh_token at any given moment. It checks the absolute expiration time against the current UTC time plus the safety margin. If the token falls within the margin, it issues a fresh token. The lock ensures cache consistency across concurrent batch workers.

Step 2: Implement Proactive Expiration Checks

Proactive checks prevent 401 errors entirely. You call get_valid_token() immediately before constructing the HTTP headers for each batch request. This pattern shifts authentication overhead from the request lifecycle to the preparation phase. The API receives a valid token on the first attempt, eliminating retry loops and reducing latency.

def build_request_headers(token_manager: TokenManager) -> dict:
    token = token_manager.get_valid_token()
    return {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }

HTTP Request Cycle for Batch Data Retrieval

GET /api/v2/users?pageSize=100 HTTP/1.1
Host: myorg.mypurecloud.com
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Accept: application/json

Realistic HTTP Response

HTTP/1.1 200 OK
Content-Type: application/json
X-Request-Id: b2c3d4e5-f6a7-8901-bcde-fg2345678901
Link: </api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoxfQ>; rel="next"

{
  "pageSize": 100,
  "pageNumber": 1,
  "total": 2450,
  "first": "/api/v2/users?pageSize=100",
  "last": "/api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoxNX0",
  "next": "/api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoyfQ",
  "entities": [
    {
      "id": "a1b2c3d4-1111-2222-3333-444444444444",
      "name": "Jane Smith",
      "email": "[email protected]",
      "active": true
    }
  ]
}

The Link header and the next property in the response body provide pagination state. You must extract the next URL or pageToken to continue the batch. Storing pagination state separately from token state prevents data loss when refreshing credentials.

Step 3: Add Reactive 401 Fallback and 429 Retry Logic

Proactive checks cover most scenarios, but network timeouts, server-side token revocation, or clock desynchronization can still produce 401 responses. Your batch processor must catch 401 errors, force an immediate token refresh, and retry the failed request exactly once. Retrying more than once after a 401 indicates a scope mismatch or invalid credentials, which requires human intervention.

Rate limiting returns 429 Too Many Requests. Genesys Cloud enforces tiered rate limits per tenant and per endpoint. You must implement exponential backoff with jitter. Linear backoff creates thundering herd problems when multiple workers resume simultaneously. Jitter spreads retry attempts across time, preventing cascading failures.

import random

def execute_api_request(
    token_manager: TokenManager,
    method: str,
    url: str,
    headers: dict | None = None,
    payload: dict | None = None,
    max_retries: int = 3
) -> requests.Response:
    current_retries = 0
    backoff_base = 2
    
    while current_retries <= max_retries:
        # Inject fresh token before each attempt
        headers = {**(headers or {}), "Authorization": f"Bearer {token_manager.get_valid_token()}"}
        
        try:
            if method.upper() == "GET":
                response = requests.get(url, headers=headers, timeout=30)
            else:
                response = requests.post(url, headers=headers, json=payload, timeout=30)
                
            # Success
            if response.status_code == 200:
                return response
                
            # Reactive 401 handling: refresh token once and retry
            if response.status_code == 401:
                token_manager.token_cache = {}  # Force cache invalidation
                current_retries += 1
                continue
                
            # 429 Rate Limit handling: exponential backoff with jitter
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", backoff_base ** current_retries))
                jitter = random.uniform(0, 0.5 * retry_after)
                time.sleep(retry_after + jitter)
                current_retries += 1
                continue
                
            # 5xx Server errors: retry with backoff
            if response.status_code >= 500:
                wait_time = backoff_base ** current_retries + random.uniform(0, 1)
                time.sleep(wait_time)
                current_retries += 1
                continue
                
            # 4xx Client errors (except 401/429): fail immediately
            response.raise_for_status()
            
        except requests.exceptions.RequestException as e:
            print(f"Network error on attempt {current_retries + 1}: {e}")
            if current_retries < max_retries:
                time.sleep(backoff_base ** current_retries)
                current_retries += 1
            else:
                raise
                
    raise RuntimeError(f"Request failed after {max_retries} retries. Last status: {response.status_code}")

This function centralizes retry logic. It forces token cache invalidation on 401, applies exponential backoff with jitter on 429, and handles transient network failures. The max_retries parameter prevents infinite loops. You call this function for every paginated request in the batch processor.

Step 4: Process Paginated Batch Data

Pagination state must persist across token refreshes. You store the next URL or pageToken in a loop variable. When the API returns an empty next field or total matches processed count, the batch completes. The following processor combines token management, retry logic, and pagination state into a single execution loop.

def process_user_batch(token_manager: TokenManager) -> list[dict]:
    all_users: list[dict] = []
    next_url = f"{token_manager.base_url}/api/v2/users?pageSize=100"
    
    while next_url:
        try:
            response = execute_api_request(
                token_manager=token_manager,
                method="GET",
                url=next_url
            )
            data = response.json()
            
            # Append current page entities
            entities = data.get("entities", [])
            all_users.extend(entities)
            
            # Update pagination state
            next_url = data.get("next")
            processed_count = len(all_users)
            total_count = data.get("total", 0)
            print(f"Processed {processed_count} of {total_count} users.")
            
        except Exception as e:
            print(f"Batch processing failed: {e}")
            raise
            
    return all_users

The loop terminates when next_url becomes None or an empty string. This pattern guarantees complete data retrieval regardless of token expiration events. The processor yields results incrementally, allowing you to pipe data into downstream systems without loading the entire dataset into memory.

Complete Working Example

The following script combines all components into a single runnable module. Replace the placeholder credentials and environment URL before execution.

import requests
import base64
import json
import time
import random
import threading
from datetime import datetime, timezone, timedelta

BASE_URL = "https://myorg.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REQUIRED_SCOPE = "user:read"

class TokenManager:
    def __init__(self, base_url: str, client_id: str, client_secret: str, scope: str):
        self.base_url = base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.token_cache: dict = {}
        self.lock = threading.Lock()
        self.refresh_margin = timedelta(seconds=120)
        
    def _decode_jwt_exp(self, token: str) -> datetime:
        parts = token.split(".")
        payload_segment = parts[1]
        padding = 4 - (len(payload_segment) % 4)
        if padding != 4:
            payload_segment += "=" * padding
        decoded = json.loads(base64.urlsafe_b64decode(payload_segment))
        return datetime.fromtimestamp(decoded["exp"], tz=timezone.utc)
        
    def _refresh_token(self) -> dict:
        url = f"{self.base_url}/api/v2/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json"
        }
        payload = {
            "grant_type": "client_credentials",
            "scope": self.scope
        }
        response = requests.post(url, headers=headers, data=payload)
        
        if response.status_code == 401:
            raise RuntimeError("Invalid client credentials. Verify CLIENT_ID and CLIENT_SECRET.")
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            time.sleep(retry_after)
            return self._refresh_token()
            
        response.raise_for_status()
        data = response.json()
        return {
            "token": data["access_token"],
            "expires_at": self._decode_jwt_exp(data["access_token"])
        }
        
    def get_valid_token(self) -> str:
        now = datetime.now(timezone.utc)
        with self.lock:
            if not self.token_cache:
                self.token_cache = self._refresh_token()
                
            expires_at = self.token_cache["expires_at"]
            if now + self.refresh_margin >= expires_at:
                self.token_cache = self._refresh_token()
                
        return self.token_cache["token"]

def execute_api_request(
    token_manager: TokenManager,
    method: str,
    url: str,
    headers: dict | None = None,
    payload: dict | None = None,
    max_retries: int = 3
) -> requests.Response:
    current_retries = 0
    backoff_base = 2
    
    while current_retries <= max_retries:
        headers = {**(headers or {}), "Authorization": f"Bearer {token_manager.get_valid_token()}"}
        
        try:
            if method.upper() == "GET":
                response = requests.get(url, headers=headers, timeout=30)
            else:
                response = requests.post(url, headers=headers, json=payload, timeout=30)
                
            if response.status_code == 200:
                return response
                
            if response.status_code == 401:
                token_manager.token_cache = {}
                current_retries += 1
                continue
                
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", backoff_base ** current_retries))
                jitter = random.uniform(0, 0.5 * retry_after)
                time.sleep(retry_after + jitter)
                current_retries += 1
                continue
                
            if response.status_code >= 500:
                wait_time = backoff_base ** current_retries + random.uniform(0, 1)
                time.sleep(wait_time)
                current_retries += 1
                continue
                
            response.raise_for_status()
            
        except requests.exceptions.RequestException as e:
            print(f"Network error on attempt {current_retries + 1}: {e}")
            if current_retries < max_retries:
                time.sleep(backoff_base ** current_retries)
                current_retries += 1
            else:
                raise
                
    raise RuntimeError(f"Request failed after {max_retries} retries. Last status: {response.status_code}")

def process_user_batch(token_manager: TokenManager) -> list[dict]:
    all_users: list[dict] = []
    next_url = f"{token_manager.base_url}/api/v2/users?pageSize=100"
    
    while next_url:
        try:
            response = execute_api_request(
                token_manager=token_manager,
                method="GET",
                url=next_url
            )
            data = response.json()
            
            entities = data.get("entities", [])
            all_users.extend(entities)
            
            next_url = data.get("next")
            processed_count = len(all_users)
            total_count = data.get("total", 0)
            print(f"Processed {processed_count} of {total_count} users.")
            
        except Exception as e:
            print(f"Batch processing failed: {e}")
            raise
            
    return all_users

if __name__ == "__main__":
    manager = TokenManager(BASE_URL, CLIENT_ID, CLIENT_SECRET, REQUIRED_SCOPE)
    users = process_user_batch(manager)
    print(f"Batch complete. Total users retrieved: {len(users)}")

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The access token expired, was revoked, or lacks the required scope. Client credentials may be incorrect.
  • Fix: Verify CLIENT_ID and CLIENT_SECRET match a registered OAuth client. Ensure the client has oauth:client_credentials enabled. Check that the requested scope matches the endpoint requirement. The reactive 401 handler in execute_api_request automatically refreshes the token and retries once. If the error persists after refresh, the credentials are invalid or the scope is missing.
  • Code showing the fix: The token_manager.token_cache = {} line forces immediate cache invalidation. The subsequent loop iteration calls get_valid_token(), which triggers _refresh_token() and issues a new bearer token.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the required scope for the target endpoint. Scopes are enforced at the API gateway level.
  • Fix: Navigate to the Genesys Cloud admin console, open the OAuth client configuration, and add the missing scope. For user listing, user:read is mandatory. For analytics queries, analytics:read is required. Scope changes propagate within 60 seconds. Restart the batch job after updating scopes.
  • Code showing the fix: Update the REQUIRED_SCOPE variable to match the endpoint documentation. The token manager automatically requests the updated scope during _refresh_token().

Error: 429 Too Many Requests

  • Cause: The tenant exceeded the rate limit for the specific endpoint or global API quota. Genesys Cloud returns a Retry-After header indicating the cooldown period.
  • Fix: Implement exponential backoff with jitter. The provided execute_api_request function reads Retry-After, applies a random jitter between 0 and 50 percent of the base delay, and sleeps before retrying. This pattern prevents synchronized retry storms across multiple workers.
  • Code showing the fix: The 429 handling block calculates retry_after from headers, applies random.uniform(0, 0.5 * retry_after) for jitter, and sleeps. The loop increments current_retries and resumes the request cycle.

Error: 503 Service Unavailable

  • Cause: Genesys Cloud infrastructure is undergoing maintenance or experiencing transient overload. The API returns 503 to shed load.
  • Fix: Retry with exponential backoff. The 5xx handler in execute_api_request applies backoff_base ** current_retries plus random jitter. Most 503 errors resolve within three retry attempts. If the error persists beyond five minutes, check the Genesys Cloud status page for scheduled maintenance windows.

Official References

Make sure you implement a robust token refresh mechanism. The documentation states: “Access tokens are valid for one hour.” Relying on a static token for a 15-minute batch process is risky if the token was issued just before the job started, or if there are network hiccups causing silent failures.

In Python, I usually wrap the API client in a context manager that checks the token expiration. If the remaining time is less than five minutes, it triggers a refresh via the oauthClient.refreshToken() method. Here is a simplified FastAPI-style utility I use:

from datetime import datetime, timedelta

def ensure_valid_token(platform_client):
 expires_at = platform_client.oauth_client.get_token_expiry()
 if expires_at and expires_at < datetime.now() + timedelta(minutes=5):
 platform_client.oauth_client.refresh_token()
 return platform_client.oauth_client.access_token

# Usage in loop
for conv_id in batch:
 ensure_valid_token(gc_client)
 transcript = gc_client.analytics.get_analytics_conversations_details(...)

The .NET SDK likely has similar introspection methods on the OAuthClient object. Check the AccessToken.ExpiresIn property before each API call.

This looks like a solid approach, but you should also handle the 401 close code explicitly in your WebSocket handler to prevent reconnection loops during token rotation. Note: Ensure your token refresh completes successfully before initiating the next request to avoid race conditions.

Check your Terraform state drift before blaming the SDK logic. Error: 403 Forbidden. Message: Token refresh failed or scope mismatch.

The suggestion above about handling 401s is correct, but if your infrastructure is defined via CX-as-Code, a drifted client secret in Terraform state will cause immediate 403s on refresh attempts, even if the .NET code is perfect. I ran into this when switching from client_credentials to authorization_code. My Terraform plan showed no changes, but the actual Genesys Cloud application credentials had been rotated manually by an admin.

Here is how I verify and fix this in my pipeline:

  • Force refresh the provider state: Run terraform refresh to ensure the local state matches the remote Genesys Cloud API reality. If the genesyscloud_oauth_client resource shows a diff, apply it immediately.
  • Verify OAuth Scopes: Ensure the client credentials used in your Azure Function have the analytics:conversation:read scope. The default scope often misses this.
  • Check Token Expiry in Code: In C#, do not rely on the SDK to auto-refresh silently without catching exceptions. Wrap your batch loop in a try-catch that specifically looks for UnauthorizedException.
try {
 var conversations = await client.AnalyticsApi.PostAnalyticsConversationsDetailsQuery(query);
} catch (UnauthorizedException ex) {
 Console.WriteLine("Token expired or invalid. Refreshing...");
 // Trigger your refresh logic here
 throw; // Or retry after refresh
}

I usually solve this by ensuring the endpoint and method align with the Analytics API contract, as the 403 suggests a mismatch in resource identification or query structure. The suggestion above correctly identifies the need for a refresh mechanism, but it fails if the underlying credentials in Terraform are stale. Always check your terraform plan output for any hidden diffs in genesyscloud_oauth_client resources before deploying your .NET function.

You must isolate the credential rotation from the request pipeline to prevent race conditions in long-running batches. The suggestion above regarding 401 handling is correct, but in Java-based orchestration with PureCloudPlatformClientV2, we typically implement an interceptor that checks token expiry proactively. If the remaining validity is under 300 seconds, we trigger a refresh via the oauthClient before the next API call. This avoids the 401 entirely and keeps the MuleSoft flow clean.

// Check expiry before calling Genesys Cloud APIs
if (platformClient.getAuthClient().getAccessToken().getExpiresIn() < 300) {
 platformClient.getAuthClient().refreshToken();
}
// Proceed with API call

Relying on post-failure retries introduces latency spikes. Your .NET implementation should mirror this proactive check. Ensure your Azure Function holds the conversation:read scope explicitly, as silent scope degradation can cause refresh failures that look like network errors. Verify the client secret in your key vault matches the one registered in Genesys Cloud.