Implementing Robust Token Refresh Logic for Long-Running Batch Jobs
What You Will Build
- A production-grade Python utility that automatically handles OAuth2 access token expiration during long-running data export or import operations.
- A retry mechanism that intercepts 401 Unauthorized responses, refreshes the token using the client credentials flow, and resumes the batch job without data loss.
- The implementation uses the
requestslibrary with a custom session wrapper and demonstrates the pattern using Genesys Cloud CX and NICE CXone APIs.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant). This flow provides a refresh token or allows re-authentication using the client secret.
- Required Scopes:
analytics:conversation:read,user:read, or any scope relevant to your batch operation. - SDK/API Version: Genesys Cloud API v2, NICE CXone API v2.
- Language/Runtime: Python 3.9+.
- External Dependencies:
requests>=2.31.0,pydantic>=2.0(for configuration validation).
Authentication Setup
The core problem in batch processing is that OAuth2 access tokens typically expire after 2 hours (7200 seconds). If your batch job processes 10,000 records and takes 3 hours, the token will expire mid-stream. The naive approach is to fetch a new token every request, which adds unnecessary latency. The correct approach is to cache the token and refresh it only when the API returns a 401 Unauthorized or when the token is nearing expiration.
We will implement a TokenManager class that holds the current access token and the refresh token (if applicable) or the client credentials. It exposes a get_valid_token() method that returns a fresh token if the current one is invalid or expired.
import time
import requests
from typing import Optional, Dict, Any
class TokenManager:
"""
Manages OAuth2 token lifecycle for Genesys Cloud CX or NICE CXone.
Supports Client Credentials Grant.
"""
def __init__(self, env_url: str, client_id: str, client_secret: str, scope: str):
self.env_url = env_url.rstrip('/')
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
self.token_type: str = "Bearer"
def _get_token_endpoint(self) -> str:
"""
Returns the OAuth token endpoint based on the environment URL.
Genesys Cloud uses /oauth/token
NICE CXone uses /oauth2/token
"""
if "pure.cloud" in self.env_url or "genesi" in self.env_url:
return f"{self.env_url}/oauth/token"
elif "nice.incontact" in self.env_url or "nicecxone" in self.env_url:
return f"{self.env_url}/oauth2/token"
else:
# Default fallback for Genesys
return f"{self.env_url}/oauth/token"
def _fetch_token(self) -> Dict[str, Any]:
"""
Fetches a new access token using Client Credentials Grant.
"""
token_url = self._get_token_endpoint()
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(token_url, data=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch token: {response.status_code} - {response.text}")
data = response.json()
self.access_token = data.get("access_token")
self.token_type = data.get("token_type", "Bearer")
# Genesys returns expires_in in seconds. CXone may vary, but usually follows standard.
expires_in = data.get("expires_in", 7200)
self.token_expiry = time.time() + expires_in - 300 # Subtract 5 mins buffer
return data
def get_valid_token(self) -> str:
"""
Returns the current access token. If expired or None, fetches a new one.
"""
if not self.access_token or time.time() >= self.token_expiry:
self._fetch_token()
return self.access_token
Implementation
Step 1: Creating a Resilient HTTP Session
We need a mechanism to intercept HTTP requests, attach the authentication header, and handle 401 errors. We will subclass requests.Session to override the request method. This allows us to transparently refresh the token and retry the request without changing the calling code.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class AuthenticatedSession(requests.Session):
"""
A requests Session that automatically handles token refresh on 401 errors.
"""
def __init__(self, token_manager: TokenManager, max_retries: int = 3):
super().__init__()
self.token_manager = token_manager
self.max_retries = max_retries
# Configure retry strategy for non-auth errors (e.g., 503, 429)
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount("https://", adapter)
self.mount("http://", adapter)
def request(self, method: str, url: str, **kwargs) -> requests.Response:
"""
Overrides the request method to inject auth header and handle 401 retries.
"""
# Ensure we have a valid token before sending
token = self.token_manager.get_valid_token()
# Inject Authorization header
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {token}"
# First attempt
response = super().request(method, url, **kwargs)
# If 401 Unauthorized, try refreshing token once and retry
if response.status_code == 401:
# Check if this is a fresh token issue (e.g., token revoked on server side)
# Force a new token fetch
self.token_manager._fetch_token()
new_token = self.token_manager.get_valid_token()
kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {new_token}"
# Retry the request with the new token
response = super().request(method, url, **kwargs)
# Raise for status if still not successful
if response.status_code not in [200, 201, 204, 429]:
response.raise_for_status()
return response
Step 2: Implementing the Batch Job with Pagination
Now we apply this session to a real-world scenario: exporting conversation details from Genesys Cloud. The /api/v2/analytics/conversations/details/query endpoint is paginated. If the job runs long, the token will expire. Our AuthenticatedSession will handle this transparently.
We must handle pagination correctly. Genesys Cloud uses nextPage in the response body.
from typing import List, Dict, Any, Generator
import json
def export_conversations(session: AuthenticatedSession, query_body: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
"""
Fetches all conversations matching the query, handling pagination and token refresh.
Args:
session: An AuthenticatedSession instance.
query_body: The JSON payload for the analytics query.
Yields:
Individual conversation detail objects.
"""
url = f"{session.token_manager.env_url}/api/v2/analytics/conversations/details/query"
# Initial request
response = session.post(url, json=query_body)
while True:
data = response.json()
# Process results
results = data.get("results", [])
for result in results:
yield result
# Check for pagination
next_page = data.get("nextPage")
if not next_page:
break
# Fetch next page
# Note: Genesys Cloud returns the next page as a URL or a query ID.
# For details query, it often returns a 'nextPage' URL or requires re-querying with cursor.
# Here we assume the standard nextLink pattern or cursor-based pagination.
# For Genesys Analytics Details, it usually returns a 'nextPage' URL.
response = session.get(next_page)
# If the next page request fails due to token expiry, the session handles it.
# If it fails for other reasons (e.g., 429), the retry strategy handles it.
Step 3: Processing Results and Error Handling
In a production environment, you must handle cases where the API returns a 429 (Too Many Requests) or 5xx errors. Our AuthenticatedSession already retries 429s. However, for 401s, we only retry once. If the second attempt also fails with 401, it indicates a fundamental authentication issue (wrong client ID/secret or scope), and we should stop and alert.
We also need to ensure that if the job fails partway through, we can resume. This is done by tracking the last processed ID or cursor.
def run_batch_export(
env_url: str,
client_id: str,
client_secret: str,
scope: str,
query_params: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Main function to run the batch export with robust error handling.
"""
# 1. Initialize Token Manager
token_mgr = TokenManager(env_url, client_id, client_secret, scope)
# 2. Initialize Session
session = AuthenticatedSession(token_mgr, max_retries=3)
all_conversations = []
try:
# 3. Execute Export
for conv in export_conversations(session, query_params):
all_conversations.append(conv)
# Optional: Log progress every 100 records
if len(all_conversations) % 100 == 0:
print(f"Processed {len(all_conversations)} conversations...")
print(f"Export complete. Total records: {len(all_conversations)}")
except requests.exceptions.HTTPError as e:
# Handle specific HTTP errors
if e.response.status_code == 401:
print("CRITICAL: Authentication failed even after token refresh. Check credentials and scopes.")
elif e.response.status_code == 403:
print("CRITICAL: Forbidden. Check OAuth scopes.")
elif e.response.status_code == 429:
print("CRITICAL: Rate limit exceeded after retries. Consider reducing query frequency.")
else:
print(f"HTTP Error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
return all_conversations
Complete Working Example
Below is the full, copy-pasteable script. It uses requests and handles the entire lifecycle of a batch job, including token refresh.
#!/usr/bin/env python3
"""
Genesys Cloud CX / NICE CXone Batch Export with Auto-Refresh Token Logic
This script demonstrates how to handle OAuth2 token expiration during long-running
batch operations. It uses the Client Credentials Grant flow.
Prerequisites:
pip install requests pydantic
"""
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, List, Generator
import os
# --- Configuration ---
# Replace these with your actual credentials
ENV_URL = os.getenv("GENESYS_ENV_URL", "https://api.mypurecloud.com")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
SCOPE = os.getenv("GENESYS_SCOPE", "analytics:conversation:read")
# --- Token Manager ---
class TokenManager:
def __init__(self, env_url: str, client_id: str, client_secret: str, scope: str):
self.env_url = env_url.rstrip('/')
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
self.token_type: str = "Bearer"
def _get_token_endpoint(self) -> str:
if "pure.cloud" in self.env_url or "genesi" in self.env_url:
return f"{self.env_url}/oauth/token"
elif "nice.incontact" in self.env_url or "nicecxone" in self.env_url:
return f"{self.env_url}/oauth2/token"
else:
return f"{self.env_url}/oauth/token"
def _fetch_token(self) -> Dict[str, Any]:
token_url = self._get_token_endpoint()
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(token_url, data=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch token: {response.status_code} - {response.text}")
data = response.json()
self.access_token = data.get("access_token")
self.token_type = data.get("token_type", "Bearer")
expires_in = data.get("expires_in", 7200)
self.token_expiry = time.time() + expires_in - 300 # 5 min buffer
return data
def get_valid_token(self) -> str:
if not self.access_token or time.time() >= self.token_expiry:
self._fetch_token()
return self.access_token
# --- Authenticated Session ---
class AuthenticatedSession(requests.Session):
def __init__(self, token_manager: TokenManager, max_retries: int = 3):
super().__init__()
self.token_manager = token_manager
self.max_retries = max_retries
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.mount("https://", adapter)
self.mount("http://", adapter)
def request(self, method: str, url: str, **kwargs) -> requests.Response:
token = self.token_manager.get_valid_token()
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {token}"
response = super().request(method, url, **kwargs)
if response.status_code == 401:
self.token_manager._fetch_token()
new_token = self.token_manager.get_valid_token()
kwargs["headers"]["Authorization"] = f"{self.token_manager.token_type} {new_token}"
response = super().request(method, url, **kwargs)
if response.status_code not in [200, 201, 204, 429]:
response.raise_for_status()
return response
# --- Batch Logic ---
def export_conversations(session: AuthenticatedSession, query_body: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
url = f"{session.token_manager.env_url}/api/v2/analytics/conversations/details/query"
response = session.post(url, json=query_body)
while True:
data = response.json()
results = data.get("results", [])
for result in results:
yield result
next_page = data.get("nextPage")
if not next_page:
break
response = session.get(next_page)
def main():
# Define query parameters
# This query fetches conversations from the last 24 hours
query_body = {
"dateFrom": "2023-10-01T00:00:00.000Z",
"dateTo": "2023-10-02T00:00:00.000Z",
"size": 50,
"query": "type:voice",
"groupBy": [],
"select": ["id", "type", "startTime", "endTime"]
}
token_mgr = TokenManager(ENV_URL, CLIENT_ID, CLIENT_SECRET, SCOPE)
session = AuthenticatedSession(token_mgr, max_retries=3)
all_conversations = []
try:
for conv in export_conversations(session, query_body):
all_conversations.append(conv)
if len(all_conversations) % 100 == 0:
print(f"Processed {len(all_conversations)} conversations...")
print(f"Export complete. Total records: {len(all_conversations)}")
except Exception as e:
print(f"Error occurred: {e}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized After Refresh
- Cause: The client ID or secret is incorrect, or the OAuth application does not have the required scope. Alternatively, the token was revoked on the server side.
- Fix: Verify the client credentials in the Genesys Cloud Admin Console or CXone Admin Center. Ensure the scope
analytics:conversation:readis assigned to the OAuth app. - Code Fix: The
AuthenticatedSessioncatches this after one retry. If it persists, therequests.exceptions.HTTPErrorwill be raised with status 401. Check your logs for the exact error message from the_fetch_tokenmethod.
Error: 429 Too Many Requests
- Cause: You are exceeding the API rate limits. Genesys Cloud has specific limits for analytics queries.
- Fix: Implement exponential backoff. The
HTTPAdapterwithRetrystrategy inAuthenticatedSessionhandles this automatically by retrying up tomax_retriestimes with backoff. - Code Fix: Increase
max_retriesinAuthenticatedSessionif your job is very large, but be aware that this increases total execution time.
Error: Token Expiry Mid-Pagination
- Cause: The token expired between fetching page N and page N+1.
- Fix: The
AuthenticatedSessionintercepts the 401 on thenextPageGET request, refreshes the token, and retries the GET. This is transparent to theexport_conversationsgenerator. - Code Fix: Ensure you are using the
AuthenticatedSessionfor all requests, including pagination. Do not switch back to a standardrequests.Session.