Increase Genesys Cloud Data Action Timeout for Slow Integrations
What You Will Build
- A Python script that updates the execution timeout configuration for specific Data Actions using the Genesys Cloud Platform API.
- This tutorial uses the Genesys Cloud CX API (
/api/v2/flows) to modify Flow resources directly. - The programming language covered is Python 3.10+ using the
requestslibrary.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth client with the
admin:flowscope. Standard user tokens do not have permission to modify Flow definitions. - API Version: Genesys Cloud Platform API v2.
- Language/Runtime: Python 3.10 or higher.
- External Dependencies:
requests(for HTTP calls)pydantic(optional, for type validation in this example, but we will stick to standard dictionaries for simplicity and broader compatibility)
Required OAuth Scopes:
admin:flow(Required to update Flow definitions)view:flow(Required to read the current Flow definition)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials flow is the standard approach. You must obtain an access token before making any API calls.
The following function handles token acquisition and basic caching. In a production environment, you should implement a proper token cache that checks expiration before requesting a new token.
import requests
import json
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_domain: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = f"https://{org_domain}.mypurecloud.com"
self.token_url = f"{self.base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
def get_token(self) -> str:
"""
Retrieves an OAuth access token using Client Credentials flow.
"""
# Check if we have a valid cached token
if self.access_token and time.time() < self.token_expiry:
return self.access_token
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Genesys tokens usually expire in 3600 seconds (1 hour).
# We subtract 60 seconds to ensure we refresh before hard expiry.
self.token_expiry = time.time() + (token_data.get("expires_in", 3600) - 60)
return self.access_token
except requests.exceptions.HTTPError as e:
raise Exception(f"Failed to obtain token: {e.response.text}")
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during token request: {str(e)}")
def get_headers(self) -> dict:
"""
Returns headers with the Authorization Bearer token.
"""
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
Implementation
Step 1: Retrieve the Current Flow Definition
Data Action timeouts are not global settings; they are configured per Flow. To change the timeout, you must first retrieve the specific Flow JSON definition. The API does not provide a direct “update timeout” endpoint. You must read the entire Flow, modify the relevant object, and write it back.
Endpoint: GET /api/v2/flows/{flowId}
Scope: view:flow
class FlowManager:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = auth.base_url
def get_flow(self, flow_id: str) -> dict:
"""
Retrieves the full JSON definition of a Flow.
"""
endpoint = f"/api/v2/flows/{flow_id}"
headers = self.auth.get_headers()
url = f"{self.base_url}{endpoint}"
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 404:
raise Exception(f"Flow {flow_id} not found.")
elif response.status_code == 403:
raise Exception("Forbidden: Check if your OAuth client has 'view:flow' scope.")
else:
raise Exception(f"Error retrieving flow: {e.response.text}")
Step 2: Locate and Update the Data Action Timeout
The Flow JSON structure is hierarchical. Data Actions are located within the steps array. Each step has a type. For Data Actions, the type is data-action.
The timeout setting is not always present in the JSON if it is using the default value. However, to explicitly set it to a higher value (e.g., 10 seconds instead of 3), you must add or update the timeout property within the settings object of that specific step.
Key Properties:
steps: Array of flow steps.type: Must be"data-action".settings.timeout: The timeout in seconds. The maximum allowed value is typically 30 seconds for standard Data Actions, though this can vary by contract. If you need longer, you must use a Script or External API step with different limits.
def update_data_action_timeout(self, flow_definition: dict, target_action_name: str, new_timeout_seconds: int) -> dict:
"""
Iterates through flow steps to find a Data Action by name
and updates its timeout setting.
"""
if "steps" not in flow_definition:
raise ValueError("Invalid flow definition: 'steps' key missing.")
updated = False
# Iterate through all steps in the flow
for step in flow_definition["steps"]:
# Check if this is a Data Action step
if step.get("type") == "data-action":
# Check if this is the specific action we want to change
# The name is usually in the 'name' field or sometimes inferred from the actionId
if step.get("name") == target_action_name:
# Ensure settings object exists
if "settings" not in step:
step["settings"] = {}
# Update the timeout
step["settings"]["timeout"] = new_timeout_seconds
updated = True
print(f"Updated timeout for '{target_action_name}' to {new_timeout_seconds}s")
break # Assuming unique names for this tutorial
if not updated:
raise ValueError(f"Data Action named '{target_action_name}' not found in Flow.")
return flow_definition
Note on Edge Cases:
If the Data Action is inside a sub-flow or a container, this simple iterator will not find it. This tutorial assumes a flat Flow structure for clarity. In complex flows, you would need a recursive function to traverse steps within steps (e.g., inside a case or loop structure).
Step 3: Publish the Updated Flow
Genesys Cloud Flows must be published to take effect. You cannot simply save the JSON; you must submit it to the publish endpoint. This operation is idempotent but requires the full Flow definition.
Endpoint: PUT /api/v2/flows/{flowId}/publish
Scope: admin:flow
Critical Requirement: The version field in the Flow definition must be incremented or handled correctly. When you retrieve a flow, it includes a version field. When you publish, you must ensure the version you are submitting is consistent with what the server expects. Usually, the API handles versioning automatically if you submit the exact object you retrieved, modified. However, if you modify the object significantly, you may need to handle ETag conflicts.
def publish_flow(self, flow_id: str, flow_definition: dict) -> dict:
"""
Publishes the updated Flow definition.
"""
endpoint = f"/api/v2/flows/{flow_id}/publish"
headers = self.auth.get_headers()
url = f"{self.base_url}{endpoint}"
# Remove fields that are read-only or auto-generated if necessary
# But usually, sending the full retrieved JSON back is safe
# Ensure the 'version' field is preserved from the GET request
try:
response = requests.put(url, headers=headers, json=flow_definition)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 409:
raise Exception("Conflict: Version mismatch. Fetch the latest flow again and retry.")
elif response.status_code == 400:
raise Exception(f"Bad Request: Invalid flow definition. {e.response.text}")
else:
raise Exception(f"Error publishing flow: {e.response.text}")
Complete Working Example
This script combines authentication, retrieval, modification, and publication. Replace the placeholder credentials and Flow ID with your actual values.
import requests
import json
import time
import sys
# --- Configuration ---
CLIENT_ID = "your_client_id_here"
CLIENT_SECRET = "your_client_secret_here"
ORG_DOMAIN = "your_org_domain_here"
FLOW_ID = "your_flow_id_here"
TARGET_ACTION_NAME = "MySlowDataAction" # The exact name of the Data Action step in the Flow
NEW_TIMEOUT = 10 # Desired timeout in seconds
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, org_domain: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = f"https://{org_domain}.mypurecloud.com"
self.token_url = f"{self.base_url}/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry:
return self.access_token
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + (token_data.get("expires_in", 3600) - 60)
return self.access_token
except requests.exceptions.RequestException as e:
raise Exception(f"Auth failed: {str(e)}")
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
class FlowManager:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = auth.base_url
def get_flow(self, flow_id: str) -> dict:
url = f"{self.base_url}/api/v2/flows/{flow_id}"
headers = self.auth.get_headers()
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def update_data_action_timeout(self, flow_def: dict, action_name: str, timeout: int) -> dict:
if "steps" not in flow_def:
raise ValueError("Flow has no steps")
found = False
for step in flow_def["steps"]:
if step.get("type") == "data-action" and step.get("name") == action_name:
if "settings" not in step:
step["settings"] = {}
step["settings"]["timeout"] = timeout
found = True
break
if not found:
raise ValueError(f"Action '{action_name}' not found")
return flow_def
def publish_flow(self, flow_id: str, flow_def: dict) -> dict:
url = f"{self.base_url}/api/v2/flows/{flow_id}/publish"
headers = self.auth.get_headers()
# Remove the 'version' from the body if it causes conflicts,
# though typically PUT /publish accepts the version from the GET
# Some API versions require sending the ETag header instead.
# For simplicity, we send the JSON body.
response = requests.put(url, headers=headers, json=flow_def)
if response.status_code == 409:
print("Version conflict detected. The flow was modified by another user.")
sys.exit(1)
response.raise_for_status()
return response.json()
def main():
try:
# 1. Authenticate
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_DOMAIN)
manager = FlowManager(auth)
print(f"Fetching Flow {FLOW_ID}...")
flow = manager.get_flow(FLOW_ID)
print(f"Current timeout for '{TARGET_ACTION_NAME}' will be updated to {NEW_TIMEOUT}s")
# 2. Modify
updated_flow = manager.update_data_action_timeout(flow, TARGET_ACTION_NAME, NEW_TIMEOUT)
# 3. Publish
print("Publishing updated flow...")
result = manager.publish_flow(FLOW_ID, updated_flow)
print("Success! Flow published with new timeout.")
print(f"Flow Version: {result.get('version')}")
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth client used in the script does not have the admin:flow scope.
Fix: Go to the Genesys Cloud Admin Console > Organization > OAuth Clients. Select your client and ensure admin:flow is checked in the Scopes section. Regenerate the token.
Error: 409 Conflict
Cause: You retrieved the Flow, modified it locally, and attempted to publish it, but another user or process published a new version of the Flow in the meantime. The version number in your local JSON no longer matches the server’s current version.
Fix: Implement a retry loop. If a 409 is received, fetch the Flow again (GET /api/v2/flows/{id}), re-apply your timeout modification to the new JSON, and try publishing again.
# Retry Logic Example
def publish_with_retry(self, flow_id: str, flow_def: dict, max_retries: int = 3) -> dict:
for attempt in range(max_retries):
url = f"{self.base_url}/api/v2/flows/{flow_id}/publish"
headers = self.auth.get_headers()
response = requests.put(url, headers=headers, json=flow_def)
if response.status_code != 409:
response.raise_for_status()
return response.json()
# If conflict, fetch fresh data and re-apply changes
print(f"Version conflict on attempt {attempt + 1}. Re-fetching...")
fresh_flow = self.get_flow(flow_id)
flow_def = self.update_data_action_timeout(fresh_flow, TARGET_ACTION_NAME, NEW_TIMEOUT)
raise Exception("Max retries exceeded for Flow publish.")
Error: Data Action Not Found
Cause: The name property of the step in the JSON does not match the TARGET_ACTION_NAME variable.
Fix: Print the flow["steps"] array to inspect the actual names. Note that the “Name” seen in the Flow Builder UI might differ from the internal name property if the step was renamed or imported. Use the name field from the JSON response.
Error: Timeout Value Rejected
Cause: The timeout value set is outside the allowed range (e.g., > 30 seconds for Data Actions).
Fix: Check your Genesys Cloud contract limits. If you require timeouts longer than 30 seconds, a Data Action is the wrong tool. You should use a Script step (which supports longer execution times) or an External API step with appropriate configuration.