Reuse Shared Flows in Genesys Cloud CX via the REST API
What You Will Build
- You will create a Python script that programmatically creates a shared Flow and references it from multiple distinct inbound Call Flows using the Genesys Cloud CX API.
- This tutorial uses the Genesys Cloud CX Platform API v2 (
/api/v2/flowsand/api/v2/flows/versions). - The programming language covered is Python 3.9+ using the
requestslibrary.
Prerequisites
- OAuth Client Type: Service Account or User-to-User OAuth client.
- Required Scopes:
flow:read,flow:write,routing:flow:read,routing:flow:write. - API Version: Genesys Cloud CX Platform API v2.
- Language/Runtime: Python 3.9 or higher.
- External Dependencies:
requests: For HTTP communication.python-dotenv: For managing environment variables securely.
Install dependencies:
pip install requests python-dotenv
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 for authentication. For server-side scripts, the Client Credentials flow is the standard approach. You must generate a bearer token with the required scopes before making any API calls.
Create a file named .env in your project root:
GENESYS_REGION=us-east-1
GENESYS_CLIENT_ID=your_client_id_here
GENESYS_CLIENT_SECRET=your_client_secret_here
Create a helper script auth.py to handle token retrieval and caching:
import os
import time
import requests
from dotenv import load_dotenv
load_dotenv()
def get_auth_url(region: str) -> str:
"""Returns the OAuth token endpoint based on the region."""
if region == "us-east-1":
return "https://api.mypurecloud.com"
elif region == "eu-west-1":
return "https://api.eu.mypurecloud.com"
elif region == "ap-southeast-1":
return "https://api.ap.mypurecloud.com"
else:
raise ValueError(f"Unsupported region: {region}")
def get_access_token(client_id: str, client_secret: str, region: str) -> str:
"""
Retrieves an OAuth2 access token using Client Credentials flow.
Implements basic caching logic to avoid unnecessary token requests.
"""
# Check for cached token
cache_file = ".token_cache"
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
data = f.read().split("|")
if len(data) == 2:
token, expiry_str = data
expiry = int(expiry_str)
if time.time() < expiry:
return token
# Request new token
auth_url = f"{get_auth_url(region)}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "flow:read flow:write routing:flow:read routing:flow:write"
}
response = requests.post(auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]
# Cache the token (subtract 60 seconds for safety margin)
expiry_time = time.time() + (expires_in - 60)
with open(cache_file, "w") as f:
f.write(f"{access_token}|{expiry_time}")
return access_token
# Configuration
REGION = os.getenv("GENESYS_REGION", "us-east-1")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
ACCESS_TOKEN = get_access_token(CLIENT_ID, CLIENT_SECRET, REGION)
API_BASE_URL = f"https://{get_auth_url(REGION)}"
HEADERS = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json"
}
Implementation
Step 1: Create the Shared Flow
A “Shared Flow” in Genesys Cloud is a standard Call Flow that is designed to be invoked by other flows. It must have an entry point defined by a Start node and typically ends with a Transfer or Hangup node. To make it reusable, you often use the Transfer node to return control to the parent flow or execute specific logic.
We will create a simple shared flow that plays a greeting.
import json
def create_shared_flow(flow_name: str, flow_description: str) -> dict:
"""
Creates a new Call Flow in Genesys Cloud.
Returns the flow object containing the ID and initial version.
"""
url = f"{API_BASE_URL}/api/v2/flows"
# Define the flow structure
# Note: The 'type' must be 'CALL' for inbound call flows
flow_payload = {
"name": flow_name,
"description": flow_description,
"type": "CALL",
"enabled": False, # Start disabled to allow editing
"outboundCall": {
"enabled": False
},
"nodes": {
"Start": {
"id": "Start",
"type": "Start",
"properties": {
"name": "Start"
},
"edges": {
"default": {
"targetId": "PlayGreeting",
"conditionType": "default"
}
}
},
"PlayGreeting": {
"id": "PlayGreeting",
"type": "Play",
"properties": {
"name": "Play Shared Greeting",
"media": {
"mediaType": "audio",
"mediaId": None, # Replace with a valid Media ID if you have one
"mediaText": "Hello, this is a shared greeting from a reusable flow."
},
"playType": "text",
"playTts": True
},
"edges": {
"default": {
"targetId": "EndFlow",
"conditionType": "default"
}
}
},
"EndFlow": {
"id": "EndFlow",
"type": "Hangup",
"properties": {
"name": "End Shared Flow",
"hangupType": "normal"
}
}
},
"startNodeId": "Start"
}
response = requests.post(url, headers=HEADERS, json=flow_payload)
response.raise_for_status()
flow_data = response.json()
print(f"Created Shared Flow ID: {flow_data['id']}")
return flow_data
Key Technical Details:
- The
typefield is set to"CALL". - The
nodesobject defines the visual logic. Each node has a uniqueid. - The
edgesobject defines the connections between nodes.conditionType: "default"is used for unconditional paths. - The
startNodeIdmust match the ID of theStartnode.
Step 2: Create the Parent Inbound Call Flows
Now we create two distinct inbound call flows that will invoke the shared flow. To invoke another flow, you use the Transfer node with transferType: "flow". This creates a sub-flow invocation.
def create_parent_flow(flow_name: str, shared_flow_id: str, queue_id: str) -> dict:
"""
Creates an inbound Call Flow that transfers to a shared flow.
"""
url = f"{API_BASE_URL}/api/v2/flows"
flow_payload = {
"name": flow_name,
"description": f"Parent flow invoking shared flow {shared_flow_id}",
"type": "CALL",
"enabled": False,
"outboundCall": {
"enabled": False
},
"nodes": {
"Start": {
"id": "Start",
"type": "Start",
"properties": {
"name": "Start Parent Flow"
},
"edges": {
"default": {
"targetId": "InvokeShared",
"conditionType": "default"
}
}
},
"InvokeShared": {
"id": "InvokeShared",
"type": "Transfer",
"properties": {
"name": "Invoke Shared Greeting",
"transferType": "flow", # Critical: This makes it a sub-flow invocation
"flowId": shared_flow_id, # Reference the shared flow ID
"flowVersion": None # Optional: Specify version if needed
},
"edges": {
"default": {
"targetId": "QueueAgent",
"conditionType": "default"
},
"error": {
"targetId": "ErrorHandling",
"conditionType": "error"
}
}
},
"QueueAgent": {
"id": "QueueAgent",
"type": "Queue",
"properties": {
"name": "Queue to Agent",
"queueId": queue_id, # Replace with a valid Queue ID
"queueLabel": "General Support",
"timeout": 60000
},
"edges": {
"default": {
"targetId": "EndFlow",
"conditionType": "default"
}
}
},
"ErrorHandling": {
"id": "ErrorHandling",
"type": "Hangup",
"properties": {
"name": "Handle Error",
"hangupType": "normal"
}
},
"EndFlow": {
"id": "EndFlow",
"type": "Hangup",
"properties": {
"name": "End Parent Flow",
"hangupType": "normal"
}
}
},
"startNodeId": "Start"
}
response = requests.post(url, headers=HEADERS, json=flow_payload)
response.raise_for_status()
flow_data = response.json()
print(f"Created Parent Flow ID: {flow_data['id']}")
return flow_data
Key Technical Details:
- The
Transfernode usestransferType: "flow". This is the mechanism for module reuse. - The
flowIdproperty in theInvokeSharednode points to the ID of the flow created in Step 1. - If the shared flow completes successfully, execution continues to the
defaultedge of theTransfernode (in this case,QueueAgent). - If the shared flow fails or throws an error, execution follows the
erroredge.
Step 3: Verify the Flow Structure
After creating the flows, it is good practice to retrieve them and verify the structure, especially the flowId reference in the parent flows.
def get_flow_details(flow_id: str) -> dict:
"""
Retrieves the detailed structure of a specific flow.
"""
url = f"{API_BASE_URL}/api/v2/flows/{flow_id}"
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.json()
def verify_integration(shared_flow_id: str, parent_flow_ids: list[str]):
"""
Verifies that the parent flows correctly reference the shared flow.
"""
print(f"\nVerifying Shared Flow: {shared_flow_id}")
shared_flow = get_flow_details(shared_flow_id)
print(f" Name: {shared_flow['name']}")
print(f" Version: {shared_flow['versions'][0]['versionNumber']}")
for pf_id in parent_flow_ids:
print(f"\nVerifying Parent Flow: {pf_id}")
parent_flow = get_flow_details(pf_id)
# Check the InvokeShared node
invoke_node = parent_flow['nodes'].get('InvokeShared')
if invoke_node:
transfer_type = invoke_node['properties'].get('transferType')
referenced_flow_id = invoke_node['properties'].get('flowId')
if transfer_type == "flow" and referenced_flow_id == shared_flow_id:
print(f" [OK] Correctly references Shared Flow {shared_flow_id}")
else:
print(f" [ERROR] Incorrect reference. Found: {referenced_flow_id}")
else:
print(f" [ERROR] InvokeShared node not found")
Complete Working Example
Combine the above functions into a single executable script main.py.
import os
import sys
import time
from dotenv import load_dotenv
# Import authentication helper
# Ensure auth.py is in the same directory and contains the code from "Authentication Setup"
from auth import ACCESS_TOKEN, API_BASE_URL, HEADERS, REGION
# Import flow creation functions
# Ensure the functions from Steps 1, 2, and 3 are included here or imported
def create_shared_flow(flow_name: str, flow_description: str) -> dict:
url = f"{API_BASE_URL}/api/v2/flows"
flow_payload = {
"name": flow_name,
"description": flow_description,
"type": "CALL",
"enabled": False,
"outboundCall": {"enabled": False},
"nodes": {
"Start": {
"id": "Start",
"type": "Start",
"properties": {"name": "Start"},
"edges": {"default": {"targetId": "PlayGreeting", "conditionType": "default"}}
},
"PlayGreeting": {
"id": "PlayGreeting",
"type": "Play",
"properties": {
"name": "Play Shared Greeting",
"media": {"mediaType": "audio", "mediaId": None, "mediaText": "Hello, this is a shared greeting."},
"playType": "text",
"playTts": True
},
"edges": {"default": {"targetId": "EndFlow", "conditionType": "default"}}
},
"EndFlow": {
"id": "EndFlow",
"type": "Hangup",
"properties": {"name": "End Shared Flow", "hangupType": "normal"}
}
},
"startNodeId": "Start"
}
response = requests.post(url, headers=HEADERS, json=flow_payload)
response.raise_for_status()
return response.json()
def create_parent_flow(flow_name: str, shared_flow_id: str, queue_id: str) -> dict:
url = f"{API_BASE_URL}/api/v2/flows"
flow_payload = {
"name": flow_name,
"description": f"Parent flow invoking shared flow {shared_flow_id}",
"type": "CALL",
"enabled": False,
"outboundCall": {"enabled": False},
"nodes": {
"Start": {
"id": "Start",
"type": "Start",
"properties": {"name": "Start Parent Flow"},
"edges": {"default": {"targetId": "InvokeShared", "conditionType": "default"}}
},
"InvokeShared": {
"id": "InvokeShared",
"type": "Transfer",
"properties": {
"name": "Invoke Shared Greeting",
"transferType": "flow",
"flowId": shared_flow_id
},
"edges": {
"default": {"targetId": "QueueAgent", "conditionType": "default"},
"error": {"targetId": "ErrorHandling", "conditionType": "error"}
}
},
"QueueAgent": {
"id": "QueueAgent",
"type": "Queue",
"properties": {
"name": "Queue to Agent",
"queueId": queue_id,
"queueLabel": "General Support",
"timeout": 60000
},
"edges": {"default": {"targetId": "EndFlow", "conditionType": "default"}}
},
"ErrorHandling": {
"id": "ErrorHandling",
"type": "Hangup",
"properties": {"name": "Handle Error", "hangupType": "normal"}
},
"EndFlow": {
"id": "EndFlow",
"type": "Hangup",
"properties": {"name": "End Parent Flow", "hangupType": "normal"}
}
},
"startNodeId": "Start"
}
response = requests.post(url, headers=HEADERS, json=flow_payload)
response.raise_for_status()
return response.json()
def get_flow_details(flow_id: str) -> dict:
url = f"{API_BASE_URL}/api/v2/flows/{flow_id}"
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.json()
def main():
load_dotenv()
# Configuration
SHARED_FLOW_NAME = "Shared Greeting Module"
PARENT_FLOW_1_NAME = "Inbound Sales Flow"
PARENT_FLOW_2_NAME = "Inbound Support Flow"
# You must provide a valid Queue ID. Replace this with a real Queue ID from your org.
# To find a Queue ID, you can query: GET /api/v2/routing/queues
QUEUE_ID = os.getenv("GENESYS_QUEUE_ID")
if not QUEUE_ID:
print("Error: GENESYS_QUEUE_ID is not set in .env")
sys.exit(1)
print("Starting Flow Creation Process...")
try:
# Step 1: Create Shared Flow
print("1. Creating Shared Flow...")
shared_flow = create_shared_flow(SHARED_FLOW_NAME, "Reusable greeting module")
shared_flow_id = shared_flow['id']
print(f" Shared Flow ID: {shared_flow_id}")
# Small delay to ensure propagation
time.sleep(2)
# Step 2: Create Parent Flows
print("2. Creating Parent Flow 1...")
parent_flow_1 = create_parent_flow(PARENT_FLOW_1_NAME, shared_flow_id, QUEUE_ID)
parent_flow_1_id = parent_flow_1['id']
print(f" Parent Flow 1 ID: {parent_flow_1_id}")
print("3. Creating Parent Flow 2...")
parent_flow_2 = create_parent_flow(PARENT_FLOW_2_NAME, shared_flow_id, QUEUE_ID)
parent_flow_2_id = parent_flow_2['id']
print(f" Parent Flow 2 ID: {parent_flow_2_id}")
# Step 3: Verify
print("4. Verifying Integration...")
parent_ids = [parent_flow_1_id, parent_flow_2_id]
# Verify Shared Flow
shared_details = get_flow_details(shared_flow_id)
print(f" Shared Flow '{shared_details['name']}' exists and is valid.")
# Verify Parent Flows
for pf_id in parent_ids:
pf_details = get_flow_details(pf_id)
invoke_node = pf_details['nodes'].get('InvokeShared')
if invoke_node and invoke_node['properties'].get('flowId') == shared_flow_id:
print(f" Parent Flow '{pf_details['name']}' correctly references Shared Flow.")
else:
print(f" ERROR: Parent Flow '{pf_details['name']}' does not reference Shared Flow correctly.")
print("\nSuccess: All flows created and linked.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 400 Bad Request - Invalid Flow Structure
- Cause: The
nodesobject is malformed. Common issues include missingedgesfor a node, mismatchedtargetIdreferences, or thestartNodeIdnot pointing to aStartnode. - Fix: Ensure every node (except
Hangup/Transferendpoints if they are final) has anedgesobject. Verify that alltargetIdvalues inedgescorrespond to existing node IDs in thenodesobject.
Error: 403 Forbidden - Insufficient Scopes
- Cause: The OAuth token does not have
flow:writeorrouting:flow:writescopes. - Fix: Update your OAuth client configuration in the Genesys Cloud Admin portal. Add the required scopes and regenerate the client secret. Clear the
.token_cachefile to force a new token request with the updated scopes.
Error: 429 Too Many Requests
- Cause: Genesys Cloud enforces rate limits. Creating multiple flows in quick succession can trigger this.
- Fix: Implement exponential backoff. In the
create_shared_flowandcreate_parent_flowfunctions, wrap therequests.postcall in a retry loop.
import time
def post_with_retry(url, headers, json_payload, max_retries=3):
for attempt in range(max_retries):
response = requests.post(url, headers=headers, json=json_payload)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 2 ** attempt))
print(f"Rate limited. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
raise Exception("Max retries exceeded")
Error: Flow Does Not Execute as Expected
- Cause: The
transferTypein theTransfernode is not set to"flow". If it is set to"queue"or"agent", it will not invoke the shared flow. - Fix: Verify the
propertiesof theTransfernode in the parent flow JSON. Ensure"transferType": "flow"and"flowId": "<shared_flow_id>"are present.