Token refresh logic — my access token expires mid-batch and the job fails
What You Will Build
- You will build a robust OAuth token management system that automatically refreshes access tokens before they expire or upon receiving a 401 Unauthorized response.
- This tutorial uses the Genesys Cloud CX REST API and the official Python SDK (
genesyscloud). - The implementation is written in Python 3.9+ using the
requestslibrary for low-level control and the SDK for high-level abstraction.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:report:read,conversation:voice:read(or any scope relevant to your batch job). - SDK Version:
genesyscloud>= 115.0.0. - Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests,genesyscloud,pyjwt(for optional token inspection),tenacity(for retry logic).
pip install requests genesyscloud tenacity pyjwt
Authentication Setup
The core failure mode described in the topic occurs when a long-running batch process holds a static access token. Genesys Cloud access tokens have a default lifetime of 3600 seconds (1 hour). If your batch job processes 10,000 records and takes 90 minutes, the token expires at minute 60, causing the remaining 5,000 requests to fail with 401 Unauthorized.
You must implement two mechanisms:
- Proactive Refresh: Check the token’s
expclaim before making a request. - Reactive Refresh: Catch
401responses and trigger an immediate refresh.
Step 1: Building the Token Manager Class
You need a singleton-like class that manages the lifecycle of the token. This class will handle the initial fetch, caching, and the refresh logic.
import time
import requests
from typing import Optional, Dict, Any
import os
from datetime import datetime, timezone
class GenesysTokenManager:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://api.{environment}/oauth/token"
# Internal state
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: float = 0.0
self.lock = False # Simplified lock for single-threaded safety
def _get_token_from_request(self, payload: Dict[str, str]) -> Dict[str, Any]:
"""
Performs the HTTP POST to the OAuth endpoint.
"""
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(
self.token_url,
headers=headers,
data=payload,
timeout=10
)
if response.status_code != 200:
raise Exception(f"OAuth Token Error: {response.status_code} - {response.text}")
return response.json()
def authenticate(self) -> str:
"""
Initial authentication using Client Credentials Grant.
"""
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
token_data = self._get_token_from_request(payload)
self.access_token = token_data.get("access_token")
# Note: Client Credentials grant does not return a refresh_token.
# We must re-authenticate with client_id/secret.
self.refresh_token = None
# Parse expiry
expires_in = int(token_data.get("expires_in", 3600))
self.token_expiry = time.time() + expires_in
return self.access_token
def is_token_expired(self) -> bool:
"""
Checks if the current token is expired or will expire within 60 seconds.
The 60-second buffer prevents race conditions where the token expires
during the request setup.
"""
return time.time() >= (self.token_expiry - 60)
def get_access_token(self) -> str:
"""
Returns a valid access token. Triggers a refresh if expired.
"""
if not self.access_token or self.is_token_expired():
self.refresh_token()
return self.access_token
def refresh_token(self) -> str:
"""
Refreshes the token. For Client Credentials, this is a re-authentication.
For Authorization Code, this would use the refresh_token.
"""
if self.refresh_token:
# Scenario: Authorization Code Grant (Human User)
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"refresh_token": self.refresh_token
}
else:
# Scenario: Client Credentials Grant (Machine User)
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
token_data = self._get_token_from_request(payload)
self.access_token = token_data.get("access_token")
if "refresh_token" in token_data:
self.refresh_token = token_data.get("refresh_token")
expires_in = int(token_data.get("expires_in", 3600))
self.token_expiry = time.time() + expires_in
return self.access_token
Step 2: Implementing Reactive Refresh with Retry Logic
Proactive checks are good, but network latency and server clock skew can cause a token to expire after the check but before the request reaches the Genesys Cloud API. The most reliable pattern is to catch the 401 Unauthorized response and retry the request with a fresh token.
You will use the tenacity library to wrap your API calls. This handles the exponential backoff and the specific retry condition (401).
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from requests.exceptions import HTTPError
import logging
logger = logging.getLogger(__name__)
class GenesysApiClient:
def __init__(self, token_manager: GenesysTokenManager, environment: str = "mypurecloud.com"):
self.token_manager = token_manager
self.base_url = f"https://api.{environment}"
self.session = requests.Session()
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""
Core request method that injects the Auth header and handles retries.
"""
# 1. Get a valid token (proactive check)
token = self.token_manager.get_access_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {token}"
headers["Content-Type"] = "application/json"
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, headers=headers, **kwargs)
# 2. Reactive Check: If 401, the token might have just expired.
if response.status_code == 401:
logger.warning("Received 401 Unauthorized. Refreshing token and retrying.")
# Force a refresh
self.token_manager.refresh_token()
# Update header with new token
new_token = self.token_manager.get_access_token()
headers["Authorization"] = f"Bearer {new_token}"
# Retry the request once with the new token
response = self.session.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response
except HTTPError as e:
logger.error(f"HTTP Error: {e}")
raise
except Exception as e:
logger.error(f"Request Error: {e}")
raise
Step 3: Processing Batch Results with Pagination
A common scenario for token expiration is paginated data retrieval. If you fetch 100 pages of data, and each page takes 2 seconds to process, the total time is 200 seconds. This is safe. However, if you are downloading large analytics reports or conversation transcripts, the processing time per item might be high.
You must ensure that the token check happens inside the loop, not just before it.
def fetch_all_conversations(api_client: GenesysApiClient, query_body: Dict[str, Any]):
"""
Fetches all conversations matching a query, handling pagination and token refresh.
"""
all_conversations = []
next_page_token = None
while True:
# The endpoint for analytics conversation details
endpoint = "/api/v2/analytics/conversations/details/query"
# Construct the payload for this page
payload = query_body.copy()
if next_page_token:
payload["nextPageToken"] = next_page_token
# Make the request. The _make_request method handles 401 retries.
response = api_client._make_request("POST", endpoint, json=payload)
data = response.json()
# Process the current page
entities = data.get("entities", [])
all_conversations.extend(entities)
# Check for more pages
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
# Optional: Add a small delay to respect rate limits
time.sleep(0.1)
return all_conversations
Complete Working Example
This script combines the Token Manager, the API Client, and a batch job simulation. It demonstrates how to structure a production-grade script that survives token expiration.
import os
import time
import logging
import requests
from typing import Dict, Any, List
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Token Manager (From Step 1) ---
class GenesysTokenManager:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://api.{environment}/oauth/token"
self.access_token: str = None
self.token_expiry: float = 0.0
def _fetch_token(self, payload: Dict[str, str]) -> Dict[str, Any]:
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(self.token_url, headers=headers, data=payload, timeout=10)
if response.status_code != 200:
raise Exception(f"OAuth Error: {response.status_code} - {response.text}")
return response.json()
def authenticate(self) -> str:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
token_data = self._fetch_token(payload)
self.access_token = token_data.get("access_token")
expires_in = int(token_data.get("expires_in", 3600))
self.token_expiry = time.time() + expires_in
return self.access_token
def is_token_expired(self) -> bool:
# Expire 60 seconds before actual expiry to be safe
return time.time() >= (self.token_expiry - 60)
def get_access_token(self) -> str:
if not self.access_token or self.is_token_expired():
self.refresh_token()
return self.access_token
def refresh_token(self) -> str:
# For Client Credentials, we re-authenticate
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
token_data = self._fetch_token(payload)
self.access_token = token_data.get("access_token")
expires_in = int(token_data.get("expires_in", 3600))
self.token_expiry = time.time() + expires_in
logger.info("Token refreshed successfully.")
return self.access_token
# --- API Client with Reactive Retry (From Step 2) ---
class GenesysApiClient:
def __init__(self, token_manager: GenesysTokenManager, environment: str = "mypurecloud.com"):
self.token_manager = token_manager
self.base_url = f"https://api.{environment}"
self.session = requests.Session()
def get(self, endpoint: str, **kwargs) -> requests.Response:
return self._make_request("GET", endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> requests.Response:
return self._make_request("POST", endpoint, **kwargs)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
token = self.token_manager.get_access_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {token}"
headers["Content-Type"] = "application/json"
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, headers=headers, **kwargs)
# Reactive Refresh Logic
if response.status_code == 401:
logger.warning("401 Received. Refreshing token and retrying request.")
self.token_manager.refresh_token()
new_token = self.token_manager.get_access_token()
headers["Authorization"] = f"Bearer {new_token}"
response = self.session.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
logger.error(f"Request failed: {e}")
raise
# --- Batch Job Simulation ---
def run_batch_job():
# 1. Setup Credentials
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")
# 2. Initialize Managers
token_mgr = GenesysTokenManager(CLIENT_ID, CLIENT_SECRET)
api_client = GenesysApiClient(token_mgr)
# 3. Initial Auth
token_mgr.authenticate()
logger.info("Initial authentication successful.")
# 4. Simulate a Long-Running Batch Job
# We will fetch users in batches. If this takes > 1 hour, it would fail without refresh logic.
# We simulate processing time with time.sleep()
user_id = 1
total_users_to_fetch = 50 # Simulate 50 iterations
logger.info(f"Starting batch job to fetch {total_users_to_fetch} users...")
try:
for i in range(total_users_to_fetch):
# Simulate heavy processing that takes time
# In a real scenario, this might be downloading analytics data
time.sleep(2)
# Fetch a single user to test the API connection
# Endpoint: /api/v2/users/{id}
# Scope: user:read
endpoint = f"/api/v2/users/{user_id}"
try:
response = api_client.get(endpoint)
user_data = response.json()
logger.info(f"Successfully fetched user {user_id}: {user_data.get('name')}")
except Exception as e:
logger.error(f"Failed to fetch user {user_id}: {e}")
# In a real batch, you might want to retry this specific item or log it
user_id += 1
# Check token status periodically for logging
if i % 10 == 0:
logger.info(f"Progress: {i+1}/{total_users_to_fetch}. Token expires in {token_mgr.token_expiry - time.time():.0f} seconds.")
logger.info("Batch job completed successfully.")
except Exception as e:
logger.error(f"Batch job failed with error: {e}")
raise
if __name__ == "__main__":
run_batch_job()
Common Errors & Debugging
Error: 401 Unauthorized After Refresh
What causes it:
The token was refreshed, but the new token is still invalid. This usually happens because:
- The Client ID or Secret is incorrect.
- The OAuth client is disabled in the Genesys Cloud Admin Console.
- The requested scopes are not granted to the OAuth client.
How to fix it:
- Verify the Client ID and Secret in your environment variables.
- Log into the Genesys Cloud Admin Console. Navigate to Setup > Admin > Security > OAuth Clients. Ensure the client status is Enabled.
- Check the Scopes tab for the OAuth client. Ensure it has
user:read(for the example above) or the specific scopes required by your API call.
Code showing the fix:
Add explicit scope validation during initialization.
def validate_scopes(required_scopes: List[str], available_scopes: List[str]):
missing = set(required_scopes) - set(available_scopes)
if missing:
raise PermissionError(f"Missing required scopes: {missing}")
Error: 403 Forbidden
What causes it:
The token is valid, but the client does not have permission to access the resource. This is different from 401. A 403 means “I know who you are, but you are not allowed to do this.”
How to fix it:
- Check the OAuth Client’s scopes again.
- Ensure the OAuth client has the necessary Role permissions. In Genesys Cloud, scopes are often tied to roles. If the OAuth client is mapped to a role that does not have “Read User” permissions, you will get a 403.
- Verify that the resource exists. For example, trying to fetch a user ID that does not exist might return 404, but trying to access a private queue without permission returns 403.
Error: 429 Too Many Requests
What causes it:
You are hitting the Genesys Cloud API rate limits. This is common in batch jobs that loop rapidly.
How to fix it:
- Implement exponential backoff.
- Respect the
Retry-Afterheader in the 429 response.
Code showing the fix:
Update the _make_request method to handle 429.
# Reactive Refresh Logic
if response.status_code == 401:
# ... existing 401 logic ...
pass
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
# Retry the request
response = self.session.request(method, url, headers=headers, **kwargs)
Error: Token Expiry Calculation Error
What causes it:
The expires_in field from the OAuth response is relative to the time the token was issued. If you store expires_in directly instead of calculating time.time() + expires_in, your expiry check will be wrong.
How to fix it:
Ensure you calculate the absolute expiry timestamp.
# Correct
self.token_expiry = time.time() + int(token_data.get("expires_in", 3600))
# Incorrect
self.token_expiry = int(token_data.get("expires_in", 3600))