Launching a Genesys Cloud Architect Flow from an External App
What You Will Build
- This tutorial demonstrates how to programmatically trigger an Architect flow execution using the Genesys Cloud REST API.
- It uses the
POST /api/v2/flows/executionsendpoint to inject data into a running flow. - The implementation is provided in Python using the
requestslibrary and the officialgenesys-cloud-sdkfor context.
Prerequisites
- OAuth Client Type: A Genesys Cloud OAuth Client Application with the
publicorconfidentialgrant type. - Required Scopes: The client must possess the
flow:execution:writescope. Read-only scopes will result in a 403 Forbidden error. - SDK Version: Genesys Cloud Python SDK v2.3.0+ (though this tutorial primarily uses raw HTTP for transparency, the SDK concepts apply).
- Language/Runtime: Python 3.8+.
- External Dependencies:
requests: For HTTP communication.python-dotenv: For secure credential management (optional but recommended).pydantic: For payload validation (optional).
Authentication Setup
Genesys Cloud APIs are secured via OAuth 2.0. You must exchange your client credentials for an access token before making any API calls. The token expires in 3600 seconds (1 hour), so production systems must implement token caching and refresh logic.
The following code establishes a secure authentication handler. It uses the client_credentials grant flow, which is standard for server-to-server integrations.
import requests
import time
import os
from typing import Optional, Dict
class GenesysAuthenticator:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
# Map region to the correct Auth server base URL
region_map = {
"us-east-1": "https://api.mypurecloud.com",
"us-east-2": "https://api.mypurecloud.com",
"us-west-2": "https://api.mypurecloud.com",
"eu-west-1": "https://api.eu.pure.cloud.com",
"ap-southeast-2": "https://api.au.pure.cloud.com",
}
self.auth_url = f"{region_map.get(region, 'https://api.mypurecloud.com')}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
def get_token(self) -> str:
"""
Retrieves an OAuth access token.
Returns a cached token if valid, otherwise fetches a new one.
"""
# If we have a token and it is not expired, return it
if self.access_token and time.time() < self.token_expiry:
return self.access_token
# Prepare payload for client_credentials grant
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
try:
response = requests.post(self.auth_url, data=payload, headers=headers)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Authentication failed: Invalid Client ID or Secret.") from e
raise Exception(f"Failed to obtain token: {response.text}") from e
data = response.json()
self.access_token = data["access_token"]
# Cache token with a 5-minute buffer to prevent edge-case expiration during use
self.token_expiry = time.time() + (data["expires_in"] - 300)
return self.access_token
Implementation
Step 1: Constructing the Execution Payload
To launch a flow, you must send a JSON body to the execution endpoint. The structure of this body depends entirely on how the Architect flow was designed. Specifically, the flow must contain a Start Event that is configured to accept external triggers.
There are two common patterns for external triggers:
- Generic Start Event: The flow listens for any execution start. You can pass arbitrary data.
- Specific Start Event (e.g., HTTP Request, Web Chat): The flow expects specific fields defined in the “Data” tab of the Start Event configuration.
If your flow uses a generic Start Event, the payload is flexible. If it uses a specific Start Event, the keys in your JSON must match the variable names defined in Architect.
Here is a helper function to structure the payload. Note that executionContext is optional but useful for debugging.
from typing import Any, Dict, List, Optional
def build_execution_payload(
flow_id: str,
data: Dict[str, Any],
execution_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Constructs the JSON payload for POST /api/v2/flows/executions.
Args:
flow_id: The UUID of the Architect flow to trigger.
data: A dictionary of key-value pairs. These keys must match the
variables defined in the Architect Start Event.
execution_context: Optional metadata to help identify the source of the trigger.
Returns:
A dictionary ready to be JSON-serialized and sent to the API.
"""
payload: Dict[str, Any] = {
"flowId": flow_id,
"data": data
}
# Add execution context if provided. This is often used to pass
# external IDs (e.g., CRM Case ID) for traceability.
if execution_context:
payload["executionContext"] = execution_context
return payload
Step 2: Executing the Flow
This step performs the actual HTTP POST request. It requires the flow:execution:write scope. The endpoint does not return the result of the flow execution immediately; it returns an executionId. This ID is crucial for subsequent polling or webhook tracking.
class GenesysFlowExecutor:
def __init__(self, authenticator: GenesysAuthenticator, region: str = "us-east-1"):
self.authenticator = authenticator
region_map = {
"us-east-1": "https://api.mypurecloud.com",
"us-east-2": "https://api.mypurecloud.com",
"us-west-2": "https://api.mypurecloud.com",
"eu-west-1": "https://api.eu.pure.cloud.com",
"ap-southeast-2": "https://api.au.pure.cloud.com",
}
self.base_url = f"{region_map.get(region, 'https://api.mypurecloud.com')}/api/v2"
def trigger_flow(
self,
flow_id: str,
input_data: Dict[str, Any],
execution_context: Optional[Dict[str, Any]] = None
) -> str:
"""
Triggers an Architect flow execution.
Args:
flow_id: The UUID of the flow.
input_data: Data to inject into the flow's Start Event.
execution_context: Optional metadata.
Returns:
The executionId string.
Raises:
Exception: If the API call fails.
"""
# 1. Get a valid token
token = self.authenticator.get_token()
# 2. Build the payload
payload = build_execution_payload(flow_id, input_data, execution_context)
# 3. Set up headers
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
# 4. Define the endpoint
url = f"{self.base_url}/flows/executions"
try:
response = requests.post(url, json=payload, headers=headers)
# 5. Handle HTTP Errors
if response.status_code == 401:
raise Exception("Unauthorized: Token is invalid or expired.")
elif response.status_code == 403:
raise Exception("Forbidden: Client lacks 'flow:execution:write' scope.")
elif response.status_code == 404:
raise Exception(f"Flow not found: {flow_id}")
elif response.status_code == 429:
raise Exception("Rate Limit Exceeded. Back off and retry.")
elif response.status_code >= 500:
raise Exception(f"Server Error: {response.status_code}")
elif response.status_code != 201:
raise Exception(f"Unexpected status: {response.status_code} - {response.text}")
# 6. Parse Response
result = response.json()
return result["executionId"]
except requests.exceptions.RequestException as e:
raise Exception(f"Network error during flow execution: {str(e)}") from e
Step 3: Validating the Execution (Optional but Recommended)
The POST request is asynchronous. A 201 Created response means the request was accepted, not that the flow succeeded. To verify the flow ran correctly, you can poll the GET /api/v2/flows/executions/{executionId} endpoint. This requires the flow:execution:read scope.
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""
Polls the status of a flow execution.
Args:
execution_id: The ID returned from trigger_flow.
Returns:
The full execution object including status and data.
"""
token = self.authenticator.get_token()
url = f"{self.base_url}/flows/executions/{execution_id}"
headers = {
"Authorization": f"Bearer {token}"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
Complete Working Example
The following script combines authentication, payload construction, and execution into a single runnable module. Replace the placeholder credentials with your actual Genesys Cloud OAuth Client details.
import os
import json
import time
from typing import Dict, Any
# --- Imports from previous sections ---
# In a real project, these would be in separate modules or imported from a package.
class GenesysAuthenticator:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
region_map = {
"us-east-1": "https://api.mypurecloud.com",
"us-east-2": "https://api.mypurecloud.com",
"us-west-2": "https://api.mypurecloud.com",
"eu-west-1": "https://api.eu.pure.cloud.com",
"ap-southeast-2": "https://api.au.pure.cloud.com",
}
self.auth_url = f"{region_map.get(region, 'https://api.mypurecloud.com')}/oauth/token"
self.access_token: str | None = None
self.token_expiry: float = 0
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry:
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
response = requests.post(self.auth_url, data=payload, headers=headers)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code == 401:
raise Exception("Authentication failed: Invalid Client ID or Secret.") from e
raise Exception(f"Failed to obtain token: {response.text}") from e
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + (data["expires_in"] - 300)
return self.access_token
def build_execution_payload(flow_id: str, data: Dict[str, Any], execution_context: Dict[str, Any] | None = None) -> Dict[str, Any]:
payload = {"flowId": flow_id, "data": data}
if execution_context:
payload["executionContext"] = execution_context
return payload
class GenesysFlowExecutor:
def __init__(self, authenticator: GenesysAuthenticator, region: str = "us-east-1"):
self.authenticator = authenticator
region_map = {
"us-east-1": "https://api.mypurecloud.com",
"us-east-2": "https://api.mypurecloud.com",
"us-west-2": "https://api.mypurecloud.com",
"eu-west-1": "https://api.eu.pure.cloud.com",
"ap-southeast-2": "https://api.au.pure.cloud.com",
}
self.base_url = f"{region_map.get(region, 'https://api.mypurecloud.com')}/api/v2"
def trigger_flow(self, flow_id: str, input_data: Dict[str, Any], execution_context: Dict[str, Any] | None = None) -> str:
token = self.authenticator.get_token()
payload = build_execution_payload(flow_id, input_data, execution_context)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
url = f"{self.base_url}/flows/executions"
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 403:
raise Exception("Forbidden: Check if 'flow:execution:write' scope is granted.")
response.raise_for_status()
result = response.json()
return result["executionId"]
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {response.status_code} - {response.text}")
raise e
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
token = self.authenticator.get_token()
url = f"{self.base_url}/flows/executions/{execution_id}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
# --- Main Execution Block ---
if __name__ == "__main__":
# Configuration
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "YOUR_CLIENT_ID_HERE")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "YOUR_CLIENT_SECRET_HERE")
REGION = "us-east-1"
FLOW_ID = os.getenv("GENESYS_FLOW_ID", "YOUR_FLOW_UUID_HERE")
# Initialize Components
auth = GenesysAuthenticator(CLIENT_ID, CLIENT_SECRET, REGION)
executor = GenesysFlowExecutor(auth, REGION)
# Define Input Data
# These keys MUST match the variables defined in your Architect Flow's Start Event
flow_input = {
"customerName": "Jane Doe",
"orderId": "ORD-998877",
"priority": "high",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
# Define Execution Context (Optional)
ctx = {
"source": "external-crm-sync",
"requestId": "req-12345"
}
try:
print("Triggering flow...")
exec_id = executor.trigger_flow(FLOW_ID, flow_input, ctx)
print(f"Flow triggered successfully. Execution ID: {exec_id}")
# Optional: Wait and check status
print("Waiting 2 seconds to check status...")
time.sleep(2)
status = executor.get_execution_status(exec_id)
print(f"Current Status: {status.get('status', 'Unknown')}")
print(f"Flow ID: {status.get('flowId', 'Unknown')}")
except Exception as e:
print(f"Error: {str(e)}")
Common Errors & Debugging
Error: 403 Forbidden
What causes it:
The OAuth client used to generate the token does not have the flow:execution:write scope. This is the most common error for new integrations.
How to fix it:
- Log into the Genesys Cloud Admin Console.
- Navigate to Admin > Platform > OAuth Client Applications.
- Select your client application.
- Click Scopes.
- Search for
flow:execution:writeand check the box. - Save the changes.
- Generate a new access token (old tokens do not inherit new scopes).
Code showing the fix:
No code change is required. The fix is administrative. Ensure your GenesysAuthenticator is re-initialized or the token is refreshed after the scope change.
Error: 400 Bad Request - “Flow start event data mismatch”
What causes it:
The JSON keys in your data payload do not match the variable names defined in the Architect Flow’s Start Event. Genesys Cloud is strict about this. If the Start Event expects a variable named customerId and you send customer_id, the execution may fail or the variable will be null.
How to fix it:
- Open the Architect Flow in the Genesys Cloud UI.
- Select the Start Event.
- Look at the Data tab.
- Verify the exact spelling and case of each variable name.
- Update your
flow_inputdictionary in the code to match exactly.
Error: 429 Too Many Requests
What causes it:
You have exceeded the rate limit for the POST /api/v2/flows/executions endpoint. Genesys Cloud enforces rate limits per client application and per organization.
How to fix it:
Implement exponential backoff. Do not retry immediately.
Code showing the fix:
import time
def trigger_flow_with_retry(executor, flow_id, input_data, max_retries=3):
for attempt in range(max_retries):
try:
return executor.trigger_flow(flow_id, input_data)
except Exception as e:
if "429" in str(e) or "Rate Limit" in str(e):
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise e
raise Exception("Max retries exceeded for flow execution.")