Robust Token Refresh: Handling Expiry During Batch API Operations

Robust Token Refresh: Handling Expiry During Batch API Operations

What You Will Build

  • A production-grade Python utility that executes a batch query against Genesys Cloud CX Analytics APIs without failing when an access token expires.
  • Implementation of automatic token refresh logic using the genesys-cloud-python SDK and raw HTTP fallbacks.
  • A complete Python script that handles pagination, retries, and seamless token rotation in a single execution flow.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant). This is the standard for server-to-server integrations.
  • Required Scopes: analytics:conversation:read (for the example data fetch). Adjust based on your specific API endpoint.
  • SDK Version: genesys-cloud-python v8.0.0 or higher.
  • Language/Runtime: Python 3.8+
  • External Dependencies: requests (for underlying HTTP handling if bypassing SDK, though the SDK handles this internally, we will demonstrate the SDK approach and a raw HTTP fallback for clarity).

Authentication Setup

The Genesys Cloud Python SDK (genesys-cloud-python) handles OAuth token management internally when configured correctly. However, understanding the underlying mechanism is critical when debugging batch failures. The SDK uses a PlatformClient instance that maintains a token cache. When a 401 Unauthorized response is received, the SDK attempts to refresh the token automatically if the client credentials are provided.

For batch operations, the risk is not just the initial token expiry, but the token expiring between API calls within a long-running loop. The SDK’s default behavior is sufficient for most cases, but explicit error handling ensures your batch job does not crash silently.

SDK Configuration

import os
from purecloud_platform_client_v2 import Configuration, ApiClient, PlatformClient

def get_platform_client():
    """
    Initializes the Genesys Cloud Platform Client with OAuth configuration.
    The SDK will automatically refresh tokens upon 401 responses.
    """
    # Load credentials from environment variables
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    # Configure the SDK
    configuration = Configuration(
        environment=environment,
        client_id=client_id,
        client_secret=client_secret
    )

    # Create the API Client
    api_client = ApiClient(configuration=configuration)
    
    # Create the Platform Client
    platform_client = PlatformClient(api_client=api_client)
    
    return platform_client

Key Insight: The PlatformClient does not require you to manually call a refresh method. It intercepts HTTP responses. If it receives a 401 Unauthorized, it triggers the OAuth refresh flow in the background before retrying the original request. This is transparent to your business logic.

Implementation

Step 1: Defining the Batch Query Structure

We will use the Analytics Conversation Details Query API (/api/v2/analytics/conversations/details/query). This endpoint is ideal for demonstrating batch logic because it returns paginated results and can take significant time to process, increasing the likelihood of token expiry during large datasets.

from purecloud_platform_client_v2.models import ConversationDetailsQueryRequest
from purecloud_platform_client_v2.api import analytics_api
from datetime import datetime, timedelta

def build_query_request(platform_client):
    """
    Constructs a ConversationDetailsQueryRequest for the last 24 hours.
    """
    analytics_instance = analytics_api.AnalyticsApi(platform_client.api_client)
    
    # Define time range
    now = datetime.utcnow()
    start_time = now - timedelta(hours=24)
    
    # Build the query body
    query_body = ConversationDetailsQueryRequest(
        start=start_time.isoformat() + "Z",
        end=now.isoformat() + "Z",
        size=50,  # Page size
        view="default",
        filter="type:voice"  # Filter for voice conversations only
    )
    
    return analytics_instance, query_body

Step 2: Core Logic with Robust Pagination and Retry

The critical part of this tutorial is the loop. We must ensure that if the token expires during the next_page call, the SDK handles it, or we handle the exception gracefully.

While the SDK handles 401s automatically, it is best practice to wrap API calls in a try-except block to catch specific SDK exceptions and log them. This prevents the batch job from terminating unexpectedly.

import logging
from purecloud_platform_client_v2.exceptions import ApiException

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_batch_data(platform_client):
    """
    Fetches all conversation details based on the query.
    Handles pagination and potential token refreshes automatically via SDK.
    """
    analytics_instance, query_body = build_query_request(platform_client)
    
    all_conversations = []
    page_number = 1
    has_more_pages = True
    
    logger.info("Starting batch fetch...")
    
    try:
        while has_more_pages:
            # The SDK automatically handles 401 refresh here.
            # If the token expires mid-loop, the next call triggers a refresh.
            response = analytics_instance.post_analytics_conversations_details_query(
                body=query_body,
                page=page_number
            )
            
            if response.conversations is None or len(response.conversations) == 0:
                logger.info("No more conversations found.")
                break
            
            # Process current page
            for conversation in response.conversations:
                # Example: Extracting ID and start time
                all_conversations.append({
                    "id": conversation.id,
                    "start_time": conversation.start_time,
                    "duration": conversation.duration
                })
            
            logger.info(f"Fetched page {page_number}, total items so far: {len(all_conversations)}")
            
            # Check if there are more pages
            if response.next_page is not None:
                page_number += 1
                # Optional: Add a small sleep to respect rate limits if necessary
                # import time; time.sleep(0.5)
            else:
                has_more_pages = False
                
    except ApiException as e:
        # Handle specific API errors
        if e.status == 401:
            logger.error("Authentication failed. Token refresh may have failed or credentials are invalid.")
            raise
        elif e.status == 429:
            logger.warning("Rate limit exceeded. Consider implementing exponential backoff.")
            raise
        else:
            logger.error(f"API Error {e.status}: {e.body}")
            raise
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        raise

    return all_conversations

Why this works: The post_analytics_conversations_details_query method is synchronous. When the SDK receives a 401, it pauses the current call, fetches a new token using the stored client_secret, and retries the request. This happens before the ApiException is raised. Therefore, your loop continues seamlessly.

Step 3: Raw HTTP Fallback (For Advanced Control)

In some enterprise environments, you may prefer not to use the SDK for batch operations due to memory overhead or desire for granular control over retry logic. Below is a raw requests implementation that explicitly handles token refresh.

import requests
import time
from datetime import datetime

class GenesysBatchClient:
    def __init__(self, client_id, client_secret, environment="mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{environment}"
        self.access_token = None
        self.token_expiry = None
        
    def _get_token(self):
        """Fetches a new OAuth token."""
        url = f"{self.base_url}/oauth/token"
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        response = requests.post(url, data=data)
        response.raise_for_status()
        token_data = response.json()
        self.access_token = token_data["access_token"]
        # Expire slightly before actual expiry to be safe
        self.token_expiry = time.time() + (token_data["expires_in"] - 60)
        return self.access_token
        
    def _ensure_token(self):
        """Ensures the current token is valid."""
        if self.access_token is None or time.time() >= self.token_expiry:
            logger.info("Token expired or missing. Refreshing...")
            self._get_token()
            
    def post_analytics_query(self, query_body, page=1):
        """
        Executes the analytics query with explicit token refresh logic.
        """
        self._ensure_token()
        
        url = f"{self.base_url}/api/v2/analytics/conversations/details/query"
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        params = {"page": page}
        
        response = requests.post(url, json=query_body, headers=headers, params=params)
        
        # Handle 401 explicitly for raw HTTP
        if response.status_code == 401:
            logger.warning("Received 401. Forcing token refresh and retry.")
            self._get_token()
            headers["Authorization"] = f"Bearer {self.access_token}"
            response = requests.post(url, json=query_body, headers=headers, params=params)
            response.raise_for_status()
            
        response.raise_for_status()
        return response.json()

    def fetch_all_conversations(self, query_body):
        """
        Batch fetches all conversations with explicit pagination and token checks.
        """
        all_conversations = []
        page = 1
        
        while True:
            logger.info(f"Fetching page {page}...")
            try:
                data = self.post_analytics_query(query_body, page=page)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:
                    logger.warning("Rate limited. Waiting 5 seconds...")
                    time.sleep(5)
                    continue
                raise
                
            conversations = data.get("conversations", [])
            if not conversations:
                break
                
            all_conversations.extend(conversations)
            
            if data.get("next_page") is None:
                break
                
            page += 1
            
        return all_conversations

Complete Working Example

Below is the full, copy-pasteable Python script using the SDK approach, which is recommended for most developers due to its built-in resilience.

import os
import logging
from purecloud_platform_client_v2 import Configuration, ApiClient, PlatformClient
from purecloud_platform_client_v2.api import analytics_api
from purecloud_platform_client_v2.models import ConversationDetailsQueryRequest
from purecloud_platform_client_v2.exceptions import ApiException
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def get_platform_client():
    """Initializes the Genesys Cloud Platform Client."""
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")

    configuration = Configuration(
        environment=environment,
        client_id=client_id,
        client_secret=client_secret
    )

    api_client = ApiClient(configuration=configuration)
    platform_client = PlatformClient(api_client=api_client)
    return platform_client

def run_batch_job():
    """
    Main function to execute the batch conversation fetch.
    """
    try:
        platform_client = get_platform_client()
        logger.info("Platform client initialized.")
        
        analytics_instance = analytics_api.AnalyticsApi(platform_client.api_client)
        
        # Define query parameters
        now = datetime.utcnow()
        start_time = now - timedelta(hours=24)
        
        query_body = ConversationDetailsQueryRequest(
            start=start_time.isoformat() + "Z",
            end=now.isoformat() + "Z",
            size=100,  # Larger page size for efficiency
            view="default",
            filter="type:voice"
        )
        
        all_conversations = []
        page = 1
        max_pages = 500  # Safety break to prevent infinite loops
        
        logger.info(f"Starting batch fetch from {start_time} to {now}")
        
        while page <= max_pages:
            try:
                # SDK automatically handles token refresh on 401
                response = analytics_instance.post_analytics_conversations_details_query(
                    body=query_body,
                    page=page
                )
                
                if response.conversations is None or len(response.conversations) == 0:
                    logger.info("No more conversations found.")
                    break
                
                # Process data
                for conv in response.conversations:
                    all_conversations.append({
                        "id": conv.id,
                        "start_time": conv.start_time,
                        "duration_ms": conv.duration
                    })
                
                logger.info(f"Page {page} processed. Total records: {len(all_conversations)}")
                
                # Check for next page
                if response.next_page is not None:
                    page += 1
                else:
                    logger.info("End of pages reached.")
                    break
                    
            except ApiException as e:
                logger.error(f"API Error on page {page}: {e.status} - {e.reason}")
                if e.status == 401:
                    logger.error("Authentication failed. Check credentials.")
                    return
                elif e.status == 429:
                    logger.warning("Rate limit hit. Retrying in 5 seconds...")
                    import time
                    time.sleep(5)
                    continue # Retry same page
                else:
                    raise
        
        logger.info(f"Batch job complete. Total conversations fetched: {len(all_conversations)}")
        return all_conversations

    except Exception as e:
        logger.error(f"Job failed: {str(e)}")
        return []

if __name__ == "__main__":
    conversations = run_batch_job()
    if conversations:
        print(f"Successfully fetched {len(conversations)} conversations.")
        # Example: Save to CSV or Database
        # import csv
        # with open('conversations.csv', 'w', newline='') as f:
        #     writer = csv.DictWriter(f, fieldnames=conversations[0].keys())
        #     writer.writeheader()
        #     writer.writerows(conversations)

Common Errors & Debugging

Error: 401 Unauthorized during batch loop

  • What causes it: The access token expired while the script was processing a large dataset.
  • How to fix it: If using the SDK, this should be handled automatically. If you still see this error, ensure you are using the latest version of genesys-cloud-python. If using raw HTTP, ensure your _ensure_token logic checks the expiry timestamp before every request.
  • Code Fix: The SDK example above relies on internal retry. If it fails, verify that client_secret is correctly passed to the Configuration.

Error: 429 Too Many Requests

  • What causes it: You are sending requests faster than Genesys Cloud allows (typically 10-20 requests per second per client ID for analytics).
  • How to fix it: Implement exponential backoff. The raw HTTP example includes a simple time.sleep(5) on 429. For production, use a library like tenacity or implement a jittered backoff strategy.
  • Code Fix:
    from tenacity import retry, stop_after_attempt, wait_exponential
    from purecloud_platform_client_v2.exceptions import ApiException
    
    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10))
    def safe_fetch_page(analytics_instance, query_body, page):
        return analytics_instance.post_analytics_conversations_details_query(body=query_body, page=page)
    

Error: ApiException: 400 Bad Request

  • What causes it: The query body is malformed or the filter syntax is invalid.
  • How to fix it: Validate the ConversationDetailsQueryRequest object. Ensure start/end times are in ISO 8601 format with ‘Z’ suffix.
  • Debugging Tip: Print the query_body.to_dict() before sending to verify the JSON structure.

Official References