Token refresh logic — my access token expires mid-batch and the job fails
What You Will Build
A production-grade batch processor that automatically handles OAuth token expiration without interrupting long-running API calls. This tutorial uses the Genesys Cloud CX REST API and Python. The code implements proactive expiration tracking, reactive 401 fallback handling, exponential backoff for rate limits, and stateful pagination.
Prerequisites
- OAuth 2.0 Client Credentials grant type configured in Genesys Cloud
- Required scopes:
oauth:client_credentialsfor token issuance,user:readfor batch data retrieval - Genesys Cloud CX API v2
- Python 3.9 or higher
- External dependencies:
requests>=2.31.0 - Base environment URL format:
https://{your-org}.mypurecloud.com
Authentication Setup
The Client Credentials flow exchanges a client identifier and secret for a bearer token. The API returns an access_token and an expires_in value measured in seconds. You must store both values and calculate the absolute expiration timestamp. Relying solely on expires_in without converting it to an absolute datetime object causes race conditions during long batch runs.
The following code demonstrates the initial token acquisition. It parses the JWT payload directly to extract the exp claim. This approach removes dependency on third-party JWT libraries and guarantees accurate expiration tracking even if the server clock drifts slightly.
import requests
import base64
import json
from datetime import datetime, timezone
BASE_URL = "https://myorg.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REQUIRED_SCOPE = "user:read"
def fetch_initial_token() -> dict:
"""
Exchanges client credentials for an OAuth2 access token.
Required scope: oauth:client_credentials (implicit in client registration)
"""
url = f"{BASE_URL}/api/v2/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
payload = {
"grant_type": "client_credentials",
"scope": REQUIRED_SCOPE
}
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]
# Decode JWT payload to extract absolute expiration timestamp
parts = access_token.split(".")
payload_segment = parts[1]
# Add padding if missing
padding = 4 - (len(payload_segment) % 4)
if padding != 4:
payload_segment += "=" * padding
decoded_payload = json.loads(base64.urlsafe_b64decode(payload_segment))
expires_at = datetime.fromtimestamp(decoded_payload["exp"], tz=timezone.utc)
return {
"token": access_token,
"expires_at": expires_at,
"scope": REQUIRED_SCOPE
}
HTTP Request Cycle for Token Acquisition
POST /api/v2/oauth/token HTTP/1.1
Host: myorg.mypurecloud.com
Content-Type: application/x-www-form-urlencoded
Accept: application/json
grant_type=client_credentials&scope=user:read
Realistic HTTP Response
HTTP/1.1 200 OK
Content-Type: application/json
X-Request-Id: a1b2c3d4-e5f6-7890-abcd-ef1234567890
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwic2NvcGUiOiJ1c2VyOnJlYWQiLCJleHAiOjE3MTU2NzIwMDB9.signature",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "user:read"
}
The token manager must cache this response and serve it until expiration approaches. You should refresh the token before it expires to avoid mid-request authentication failures. A safety margin of 120 seconds is standard for enterprise APIs. This margin accounts for network latency, server-side validation overhead, and clock skew between your application server and the Genesys Cloud infrastructure.
Implementation
Step 1: Build a Thread-Safe Token Manager
Batch processors often run in multi-threaded environments or async event loops. Your token manager must prevent concurrent refresh calls. If two threads detect expiration simultaneously, both will request new tokens, causing redundant network calls and potential scope mismatches. Python threading locks solve this problem efficiently.
import threading
import time
from datetime import datetime, timezone, timedelta
class TokenManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, scope: str):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token_cache: dict = {}
self.lock = threading.Lock()
self.refresh_margin = timedelta(seconds=120)
def _decode_jwt_exp(self, token: str) -> datetime:
parts = token.split(".")
payload_segment = parts[1]
padding = 4 - (len(payload_segment) % 4)
if padding != 4:
payload_segment += "=" * padding
decoded = json.loads(base64.urlsafe_b64decode(payload_segment))
return datetime.fromtimestamp(decoded["exp"], tz=timezone.utc)
def _refresh_token(self) -> dict:
url = f"{self.base_url}/api/v2/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
payload = {
"grant_type": "client_credentials",
"scope": self.scope
}
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 401:
raise RuntimeError("Invalid client credentials. Verify CLIENT_ID and CLIENT_SECRET.")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return self._refresh_token()
response.raise_for_status()
data = response.json()
return {
"token": data["access_token"],
"expires_at": self._decode_jwt_exp(data["access_token"])
}
def get_valid_token(self) -> str:
now = datetime.now(timezone.utc)
with self.lock:
if not self.token_cache:
self.token_cache = self._refresh_token()
expires_at = self.token_cache["expires_at"]
if now + self.refresh_margin >= expires_at:
self.token_cache = self._refresh_token()
return self.token_cache["token"]
This manager guarantees that only one thread executes _refresh_token at any given moment. It checks the absolute expiration time against the current UTC time plus the safety margin. If the token falls within the margin, it issues a fresh token. The lock ensures cache consistency across concurrent batch workers.
Step 2: Implement Proactive Expiration Checks
Proactive checks prevent 401 errors entirely. You call get_valid_token() immediately before constructing the HTTP headers for each batch request. This pattern shifts authentication overhead from the request lifecycle to the preparation phase. The API receives a valid token on the first attempt, eliminating retry loops and reducing latency.
def build_request_headers(token_manager: TokenManager) -> dict:
token = token_manager.get_valid_token()
return {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
HTTP Request Cycle for Batch Data Retrieval
GET /api/v2/users?pageSize=100 HTTP/1.1
Host: myorg.mypurecloud.com
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Accept: application/json
Realistic HTTP Response
HTTP/1.1 200 OK
Content-Type: application/json
X-Request-Id: b2c3d4e5-f6a7-8901-bcde-fg2345678901
Link: </api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoxfQ>; rel="next"
{
"pageSize": 100,
"pageNumber": 1,
"total": 2450,
"first": "/api/v2/users?pageSize=100",
"last": "/api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoxNX0",
"next": "/api/v2/users?pageSize=100&pageToken=eyJwYWdlIjoyfQ",
"entities": [
{
"id": "a1b2c3d4-1111-2222-3333-444444444444",
"name": "Jane Smith",
"email": "[email protected]",
"active": true
}
]
}
The Link header and the next property in the response body provide pagination state. You must extract the next URL or pageToken to continue the batch. Storing pagination state separately from token state prevents data loss when refreshing credentials.
Step 3: Add Reactive 401 Fallback and 429 Retry Logic
Proactive checks cover most scenarios, but network timeouts, server-side token revocation, or clock desynchronization can still produce 401 responses. Your batch processor must catch 401 errors, force an immediate token refresh, and retry the failed request exactly once. Retrying more than once after a 401 indicates a scope mismatch or invalid credentials, which requires human intervention.
Rate limiting returns 429 Too Many Requests. Genesys Cloud enforces tiered rate limits per tenant and per endpoint. You must implement exponential backoff with jitter. Linear backoff creates thundering herd problems when multiple workers resume simultaneously. Jitter spreads retry attempts across time, preventing cascading failures.
import random
def execute_api_request(
token_manager: TokenManager,
method: str,
url: str,
headers: dict | None = None,
payload: dict | None = None,
max_retries: int = 3
) -> requests.Response:
current_retries = 0
backoff_base = 2
while current_retries <= max_retries:
# Inject fresh token before each attempt
headers = {**(headers or {}), "Authorization": f"Bearer {token_manager.get_valid_token()}"}
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, timeout=30)
else:
response = requests.post(url, headers=headers, json=payload, timeout=30)
# Success
if response.status_code == 200:
return response
# Reactive 401 handling: refresh token once and retry
if response.status_code == 401:
token_manager.token_cache = {} # Force cache invalidation
current_retries += 1
continue
# 429 Rate Limit handling: exponential backoff with jitter
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", backoff_base ** current_retries))
jitter = random.uniform(0, 0.5 * retry_after)
time.sleep(retry_after + jitter)
current_retries += 1
continue
# 5xx Server errors: retry with backoff
if response.status_code >= 500:
wait_time = backoff_base ** current_retries + random.uniform(0, 1)
time.sleep(wait_time)
current_retries += 1
continue
# 4xx Client errors (except 401/429): fail immediately
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Network error on attempt {current_retries + 1}: {e}")
if current_retries < max_retries:
time.sleep(backoff_base ** current_retries)
current_retries += 1
else:
raise
raise RuntimeError(f"Request failed after {max_retries} retries. Last status: {response.status_code}")
This function centralizes retry logic. It forces token cache invalidation on 401, applies exponential backoff with jitter on 429, and handles transient network failures. The max_retries parameter prevents infinite loops. You call this function for every paginated request in the batch processor.
Step 4: Process Paginated Batch Data
Pagination state must persist across token refreshes. You store the next URL or pageToken in a loop variable. When the API returns an empty next field or total matches processed count, the batch completes. The following processor combines token management, retry logic, and pagination state into a single execution loop.
def process_user_batch(token_manager: TokenManager) -> list[dict]:
all_users: list[dict] = []
next_url = f"{token_manager.base_url}/api/v2/users?pageSize=100"
while next_url:
try:
response = execute_api_request(
token_manager=token_manager,
method="GET",
url=next_url
)
data = response.json()
# Append current page entities
entities = data.get("entities", [])
all_users.extend(entities)
# Update pagination state
next_url = data.get("next")
processed_count = len(all_users)
total_count = data.get("total", 0)
print(f"Processed {processed_count} of {total_count} users.")
except Exception as e:
print(f"Batch processing failed: {e}")
raise
return all_users
The loop terminates when next_url becomes None or an empty string. This pattern guarantees complete data retrieval regardless of token expiration events. The processor yields results incrementally, allowing you to pipe data into downstream systems without loading the entire dataset into memory.
Complete Working Example
The following script combines all components into a single runnable module. Replace the placeholder credentials and environment URL before execution.
import requests
import base64
import json
import time
import random
import threading
from datetime import datetime, timezone, timedelta
BASE_URL = "https://myorg.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REQUIRED_SCOPE = "user:read"
class TokenManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, scope: str):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token_cache: dict = {}
self.lock = threading.Lock()
self.refresh_margin = timedelta(seconds=120)
def _decode_jwt_exp(self, token: str) -> datetime:
parts = token.split(".")
payload_segment = parts[1]
padding = 4 - (len(payload_segment) % 4)
if padding != 4:
payload_segment += "=" * padding
decoded = json.loads(base64.urlsafe_b64decode(payload_segment))
return datetime.fromtimestamp(decoded["exp"], tz=timezone.utc)
def _refresh_token(self) -> dict:
url = f"{self.base_url}/api/v2/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
payload = {
"grant_type": "client_credentials",
"scope": self.scope
}
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 401:
raise RuntimeError("Invalid client credentials. Verify CLIENT_ID and CLIENT_SECRET.")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return self._refresh_token()
response.raise_for_status()
data = response.json()
return {
"token": data["access_token"],
"expires_at": self._decode_jwt_exp(data["access_token"])
}
def get_valid_token(self) -> str:
now = datetime.now(timezone.utc)
with self.lock:
if not self.token_cache:
self.token_cache = self._refresh_token()
expires_at = self.token_cache["expires_at"]
if now + self.refresh_margin >= expires_at:
self.token_cache = self._refresh_token()
return self.token_cache["token"]
def execute_api_request(
token_manager: TokenManager,
method: str,
url: str,
headers: dict | None = None,
payload: dict | None = None,
max_retries: int = 3
) -> requests.Response:
current_retries = 0
backoff_base = 2
while current_retries <= max_retries:
headers = {**(headers or {}), "Authorization": f"Bearer {token_manager.get_valid_token()}"}
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, timeout=30)
else:
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
return response
if response.status_code == 401:
token_manager.token_cache = {}
current_retries += 1
continue
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", backoff_base ** current_retries))
jitter = random.uniform(0, 0.5 * retry_after)
time.sleep(retry_after + jitter)
current_retries += 1
continue
if response.status_code >= 500:
wait_time = backoff_base ** current_retries + random.uniform(0, 1)
time.sleep(wait_time)
current_retries += 1
continue
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Network error on attempt {current_retries + 1}: {e}")
if current_retries < max_retries:
time.sleep(backoff_base ** current_retries)
current_retries += 1
else:
raise
raise RuntimeError(f"Request failed after {max_retries} retries. Last status: {response.status_code}")
def process_user_batch(token_manager: TokenManager) -> list[dict]:
all_users: list[dict] = []
next_url = f"{token_manager.base_url}/api/v2/users?pageSize=100"
while next_url:
try:
response = execute_api_request(
token_manager=token_manager,
method="GET",
url=next_url
)
data = response.json()
entities = data.get("entities", [])
all_users.extend(entities)
next_url = data.get("next")
processed_count = len(all_users)
total_count = data.get("total", 0)
print(f"Processed {processed_count} of {total_count} users.")
except Exception as e:
print(f"Batch processing failed: {e}")
raise
return all_users
if __name__ == "__main__":
manager = TokenManager(BASE_URL, CLIENT_ID, CLIENT_SECRET, REQUIRED_SCOPE)
users = process_user_batch(manager)
print(f"Batch complete. Total users retrieved: {len(users)}")
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The access token expired, was revoked, or lacks the required scope. Client credentials may be incorrect.
- Fix: Verify
CLIENT_IDandCLIENT_SECRETmatch a registered OAuth client. Ensure the client hasoauth:client_credentialsenabled. Check that the requested scope matches the endpoint requirement. The reactive 401 handler inexecute_api_requestautomatically refreshes the token and retries once. If the error persists after refresh, the credentials are invalid or the scope is missing. - Code showing the fix: The
token_manager.token_cache = {}line forces immediate cache invalidation. The subsequent loop iteration callsget_valid_token(), which triggers_refresh_token()and issues a new bearer token.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope for the target endpoint. Scopes are enforced at the API gateway level.
- Fix: Navigate to the Genesys Cloud admin console, open the OAuth client configuration, and add the missing scope. For user listing,
user:readis mandatory. For analytics queries,analytics:readis required. Scope changes propagate within 60 seconds. Restart the batch job after updating scopes. - Code showing the fix: Update the
REQUIRED_SCOPEvariable to match the endpoint documentation. The token manager automatically requests the updated scope during_refresh_token().
Error: 429 Too Many Requests
- Cause: The tenant exceeded the rate limit for the specific endpoint or global API quota. Genesys Cloud returns a
Retry-Afterheader indicating the cooldown period. - Fix: Implement exponential backoff with jitter. The provided
execute_api_requestfunction readsRetry-After, applies a random jitter between 0 and 50 percent of the base delay, and sleeps before retrying. This pattern prevents synchronized retry storms across multiple workers. - Code showing the fix: The 429 handling block calculates
retry_afterfrom headers, appliesrandom.uniform(0, 0.5 * retry_after)for jitter, and sleeps. The loop incrementscurrent_retriesand resumes the request cycle.
Error: 503 Service Unavailable
- Cause: Genesys Cloud infrastructure is undergoing maintenance or experiencing transient overload. The API returns 503 to shed load.
- Fix: Retry with exponential backoff. The 5xx handler in
execute_api_requestappliesbackoff_base ** current_retriesplus random jitter. Most 503 errors resolve within three retry attempts. If the error persists beyond five minutes, check the Genesys Cloud status page for scheduled maintenance windows.