POST /api/v2/flows/executions — launching an Architect flow programmatically from an external app
What You Will Build
- This tutorial demonstrates how to trigger a Genesys Cloud Architect flow from an external application using the
POST /api/v2/flows/executionsendpoint. - The implementation uses the Genesys Cloud Python SDK (
genesyscloud) to handle authentication, payload construction, and execution launch. - The code is written in Python 3.9+ and requires no UI interaction, relying entirely on API calls.
Prerequisites
- OAuth Client Type: A Confidential Client (Client Credentials Grant) or Authorization Code Grant. For server-to-server integrations, Client Credentials is standard.
- Required Scopes:
flow:execution:read(to verify flow status if needed)flow:execution:write(required to launch the execution)flow:read(optional, to validate flow ID existence beforehand)
- SDK Version:
genesyscloud>= 2.0.0 (Python). - Runtime: Python 3.9 or higher.
- Dependencies:
genesyscloudpython-dotenv(for secure credential management)
Install the dependencies:
pip install genesyscloud python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. The Python SDK handles token acquisition, caching, and refresh automatically when configured correctly. You must store your client_id and client_secret securely. Never hardcode these values.
Create a .env file in your project root:
GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your_client_id_here
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret_here
Initialize the SDK client in your code:
import os
from dotenv import load_dotenv
from genesyscloud.platform.client import PlatformClient
from genesyscloud.platform.auth import OAuthClientCredentials
from genesyscloud.rest import Configuration
# Load environment variables
load_dotenv()
def get_genesys_client():
"""
Initializes and returns a configured Genesys Cloud PlatformClient.
"""
region = os.getenv("GENESYS_CLOUD_REGION")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([region, client_id, client_secret]):
raise ValueError("Missing required environment variables for Genesys Cloud authentication.")
# Configure the OAuth provider
oauth = OAuthClientCredentials(
client_id=client_id,
client_secret=client_secret,
region=region
)
# Initialize the platform client
client = PlatformClient(oauth=oauth)
return client
The PlatformClient will automatically fetch an access token on the first API call and refresh it when it expires. This removes the need for manual token management in your business logic.
Implementation
Step 1: Construct the Execution Request Payload
The POST /api/v2/flows/executions endpoint requires a JSON body containing the flowId and an optional inputs object. The inputs object maps variable names defined in your Architect flow to values provided by your external app.
If your Architect flow defines an input variable named customerEmail (type: String) and orderAmount (type: Number), your payload must match these keys exactly. Case sensitivity applies.
from genesyscloud.flows.models import FlowExecutionRequest
def create_execution_request(flow_id: str, inputs: dict) -> FlowExecutionRequest:
"""
Constructs a FlowExecutionRequest object for the SDK.
Args:
flow_id: The UUID of the Architect flow to execute.
inputs: A dictionary of key-value pairs matching the flow's input variables.
Returns:
FlowExecutionRequest: The request object ready for API submission.
"""
# The SDK model handles serialization to JSON.
# 'inputs' can be a dict or a custom InputSet object.
# Using a dict is simpler for dynamic inputs.
request_body = FlowExecutionRequest(
flow_id=flow_id,
inputs=inputs
)
return request_body
Critical Note on Data Types:
Genesys Cloud Architect is strongly typed. If your flow expects a Number for orderAmount, sending the string "100.00" may cause the flow to fail at the start node or result in unexpected behavior in downstream steps. Always ensure Python types match Architect types:
- Python
int/float→ ArchitectNumber - Python
str→ ArchitectString - Python
bool→ ArchitectBoolean - Python
list/dict→ ArchitectArray/Object
Step 2: Launch the Execution
The execution launch is asynchronous. The API returns immediately with an executionId. It does not wait for the flow to complete.
from genesyscloud.flows.api import FlowsApi
from genesyscloud.rest import ApiException
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def launch_flow_execution(client: PlatformClient, flow_id: str, inputs: dict) -> str:
"""
Launches an Architect flow execution.
Args:
client: The initialized PlatformClient.
flow_id: The UUID of the flow.
inputs: Dictionary of input variables.
Returns:
execution_id: The UUID of the newly created execution.
Raises:
ApiException: If the API request fails.
"""
flows_api = FlowsApi(client)
# Construct the request
request = create_execution_request(flow_id, inputs)
try:
# The API call
response = flows_api.post_flow_executions(body=request)
# The response body is a FlowExecution object
execution_id = response.id
logger.info(f"Flow execution launched successfully. Execution ID: {execution_id}")
return execution_id
except ApiException as e:
logger.error(f"Failed to launch flow execution: {e.status} - {e.reason}")
raise
Step 3: Handling Asynchronous Results and Polling
Because the execution is asynchronous, you must decide how to handle the result. There are two common patterns:
- Fire-and-Forget: You launch the flow and assume it works. You monitor failures via Genesys Cloud analytics or logs later.
- Polling: You poll the
GET /api/v2/flows/executions/{executionId}endpoint until the status iscompleted,failed, orcanceled.
For robust integrations, polling is recommended. The execution object contains a status field and a result field (if the flow produces an output).
import time
from genesyscloud.flows.models import FlowExecution
def poll_execution_status(client: PlatformClient, execution_id: str, timeout_seconds: int = 300, poll_interval: int = 5) -> FlowExecution:
"""
Polls the status of a flow execution until completion or timeout.
Args:
client: The initialized PlatformClient.
execution_id: The UUID of the execution to poll.
timeout_seconds: Maximum time to wait in seconds.
poll_interval: Seconds between each poll request.
Returns:
FlowExecution: The final state of the execution.
Raises:
TimeoutError: If the execution does not complete within the timeout.
ApiException: If the API request fails.
"""
flows_api = FlowsApi(client)
start_time = time.time()
while time.time() - start_time < timeout_seconds:
try:
# Fetch the current execution state
response = flows_api.get_flow_execution_by_id(execution_id=execution_id)
status = response.status
logger.debug(f"Execution {execution_id} status: {status}")
# Check if the execution has reached a terminal state
if status in ["completed", "failed", "canceled"]:
logger.info(f"Execution {execution_id} reached terminal state: {status}")
return response
# Wait before polling again
time.sleep(poll_interval)
except ApiException as e:
logger.error(f"Error polling execution status: {e}")
raise
raise TimeoutError(f"Execution {execution_id} did not complete within {timeout_seconds} seconds.")
Rate Limiting Consideration:
Polling too frequently can trigger 429 Too Many Requests errors. Genesys Cloud APIs enforce rate limits per client ID and per endpoint. A 5-second interval is generally safe for moderate volumes. If you are launching thousands of flows concurrently, implement exponential backoff on 429 responses.
Step 4: Extracting Output Variables
When a flow completes successfully, the result field in the FlowExecution object contains the output variables defined in the Architect flow. These outputs are mapped from the flow’s End node or any output variables explicitly defined.
def get_flow_outputs(execution: FlowExecution) -> dict:
"""
Extracts output variables from a completed flow execution.
Args:
execution: The FlowExecution object.
Returns:
dict: A dictionary of output variable names and their values.
Returns an empty dict if no outputs are available.
"""
if execution.status == "completed" and execution.result:
# The result is often a dictionary-like structure in the SDK
# depending on the SDK version. In newer versions, it may be
# a custom object or dict.
return execution.result
return {}
Complete Working Example
This script combines all steps into a single runnable module. It loads credentials, launches a flow, polls for completion, and prints the results.
#!/usr/bin/env python3
"""
Genesys Cloud Flow Execution Launcher
Launches an Architect flow and waits for completion.
"""
import os
import time
import logging
from dotenv import load_dotenv
from genesyscloud.platform.client import PlatformClient
from genesyscloud.platform.auth import OAuthClientCredentials
from genesyscloud.flows.api import FlowsApi
from genesyscloud.flows.models import FlowExecutionRequest
from genesyscloud.rest import ApiException
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def init_client():
"""Initialize the Genesys Cloud client."""
load_dotenv()
region = os.getenv("GENESYS_CLOUD_REGION")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([region, client_id, client_secret]):
raise ValueError("Missing environment variables: GENESYS_CLOUD_REGION, GENESYS_CLOUD_CLIENT_ID, GENESYS_CLOUD_CLIENT_SECRET")
oauth = OAuthClientCredentials(
client_id=client_id,
client_secret=client_secret,
region=region
)
return PlatformClient(oauth=oauth)
def launch_and_wait(client: PlatformClient, flow_id: str, inputs: dict, timeout: int = 120):
"""
Launches a flow and polls until completion.
"""
flows_api = FlowsApi(client)
# 1. Create Request
request = FlowExecutionRequest(flow_id=flow_id, inputs=inputs)
logger.info(f"Launching flow {flow_id} with inputs: {inputs}")
try:
# 2. Launch Execution
response = flows_api.post_flow_executions(body=request)
execution_id = response.id
logger.info(f"Execution started. ID: {execution_id}")
except ApiException as e:
logger.error(f"Failed to launch flow: {e.status} - {e.body}")
return None
# 3. Poll for Status
start_time = time.time()
poll_interval = 5
while time.time() - start_time < timeout:
try:
exec_response = flows_api.get_flow_execution_by_id(execution_id=execution_id)
status = exec_response.status
logger.debug(f"Current status: {status}")
if status in ["completed", "failed", "canceled"]:
logger.info(f"Execution finished with status: {status}")
if status == "completed" and exec_response.result:
logger.info(f"Flow Outputs: {exec_response.result}")
elif status == "failed":
logger.warning(f"Flow failed. Check Genesys Cloud logs for details.")
return exec_response
time.sleep(poll_interval)
except ApiException as e:
logger.error(f"Error polling status: {e}")
return None
logger.error(f"Timeout waiting for execution {execution_id}")
return None
def main():
# Replace with your actual Flow ID
FLOW_ID = "your-flow-uuid-here"
# Define inputs matching your Architect flow variables
# Example: Flow expects 'customerName' (String) and 'priority' (Number)
inputs = {
"customerName": "John Doe",
"priority": 1
}
try:
client = init_client()
result = launch_and_wait(client, FLOW_ID, inputs)
if result and result.status == "completed":
print("Success: Flow completed.")
print(f"Outputs: {result.result}")
else:
print("Failure: Flow did not complete successfully.")
except Exception as e:
logger.error(f"Critical error: {e}")
raise
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token is invalid, expired, or the client credentials are incorrect.
Fix:
- Verify
GENESYS_CLOUD_CLIENT_IDandGENESYS_CLOUD_CLIENT_SECRETin your.envfile. - Ensure the client is active in the Genesys Cloud Admin Console.
- Check that the OAuth grant type matches your client configuration (e.g., Client Credentials).
Error: 403 Forbidden
Cause: The OAuth token lacks the required scopes.
Fix:
- In the Admin Console, navigate to Admin > Security > OAuth clients.
- Select your client.
- Ensure
flow:execution:writeis added to the scopes. - Important: You must regenerate the client secret or re-authorize the client for scope changes to take effect in some grant types. For Client Credentials, the new scope is effective immediately for new tokens, but the SDK may cache the old token. Force a refresh by restarting the application or clearing the token cache.
Error: 404 Not Found
Cause: The flowId provided does not exist, or the client does not have permission to access that specific flow.
Fix:
- Verify the Flow ID in the Architect UI. Copy it from the URL or the flow properties.
- Ensure the flow is published. Draft flows cannot be executed via API.
- Check organization permissions. If using a multi-org setup, ensure the client belongs to the correct organization.
Error: 400 Bad Request
Cause: The input payload is invalid. Common reasons:
- Missing required input variables.
- Incorrect data types (e.g., sending a string where a number is expected).
- Input variable names do not match the flow definition exactly (case-sensitive).
Fix:
- Review the Architect flow definition. Check the Inputs tab of the flow properties.
- Validate that your Python dictionary keys match the variable names.
- Validate data types. Use
type()in Python to confirm.
# Example of correct type mapping
inputs = {
"count": 10, # int -> Number
"name": "Test", # str -> String
"is_active": True # bool -> Boolean
}
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the /api/v2/flows/executions endpoint.
Fix:
- Implement exponential backoff in your polling or launch logic.
- Reduce the frequency of concurrent launches.
- Check the
Retry-Afterheader in the response to determine how long to wait.
import time
def handle_rate_limit(response):
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 10))
logger.warning(f"Rate limited. Waiting {retry_after} seconds.")
time.sleep(retry_after)
return True
return False