Robust Token Refresh: Handling Expiry During Batch API Operations
What You Will Build
- A production-grade Python utility that executes a batch query against Genesys Cloud CX Analytics APIs without failing when an access token expires.
- Implementation of automatic token refresh logic using the
genesys-cloud-pythonSDK and raw HTTP fallbacks. - A complete Python script that handles pagination, retries, and seamless token rotation in a single execution flow.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant). This is the standard for server-to-server integrations.
- Required Scopes:
analytics:conversation:read(for the example data fetch). Adjust based on your specific API endpoint. - SDK Version:
genesys-cloud-pythonv8.0.0 or higher. - Language/Runtime: Python 3.8+
- External Dependencies:
requests(for underlying HTTP handling if bypassing SDK, though the SDK handles this internally, we will demonstrate the SDK approach and a raw HTTP fallback for clarity).
Authentication Setup
The Genesys Cloud Python SDK (genesys-cloud-python) handles OAuth token management internally when configured correctly. However, understanding the underlying mechanism is critical when debugging batch failures. The SDK uses a PlatformClient instance that maintains a token cache. When a 401 Unauthorized response is received, the SDK attempts to refresh the token automatically if the client credentials are provided.
For batch operations, the risk is not just the initial token expiry, but the token expiring between API calls within a long-running loop. The SDK’s default behavior is sufficient for most cases, but explicit error handling ensures your batch job does not crash silently.
SDK Configuration
import os
from purecloud_platform_client_v2 import Configuration, ApiClient, PlatformClient
def get_platform_client():
"""
Initializes the Genesys Cloud Platform Client with OAuth configuration.
The SDK will automatically refresh tokens upon 401 responses.
"""
# Load credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Configure the SDK
configuration = Configuration(
environment=environment,
client_id=client_id,
client_secret=client_secret
)
# Create the API Client
api_client = ApiClient(configuration=configuration)
# Create the Platform Client
platform_client = PlatformClient(api_client=api_client)
return platform_client
Key Insight: The PlatformClient does not require you to manually call a refresh method. It intercepts HTTP responses. If it receives a 401 Unauthorized, it triggers the OAuth refresh flow in the background before retrying the original request. This is transparent to your business logic.
Implementation
Step 1: Defining the Batch Query Structure
We will use the Analytics Conversation Details Query API (/api/v2/analytics/conversations/details/query). This endpoint is ideal for demonstrating batch logic because it returns paginated results and can take significant time to process, increasing the likelihood of token expiry during large datasets.
from purecloud_platform_client_v2.models import ConversationDetailsQueryRequest
from purecloud_platform_client_v2.api import analytics_api
from datetime import datetime, timedelta
def build_query_request(platform_client):
"""
Constructs a ConversationDetailsQueryRequest for the last 24 hours.
"""
analytics_instance = analytics_api.AnalyticsApi(platform_client.api_client)
# Define time range
now = datetime.utcnow()
start_time = now - timedelta(hours=24)
# Build the query body
query_body = ConversationDetailsQueryRequest(
start=start_time.isoformat() + "Z",
end=now.isoformat() + "Z",
size=50, # Page size
view="default",
filter="type:voice" # Filter for voice conversations only
)
return analytics_instance, query_body
Step 2: Core Logic with Robust Pagination and Retry
The critical part of this tutorial is the loop. We must ensure that if the token expires during the next_page call, the SDK handles it, or we handle the exception gracefully.
While the SDK handles 401s automatically, it is best practice to wrap API calls in a try-except block to catch specific SDK exceptions and log them. This prevents the batch job from terminating unexpectedly.
import logging
from purecloud_platform_client_v2.exceptions import ApiException
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_batch_data(platform_client):
"""
Fetches all conversation details based on the query.
Handles pagination and potential token refreshes automatically via SDK.
"""
analytics_instance, query_body = build_query_request(platform_client)
all_conversations = []
page_number = 1
has_more_pages = True
logger.info("Starting batch fetch...")
try:
while has_more_pages:
# The SDK automatically handles 401 refresh here.
# If the token expires mid-loop, the next call triggers a refresh.
response = analytics_instance.post_analytics_conversations_details_query(
body=query_body,
page=page_number
)
if response.conversations is None or len(response.conversations) == 0:
logger.info("No more conversations found.")
break
# Process current page
for conversation in response.conversations:
# Example: Extracting ID and start time
all_conversations.append({
"id": conversation.id,
"start_time": conversation.start_time,
"duration": conversation.duration
})
logger.info(f"Fetched page {page_number}, total items so far: {len(all_conversations)}")
# Check if there are more pages
if response.next_page is not None:
page_number += 1
# Optional: Add a small sleep to respect rate limits if necessary
# import time; time.sleep(0.5)
else:
has_more_pages = False
except ApiException as e:
# Handle specific API errors
if e.status == 401:
logger.error("Authentication failed. Token refresh may have failed or credentials are invalid.")
raise
elif e.status == 429:
logger.warning("Rate limit exceeded. Consider implementing exponential backoff.")
raise
else:
logger.error(f"API Error {e.status}: {e.body}")
raise
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise
return all_conversations
Why this works: The post_analytics_conversations_details_query method is synchronous. When the SDK receives a 401, it pauses the current call, fetches a new token using the stored client_secret, and retries the request. This happens before the ApiException is raised. Therefore, your loop continues seamlessly.
Step 3: Raw HTTP Fallback (For Advanced Control)
In some enterprise environments, you may prefer not to use the SDK for batch operations due to memory overhead or desire for granular control over retry logic. Below is a raw requests implementation that explicitly handles token refresh.
import requests
import time
from datetime import datetime
class GenesysBatchClient:
def __init__(self, client_id, client_secret, environment="mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = f"https://{environment}"
self.access_token = None
self.token_expiry = None
def _get_token(self):
"""Fetches a new OAuth token."""
url = f"{self.base_url}/oauth/token"
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(url, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Expire slightly before actual expiry to be safe
self.token_expiry = time.time() + (token_data["expires_in"] - 60)
return self.access_token
def _ensure_token(self):
"""Ensures the current token is valid."""
if self.access_token is None or time.time() >= self.token_expiry:
logger.info("Token expired or missing. Refreshing...")
self._get_token()
def post_analytics_query(self, query_body, page=1):
"""
Executes the analytics query with explicit token refresh logic.
"""
self._ensure_token()
url = f"{self.base_url}/api/v2/analytics/conversations/details/query"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
params = {"page": page}
response = requests.post(url, json=query_body, headers=headers, params=params)
# Handle 401 explicitly for raw HTTP
if response.status_code == 401:
logger.warning("Received 401. Forcing token refresh and retry.")
self._get_token()
headers["Authorization"] = f"Bearer {self.access_token}"
response = requests.post(url, json=query_body, headers=headers, params=params)
response.raise_for_status()
response.raise_for_status()
return response.json()
def fetch_all_conversations(self, query_body):
"""
Batch fetches all conversations with explicit pagination and token checks.
"""
all_conversations = []
page = 1
while True:
logger.info(f"Fetching page {page}...")
try:
data = self.post_analytics_query(query_body, page=page)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
logger.warning("Rate limited. Waiting 5 seconds...")
time.sleep(5)
continue
raise
conversations = data.get("conversations", [])
if not conversations:
break
all_conversations.extend(conversations)
if data.get("next_page") is None:
break
page += 1
return all_conversations
Complete Working Example
Below is the full, copy-pasteable Python script using the SDK approach, which is recommended for most developers due to its built-in resilience.
import os
import logging
from purecloud_platform_client_v2 import Configuration, ApiClient, PlatformClient
from purecloud_platform_client_v2.api import analytics_api
from purecloud_platform_client_v2.models import ConversationDetailsQueryRequest
from purecloud_platform_client_v2.exceptions import ApiException
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_platform_client():
"""Initializes the Genesys Cloud Platform Client."""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
configuration = Configuration(
environment=environment,
client_id=client_id,
client_secret=client_secret
)
api_client = ApiClient(configuration=configuration)
platform_client = PlatformClient(api_client=api_client)
return platform_client
def run_batch_job():
"""
Main function to execute the batch conversation fetch.
"""
try:
platform_client = get_platform_client()
logger.info("Platform client initialized.")
analytics_instance = analytics_api.AnalyticsApi(platform_client.api_client)
# Define query parameters
now = datetime.utcnow()
start_time = now - timedelta(hours=24)
query_body = ConversationDetailsQueryRequest(
start=start_time.isoformat() + "Z",
end=now.isoformat() + "Z",
size=100, # Larger page size for efficiency
view="default",
filter="type:voice"
)
all_conversations = []
page = 1
max_pages = 500 # Safety break to prevent infinite loops
logger.info(f"Starting batch fetch from {start_time} to {now}")
while page <= max_pages:
try:
# SDK automatically handles token refresh on 401
response = analytics_instance.post_analytics_conversations_details_query(
body=query_body,
page=page
)
if response.conversations is None or len(response.conversations) == 0:
logger.info("No more conversations found.")
break
# Process data
for conv in response.conversations:
all_conversations.append({
"id": conv.id,
"start_time": conv.start_time,
"duration_ms": conv.duration
})
logger.info(f"Page {page} processed. Total records: {len(all_conversations)}")
# Check for next page
if response.next_page is not None:
page += 1
else:
logger.info("End of pages reached.")
break
except ApiException as e:
logger.error(f"API Error on page {page}: {e.status} - {e.reason}")
if e.status == 401:
logger.error("Authentication failed. Check credentials.")
return
elif e.status == 429:
logger.warning("Rate limit hit. Retrying in 5 seconds...")
import time
time.sleep(5)
continue # Retry same page
else:
raise
logger.info(f"Batch job complete. Total conversations fetched: {len(all_conversations)}")
return all_conversations
except Exception as e:
logger.error(f"Job failed: {str(e)}")
return []
if __name__ == "__main__":
conversations = run_batch_job()
if conversations:
print(f"Successfully fetched {len(conversations)} conversations.")
# Example: Save to CSV or Database
# import csv
# with open('conversations.csv', 'w', newline='') as f:
# writer = csv.DictWriter(f, fieldnames=conversations[0].keys())
# writer.writeheader()
# writer.writerows(conversations)
Common Errors & Debugging
Error: 401 Unauthorized during batch loop
- What causes it: The access token expired while the script was processing a large dataset.
- How to fix it: If using the SDK, this should be handled automatically. If you still see this error, ensure you are using the latest version of
genesys-cloud-python. If using raw HTTP, ensure your_ensure_tokenlogic checks the expiry timestamp before every request. - Code Fix: The SDK example above relies on internal retry. If it fails, verify that
client_secretis correctly passed to theConfiguration.
Error: 429 Too Many Requests
- What causes it: You are sending requests faster than Genesys Cloud allows (typically 10-20 requests per second per client ID for analytics).
- How to fix it: Implement exponential backoff. The raw HTTP example includes a simple
time.sleep(5)on 429. For production, use a library liketenacityor implement a jittered backoff strategy. - Code Fix:
from tenacity import retry, stop_after_attempt, wait_exponential from purecloud_platform_client_v2.exceptions import ApiException @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10)) def safe_fetch_page(analytics_instance, query_body, page): return analytics_instance.post_analytics_conversations_details_query(body=query_body, page=page)
Error: ApiException: 400 Bad Request
- What causes it: The query body is malformed or the filter syntax is invalid.
- How to fix it: Validate the
ConversationDetailsQueryRequestobject. Ensure start/end times are in ISO 8601 format with ‘Z’ suffix. - Debugging Tip: Print the
query_body.to_dict()before sending to verify the JSON structure.