Increase Genesys Cloud Data Action Timeout for Slow Integrations

Increase Genesys Cloud Data Action Timeout for Slow Integrations

What You Will Build

  • A Python script that updates the execution timeout configuration for specific Data Actions using the Genesys Cloud Platform API.
  • This tutorial uses the Genesys Cloud CX API (/api/v2/flows) to modify Flow resources directly.
  • The programming language covered is Python 3.10+ using the requests library.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth client with the admin:flow scope. Standard user tokens do not have permission to modify Flow definitions.
  • API Version: Genesys Cloud Platform API v2.
  • Language/Runtime: Python 3.10 or higher.
  • External Dependencies:
    • requests (for HTTP calls)
    • pydantic (optional, for type validation in this example, but we will stick to standard dictionaries for simplicity and broader compatibility)

Required OAuth Scopes:

  • admin:flow (Required to update Flow definitions)
  • view:flow (Required to read the current Flow definition)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials flow is the standard approach. You must obtain an access token before making any API calls.

The following function handles token acquisition and basic caching. In a production environment, you should implement a proper token cache that checks expiration before requesting a new token.

import requests
import json
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_domain: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{org_domain}.mypurecloud.com"
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    def get_token(self) -> str:
        """
        Retrieves an OAuth access token using Client Credentials flow.
        """
        # Check if we have a valid cached token
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token

        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.token_url, data=data)
            response.raise_for_status()
            
            token_data = response.json()
            self.access_token = token_data["access_token"]
            # Genesys tokens usually expire in 3600 seconds (1 hour).
            # We subtract 60 seconds to ensure we refresh before hard expiry.
            self.token_expiry = time.time() + (token_data.get("expires_in", 3600) - 60)
            
            return self.access_token

        except requests.exceptions.HTTPError as e:
            raise Exception(f"Failed to obtain token: {e.response.text}")
        except requests.exceptions.RequestException as e:
            raise Exception(f"Network error during token request: {str(e)}")

    def get_headers(self) -> dict:
        """
        Returns headers with the Authorization Bearer token.
        """
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

Implementation

Step 1: Retrieve the Current Flow Definition

Data Action timeouts are not global settings; they are configured per Flow. To change the timeout, you must first retrieve the specific Flow JSON definition. The API does not provide a direct “update timeout” endpoint. You must read the entire Flow, modify the relevant object, and write it back.

Endpoint: GET /api/v2/flows/{flowId}
Scope: view:flow

class FlowManager:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = auth.base_url

    def get_flow(self, flow_id: str) -> dict:
        """
        Retrieves the full JSON definition of a Flow.
        """
        endpoint = f"/api/v2/flows/{flow_id}"
        headers = self.auth.get_headers()
        url = f"{self.base_url}{endpoint}"

        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if response.status_code == 404:
                raise Exception(f"Flow {flow_id} not found.")
            elif response.status_code == 403:
                raise Exception("Forbidden: Check if your OAuth client has 'view:flow' scope.")
            else:
                raise Exception(f"Error retrieving flow: {e.response.text}")

Step 2: Locate and Update the Data Action Timeout

The Flow JSON structure is hierarchical. Data Actions are located within the steps array. Each step has a type. For Data Actions, the type is data-action.

The timeout setting is not always present in the JSON if it is using the default value. However, to explicitly set it to a higher value (e.g., 10 seconds instead of 3), you must add or update the timeout property within the settings object of that specific step.

Key Properties:

  • steps: Array of flow steps.
  • type: Must be "data-action".
  • settings.timeout: The timeout in seconds. The maximum allowed value is typically 30 seconds for standard Data Actions, though this can vary by contract. If you need longer, you must use a Script or External API step with different limits.
    def update_data_action_timeout(self, flow_definition: dict, target_action_name: str, new_timeout_seconds: int) -> dict:
        """
        Iterates through flow steps to find a Data Action by name
        and updates its timeout setting.
        """
        if "steps" not in flow_definition:
            raise ValueError("Invalid flow definition: 'steps' key missing.")

        updated = False
        
        # Iterate through all steps in the flow
        for step in flow_definition["steps"]:
            # Check if this is a Data Action step
            if step.get("type") == "data-action":
                # Check if this is the specific action we want to change
                # The name is usually in the 'name' field or sometimes inferred from the actionId
                if step.get("name") == target_action_name:
                    # Ensure settings object exists
                    if "settings" not in step:
                        step["settings"] = {}
                    
                    # Update the timeout
                    step["settings"]["timeout"] = new_timeout_seconds
                    updated = True
                    print(f"Updated timeout for '{target_action_name}' to {new_timeout_seconds}s")
                    break # Assuming unique names for this tutorial
        
        if not updated:
            raise ValueError(f"Data Action named '{target_action_name}' not found in Flow.")

        return flow_definition

Note on Edge Cases:
If the Data Action is inside a sub-flow or a container, this simple iterator will not find it. This tutorial assumes a flat Flow structure for clarity. In complex flows, you would need a recursive function to traverse steps within steps (e.g., inside a case or loop structure).

Step 3: Publish the Updated Flow

Genesys Cloud Flows must be published to take effect. You cannot simply save the JSON; you must submit it to the publish endpoint. This operation is idempotent but requires the full Flow definition.

Endpoint: PUT /api/v2/flows/{flowId}/publish
Scope: admin:flow

Critical Requirement: The version field in the Flow definition must be incremented or handled correctly. When you retrieve a flow, it includes a version field. When you publish, you must ensure the version you are submitting is consistent with what the server expects. Usually, the API handles versioning automatically if you submit the exact object you retrieved, modified. However, if you modify the object significantly, you may need to handle ETag conflicts.

    def publish_flow(self, flow_id: str, flow_definition: dict) -> dict:
        """
        Publishes the updated Flow definition.
        """
        endpoint = f"/api/v2/flows/{flow_id}/publish"
        headers = self.auth.get_headers()
        url = f"{self.base_url}{endpoint}"

        # Remove fields that are read-only or auto-generated if necessary
        # But usually, sending the full retrieved JSON back is safe
        # Ensure the 'version' field is preserved from the GET request
        
        try:
            response = requests.put(url, headers=headers, json=flow_definition)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if response.status_code == 409:
                raise Exception("Conflict: Version mismatch. Fetch the latest flow again and retry.")
            elif response.status_code == 400:
                raise Exception(f"Bad Request: Invalid flow definition. {e.response.text}")
            else:
                raise Exception(f"Error publishing flow: {e.response.text}")

Complete Working Example

This script combines authentication, retrieval, modification, and publication. Replace the placeholder credentials and Flow ID with your actual values.

import requests
import json
import time
import sys

# --- Configuration ---
CLIENT_ID = "your_client_id_here"
CLIENT_SECRET = "your_client_secret_here"
ORG_DOMAIN = "your_org_domain_here"
FLOW_ID = "your_flow_id_here"
TARGET_ACTION_NAME = "MySlowDataAction" # The exact name of the Data Action step in the Flow
NEW_TIMEOUT = 10 # Desired timeout in seconds

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_domain: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = f"https://{org_domain}.mypurecloud.com"
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token = None
        self.token_expiry = 0

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token

        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.token_url, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = time.time() + (token_data.get("expires_in", 3600) - 60)
            return self.access_token
        except requests.exceptions.RequestException as e:
            raise Exception(f"Auth failed: {str(e)}")

    def get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

class FlowManager:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = auth.base_url

    def get_flow(self, flow_id: str) -> dict:
        url = f"{self.base_url}/api/v2/flows/{flow_id}"
        headers = self.auth.get_headers()
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()

    def update_data_action_timeout(self, flow_def: dict, action_name: str, timeout: int) -> dict:
        if "steps" not in flow_def:
            raise ValueError("Flow has no steps")
        
        found = False
        for step in flow_def["steps"]:
            if step.get("type") == "data-action" and step.get("name") == action_name:
                if "settings" not in step:
                    step["settings"] = {}
                step["settings"]["timeout"] = timeout
                found = True
                break
        
        if not found:
            raise ValueError(f"Action '{action_name}' not found")
        return flow_def

    def publish_flow(self, flow_id: str, flow_def: dict) -> dict:
        url = f"{self.base_url}/api/v2/flows/{flow_id}/publish"
        headers = self.auth.get_headers()
        
        # Remove the 'version' from the body if it causes conflicts, 
        # though typically PUT /publish accepts the version from the GET
        # Some API versions require sending the ETag header instead.
        # For simplicity, we send the JSON body.
        
        response = requests.put(url, headers=headers, json=flow_def)
        
        if response.status_code == 409:
            print("Version conflict detected. The flow was modified by another user.")
            sys.exit(1)
            
        response.raise_for_status()
        return response.json()

def main():
    try:
        # 1. Authenticate
        auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_DOMAIN)
        manager = FlowManager(auth)
        
        print(f"Fetching Flow {FLOW_ID}...")
        flow = manager.get_flow(FLOW_ID)
        
        print(f"Current timeout for '{TARGET_ACTION_NAME}' will be updated to {NEW_TIMEOUT}s")
        
        # 2. Modify
        updated_flow = manager.update_data_action_timeout(flow, TARGET_ACTION_NAME, NEW_TIMEOUT)
        
        # 3. Publish
        print("Publishing updated flow...")
        result = manager.publish_flow(FLOW_ID, updated_flow)
        
        print("Success! Flow published with new timeout.")
        print(f"Flow Version: {result.get('version')}")

    except Exception as e:
        print(f"Error: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth client used in the script does not have the admin:flow scope.
Fix: Go to the Genesys Cloud Admin Console > Organization > OAuth Clients. Select your client and ensure admin:flow is checked in the Scopes section. Regenerate the token.

Error: 409 Conflict

Cause: You retrieved the Flow, modified it locally, and attempted to publish it, but another user or process published a new version of the Flow in the meantime. The version number in your local JSON no longer matches the server’s current version.
Fix: Implement a retry loop. If a 409 is received, fetch the Flow again (GET /api/v2/flows/{id}), re-apply your timeout modification to the new JSON, and try publishing again.

# Retry Logic Example
def publish_with_retry(self, flow_id: str, flow_def: dict, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        url = f"{self.base_url}/api/v2/flows/{flow_id}/publish"
        headers = self.auth.get_headers()
        response = requests.put(url, headers=headers, json=flow_def)
        
        if response.status_code != 409:
            response.raise_for_status()
            return response.json()
            
        # If conflict, fetch fresh data and re-apply changes
        print(f"Version conflict on attempt {attempt + 1}. Re-fetching...")
        fresh_flow = self.get_flow(flow_id)
        flow_def = self.update_data_action_timeout(fresh_flow, TARGET_ACTION_NAME, NEW_TIMEOUT)
        
    raise Exception("Max retries exceeded for Flow publish.")

Error: Data Action Not Found

Cause: The name property of the step in the JSON does not match the TARGET_ACTION_NAME variable.
Fix: Print the flow["steps"] array to inspect the actual names. Note that the “Name” seen in the Flow Builder UI might differ from the internal name property if the step was renamed or imported. Use the name field from the JSON response.

Error: Timeout Value Rejected

Cause: The timeout value set is outside the allowed range (e.g., > 30 seconds for Data Actions).
Fix: Check your Genesys Cloud contract limits. If you require timeouts longer than 30 seconds, a Data Action is the wrong tool. You should use a Script step (which supports longer execution times) or an External API step with appropriate configuration.

Official References