Handle Access Token Expiration Mid-Batch Without Data Loss
What You Will Build
- A robust batch processing worker that automatically detects expired access tokens, refreshes them transparently, and retries failed API calls.
- This implementation uses the Genesys Cloud CX REST API with Python
requestsandhttpxfor granular control over the OAuth flow. - The programming language covered is Python 3.9+.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant). This flow is standard for backend batch jobs where no user interaction occurs.
- Required Scopes:
analytics:query,analytics:export(depending on the specific endpoint used). - SDK Version:
genesys-cloud-purecloud-platform-clientv158.0.0 or higher. - Runtime: Python 3.9+.
- Dependencies:
httpx: For async HTTP requests with built-in retry and timeout management.purecloudplatformclientv2: The official Genesys Cloud Python SDK.pydantic: For data validation in the response processing step.
Install dependencies:
pip install httpx purecloudplatformclientv2 pydantic
Authentication Setup
The core problem in batch jobs is that the standard Client Credentials flow provides an access token with a validity of approximately 1 hour. If your batch job processes 50,000 records and takes 90 minutes, the token expires at minute 60, causing the remaining requests to fail with 401 Unauthorized.
You must implement a token manager that caches the token, checks its expiration time before every request, and fetches a new token only when necessary.
Token Manager Implementation
We will build a TokenManager class that handles the OAuth2 client credentials grant. It stores the expires_in timestamp and calculates when the token becomes invalid.
import time
import httpx
from typing import Optional, Dict, Any
class TokenManager:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://{environment}/oauth/token"
# State
self.access_token: Optional[str] = None
self.token_expiry: Optional[float] = None
self.refresh_buffer_seconds = 60 # Refresh 60 seconds before actual expiry
def _is_token_valid(self) -> bool:
"""Check if the current token is still valid with a safety buffer."""
if self.access_token is None or self.token_expiry is None:
return False
return time.time() < (self.token_expiry - self.refresh_buffer_seconds)
def get_access_token(self) -> str:
"""
Returns a valid access token.
Refreshes the token if it is expired or close to expiration.
"""
if self._is_token_valid():
return self.access_token
# Token is invalid or expired; refresh it
self._refresh_token()
return self.access_token
def _refresh_token(self) -> None:
"""Fetches a new token using Client Credentials Grant."""
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
# Use httpx for robust connection handling
with httpx.Client(timeout=30.0) as client:
response = client.post(
self.token_url,
content=payload, # httpx handles urlencoding of dict if content type is form-urlencoded
headers=headers
)
if response.status_code != 200:
raise Exception(f"Failed to refresh token: {response.status_code} - {response.text}")
data = response.json()
self.access_token = data["access_token"]
# expires_in is in seconds from now
self.token_expiry = time.time() + data["expires_in"]
print(f"Token refreshed. Expires in {data['expires_in']} seconds.")
Implementation
Step 1: Configure the HTTP Client with Automatic Retry and Token Injection
When making API calls, you cannot simply use a static Authorization: Bearer <token> header because the token changes. You need an HTTP client that dynamically injects the current valid token for every request and handles transient errors (like 429 Too Many Requests or 5xx Server Errors).
We will use httpx because it allows us to use Event Hooks or Transports to intercept requests. For simplicity and clarity, we will create a wrapper method that ensures the token is fresh before sending the request.
import httpx
from purecloudplatformclientv2 import AnalyticsApi, ApiClient, Configuration
import json
class GenesysBatchWorker:
def __init__(self, token_manager: TokenManager, environment: str = "mypurecloud.com"):
self.token_manager = token_manager
self.environment = environment
# Configure the Genesys SDK's underlying API client to use our custom auth logic
# Note: The SDK expects a Configuration object. We will bypass the SDK's internal auth
# for the batch query to demonstrate raw control, but we will use the SDK for parsing.
self.analytics_api = AnalyticsApi(
api_client=self._create_api_client()
)
def _create_api_client(self) -> ApiClient:
"""
Creates an ApiClient that uses our TokenManager for authentication.
The SDK's ApiClient allows us to set a custom 'get_access_token' callable.
"""
config = Configuration(
host=f"https://{self.environment}",
get_access_token=self.token_manager.get_access_token,
)
# Configure retry strategy for 429 and 5xx errors
# The SDK uses urllib3 under the hood, but we can influence retries via the configuration
# However, for fine-grained control, we often wrap the SDK calls.
return ApiClient(config)
def _make_request_with_retry(self, func, *args, **kwargs) -> Any:
"""
Wrapper to handle 429 rate limits and 5xx server errors with exponential backoff.
"""
max_retries = 3
base_delay = 2.0
for attempt in range(max_retries):
try:
# Ensure token is valid before every attempt
# The SDK's ApiClient will call get_access_token() automatically if configured correctly,
# but explicit control is safer for complex batch flows.
response = func(*args, **kwargs)
return response
except Exception as e:
# Check for specific HTTP errors
error_code = getattr(e, 'status', None)
if error_code == 429:
# Rate Limited: Wait and retry
wait_time = base_delay * (2 ** attempt)
print(f"Rate limited (429). Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
elif error_code and 500 <= error_code < 600:
# Server Error: Wait and retry
wait_time = base_delay * (2 ** attempt)
print(f"Server error ({error_code}). Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
# Other errors (401, 403, 400) should not be retried blindly
raise e
raise Exception("Max retries exceeded")
Step 2: Construct the Analytics Query Payload
Batch jobs typically pull conversation details. The endpoint /api/v2/analytics/conversations/details/query is paginated and supports large datasets. You must construct a valid JSON payload with the correct interval, view, and entityTypes.
Required Scope: analytics:query
def build_query_payload(self, start_time: str, end_time: str) -> Dict[str, Any]:
"""
Builds the JSON payload for the Analytics API.
Args:
start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00Z")
end_time: ISO 8601 end time (e.g., "2023-10-01T23:59:59Z")
"""
return {
"interval": f"{start_time}/{end_time}",
"view": "conversation",
"entityTypes": ["conversation"],
"groupings": [],
"select": [
"conversationId",
"mediaType",
"channelId",
"startTime",
"endTime",
"duration",
"wrapupCode",
"queueId",
"queueName",
"skillId",
"skillName"
],
"pageSize": 1000, # Max page size for this endpoint
"pageToken": None # Will be updated in the loop
}
Step 3: Process Results with Pagination and Token Refresh
This is the critical section. The loop must:
- Check token validity.
- Send the request.
- If the request fails with
401, theTokenManagershould have already refreshed it, but if the SDK throws a401, we must force a refresh and retry. - Handle pagination using the
nextPageToken. - Process the data chunk.
def fetch_conversation_batch(self, start_time: str, end_time: str) -> list:
"""
Fetches all conversations within the time range, handling pagination and token expiration.
"""
all_conversations = []
payload = self.build_query_payload(start_time, end_time)
page_token = None
max_pages = 1000 # Safety break to prevent infinite loops
for page_num in range(max_pages):
print(f"Fetching page {page_num + 1}...")
# Update payload with current page token
payload["pageToken"] = page_token
try:
# The SDK method for querying conversations
# We wrap it in our retry logic
response = self._make_request_with_retry(
self.analytics_api.post_analytics_conversations_details_query,
body=payload
)
# Extract data
if not response.entities:
print("No more entities found.")
break
all_conversations.extend(response.entities)
# Check for next page
page_token = response.next_page_token
if not page_token:
print("End of results reached.")
break
except Exception as e:
# Handle 401 explicitly in case the SDK's internal auth failed
error_status = getattr(e, 'status', None)
if error_status == 401:
print("Access token expired mid-request. Forcing refresh...")
self.token_manager._refresh_token()
# Retry the current page after refresh
continue
else:
print(f"Unexpected error on page {page_num + 1}: {str(e)}")
raise e
print(f"Total conversations fetched: {len(all_conversations)}")
return all_conversations
Complete Working Example
This script combines the TokenManager and GenesysBatchWorker into a runnable module. Replace CLIENT_ID, CLIENT_SECRET, and the time range before running.
import time
import httpx
from purecloudplatformclientv2 import AnalyticsApi, ApiClient, Configuration
from typing import Optional, Dict, Any
import os
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ENVIRONMENT = "mypurecloud.com"
class TokenManager:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_url = f"https://{environment}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: Optional[float] = None
self.refresh_buffer_seconds = 60
def _is_token_valid(self) -> bool:
if self.access_token is None or self.token_expiry is None:
return False
return time.time() < (self.token_expiry - self.refresh_buffer_seconds)
def get_access_token(self) -> str:
if self._is_token_valid():
return self.access_token
self._refresh_token()
return self.access_token
def _refresh_token(self) -> None:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
with httpx.Client(timeout=30.0) as client:
response = client.post(self.token_url, content=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Token refresh failed: {response.status_code} - {response.text}")
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
print(f"Token refreshed. Expires in {data['expires_in']} seconds.")
class GenesysBatchWorker:
def __init__(self, token_manager: TokenManager, environment: str = "mypurecloud.com"):
self.token_manager = token_manager
self.environment = environment
self.analytics_api = AnalyticsApi(api_client=self._create_api_client())
def _create_api_client(self) -> ApiClient:
config = Configuration(
host=f"https://{self.environment}",
get_access_token=self.token_manager.get_access_token,
)
return ApiClient(config)
def _make_request_with_retry(self, func, *args, **kwargs) -> Any:
max_retries = 3
base_delay = 2.0
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
error_code = getattr(e, 'status', None)
if error_code == 429:
wait_time = base_delay * (2 ** attempt)
print(f"Rate limited (429). Retrying in {wait_time}s...")
time.sleep(wait_time)
continue
elif error_code and 500 <= error_code < 600:
wait_time = base_delay * (2 ** attempt)
print(f"Server error ({error_code}). Retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
raise e
raise Exception("Max retries exceeded")
def fetch_conversation_batch(self, start_time: str, end_time: str) -> list:
all_conversations = []
payload = {
"interval": f"{start_time}/{end_time}",
"view": "conversation",
"entityTypes": ["conversation"],
"groupings": [],
"select": ["conversationId", "mediaType", "startTime", "endTime", "duration"],
"pageSize": 1000,
"pageToken": None
}
page_token = None
max_pages = 1000
for page_num in range(max_pages):
print(f"Fetching page {page_num + 1}...")
payload["pageToken"] = page_token
try:
response = self._make_request_with_retry(
self.analytics_api.post_analytics_conversations_details_query,
body=payload
)
if not response.entities:
break
all_conversations.extend(response.entities)
page_token = response.next_page_token
if not page_token:
break
except Exception as e:
error_status = getattr(e, 'status', None)
if error_status == 401:
print("401 Error: Forcing token refresh...")
self.token_manager._refresh_token()
continue
else:
raise e
return all_conversations
if __name__ == "__main__":
if not CLIENT_ID or not CLIENT_SECRET:
raise EnvironmentError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Initialize
tm = TokenManager(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
worker = GenesysBatchWorker(tm, ENVIRONMENT)
# Define Time Range (Last 24 hours)
from datetime import datetime, timedelta
end = datetime.utcnow()
start = end - timedelta(hours=24)
start_str = start.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = end.strftime("%Y-%m-%dT%H:%M:%SZ")
try:
results = worker.fetch_conversation_batch(start_str, end_str)
print(f"Success. Retrieved {len(results)} conversations.")
except Exception as e:
print(f"Job failed: {e}")
Common Errors & Debugging
Error: 401 Unauthorized (Mid-Batch)
- Cause: The access token expired between the last check and the current request, or the SDK cached an old token.
- Fix: Ensure your
TokenManagercheckstime.time() < expiry - bufferbefore every request. Thebuffer_seconds(set to 60s in the example) prevents edge-case expirations. If the SDK still throws 401, catch it and force a_refresh_token()call immediately before retrying the same request.
Error: 429 Too Many Requests
- Cause: You are exceeding the Genesys Cloud rate limits (typically 10 requests per second for analytics queries).
- Fix: Implement exponential backoff. The
_make_request_with_retrymethod in the example waits2 * (2^attempt)seconds. Do not retry immediately. Also, ensure you are not spawning multiple threads hitting the same endpoint without rate-limiting each thread.
Error: 400 Bad Request (Invalid Interval)
- Cause: The
intervalformat in the analytics query is incorrect. Genesys requires ISO 8601 format withZsuffix. - Fix: Verify your
start_timeandend_timestrings. Example:"2023-10-01T00:00:00Z/2023-10-01T23:59:59Z". Do not include milliseconds unless necessary, and ensure the timezone is UTC.
Error: MemoryError (Large Batches)
- Cause: Storing all conversation entities in a list (
all_conversations) for a large date range (e.g., 30 days) can exhaust RAM. - Fix: Process data in chunks. Instead of
all_conversations.extend(), write each page to a file (CSV/JSONL) or database immediately. Clear the local list after writing.