Token Refresh Logic: Handling Access Token Expiry Mid-Batch
What You Will Build
- A robust Python utility that automatically detects expired OAuth access tokens during long-running data processing jobs and refreshes them without interrupting the workflow.
- This tutorial uses the Genesys Cloud PureCloud Platform Client V2 SDK and the underlying
requestslibrary for direct HTTP interaction. - The implementation covers Python 3.9+ with type hints, async/await patterns for concurrent API calls, and exponential backoff for rate limiting.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC). This guide assumes Client Credentials for server-to-server integrations, which is the standard for batch processing.
- Required Scopes:
analytics:query,user:read, or any scope required by your batch job. The refresh mechanism itself requires no additional scopes beyond what the initial token possesses. - SDK Version:
genesys-cloud-purecloud-platform-clientv130.0.0 or later. - Language/Runtime: Python 3.9+ (requires
asyncioandaiolimiterfor concurrency control). - External Dependencies:
pip install genesys-cloud-purecloud-platform-clientpip install httpx(for robust async HTTP handling)pip install tenacity(for declarative retry logic)
Authentication Setup
The core issue in batch processing is that OAuth access tokens from Genesys Cloud typically expire after 3600 seconds (1 hour). If your batch job processes 10,000 records and takes 45 minutes, the token will expire halfway through, causing subsequent API calls to fail with 401 Unauthorized.
The solution is not to hardcode a refresh interval, but to implement a lazy refresh pattern. You attempt the API call. If it fails with a 401 or 403 (specifically when the error message indicates token expiry), you trigger a refresh, cache the new token, and retry the original request exactly once.
Below is the foundational authentication wrapper.
import os
import time
import threading
from typing import Optional, Dict, Any
from genesyscloud.platform_client_v2.platform_client import PlatformClient
from genesyscloud.platform_client_v2.api_client import ApiClient
import httpx
class GenesysAuthManager:
"""
Manages OAuth token lifecycle for Genesys Cloud API calls.
Implements lazy refresh and thread-safe token storage.
"""
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_cache: Dict[str, Any] = {}
self.token_expires_at: float = 0.0
self._lock = threading.Lock()
# Initialize the Genesys SDK Platform Client
self.platform_client = PlatformClient()
self._configure_sdk_client()
def _configure_sdk_client(self):
"""Configures the SDK client to use our custom auth logic."""
# We do not set credentials directly here. Instead, we intercept
# requests or manage the token manually and inject it.
# For this tutorial, we will use the SDK's ability to set a token
# programmatically after retrieval.
pass
def get_access_token(self) -> str:
"""
Returns a valid access token.
If the current token is expired or close to expiration, it refreshes.
"""
current_time = time.time()
# Check if token is expired or will expire in the next 60 seconds
# We add a buffer to avoid race conditions where the token expires
# between the check and the API call.
if current_time >= self.token_expires_at - 60:
self._refresh_token()
return self.token_cache.get("access_token", "")
def _refresh_token(self):
"""
Performs the OAuth2 Client Credentials Grant flow.
"""
with self._lock:
# Double-check pattern to prevent duplicate refreshes in concurrent threads
if time.time() < self.token_expires_at - 60:
return
token_url = f"https://api.{self.environment}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
# Optional: Specify scopes. If omitted, defaults to client config.
# "scope": "analytics:query user:read"
}
try:
with httpx.Client(timeout=10.0) as client:
response = client.post(token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
expires_in = token_data.get("expires_in", 3600)
self.token_cache = token_data
self.token_expires_at = time.time() + expires_in
except httpx.HTTPStatusError as e:
raise Exception(f"Failed to refresh token: {e.response.text}") from e
except Exception as e:
raise Exception(f"Unexpected error during token refresh: {str(e)}") from e
def inject_token_into_sdk(self):
"""
Injects the current valid token into the Genesys SDK client.
"""
token = self.get_access_token()
# The SDK allows setting the token directly on the auth manager
self.platform_client.auth_manager.set_token(token)
Implementation
Step 1: Building the Retry Logic with 401 Detection
The most critical part of this tutorial is handling the 401 error. A naive retry loop will fail if it does not distinguish between a “Bad Request” (400) and an “Unauthorized” (401). You must only retry on 401 or specific 403 cases related to token validity.
We will use the tenacity library to handle retries declaratively. This keeps the business logic clean.
import tenacity
import logging
logger = logging.getLogger(__name__)
def is_auth_error(exception: Exception) -> bool:
"""
Predicate to determine if an exception is caused by an expired token.
"""
# Check for Genesys SDK specific exceptions
if hasattr(exception, 'status_code'):
status_code = exception.status_code
# 401 Unauthorized is the primary indicator of token expiry
if status_code == 401:
return True
# 403 Forbidden can sometimes indicate scope issues or token revocation
if status_code == 403:
# Inspect the response body if possible
if hasattr(exception, 'body') and 'token' in str(exception.body).lower():
return True
return False
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception(is_auth_error),
reraise=True
)
def make_api_call_with_retry(auth_manager: GenesysAuthManager, api_func: callable, *args, **kwargs):
"""
Wrapper that ensures a valid token is present before calling the API,
and retries if a 401 is received.
"""
# 1. Ensure token is fresh before the call
auth_manager.inject_token_into_sdk()
try:
# 2. Execute the API call
return api_func(*args, **kwargs)
except Exception as e:
# 3. If it is an auth error, trigger an immediate refresh
if is_auth_error(e):
logger.warning("Detected authentication error. Refreshing token...")
auth_manager._refresh_token()
auth_manager.inject_token_into_sdk()
raise # Re-raise to trigger tenacity retry
else:
raise
Step 2: Core Logic - Processing Analytics Data
Now we apply this pattern to a real-world scenario: fetching conversation details. The /api/v2/analytics/conversations/details/query endpoint is heavy and often requires pagination. If the token expires between pages, the job fails without this logic.
from genesyscloud.platform_client_v2.api.analytics_api import AnalyticsApi
from genesyscloud.analytics.models import ConversationDetailsQuery
def fetch_conversation_details(auth_manager: GenesysAuthManager, query_params: Dict[str, Any]) -> list:
"""
Fetches all conversation details matching the query, handling pagination and token refresh.
"""
analytics_api = AnalyticsApi(auth_manager.platform_client)
all_conversations = []
next_page_token = None
max_pages = 50 # Safety limit to prevent infinite loops
for page_num in range(max_pages):
# Construct the query object
body = ConversationDetailsQuery(
entity_ids=query_params.get("entityIds"),
date_from=query_params.get("dateFrom"),
date_to=query_params.get("dateTo"),
size=250, # Max page size for this endpoint
page_token=next_page_token
)
try:
# Use the retry wrapper
response = make_api_call_with_retry(
auth_manager,
analytics_api.post_analytics_conversations_details_query,
body=body
)
# Process results
if response.entities:
all_conversations.extend(response.entities)
logger.info(f"Fetched page {page_num + 1}, total records so far: {len(all_conversations)}")
# Check for pagination
if response.next_page_token:
next_page_token = response.next_page_token
else:
break
except Exception as e:
logger.error(f"Failed to fetch page {page_num + 1}: {str(e)}")
# If retries are exhausted, tenacity will raise the final exception
raise
return all_conversations
Step 3: Processing Results and Error Handling
When processing the results, you must handle cases where the API returns partial data due to a timeout or network error. However, for token expiry, the retry logic in Step 1 ensures that the entire request is repeated.
It is important to note that tenacity retries the function call. If the API is idempotent (like GET or POST with unique IDs), this is safe. For non-idempotent operations (like creating a user), you must ensure the retry logic does not create duplicates. In this tutorial, we focus on read-only analytics, which is idempotent.
def process_batch_job(auth_manager: GenesysAuthManager):
"""
Main entry point for the batch job.
"""
# Example query parameters
query_params = {
"dateFrom": "2023-10-01T00:00:00.000Z",
"dateTo": "2023-10-02T00:00:00.000Z",
"entityIds": ["your_queue_id_here"] # Replace with actual ID
}
try:
conversations = fetch_conversation_details(auth_manager, query_params)
print(f"Successfully processed {len(conversations)} conversations.")
# Example: Write to local file or database
# with open("conversations.json", "w") as f:
# json.dump(conversations, f)
except Exception as e:
logger.critical(f"Batch job failed after retries: {str(e)}")
# Handle final failure (e.g., send alert)
Complete Working Example
Below is the full, copy-pasteable script. Replace the placeholder credentials with your Genesys Cloud OAuth client ID and secret.
import os
import time
import threading
import logging
import json
from typing import Optional, Dict, Any
# Install dependencies: pip install genesys-cloud-purecloud-platform-client httpx tenacity
from genesyscloud.platform_client_v2.platform_client import PlatformClient
from genesyscloud.platform_client_v2.api.analytics_api import AnalyticsApi
from genesyscloud.analytics.models import ConversationDetailsQuery
import httpx
import tenacity
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class GenesysAuthManager:
"""
Manages OAuth token lifecycle for Genesys Cloud API calls.
"""
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_cache: Dict[str, Any] = {}
self.token_expires_at: float = 0.0
self._lock = threading.Lock()
self.platform_client = PlatformClient()
def get_access_token(self) -> str:
"""
Returns a valid access token.
"""
current_time = time.time()
# Refresh if expired or within 60 seconds of expiry
if current_time >= self.token_expires_at - 60:
self._refresh_token()
return self.token_cache.get("access_token", "")
def _refresh_token(self):
"""
Performs the OAuth2 Client Credentials Grant flow.
"""
with self._lock:
# Double-check pattern
if time.time() < self.token_expires_at - 60:
return
token_url = f"https://api.{self.environment}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
with httpx.Client(timeout=10.0) as client:
response = client.post(token_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
expires_in = token_data.get("expires_in", 3600)
self.token_cache = token_data
self.token_expires_at = time.time() + expires_in
logger.info("Token refreshed successfully.")
except httpx.HTTPStatusError as e:
raise Exception(f"Failed to refresh token: {e.response.text}") from e
except Exception as e:
raise Exception(f"Unexpected error during token refresh: {str(e)}") from e
def inject_token_into_sdk(self):
"""
Injects the current valid token into the Genesys SDK client.
"""
token = self.get_access_token()
self.platform_client.auth_manager.set_token(token)
def is_auth_error(exception: Exception) -> bool:
"""
Predicate to determine if an exception is caused by an expired token.
"""
if hasattr(exception, 'status_code'):
status_code = exception.status_code
if status_code == 401:
return True
if status_code == 403:
if hasattr(exception, 'body') and 'token' in str(exception.body).lower():
return True
return False
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception(is_auth_error),
reraise=True
)
def make_api_call_with_retry(auth_manager: GenesysAuthManager, api_func: callable, *args, **kwargs):
"""
Wrapper that ensures a valid token is present before calling the API,
and retries if a 401 is received.
"""
auth_manager.inject_token_into_sdk()
try:
return api_func(*args, **kwargs)
except Exception as e:
if is_auth_error(e):
logger.warning("Detected authentication error. Refreshing token...")
auth_manager._refresh_token()
auth_manager.inject_token_into_sdk()
raise # Re-raise to trigger tenacity retry
else:
raise
def fetch_conversation_details(auth_manager: GenesysAuthManager, query_params: Dict[str, Any]) -> list:
"""
Fetches all conversation details matching the query, handling pagination and token refresh.
"""
analytics_api = AnalyticsApi(auth_manager.platform_client)
all_conversations = []
next_page_token = None
max_pages = 50
for page_num in range(max_pages):
body = ConversationDetailsQuery(
entity_ids=query_params.get("entityIds"),
date_from=query_params.get("dateFrom"),
date_to=query_params.get("dateTo"),
size=250,
page_token=next_page_token
)
try:
response = make_api_call_with_retry(
auth_manager,
analytics_api.post_analytics_conversations_details_query,
body=body
)
if response.entities:
all_conversations.extend(response.entities)
logger.info(f"Fetched page {page_num + 1}, total records so far: {len(all_conversations)}")
if response.next_page_token:
next_page_token = response.next_page_token
else:
break
except Exception as e:
logger.error(f"Failed to fetch page {page_num + 1}: {str(e)}")
raise
return all_conversations
def main():
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "your_client_id")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "your_client_secret")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if CLIENT_ID == "your_client_id":
print("Error: Please set GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables.")
return
# Initialize Auth Manager
auth_manager = GenesysAuthManager(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
# Example Query
query_params = {
"dateFrom": "2023-10-01T00:00:00.000Z",
"dateTo": "2023-10-02T00:00:00.000Z",
"entityIds": ["your_queue_id_here"]
}
try:
conversations = fetch_conversation_details(auth_manager, query_params)
print(f"Successfully processed {len(conversations)} conversations.")
# Optional: Save to file
# with open("output.json", "w") as f:
# json.dump(conversations, f, default=str, indent=2)
except Exception as e:
logger.critical(f"Batch job failed: {str(e)}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The access token has expired, been revoked, or was never valid.
- How to fix it: Ensure the
GenesysAuthManageris initialized with correctclient_idandclient_secret. Verify that the OAuth client has the necessary scopes. The retry logic in this tutorial handles expiry automatically. If it persists, check that the client credentials have not been rotated in the Genesys Cloud admin console.
Error: 403 Forbidden
- What causes it: The token is valid but lacks the required scope (e.g.,
analytics:query), or the client ID does not have permission to access the specific resource (e.g., a queue in a different org). - How to fix it: Verify the OAuth client’s scopes in the Genesys Cloud Admin console under Security > OAuth. Ensure the scope matches the API endpoint requirements.
Error: 429 Too Many Requests
- What causes it: You have exceeded the Genesys Cloud API rate limits. This is common in batch jobs.
- How to fix it: The
tenacitylibrary in this tutorial useswait_exponential. This helps mitigate rate limits by slowing down retries. For sustained batch processing, implement a token bucket or leaky bucket algorithm to throttle requests per second. Consider using the Genesys Cloud Async API endpoints if available for your use case, as they are designed for high-volume data extraction.
Error: 500 Internal Server Error
- What causes it: A temporary issue on the Genesys Cloud server side.
- How to fix it: Retry the request with exponential backoff. The
tenacityconfiguration in this tutorial handles generic retries for non-auth errors if you modify theretry_if_exceptionpredicate to includetenacity.retry_if_exception_type(Exception). However, be cautious with non-idempotent operations.