Increase Genesys Cloud Data Action Timeout for Slow API Calls

Increase Genesys Cloud Data Action Timeout for Slow API Calls

What You Will Build

  • You will configure a Genesys Cloud CX Data Action to execute an external API call that exceeds the default 3-second timeout.
  • You will use the Genesys Cloud CX Platform API (specifically the analytics and flows endpoints) to retrieve and update flow configurations.
  • You will implement this in Python using the genesys-cloud SDK and the requests library for direct API calls where SDK granularity is limited.

Prerequisites

  • OAuth Client Type: Private Key (JWT) or Client Credentials.
  • Required Scopes:
    • flow:flow:read
    • flow:flow:write
    • analytics:detail:view (for debugging timeouts)
  • SDK Version: genesys-cloud Python SDK version 3.0.0 or later.
  • Language/Runtime: Python 3.8+
  • External Dependencies:
    • genesys-cloud
    • requests
    • pyjwt (for token generation if not using a helper library)

Authentication Setup

Genesys Cloud uses JWT (JSON Web Tokens) for private key authentication. You must generate a token before making any API calls. The following code demonstrates a robust token generator that handles expiration and caching.

import jwt
import time
import requests
from typing import Optional

class GenesysAuth:
    def __init__(self, org_id: str, client_id: str, private_key: str, api_host: str = "api.mypurecloud.com"):
        self.org_id = org_id
        self.client_id = client_id
        self.private_key = private_key
        self.api_host = api_host
        self.token_url = f"https://{api_host}/oauth/token"
        self._token: Optional[str] = None
        self._token_expiry: float = 0

    def get_token(self) -> str:
        """
        Returns a valid JWT token. Generates a new one if the current one is expired or missing.
        """
        if self._token and time.time() < self._token_expiry:
            return self._token

        # Create JWT payload
        now = int(time.time())
        payload = {
            "iss": self.client_id,
            "sub": self.client_id,
            "aud": self.token_url,
            "iat": now,
            "exp": now + 300,  # Token valid for 5 minutes for signing purposes
            "jti": str(time.time())
        }

        # Sign with private key
        token = jwt.encode(payload, self.private_key, algorithm="RS256")

        # Exchange JWT for access token
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "assertion": token
        }

        response = requests.post(self.token_url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self._token = token_data["access_token"]
        self._token_expiry = time.time() + token_data["expires_in"] - 10 # Buffer 10s

        return self._token

    def get_headers(self) -> dict:
        """
        Returns headers ready for API requests.
        """
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

Implementation

Step 1: Identify the Flow and Data Action

Before modifying the timeout, you must identify the specific Flow ID and the Data Action definition within that flow. Genesys Cloud Flows are JSON-like structures represented by the SDK as Flow objects.

The Data Action timeout is not a top-level flow setting. It is embedded within the actions array of the flow definition, specifically in the data object of a DataAction.

from genesyscloud import FlowApi, PlatformClient

def get_flow_with_data_actions(client: PlatformClient, flow_id: str) -> dict:
    """
    Retrieves a specific flow and extracts its data actions.
    
    Args:
        client: Initialized PlatformClient
        flow_id: The UUID of the flow
        
    Returns:
        A dictionary containing the flow ID and a list of data action definitions.
    """
    flow_api = FlowApi(client)
    
    try:
        # Fetch the flow. expand='dataActions' ensures we get the detailed action configs
        flow_response = flow_api.get_flow(flow_id=flow_id, expand=["dataActions"])
        
        if not flow_response.body:
            raise ValueError("Flow body is empty. Check Flow ID.")
            
        flow_data = flow_response.body.to_dict()
        
        # Extract data actions from the flow structure
        # Note: Genesys flows can be complex. Data actions are often in 'actions' or nested in 'tasks'.
        data_actions = []
        
        if 'actions' in flow_data:
            for action in flow_data['actions']:
                if action.get('type') == 'data':
                    data_actions.append(action)
                    
        return {
            "flow_id": flow_id,
            "flow_name": flow_data.get('name'),
            "data_actions": data_actions
        }
        
    except Exception as e:
        print(f"Error fetching flow {flow_id}: {str(e)}")
        raise

# Example Usage
# auth = GenesysAuth(org_id="your_org", client_id="your_client", private_key="your_key")
# client = PlatformClient()
# client.set_access_token(auth.get_token())
# flow_info = get_flow_with_data_actions(client, "your-flow-id")
# print(flow_info['data_actions'])

Expected Response Structure for a Data Action:
The data_actions list will contain dictionaries similar to this:

{
  "id": "data-action-uuid-123",
  "name": "GetCustomerProfile",
  "type": "data",
  "data": {
    "url": "https://api.example.com/customer/{queue_id}",
    "method": "GET",
    "timeout": 3000, 
    "headers": [],
    "body": ""
  }
}

Critical Parameter:

  • timeout: This is the value in milliseconds. The default is often 3000 (3 seconds). You need to change this to 5000 or higher.

Step 2: Update the Data Action Timeout

Genesys Cloud Flows are immutable in the sense that you cannot PATCH a single field. You must retrieve the entire flow, modify the JSON structure, and PUT the entire flow back. This requires careful handling of ETags to avoid overwriting concurrent changes.

import copy
from genesyscloud import FlowApi

def update_data_action_timeout(client: PlatformClient, flow_id: str, action_id: str, new_timeout_ms: int) -> bool:
    """
    Updates the timeout for a specific data action within a flow.
    
    Args:
        client: Initialized PlatformClient
        flow_id: The UUID of the flow
        action_id: The UUID of the specific data action to update
        new_timeout_ms: The new timeout in milliseconds (e.g., 5000 for 5s)
        
    Returns:
        True if successful, False otherwise
    """
    flow_api = FlowApi(client)
    
    try:
        # 1. Get the current flow with ETag support
        # We need the ETag to ensure we don't overwrite changes made by others
        flow_response = flow_api.get_flow(flow_id=flow_id, expand=["dataActions"])
        current_flow = flow_response.body
        etag = flow_response.etag
        
        if not etag:
            raise ValueError("No ETag returned. Cannot update flow safely.")
            
        flow_dict = current_flow.to_dict()
        
        # 2. Locate and update the specific data action
        actions = flow_dict.get('actions', [])
        updated = False
        
        for action in actions:
            if action.get('id') == action_id and action.get('type') == 'data':
                if 'data' in action:
                    action['data']['timeout'] = new_timeout_ms
                    updated = True
                    print(f"Updated timeout for action '{action.get('name')}' to {new_timeout_ms}ms")
                break
        
        if not updated:
            raise ValueError(f"Data action with ID {action_id} not found in flow.")
            
        # 3. Prepare the update payload
        # We must exclude the 'id' and 'version' from the request body in some SDK versions, 
        # but generally, we send the full object. The SDK handles serialization.
        
        # Create a new Flow object from the modified dict
        # Note: Depending on SDK version, you might need to use a factory or manual construction.
        # Here we use a simplified approach assuming the SDK accepts a dict or we rebuild the object.
        
        # Re-initialize the flow object with modified data
        # This step can be tricky with strict SDK types. A safer approach is to use the raw API via requests 
        # if the SDK's to_dict/from_dict is lossy for complex nested objects.
        
        # For robustness, let's use the raw requests library for the PUT operation 
        # to ensure the JSON structure is exactly as we modified it.
        
        headers = {
            "Authorization": f"Bearer {client.get_access_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json",
            "If-Match": etag # Crucial for optimistic locking
        }
        
        url = f"https://api.mypurecloud.com/api/v2/flows/{flow_id}"
        
        response = requests.put(url, json=flow_dict, headers=headers)
        
        if response.status_code == 200:
            print("Flow updated successfully.")
            return True
        elif response.status_code == 412:
            print("Precondition Failed: The flow has been modified by another user since you retrieved it.")
            return False
        else:
            print(f"Error updating flow: {response.status_code} - {response.text}")
            return False
            
    except Exception as e:
        print(f"Exception during update: {str(e)}")
        return False

Step 3: Verify the Change and Handle Rate Limits

After updating the flow, you should verify the change. Additionally, if you are updating many flows, you may hit rate limits (429). The following function demonstrates how to verify the timeout and implement exponential backoff for 429 errors.

import time
import random

def update_flow_with_retry(client: PlatformClient, flow_id: str, action_id: str, new_timeout_ms: int, max_retries: int = 3) -> bool:
    """
    Wrapper to handle 429 rate limits and 412 conflicts with retries.
    """
    for attempt in range(max_retries):
        try:
            result = update_data_action_timeout(client, flow_id, action_id, new_timeout_ms)
            if result:
                return True
            elif response.status_code == 429:
                # Exponential backoff with jitter
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited. Waiting {wait_time:.2f} seconds before retry...")
                time.sleep(wait_time)
            else:
                # If it's a 412 conflict, we might need to re-fetch and try again
                if response.status_code == 412:
                    print("Conflict detected. Re-fetching flow and retrying...")
                    time.sleep(1)
                    continue
                else:
                    return False
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2)
            else:
                raise

    return False

Complete Working Example

This script combines authentication, flow retrieval, timeout update, and verification into a single runnable module.

import os
import jwt
import time
import requests
import random
from typing import Optional, Dict, Any

# --- Authentication Class ---
class GenesysAuth:
    def __init__(self, org_id: str, client_id: str, private_key: str, api_host: str = "api.mypurecloud.com"):
        self.org_id = org_id
        self.client_id = client_id
        self.private_key = private_key
        self.api_host = api_host
        self.token_url = f"https://{api_host}/oauth/token"
        self._token: Optional[str] = None
        self._token_expiry: float = 0

    def get_token(self) -> str:
        if self._token and time.time() < self._token_expiry:
            return self._token

        now = int(time.time())
        payload = {
            "iss": self.client_id,
            "sub": self.client_id,
            "aud": self.token_url,
            "iat": now,
            "exp": now + 300,
            "jti": str(time.time())
        }

        token = jwt.encode(payload, self.private_key, algorithm="RS256")

        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "assertion": token
        }

        response = requests.post(self.token_url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self._token = token_data["access_token"]
        self._token_expiry = time.time() + token_data["expires_in"] - 10

        return self._token

# --- Core Logic ---

def update_data_action_timeout(flow_id: str, action_id: str, new_timeout_ms: int, auth: GenesysAuth) -> bool:
    """
    Updates the timeout for a specific data action within a flow.
    """
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    # 1. Get the current flow with ETag
    get_url = f"https://{auth.api_host}/api/v2/flows/{flow_id}?expand=dataActions"
    get_resp = requests.get(get_url, headers=headers)
    get_resp.raise_for_status()
    
    etag = get_resp.headers.get("ETag")
    if not etag:
        raise ValueError("No ETag returned. Cannot update flow safely.")
        
    flow_data = get_resp.json()
    
    # 2. Locate and update the specific data action
    actions = flow_data.get('actions', [])
    updated = False
    
    for action in actions:
        if action.get('id') == action_id and action.get('type') == 'data':
            if 'data' in action:
                old_timeout = action['data'].get('timeout', 3000)
                action['data']['timeout'] = new_timeout_ms
                updated = True
                print(f"Updated timeout for action '{action.get('name')}' from {old_timeout}ms to {new_timeout_ms}ms")
            break
    
    if not updated:
        raise ValueError(f"Data action with ID {action_id} not found in flow {flow_id}.")
        
    # 3. Prepare the update payload
    put_url = f"https://{auth.api_host}/api/v2/flows/{flow_id}"
    put_headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json",
        "If-Match": etag
    }
    
    # 4. Send the updated flow
    response = requests.put(put_url, json=flow_data, headers=put_headers)
    
    if response.status_code == 200:
        print("Flow updated successfully.")
        return True
    elif response.status_code == 412:
        print("Precondition Failed: The flow has been modified by another user since you retrieved it.")
        return False
    elif response.status_code == 429:
        print("Rate limited. Please retry later.")
        return False
    else:
        print(f"Error updating flow: {response.status_code} - {response.text}")
        return False

def main():
    # Configuration
    ORG_ID = os.getenv("GENESYS_ORG_ID")
    CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
    PRIVATE_KEY = os.getenv("GENESYS_PRIVATE_KEY")
    FLOW_ID = os.getenv("FLOW_ID")
    ACTION_ID = os.getenv("ACTION_ID")
    NEW_TIMEOUT = 5000 # 5 seconds
    
    if not all([ORG_ID, CLIENT_ID, PRIVATE_KEY, FLOW_ID, ACTION_ID]):
        print("Missing environment variables. Please set GENESYS_ORG_ID, GENESYS_CLIENT_ID, GENESYS_PRIVATE_KEY, FLOW_ID, ACTION_ID")
        return

    # Initialize Auth
    auth = GenesysAuth(ORG_ID, CLIENT_ID, PRIVATE_KEY)
    
    try:
        # Execute Update
        success = update_data_action_timeout(FLOW_ID, ACTION_ID, NEW_TIMEOUT, auth)
        
        if success:
            print("Operation completed successfully.")
        else:
            print("Operation failed.")
            
    except Exception as e:
        print(f"Critical error: {str(e)}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 412 Precondition Failed

  • What causes it: The If-Match header contains an ETag that no longer matches the current state of the flow. This happens if another user or process modified the flow between your GET and PUT requests.
  • How to fix it: Implement a retry loop that re-fetches the flow, re-applies your change, and sends the PUT again with the new ETag.
  • Code showing the fix: See the update_flow_with_retry function in Step 3.

Error: 400 Bad Request - Validation Error

  • What causes it: The JSON structure sent in the PUT request is invalid. This often happens if the SDK’s to_dict() method omits required fields or if you accidentally modified a read-only field (like id or version in a way the API rejects).
  • How to fix it: Ensure you are sending the entire flow object retrieved from the GET request, with only the timeout field modified. Do not strip out nested objects like conditions or outcomes unless you are certain they are empty and optional.
  • Debugging Tip: Log the flow_data JSON before sending the PUT request to verify its structure matches the Genesys Cloud Flow API specification.

Error: 403 Forbidden

  • What causes it: The OAuth token lacks the flow:flow:write scope.
  • How to fix it: Regenerate your OAuth token with the correct scopes. Ensure your client credentials have the necessary permissions in the Genesys Cloud Admin Console under Administration > Security > Client Credentials.

Error: Timeout Still Exceeds New Limit

  • What causes it: The external API is taking longer than the new timeout value (e.g., 5 seconds).
  • How to fix it: Increase the timeout further (e.g., to 10000ms). Note that Genesys Cloud imposes a maximum timeout value (typically 30 seconds for data actions). If your API takes longer, consider using asynchronous processing with a webhook callback instead of a synchronous data action.

Official References