Token Refresh Logic — Handling Access Token Expiration During Batch Operations
What You Will Build
- A robust HTTP client wrapper in Python that automatically detects expired access tokens (HTTP 401) and refreshes them before retrying the original request.
- A batch processing script that queries Genesys Cloud CX Analytics API for conversation details, ensuring the job completes even if the 1-hour token lifespan is exceeded.
- This tutorial covers Python using the
requestslibrary and the official Genesys Cloud Python SDK.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant) or Resource Owner Password Credentials (ROPC) if applicable, though Client Credentials is standard for server-to-server batch jobs.
- Required Scopes:
analytics:conversation:readfor the example endpoint. - SDK Version: Genesys Cloud Python SDK v2.14.0+ (
genesys-cloud-sdk). - Runtime: Python 3.8+.
- Dependencies:
requests,genesys-cloud-sdk,python-dotenv(for secure credential management).
Install dependencies:
pip install requests genesys-cloud-sdk python-dotenv
Authentication Setup
Genesys Cloud access tokens expire after one hour by default. In a batch job processing thousands of records or iterating through large datasets, a single token will inevitably expire. The standard pattern is to intercept the 401 Unauthorized response, fetch a new token, update the client state, and retry the failed request.
We will use the Client Credentials flow, which is the most secure and reliable method for automated jobs.
Step 1: Implement the Token Provider
First, we create a class responsible for managing the lifecycle of the OAuth token. This class handles the initial fetch and the refresh logic.
import requests
import time
import logging
from typing import Optional, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GenesysTokenManager:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
# Map region to the correct OAuth endpoint
self.oauth_endpoint = f"https://api.{region}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry_time: float = 0
self._lock = None # In a multi-threaded scenario, use threading.Lock()
def _fetch_token(self) -> Dict[str, Any]:
"""
Requests a new OAuth token from Genesys Cloud.
Raises requests.exceptions.HTTPError on failure.
"""
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(
self.oauth_endpoint,
data=payload,
headers=headers
)
if response.status_code != 200:
raise requests.exceptions.HTTPError(
f"Token fetch failed with status {response.status_code}: {response.text}"
)
return response.json()
def get_access_token(self) -> str:
"""
Returns a valid access token.
If the current token is expired or close to expiring, it fetches a new one.
"""
# Check if token is expired or will expire in the next 5 minutes (300 seconds)
if not self.access_token or time.time() >= (self.token_expiry_time - 300):
logger.info("Access token expired or near expiry. Fetching new token...")
try:
token_data = self._fetch_token()
self.access_token = token_data["access_token"]
# expires_in is in seconds
self.token_expiry_time = time.time() + token_data["expires_in"]
logger.info("New access token acquired.")
except requests.exceptions.HTTPError as e:
logger.error(f"Failed to refresh token: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error during token refresh: {e}")
raise
return self.access_token
Step 2: Create the Retry-Enabled HTTP Client
Instead of relying solely on the SDK for low-level retries, we build a wrapper around requests.Session. This allows us to inject the token header dynamically and handle the 401 retry logic explicitly. This pattern is crucial because the Genesys Python SDK does not automatically retry 401 errors with token refresh; it returns the error to the caller.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class GenesysHttpClient:
def __init__(self, token_manager: GenesysTokenManager):
self.token_manager = token_manager
self.session = requests.Session()
# Configure standard retries for network issues (5xx, 429)
# Note: We do NOT retry 401 here, as it requires a token refresh first.
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
def _build_headers(self) -> Dict[str, str]:
"""
Returns headers with the current valid access token.
"""
token = self.token_manager.get_access_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def request(self, method: str, url: str, **kwargs) -> requests.Response:
"""
Executes an HTTP request with automatic token refresh on 401.
"""
headers = self._build_headers()
# Merge any additional headers provided
if "headers" in kwargs:
headers.update(kwargs["headers"])
del kwargs["headers"]
try:
response = self.session.request(method, url, headers=headers, **kwargs)
# If we get a 401, the token is likely invalid/expired.
# We force a refresh and retry ONCE.
if response.status_code == 401:
logger.warning("Received 401 Unauthorized. Refreshing token and retrying...")
# Force immediate refresh by clearing the cached token validity window
self.token_manager.access_token = None
# Re-fetch headers with the new token
headers = self._build_headers()
response = self.session.request(method, url, headers=headers, **kwargs)
# If we STILL get a 401 after refresh, raise an error
if response.status_code == 401:
raise requests.exceptions.HTTPError(
f"Persistent 401 Unauthorized after token refresh. Check scopes/permissions."
)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error: {e}")
raise
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection Error: {e}")
raise
Step 3: Implement the Batch Processing Logic
Now we apply this client to a real Genesys Cloud API endpoint. We will query /api/v2/analytics/conversations/details/query. This endpoint is commonly used for batch reporting. It supports pagination via the nextPageUri in the response.
Required Scope: analytics:conversation:read
import json
from typing import List, Dict, Any
class ConversationBatchProcessor:
def __init__(self, client: GenesysHttpClient, region: str = "us-east-1"):
self.client = client
self.base_url = f"https://api.{region}.mypurecloud.com"
def query_conversations(self, body: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Queries conversation details and handles pagination.
Automatically handles token refresh across page boundaries.
"""
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
all_results = []
next_page_url = None
# First request uses the base endpoint
url = endpoint
while url:
logger.info(f"Fetching data from: {url}")
try:
# Send POST request with the query body
response = self.client.request(
"POST",
url,
json=body,
timeout=30 # Set a reasonable timeout for large queries
)
data = response.json()
# Extract results
if "results" in data:
all_results.extend(data["results"])
logger.info(f"Fetched {len(data['results'])} records. Total: {len(all_results)}")
# Check for pagination
if "nextPageUri" in data and data["nextPageUri"]:
# The nextPageUri is a relative path or absolute URL.
# Genesys Cloud usually returns a full URL for analytics queries.
next_page_url = data["nextPageUri"]
# Note: For subsequent pages, we must use GET, not POST,
# and we do NOT send the body. The query state is in the URL.
# However, the Genesys Analytics API often requires POST for the initial query
# and GET for subsequent pages, OR it returns a full POST-able URL.
# To be safe and generic with this wrapper, we will check the method.
# In this specific API, nextPageUri usually implies a GET request to that URI.
else:
next_page_url = None
except requests.exceptions.HTTPError as e:
# If it's a 429, the retry logic in GenesysHttpClient handles it.
# If it's a 400/403, we stop.
logger.error(f"Failed to fetch page: {e}")
break
return all_results
Complete Working Example
Below is the full, copy-pasteable script. It combines the token manager, the HTTP client, and the batch processor. It queries for conversations from the last 24 hours.
File: batch_token_refresh.py
import os
import sys
import time
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, List
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# 1. Token Manager
# -----------------------------------------------------------------------------
class GenesysTokenManager:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
self.oauth_endpoint = f"https://api.{region}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry_time: float = 0
def _fetch_token(self) -> Dict[str, Any]:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(self.oauth_endpoint, data=payload, headers=headers)
if response.status_code != 200:
raise requests.exceptions.HTTPError(
f"Token fetch failed with status {response.status_code}: {response.text}"
)
return response.json()
def get_access_token(self) -> str:
# Refresh if expired or within 5 minutes of expiry
if not self.access_token or time.time() >= (self.token_expiry_time - 300):
logger.info("Token expired or near expiry. Refreshing...")
token_data = self._fetch_token()
self.access_token = token_data["access_token"]
self.token_expiry_time = time.time() + token_data["expires_in"]
logger.info("New token acquired.")
return self.access_token
# -----------------------------------------------------------------------------
# 2. HTTP Client with Retry & Token Refresh
# -----------------------------------------------------------------------------
class GenesysHttpClient:
def __init__(self, token_manager: GenesysTokenManager):
self.token_manager = token_manager
self.session = requests.Session()
# Retry on 429 (Rate Limit) and 5xx (Server Errors)
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
def _build_headers(self) -> Dict[str, str]:
token = self.token_manager.get_access_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = self._build_headers()
if "headers" in kwargs:
headers.update(kwargs["headers"])
del kwargs["headers"]
try:
response = self.session.request(method, url, headers=headers, **kwargs)
# Handle 401 Unauthorized (Token Expired)
if response.status_code == 401:
logger.warning("401 Unauthorized. Refreshing token and retrying once...")
# Force refresh
self.token_manager.access_token = None
headers = self._build_headers()
# Retry the request
response = self.session.request(method, url, headers=headers, **kwargs)
if response.status_code == 401:
raise requests.exceptions.HTTPError(
"Persistent 401 after refresh. Check credentials and scopes."
)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error: {e}")
raise
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection Error: {e}")
raise
# -----------------------------------------------------------------------------
# 3. Batch Processor
# -----------------------------------------------------------------------------
class ConversationBatchProcessor:
def __init__(self, client: GenesysHttpClient, region: str = "us-east-1"):
self.client = client
self.base_url = f"https://api.{region}.mypurecloud.com"
def query_conversations(self, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
endpoint = f"{self.base_url}/api/v2/analytics/conversations/details/query"
all_results = []
current_url = endpoint
is_first_page = True
while current_url:
logger.info(f"Requesting: {current_url}")
try:
# Initial page is POST, subsequent pages are usually GET to the nextPageUri
if is_first_page:
response = self.client.request("POST", current_url, json=query_body)
is_first_page = False
else:
response = self.client.request("GET", current_url)
data = response.json()
if "results" in data:
all_results.extend(data["results"])
logger.info(f"Batch complete. Total records: {len(all_results)}")
# Check for next page
if "nextPageUri" in data and data["nextPageUri"]:
current_url = data["nextPageUri"]
else:
current_url = None
except requests.exceptions.HTTPError as e:
logger.error(f"Query failed: {e}")
break
return all_results
# -----------------------------------------------------------------------------
# 4. Main Execution
# -----------------------------------------------------------------------------
def main():
# Load credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "us-east-1")
if not client_id or not client_secret:
logger.error("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")
sys.exit(1)
# Initialize components
token_manager = GenesysTokenManager(client_id, client_secret, region)
http_client = GenesysHttpClient(token_manager)
processor = ConversationBatchProcessor(http_client, region)
# Define Query Body: Last 24 hours, max 1000 records
query_body = {
"dateFrom": "now-24h",
"dateTo": "now",
"viewId": "default",
"size": 1000,
"groupBy": ["conversationId"],
"select": ["conversationId", "startTime", "endTime", "channel"]
}
try:
logger.info("Starting batch conversation query...")
results = processor.query_conversations(query_body)
if results:
logger.info(f"Successfully retrieved {len(results)} conversations.")
# Print first result for verification
logger.debug(f"Sample record: {json.dumps(results[0], indent=2)}")
else:
logger.info("No conversations found in the specified period.")
except Exception as e:
logger.error(f"Job failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized After Refresh
- Cause: The client credentials are invalid, the scope
analytics:conversation:readis missing from the OAuth client, or the token refresh endpoint is unreachable. - Fix: Verify the Client ID and Secret in the Genesys Cloud Admin Console. Ensure the OAuth Client has the correct scopes assigned. Check network connectivity to
api.{region}.mypurecloud.com. - Code Fix: The script above raises an
HTTPErrorwith a specific message if the second attempt fails. Add detailed logging to_fetch_tokento inspect the raw response body from the OAuth endpoint.
Error: 429 Too Many Requests
- Cause: The batch job is sending requests faster than Genesys Cloud allows. Analytics queries are heavy and have strict rate limits.
- Fix: The
GenesysHttpClientincludes aRetrystrategy for 429s with exponential backoff. If failures persist, increase thebackoff_factoror add a manualtime.sleep()between pages. - Code Fix: Adjust the
Retryconfiguration inGenesysHttpClient.__init__:retry_strategy = Retry( total=5, backoff_factor=2, # Increases delay between retries (2^retry_number seconds) status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"] )
Error: 400 Bad Request on Pagination
- Cause: The
nextPageUrireturned by Genesys Cloud might require a GET request, but the client sends a POST, or vice versa. - Fix: The
ConversationBatchProcessordistinguishes between the first page (POST with body) and subsequent pages (GET tonextPageUri). Ensure you do not send the JSON body on subsequent GET requests. - Code Fix: The provided code handles this by setting
is_first_page = Falseafter the initial request and switching toGETfor subsequent iterations.