Shared Flow Composition: Calling Reusable Flows from Inbound Routing

Shared Flow Composition: Calling Reusable Flows from Inbound Routing

What You Will Build

  • Create a reusable Genesys Cloud Flow that encapsulates common logic (such as language detection or authentication) and invoke it from multiple distinct inbound call flows using the Flow node.
  • This tutorial uses the Genesys Cloud Platform API v2 and the Python SDK (genesyscloud).
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client Type: Private Client or Confidential Client with offline_access capability for token refresh.
  • Required Scopes: flow:write, flow:read, routing:write, routing:read, organization:read.
  • SDK Version: genesyscloud v2.40.0 or later.
  • Runtime: Python 3.9+.
  • Dependencies:
    pip install genesyscloud
    

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API interactions. For server-to-server integrations, the Client Credentials Grant flow is standard. The Python SDK handles token caching and refreshing automatically when configured correctly.

from genesyscloud import PlatformClient
from genesyscloud import Configuration

def get_platform_client(client_id: str, client_secret: str, environment: str = "mypurecloud.com") -> PlatformClient:
    """
    Initializes and returns an authenticated Genesys Cloud PlatformClient.
    
    Args:
        client_id: The OAuth client ID.
        client_secret: The OAuth client secret.
        environment: The Genesys Cloud subdomain (e.g., mypurecloud.com, euw1.pure.cloud).
        
    Returns:
        An authenticated PlatformClient instance.
    """
    config = Configuration()
    config.host = f"https://{environment}"
    config.access_token = None  # Will be populated by the SDK
    
    # Configure OAuth credentials
    config.api_key['Authorization'] = client_secret
    config.api_key_prefix['Authorization'] = 'Basic'
    
    # Create the client
    client = PlatformClient(config)
    
    # Authenticate the client
    try:
        client.authenticate(client_id, client_secret)
    except Exception as e:
        raise RuntimeError(f"Authentication failed: {e}")
        
    return client

Implementation

Step 1: Define the Shared Module Flow

First, we create the “Shared Module.” This flow contains the logic that will be reused. For this example, we will create a flow that performs a simple IVR greeting and collects a digit. In a real-world scenario, this might be a language selection menu or a caller ID verification step.

We need to:

  1. Create the flow definition.
  2. Add a Say node (to speak the greeting).
  3. Add a Gather node (to collect user input).
  4. Define the flow’s start node.
from genesyscloud.rest import ApiException
import uuid

def create_shared_flow(client: PlatformClient, org_id: str, shared_flow_name: str) -> str:
    """
    Creates a reusable shared flow with a Say and Gather node.
    
    Args:
        client: Authenticated PlatformClient.
        org_id: The organization ID.
        shared_flow_name: Name for the shared flow.
        
    Returns:
        The ID of the created flow.
    """
    
    # 1. Create the Flow Definition
    # We define the flow as an 'inbound' flow type, but it is designed to be called.
    flow_definition = {
        "name": shared_flow_name,
        "type": "inbound",
        "outbound": {},
        "inbound": {
            "enabled": True,
            "queue": {},
            "routing": {
                "type": "flow"
            }
        },
        "draft": {
            "startNode": "SayHello",
            "nodes": {},
            "edges": {}
        }
    }

    # 2. Define the Nodes
    # Node: SayHello
    say_node = {
        "id": "SayHello",
        "type": "Say",
        "properties": {
            "sayText": "Welcome to the shared module. Please press 1 for English.",
            "sayLanguage": "en-US",
            "sayGender": "female",
            "playBeepBeforeText": False
        },
        "onComplete": {
            "nextNode": "GatherInput",
            "onError": "Exit"
        },
        "onError": {
            "nextNode": "Exit"
        }
    }

    # Node: GatherInput
    gather_node = {
        "id": "GatherInput",
        "type": "Gather",
        "properties": {
            "promptText": "Please press 1.",
            "promptLanguage": "en-US",
            "promptGender": "female",
            "maxDigits": 1,
            "timeoutSeconds": 10,
            "interDigitTimeoutSeconds": 5
        },
        "onComplete": {
            "nextNode": "ProcessInput",
            "onError": "Exit"
        },
        "onError": {
            "nextNode": "Exit"
        }
    }

    # Node: ProcessInput (Logic to handle the digit)
    process_node = {
        "id": "ProcessInput",
        "type": "Set",
        "properties": {
            "setVars": [
                {
                    "key": "shared_module_result",
                    "value": {
                        "type": "string",
                        "value": "{GatherInput.digits}"
                    }
                }
            ]
        },
        "onComplete": {
            "nextNode": "EndFlow",
            "onError": "Exit"
        },
        "onError": {
            "nextNode": "Exit"
        }
    }

    # Node: EndFlow
    end_node = {
        "id": "EndFlow",
        "type": "End",
        "properties": {
            "endType": "normal"
        }
    }

    # Node: Exit (Error handler)
    exit_node = {
        "id": "Exit",
        "type": "End",
        "properties": {
            "endType": "normal"
        }
    }

    # Assemble the draft nodes and edges
    draft_nodes = {
        "SayHello": say_node,
        "GatherInput": gather_node,
        "ProcessInput": process_node,
        "EndFlow": end_node,
        "Exit": exit_node
    }

    # Edges are implicitly defined by the onComplete/onError in the nodes for Genesys Cloud flows
    # However, the API expects the full structure.
    
    flow_definition["draft"]["nodes"] = draft_nodes
    # Edges are not explicitly sent in the create payload in the same way as nodes in newer SDK versions,
    # but we ensure the structure is valid.
    
    try:
        # Create the flow
        response = client.flows_api.post_flow(
            body=flow_definition,
            x_genesis_restriction="shared" # Optional: marks it as shared
        )
        print(f"Created Shared Flow: {response.id}")
        return response.id
    except ApiException as e:
        print(f"Error creating shared flow: {e}")
        raise

Step 2: Create the Inbound Caller Flows

Now we create two separate inbound flows (Flow A and Flow B) that will call the shared flow. The key mechanism here is the Flow node type.

The Flow node allows one flow to invoke another. When the called flow ends, execution returns to the calling flow at the onComplete node of the Flow node.

Critical Concept: Variables set in the shared flow are not automatically available in the calling flow unless explicitly mapped or if the flows share the same transaction context (which they do in this case, but variable scoping can be tricky). In Genesys Cloud, when Flow A calls Flow B, Flow B runs in the same conversation context. Therefore, variables set in Flow B (like shared_module_result above) are visible to Flow A after Flow B completes.

def create_inbound_flow(client: PlatformClient, flow_name: str, shared_flow_id: str) -> str:
    """
    Creates an inbound flow that calls the shared flow using a Flow node.
    
    Args:
        client: Authenticated PlatformClient.
        flow_name: Name for this inbound flow.
        shared_flow_id: The ID of the shared flow created in Step 1.
        
    Returns:
        The ID of the created inbound flow.
    """
    
    flow_definition = {
        "name": flow_name,
        "type": "inbound",
        "outbound": {},
        "inbound": {
            "enabled": True,
            "queue": {},
            "routing": {
                "type": "flow"
            }
        },
        "draft": {
            "startNode": "CallShared",
            "nodes": {},
            "edges": {}
        }
    }

    # Node: CallShared (The Flow Node)
    # This node invokes the shared flow
    call_shared_node = {
        "id": "CallShared",
        "type": "Flow",
        "properties": {
            "flowId": shared_flow_id,
            "flowName": None, # Can also reference by name if ID is unknown
            "timeoutSeconds": 300 # Max time for the shared flow to run
        },
        "onComplete": {
            "nextNode": "CheckResult",
            "onError": "HandleError"
        },
        "onError": {
            "nextNode": "HandleError"
        }
    }

    # Node: CheckResult
    # Logic to check what the shared flow did
    check_result_node = {
        "id": "CheckResult",
        "type": "If",
        "properties": {
            "conditions": [
                {
                    "expression": "{shared_module_result} == '1'",
                    "nextNode": "RouteToAgent"
                }
            ],
            "defaultNextNode": "PlayGoodbye"
        }
    }

    # Node: RouteToAgent
    route_node = {
        "id": "RouteToAgent",
        "type": "Route",
        "properties": {
            "queueId": "YOUR_QUEUE_ID_HERE", # Replace with actual Queue ID
            "skillRequirements": []
        },
        "onComplete": {
            "nextNode": "PlayGoodbye",
            "onError": "PlayGoodbye"
        },
        "onError": {
            "nextNode": "PlayGoodbye"
        }
    }

    # Node: PlayGoodbye
    goodbye_node = {
        "id": "PlayGoodbye",
        "type": "Say",
        "properties": {
            "sayText": "Thank you for calling.",
            "sayLanguage": "en-US",
            "sayGender": "female"
        },
        "onComplete": {
            "nextNode": "EndFlow",
            "onError": "EndFlow"
        },
        "onError": {
            "nextNode": "EndFlow"
        }
    }

    # Node: HandleError
    error_node = {
        "id": "HandleError",
        "type": "Say",
        "properties": {
            "sayText": "An error occurred in the shared module.",
            "sayLanguage": "en-US"
        },
        "onComplete": {
            "nextNode": "EndFlow"
        }
    }

    # Node: EndFlow
    end_node = {
        "id": "EndFlow",
        "type": "End",
        "properties": {
            "endType": "normal"
        }
    }

    # Assemble nodes
    draft_nodes = {
        "CallShared": call_shared_node,
        "CheckResult": check_result_node,
        "RouteToAgent": route_node,
        "PlayGoodbye": goodbye_node,
        "HandleError": error_node,
        "EndFlow": end_node
    }

    flow_definition["draft"]["nodes"] = draft_nodes

    try:
        response = client.flows_api.post_flow(body=flow_definition)
        print(f"Created Inbound Flow: {response.id}")
        return response.id
    except ApiException as e:
        print(f"Error creating inbound flow: {e}")
        raise

Step 3: Publishing the Flows

Creating the flow only saves it as a draft. To make it active and callable, you must publish it. Publishing creates a new version and sets it as the active version.

def publish_flow(client: PlatformClient, flow_id: str) -> None:
    """
    Publishes a flow to make it active.
    
    Args:
        client: Authenticated PlatformClient.
        flow_id: The ID of the flow to publish.
    """
    try:
        # Publish the flow
        # The API returns a task ID for the async publish operation
        response = client.flows_api.post_flow_publish(flow_id=flow_id)
        
        print(f"Publish initiated for Flow {flow_id}. Task ID: {response.id}")
        
        # In a production script, you would poll the task status
        # For this tutorial, we assume success after a brief delay
        import time
        time.sleep(5) # Wait for Genesys Cloud to process
        
        # Verify status
        flow = client.flows_api.get_flow(flow_id=flow_id)
        print(f"Flow Status: {flow.version}")
        
    except ApiException as e:
        print(f"Error publishing flow: {e}")
        raise

Complete Working Example

Below is the complete, runnable script. You must replace YOUR_CLIENT_ID, YOUR_CLIENT_SECRET, YOUR_ORG_ID, and YOUR_QUEUE_ID with your actual values.

import os
import sys
from genesyscloud import PlatformClient
from genesyscloud import Configuration
from genesyscloud.rest import ApiException
import time

# Configuration Constants
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID", "YOUR_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
ENVIRONMENT = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
ORG_ID = os.getenv("GENESYS_ORG_ID", "YOUR_ORG_ID")
QUEUE_ID = os.getenv("GENESYS_QUEUE_ID", "YOUR_QUEUE_ID")

def get_platform_client() -> PlatformClient:
    config = Configuration()
    config.host = f"https://{ENVIRONMENT}"
    config.api_key['Authorization'] = CLIENT_SECRET
    config.api_key_prefix['Authorization'] = 'Basic'
    
    client = PlatformClient(config)
    try:
        client.authenticate(CLIENT_ID, CLIENT_SECRET)
    except Exception as e:
        raise RuntimeError(f"Authentication failed: {e}")
    return client

def create_shared_flow(client: PlatformClient) -> str:
    flow_definition = {
        "name": "Shared_Module_Language_Select",
        "type": "inbound",
        "outbound": {},
        "inbound": {
            "enabled": True,
            "queue": {},
            "routing": {
                "type": "flow"
            }
        },
        "draft": {
            "startNode": "SayHello",
            "nodes": {
                "SayHello": {
                    "id": "SayHello",
                    "type": "Say",
                    "properties": {
                        "sayText": "Welcome. Press 1 for English.",
                        "sayLanguage": "en-US",
                        "sayGender": "female"
                    },
                    "onComplete": {
                        "nextNode": "GatherInput",
                        "onError": "Exit"
                    },
                    "onError": {
                        "nextNode": "Exit"
                    }
                },
                "GatherInput": {
                    "id": "GatherInput",
                    "type": "Gather",
                    "properties": {
                        "promptText": "Press 1.",
                        "promptLanguage": "en-US",
                        "maxDigits": 1,
                        "timeoutSeconds": 10
                    },
                    "onComplete": {
                        "nextNode": "SetResult",
                        "onError": "Exit"
                    },
                    "onError": {
                        "nextNode": "Exit"
                    }
                },
                "SetResult": {
                    "id": "SetResult",
                    "type": "Set",
                    "properties": {
                        "setVars": [
                            {
                                "key": "shared_module_result",
                                "value": {
                                    "type": "string",
                                    "value": "{GatherInput.digits}"
                                }
                            }
                        ]
                    },
                    "onComplete": {
                        "nextNode": "EndFlow",
                        "onError": "Exit"
                    },
                    "onError": {
                        "nextNode": "Exit"
                    }
                },
                "EndFlow": {
                    "id": "EndFlow",
                    "type": "End",
                    "properties": {
                        "endType": "normal"
                    }
                },
                "Exit": {
                    "id": "Exit",
                    "type": "End",
                    "properties": {
                        "endType": "normal"
                    }
                }
            },
            "edges": {}
        }
    }

    try:
        response = client.flows_api.post_flow(body=flow_definition)
        print(f"Created Shared Flow ID: {response.id}")
        return response.id
    except ApiException as e:
        print(f"Error: {e}")
        sys.exit(1)

def create_inbound_flow(client: PlatformClient, shared_flow_id: str, flow_name: str) -> str:
    flow_definition = {
        "name": flow_name,
        "type": "inbound",
        "outbound": {},
        "inbound": {
            "enabled": True,
            "queue": {},
            "routing": {
                "type": "flow"
            }
        },
        "draft": {
            "startNode": "CallShared",
            "nodes": {
                "CallShared": {
                    "id": "CallShared",
                    "type": "Flow",
                    "properties": {
                        "flowId": shared_flow_id,
                        "timeoutSeconds": 300
                    },
                    "onComplete": {
                        "nextNode": "CheckResult",
                        "onError": "HandleError"
                    },
                    "onError": {
                        "nextNode": "HandleError"
                    }
                },
                "CheckResult": {
                    "id": "CheckResult",
                    "type": "If",
                    "properties": {
                        "conditions": [
                            {
                                "expression": "{shared_module_result} == '1'",
                                "nextNode": "RouteToAgent"
                            }
                        ],
                        "defaultNextNode": "PlayGoodbye"
                    }
                },
                "RouteToAgent": {
                    "id": "RouteToAgent",
                    "type": "Route",
                    "properties": {
                        "queueId": QUEUE_ID,
                        "skillRequirements": []
                    },
                    "onComplete": {
                        "nextNode": "PlayGoodbye",
                        "onError": "PlayGoodbye"
                    },
                    "onError": {
                        "nextNode": "PlayGoodbye"
                    }
                },
                "PlayGoodbye": {
                    "id": "PlayGoodbye",
                    "type": "Say",
                    "properties": {
                        "sayText": "Thank you.",
                        "sayLanguage": "en-US"
                    },
                    "onComplete": {
                        "nextNode": "EndFlow"
                    }
                },
                "HandleError": {
                    "id": "HandleError",
                    "type": "Say",
                    "properties": {
                        "sayText": "Error occurred.",
                        "sayLanguage": "en-US"
                    },
                    "onComplete": {
                        "nextNode": "EndFlow"
                    }
                },
                "EndFlow": {
                    "id": "EndFlow",
                    "type": "End",
                    "properties": {
                        "endType": "normal"
                    }
                }
            },
            "edges": {}
        }
    }

    try:
        response = client.flows_api.post_flow(body=flow_definition)
        print(f"Created Inbound Flow ID: {response.id}")
        return response.id
    except ApiException as e:
        print(f"Error: {e}")
        sys.exit(1)

def publish_flow(client: PlatformClient, flow_id: str) -> None:
    try:
        client.flows_api.post_flow_publish(flow_id=flow_id)
        print(f"Published Flow ID: {flow_id}")
        time.sleep(5)
    except ApiException as e:
        print(f"Error publishing: {e}")
        sys.exit(1)

def main():
    print("Initializing Genesys Cloud Client...")
    client = get_platform_client()
    
    print("Step 1: Creating Shared Flow...")
    shared_flow_id = create_shared_flow(client)
    
    print("Step 2: Publishing Shared Flow...")
    publish_flow(client, shared_flow_id)
    
    print("Step 3: Creating Inbound Flow A...")
    flow_a_id = create_inbound_flow(client, shared_flow_id, "Inbound_Flow_A")
    
    print("Step 4: Publishing Inbound Flow A...")
    publish_flow(client, flow_a_id)
    
    print("Step 5: Creating Inbound Flow B...")
    flow_b_id = create_inbound_flow(client, shared_flow_id, "Inbound_Flow_B")
    
    print("Step 6: Publishing Inbound Flow B...")
    publish_flow(client, flow_b_id)
    
    print("Success! All flows created and published.")
    print(f"Shared Flow ID: {shared_flow_id}")
    print(f"Inbound Flow A ID: {flow_a_id}")
    print(f"Inbound Flow B ID: {flow_b_id}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request - Invalid Flow Definition

What causes it: The JSON structure of the flow draft is malformed. Common causes include missing onComplete or onError handlers for nodes, or referencing a nextNode ID that does not exist in the nodes object.
How to fix it: Validate that every node in the draft.nodes dictionary has a corresponding ID. Ensure all nextNode references point to valid IDs within the same flow. Use a JSON validator to check syntax.

Error: 403 Forbidden - Insufficient Scopes

What causes it: The OAuth client does not have the flow:write or routing:write scopes.
How to fix it: Go to the Genesys Cloud Admin Console → Security → OAuth Clients. Edit your client and add the missing scopes. Re-authenticate the script.

Error: 429 Too Many Requests

What causes it: You are hitting the API rate limit. Genesys Cloud has strict rate limits on Flow creation and publishing.
How to fix it: Implement exponential backoff. In the Python SDK, you can wrap the API calls in a retry loop.

import time

def retry_api_call(func, *args, max_retries=3, **kwargs):
    for attempt in range(max_retries):
        try:
            return func(*args, **kwargs)
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

# Usage:
# response = retry_api_call(client.flows_api.post_flow, body=flow_definition)

Error: Variable Not Found in Calling Flow

What causes it: The calling flow tries to access a variable set in the shared flow, but the variable name is incorrect or the shared flow did not complete successfully.
How to fix it: Ensure the Set node in the shared flow uses a unique and consistent key (e.g., shared_module_result). Verify that the Flow node in the calling flow has an onComplete handler that leads to a node checking this variable. Check the Flow logs in the Genesys Cloud Admin Console to trace the execution path.

Official References