Reuse Logic Across Inbound Flows: Calling Shared Flows via the Genesys Cloud API

Reuse Logic Across Inbound Flows: Calling Shared Flows via the Genesys Cloud API

What You Will Build

  • This tutorial demonstrates how to programmatically invoke a shared Genesys Cloud Flow from multiple distinct inbound routing strategies using the REST API.
  • You will use the Genesys Cloud Platform API v2, specifically the Flow Management and Routing endpoints.
  • The code examples are provided in Python using the genesyscloud SDK and raw requests for clarity.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth client with the flow:read, routing:strategy:read, and routing:strategy:write scopes.
  • SDK Version: genesyscloud Python SDK version 134.0.0 or later.
  • Runtime: Python 3.9+.
  • Dependencies:
    pip install genesyscloud requests
    
  • Genesys Cloud Organization: You must have at least one existing Shared Flow and two distinct Inbound Call Flows in your environment to test the integration.

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials flow is the standard approach. You must store the access token securely and handle expiration.

import os
from genesyscloud.auth.api_client import ApiClient
from genesyscloud.auth.oauth2_client import OAuth2Client

def get_auth_client():
    """
    Initializes and returns an authenticated ApiClient.
    """
    # Load credentials from environment variables
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
    environment = os.environ.get("GENESYS_ENVIRONMENT", "mypurecloud.com")

    # Configure the OAuth2 client
    oauth_config = {
        "client_id": client_id,
        "client_secret": client_secret,
        "environment": environment
    }

    # Create the OAuth2 client
    oauth_client = OAuth2Client(oauth_config)

    # Initialize the API client with the OAuth client
    api_client = ApiClient(oauth_client)

    # Verify authentication
    if not api_client.is_authenticated:
        raise Exception("Failed to authenticate with Genesys Cloud.")

    return api_client

Required Scopes:

  • flow:read: To list and retrieve shared flows.
  • routing:strategy:read: To inspect existing inbound flows.
  • routing:strategy:write: To update inbound flows with the shared flow reference.

Implementation

Step 1: Identify the Shared Flow

Before you can call a shared flow, you must know its ID. Shared flows are distinct from inbound flows in the API structure. They reside under the flow namespace, whereas inbound call flows often fall under routing or specific flow types.

We will query the API to find a shared flow by name. This assumes you have named your shared flow uniquely, e.g., “Global_Exception_Handler”.

from genesyscloud.flow.api.flow_api import FlowApi
from genesyscloud.flow.model.flow_entity import FlowEntity

def find_shared_flow_id(api_client: ApiClient, flow_name: str) -> str:
    """
    Searches for a shared flow by name and returns its ID.
    """
    flow_api = FlowApi(api_client)

    # Parameters for the search
    # type: 'shared' filters for shared flows specifically
    # pageSize: 20 is sufficient for most search scenarios
    response = flow_api.post_flow_search(
        body={
            "type": "shared",
            "name": flow_name,
            "pageSize": 20
        }
    )

    if not response.entities:
        raise ValueError(f"No shared flow found with name: {flow_name}")

    # Return the ID of the first match
    return response.entities[0].id

# Usage
# shared_flow_id = find_shared_flow_id(api_client, "Global_Exception_Handler")

Expected Response:
The post_flow_search endpoint returns a paginated list of FlowEntity objects. Each entity contains the id, name, type, and version.

Error Handling:

  • 401 Unauthorized: Check your OAuth token validity.
  • 404 Not Found: Ensure the flow name is exact and the flow is published.

Step 2: Locate the Inbound Call Flows

You need to identify the inbound call flows that will reference the shared flow. Inbound call flows are typically of type call or callback. We will retrieve all inbound call flows to demonstrate how to iterate over them.

from genesyscloud.routing.api.routing_strategy_api import RoutingStrategyApi

def get_inbound_call_flows(api_client: ApiClient, page_size: int = 25) -> list:
    """
    Retrieves a list of inbound call flows.
    """
    routing_api = RoutingStrategyApi(api_client)

    # We use post_routing_strategy_search to filter by type 'call'
    # Note: In newer SDK versions, 'call' flows might be under FlowApi with type 'call'
    # However, for backward compatibility and specific routing strategies,
    # we often use the FlowApi for all flow types now.
    
    # Let's stick to FlowApi for consistency as Genesys unified the flow model
    flow_api = FlowApi(api_client)
    
    all_flows = []
    next_page_sequence_id = None
    
    while True:
        search_body = {
            "type": "call",
            "pageSize": page_size
        }
        
        if next_page_sequence_id:
            search_body["pageSequenceId"] = next_page_sequence_id
            
        response = flow_api.post_flow_search(body=search_body)
        
        if not response.entities:
            break
            
        all_flows.extend(response.entities)
        
        # Handle pagination
        if response.nextPageSequenceId:
            next_page_sequence_id = response.next_page_sequence_id
        else:
            break
            
    return all_flows

Key Detail:
The type parameter is critical. Inbound voice flows are call. Shared flows are shared. Chat flows are chat. Mixing these types in the search will return unintended results.

Step 3: Update Inbound Flows to Call the Shared Flow

This is the core logic. You cannot “call” a shared flow at runtime via an API request from an external system in the same way you trigger a webhook. Instead, you configure the inbound flow to invoke the shared flow using the Subflow node.

Since the Genesys Cloud SDK does not provide a direct “add subflow node” method, we must:

  1. Retrieve the inbound flow definition.
  2. Modify the JSON structure to include a Subflow node.
  3. Update the flow definition.

Warning: Modifying flow definitions via API is complex. Flow definitions are JSON documents. You must ensure the JSON is valid and that node IDs are unique.

import json
import uuid
from genesyscloud.flow.api.flow_api import FlowApi

def add_subflow_to_inbound_flow(
    api_client: ApiClient, 
    inbound_flow_id: str, 
    shared_flow_id: str
) -> str:
    """
    Adds a Subflow node to an inbound flow that calls a specific shared flow.
    This is a simplified example. Production code should handle existing nodes
    and proper placement within the flow graph.
    """
    flow_api = FlowApi(api_client)

    # 1. Retrieve the current flow definition
    inbound_flow = flow_api.get_flow(
        flow_id=inbound_flow_id,
        expand=["definition"]
    )

    if not inbound_flow.definition:
        raise Exception(f"Inbound flow {inbound_flow_id} has no definition.")

    # 2. Parse the definition
    flow_def = json.loads(inbound_flow.definition)

    # 3. Create a new Subflow node
    subflow_node_id = str(uuid.uuid4())
    
    subflow_node = {
        "id": subflow_node_id,
        "type": "subflow",
        "version": 1,
        "configuration": {
            "flowId": shared_flow_id,
            "timeoutSeconds": 30,
            "onTimeout": "disconnect"
        },
        "outcomes": [
            {"type": "success", "label": "Success"},
            {"type": "failure", "label": "Failure"}
        ]
    }

    # 4. Add the node to the definition
    # In a real scenario, you would insert this into the 'nodes' array
    # and update the 'start' node or a specific trigger to point to this new node.
    # For this example, we assume we are appending it and manually linking it
    # or that the flow has a specific 'subflow_trigger' node we are replacing.
    
    if "nodes" not in flow_def:
        flow_def["nodes"] = []
        
    flow_def["nodes"].append(subflow_node)

    # 5. Update the flow
    # Note: You must provide the full definition back.
    update_body = {
        "definition": json.dumps(flow_def),
        "version": inbound_flow.version + 1  # Increment version
    }

    response = flow_api.put_flow(
        flow_id=inbound_flow_id,
        body=update_body
    )

    return response.id

# Usage
# add_subflow_to_inbound_flow(api_client, inbound_flow_id, shared_flow_id)

Critical Consideration:
The put_flow endpoint requires the entire flow definition. Partial updates are not supported for the flow structure itself. You must merge your changes carefully to avoid overwriting other nodes.

OAuth Scope:

  • routing:strategy:write is required to update flows.

Step 4: Validate the Integration

After updating the flows, you should verify that the shared flow is correctly referenced.

def verify_subflow_integration(
    api_client: ApiClient, 
    inbound_flow_id: str, 
    expected_shared_flow_id: str
) -> bool:
    """
    Verifies that an inbound flow contains a subflow node referencing the shared flow.
    """
    flow_api = FlowApi(api_client)

    inbound_flow = flow_api.get_flow(
        flow_id=inbound_flow_id,
        expand=["definition"]
    )

    if not inbound_flow.definition:
        return False

    flow_def = json.loads(inbound_flow.definition)
    
    if "nodes" not in flow_def:
        return False

    for node in flow_def["nodes"]:
        if node.get("type") == "subflow":
            if node.get("configuration", {}).get("flowId") == expected_shared_flow_id:
                return True

    return False

Complete Working Example

Below is a complete, runnable script that ties all steps together. It assumes you have the GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, and GENESYS_ENVIRONMENT set in your environment.

import os
import sys
import json
import uuid
from genesyscloud.auth.api_client import ApiClient
from genesyscloud.auth.oauth2_client import OAuth2Client
from genesyscloud.flow.api.flow_api import FlowApi

def main():
    # 1. Authenticate
    try:
        api_client = get_auth_client()
    except Exception as e:
        print(f"Authentication failed: {e}")
        sys.exit(1)

    # 2. Configuration
    SHARED_FLOW_NAME = "Global_Exception_Handler"
    INBOUND_FLOW_NAME_PREFIX = "Sales_Line_" # Example prefix to filter inbound flows

    # 3. Find Shared Flow
    try:
        shared_flow_id = find_shared_flow_id(api_client, SHARED_FLOW_NAME)
        print(f"Found Shared Flow ID: {shared_flow_id}")
    except ValueError as e:
        print(f"Shared flow not found: {e}")
        sys.exit(1)

    # 4. Find Inbound Flows
    try:
        all_inbound_flows = get_inbound_call_flows(api_client)
        # Filter by name prefix for demonstration
        target_flows = [f for f in all_inbound_flows if f.name.startswith(INBOUND_FLOW_NAME_PREFIX)]
        
        if not target_flows:
            print(f"No inbound flows found with prefix '{INBOUND_FLOW_NAME_PREFIX}'")
            sys.exit(0)
            
        print(f"Found {len(target_flows)} inbound flows to update.")
    except Exception as e:
        print(f"Error retrieving inbound flows: {e}")
        sys.exit(1)

    # 5. Update Each Inbound Flow
    flow_api = FlowApi(api_client)
    
    for inbound_flow in target_flows:
        print(f"Processing flow: {inbound_flow.name} (ID: {inbound_flow.id})")
        
        try:
            # Check if already integrated
            if verify_subflow_integration(api_client, inbound_flow.id, shared_flow_id):
                print(f"  -> Already integrated. Skipping.")
                continue

            # Retrieve current definition
            inbound_flow_full = flow_api.get_flow(
                flow_id=inbound_flow.id,
                expand=["definition"]
            )
            
            if not inbound_flow_full.definition:
                print(f"  -> No definition found. Skipping.")
                continue

            flow_def = json.loads(inbound_flow_full.definition)
            
            # Create Subflow Node
            subflow_node_id = str(uuid.uuid4())
            subflow_node = {
                "id": subflow_node_id,
                "type": "subflow",
                "version": 1,
                "configuration": {
                    "flowId": shared_flow_id,
                    "timeoutSeconds": 30,
                    "onTimeout": "disconnect"
                },
                "outcomes": [
                    {"type": "success", "label": "Success"},
                    {"type": "failure", "label": "Failure"}
                ]
            }
            
            # Add to nodes
            if "nodes" not in flow_def:
                flow_def["nodes"] = []
            
            flow_def["nodes"].append(subflow_node)
            
            # Update Flow
            update_body = {
                "definition": json.dumps(flow_def),
                "version": inbound_flow_full.version + 1
            }
            
            flow_api.put_flow(
                flow_id=inbound_flow.id,
                body=update_body
            )
            
            print(f"  -> Successfully added subflow node.")
            
        except Exception as e:
            print(f"  -> Error updating flow {inbound_flow.name}: {e}")

    print("Batch update complete.")

def get_auth_client():
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
    environment = os.environ.get("GENESYS_ENVIRONMENT", "mypurecloud.com")

    oauth_config = {
        "client_id": client_id,
        "client_secret": client_secret,
        "environment": environment
    }

    oauth_client = OAuth2Client(oauth_config)
    api_client = ApiClient(oauth_client)

    if not api_client.is_authenticated:
        raise Exception("Failed to authenticate with Genesys Cloud.")

    return api_client

def find_shared_flow_id(api_client: ApiClient, flow_name: str) -> str:
    flow_api = FlowApi(api_client)
    response = flow_api.post_flow_search(
        body={
            "type": "shared",
            "name": flow_name,
            "pageSize": 20
        }
    )

    if not response.entities:
        raise ValueError(f"No shared flow found with name: {flow_name}")

    return response.entities[0].id

def get_inbound_call_flows(api_client: ApiClient, page_size: int = 25) -> list:
    flow_api = FlowApi(api_client)
    all_flows = []
    next_page_sequence_id = None

    while True:
        search_body = {
            "type": "call",
            "pageSize": page_size
        }

        if next_page_sequence_id:
            search_body["pageSequenceId"] = next_page_sequence_id

        response = flow_api.post_flow_search(body=search_body)

        if not response.entities:
            break

        all_flows.extend(response.entities)

        if response.nextPageSequenceId:
            next_page_sequence_id = response.nextPageSequenceId
        else:
            break

    return all_flows

def verify_subflow_integration(
    api_client: ApiClient, 
    inbound_flow_id: str, 
    expected_shared_flow_id: str
) -> bool:
    flow_api = FlowApi(api_client)

    inbound_flow = flow_api.get_flow(
        flow_id=inbound_flow_id,
        expand=["definition"]
    )

    if not inbound_flow.definition:
        return False

    flow_def = json.loads(inbound_flow.definition)

    if "nodes" not in flow_def:
        return False

    for node in flow_def["nodes"]:
        if node.get("type") == "subflow":
            if node.get("configuration", {}).get("flowId") == expected_shared_flow_id:
                return True

    return False

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 409 Conflict

Cause: The flow version provided in the put_flow request does not match the current version in Genesys Cloud. This happens if another process updated the flow between your get and put calls.
Fix: Implement a retry mechanism with exponential backoff. Re-fetch the flow definition before retrying the update.

import time

def update_flow_with_retry(api_client, flow_id, update_body, max_retries=3):
    for attempt in range(max_retries):
        try:
            api_client.put_flow(flow_id=flow_id, body=update_body)
            return True
        except Exception as e:
            if "409" in str(e) and attempt < max_retries - 1:
                time.sleep(2 ** attempt)
                # Re-fetch definition here in production
                continue
            raise e
    return False

Error: 400 Bad Request

Cause: The JSON definition provided in put_flow is invalid. This often occurs due to duplicate node IDs, missing required fields in nodes, or circular references.
Fix: Validate the JSON structure locally before sending. Use a JSON linter or schema validator. Ensure all node IDs are unique UUIDs.

Error: 429 Too Many Requests

Cause: You are hitting the Genesys Cloud API rate limit. This is common when iterating over many flows.
Fix: Implement rate limiting in your script. Genesys Cloud returns Retry-After headers in 429 responses. Parse this header and wait before retrying.

import time

def api_call_with_rate_limit(api_func, *args, **kwargs):
    while True:
        try:
            return api_func(*args, **kwargs)
        except Exception as e:
            if "429" in str(e):
                # Extract Retry-After if possible, else default to 1 second
                retry_after = 1
                # Logic to parse Retry-After header from response object would go here
                time.sleep(retry_after)
            else:
                raise e

Official References