Invoking Genesys Cloud Custom Code via API with Python
What You Will Build
- A production-ready Python module that triggers Genesys Cloud custom code functions, polls for asynchronous results, caches outputs, tracks execution metrics, and maintains structured audit logs.
- Uses the Genesys Cloud REST API and the
genesyscloud-pythonSDK. - Written in Python 3.9+ with
httpx,cachetools, and standard library modules.
Prerequisites
- OAuth Client: Service Account or JWT with
customcode:executeandcustomcode:viewscopes. - SDK:
genesyscloud-pythonversion 110.0.0 or higher. - Runtime: Python 3.9+
- External Dependencies:
pip install genesyscloud-python httpx cachetools - Network: Outbound HTTPS access to
api.mypurecloud.comor your environment domain.
Authentication Setup
Genesys Cloud OAuth2 handles token issuance and refresh automatically when using the SDK configuration object. You must provide your environment domain, client ID, and client secret. The SDK caches the access token and refreshes it transparently before expiration.
from genesyscloud.platform_client import PlatformClient
from genesyscloud.auth import OAuthClientCredentialsClient
def build_platform_client(environment: str, client_id: str, client_secret: str) -> PlatformClient:
"""Constructs a configured PlatformClient with OAuth2 credentials."""
platform = PlatformClient()
platform.set_base_url(f"https://{environment}")
oauth = OAuthClientCredentialsClient(
platform,
client_id=client_id,
client_secret=client_secret,
scope="customcode:execute customcode:view"
)
platform.set_auth_client(oauth)
return platform
The scope parameter explicitly requests customcode:execute for invocation and customcode:view for definition validation. The SDK manages token lifecycle automatically. You do not need manual refresh logic.
Implementation
Step 1: Construct Invocation Payload and Validate Permissions
Before invoking custom code, you must retrieve the definition to verify sandbox constraints and validate input parameter mappings. Genesys Cloud custom code does not accept raw function signatures in the payload. You reference the deployed code by its customCodeId and map inputs to the keys expected by the script.
import httpx
from typing import Any, Dict, Optional
from genesyscloud.platform_client.rest import ApiException
from genesyscloud.custom_code_api import CustomCodeApi
class CustomCodeInvoker:
def __init__(self, platform: PlatformClient):
self.custom_code_api = CustomCodeApi(platform)
self.http_client = httpx.Client(base_url=platform.get_base_url(), timeout=30.0)
def fetch_definition(self, custom_code_id: str) -> Dict[str, Any]:
"""Retrieves custom code definition to validate sandbox constraints."""
try:
response = self.custom_code_api.get_customcode_by_id(custom_code_id)
return response.to_dict()
except ApiException as e:
raise RuntimeError(f"Failed to fetch custom code definition: {e.status} {e.body}") from e
def validate_inputs(self, definition: Dict[str, Any], inputs: Dict[str, Any]) -> None:
"""Validates input keys against expected parameters and sandbox limits."""
allowed_inputs = definition.get("inputs", [])
for key in inputs:
if key not in allowed_inputs:
raise ValueError(f"Input parameter '{key}' is not defined in custom code schema.")
sandbox = definition.get("sandbox", {})
if sandbox.get("timeout", 30) < 30:
raise PermissionError("Custom code sandbox timeout is restricted below minimum threshold.")
HTTP Request/Response Cycle
GET /api/v2/customcode/{customCodeId} HTTP/1.1
Host: api.mypurecloud.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
Accept: application/json
HTTP/1.1 200 OK
Content-Type: application/json
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "DataEnrichmentFunction",
"inputs": ["customerId", "sessionId"],
"sandbox": {
"timeout": 60,
"memory": 256,
"allowedPackages": ["requests", "json"]
},
"status": "active"
}
Step 2: Execute and Handle Asynchronous Polling with Timeout Management
Custom code execution is asynchronous. The POST endpoint returns an execution ID immediately. You must poll the execution endpoint until the status transitions to completed, failed, or timed_out. The implementation below includes exponential backoff for 429 rate limits and a hard timeout boundary.
import time
import uuid
from datetime import datetime, timezone
from genesyscloud.custom_code_api import CustomCodeApi
def execute_with_polling(
self,
custom_code_id: str,
inputs: Dict[str, Any],
timeout_seconds: int = 120
) -> Dict[str, Any]:
"""Invokes custom code and polls for completion with timeout and 429 retry logic."""
execution_id = self._post_execution(custom_code_id, inputs)
start_time = time.time()
poll_interval = 1.0
while time.time() - start_time < timeout_seconds:
try:
result = self.custom_code_api.get_customcode_executions_by_executionid(execution_id)
status = result.status
if status in ("completed", "failed", "timed_out"):
return {"executionId": execution_id, "status": status, "result": result.result, "error": result.error}
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, 8.0)
except ApiException as e:
if e.status == 429:
retry_after = float(e.headers.get("Retry-After", poll_interval * 2))
time.sleep(retry_after)
continue
raise RuntimeError(f"Polling failed: {e.status} {e.body}") from e
raise TimeoutError(f"Custom code execution exceeded {timeout_seconds} second limit.")
def _post_execution(self, custom_code_id: str, inputs: Dict[str, Any]) -> str:
"""Sends the initial execution request and returns the execution ID."""
try:
body = {"customCodeId": custom_code_id, "inputs": inputs, "timeout": 30}
response = self.custom_code_api.post_customcode_executions(body=body)
return response.id
except ApiException as e:
raise RuntimeError(f"Execution POST failed: {e.status} {e.body}") from e
Step 3: Cache Results, Track Metrics, and Generate Audit Logs
Repeated calls with identical inputs waste compute resources. A TTL cache stores results locally. Metrics track latency and success rates. Audit logs record every invocation for compliance.
import hashlib
import json
import logging
from cachetools import TTLCache
from datetime import datetime, timezone
def __init__(self, platform: PlatformClient, cache_ttl: int = 300):
super().__init__(platform)
self.cache = TTLCache(maxsize=1024, ttl=cache_ttl)
self.metrics = {"total_calls": 0, "cache_hits": 0, "avg_latency_ms": 0.0}
self.audit_logger = logging.getLogger("customcode.audit")
self.audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler("customcode_audit.log")
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
self.audit_logger.addHandler(handler)
def _get_cache_key(self, custom_code_id: str, inputs: Dict[str, Any]) -> str:
payload = json.dumps({"id": custom_code_id, "inputs": inputs}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
def invoke_cached(
self,
custom_code_id: str,
inputs: Dict[str, Any],
timeout_seconds: int = 120
) -> Dict[str, Any]:
cache_key = self._get_cache_key(custom_code_id, inputs)
if cache_key in self.cache:
self.metrics["cache_hits"] += 1
self.metrics["total_calls"] += 1
return self.cache[cache_key]
self.metrics["total_calls"] += 1
start = time.time()
try:
definition = self.fetch_definition(custom_code_id)
self.validate_inputs(definition, inputs)
result = self.execute_with_polling(custom_code_id, inputs, timeout_seconds)
latency_ms = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
(self.metrics["avg_latency_ms"] * (self.metrics["total_calls"] - 1) + latency_ms)
/ self.metrics["total_calls"]
)
self.cache[cache_key] = result
self._write_audit_log(custom_code_id, inputs, result, latency_ms, status="success")
return result
except Exception as e:
latency_ms = (time.time() - start) * 1000
error_detail = {
"type": type(e).__name__,
"message": str(e),
"traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__))
}
self._write_audit_log(custom_code_id, inputs, error_detail, latency_ms, status="error")
raise
def _write_audit_log(
self,
custom_code_id: str,
inputs: Dict[str, Any],
payload: Any,
latency_ms: float,
status: str
) -> None:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"customCodeId": custom_code_id,
"inputs": inputs,
"latencyMs": round(latency_ms, 2),
"status": status,
"payload": payload
}
self.audit_logger.info(json.dumps(log_entry))
Step 4: Expose a Custom Code Sandbox for Unit Testing
You cannot rely on the production environment for unit tests. This sandbox class mimics the execution lifecycle, validates input schemas, and simulates asynchronous status transitions without network calls.
import threading
from enum import Enum
from typing import Callable, Optional
class ExecutionStatus(Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class LocalCustomCodeSandbox:
"""Offline sandbox for unit testing custom code invocation logic."""
def __init__(self):
self.executions: Dict[str, Dict[str, Any]] = {}
self.mock_function: Optional[Callable] = None
def register_function(self, func: Callable) -> None:
self.mock_function = func
def simulate_post(self, custom_code_id: str, inputs: Dict[str, Any]) -> str:
execution_id = str(uuid.uuid4())
self.executions[execution_id] = {
"id": execution_id,
"status": ExecutionStatus.QUEUED.value,
"result": None,
"error": None
}
def _run():
time.sleep(0.5)
self.executions[execution_id]["status"] = ExecutionStatus.RUNNING.value
try:
result = self.mock_function(**inputs) if self.mock_function else inputs
self.executions[execution_id]["status"] = ExecutionStatus.COMPLETED.value
self.executions[execution_id]["result"] = result
except Exception as e:
self.executions[execution_id]["status"] = ExecutionStatus.FAILED.value
self.executions[execution_id]["error"] = str(e)
threading.Thread(target=_run, daemon=True).start()
return execution_id
def simulate_get(self, execution_id: str) -> Dict[str, Any]:
if execution_id not in self.executions:
raise KeyError("Execution not found")
return self.executions[execution_id]
Complete Working Example
The following script integrates authentication, invocation, caching, metrics, audit logging, and sandbox testing into a single runnable module. Replace the credentials and custom code ID before execution.
import sys
import traceback
import time
import uuid
import json
import logging
from typing import Any, Dict, Optional
from cachetools import TTLCache
from genesyscloud.platform_client import PlatformClient
from genesyscloud.auth import OAuthClientCredentialsClient
from genesyscloud.custom_code_api import CustomCodeApi
from genesyscloud.platform_client.rest import ApiException
class CustomCodeInvoker:
def __init__(self, platform: PlatformClient, cache_ttl: int = 300):
self.custom_code_api = CustomCodeApi(platform)
self.cache = TTLCache(maxsize=1024, ttl=cache_ttl)
self.metrics = {"total_calls": 0, "cache_hits": 0, "avg_latency_ms": 0.0}
self.audit_logger = logging.getLogger("customcode.audit")
self.audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler("customcode_audit.log")
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
self.audit_logger.addHandler(handler)
def fetch_definition(self, custom_code_id: str) -> Dict[str, Any]:
try:
response = self.custom_code_api.get_customcode_by_id(custom_code_id)
return response.to_dict()
except ApiException as e:
raise RuntimeError(f"Failed to fetch custom code definition: {e.status} {e.body}") from e
def validate_inputs(self, definition: Dict[str, Any], inputs: Dict[str, Any]) -> None:
allowed_inputs = definition.get("inputs", [])
for key in inputs:
if key not in allowed_inputs:
raise ValueError(f"Input parameter '{key}' is not defined in custom code schema.")
sandbox = definition.get("sandbox", {})
if sandbox.get("timeout", 30) < 30:
raise PermissionError("Custom code sandbox timeout is restricted below minimum threshold.")
def execute_with_polling(self, custom_code_id: str, inputs: Dict[str, Any], timeout_seconds: int = 120) -> Dict[str, Any]:
execution_id = self._post_execution(custom_code_id, inputs)
start_time = time.time()
poll_interval = 1.0
while time.time() - start_time < timeout_seconds:
try:
result = self.custom_code_api.get_customcode_executions_by_executionid(execution_id)
status = result.status
if status in ("completed", "failed", "timed_out"):
return {"executionId": execution_id, "status": status, "result": result.result, "error": result.error}
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, 8.0)
except ApiException as e:
if e.status == 429:
retry_after = float(e.headers.get("Retry-After", poll_interval * 2))
time.sleep(retry_after)
continue
raise RuntimeError(f"Polling failed: {e.status} {e.body}") from e
raise TimeoutError(f"Custom code execution exceeded {timeout_seconds} second limit.")
def _post_execution(self, custom_code_id: str, inputs: Dict[str, Any]) -> str:
try:
body = {"customCodeId": custom_code_id, "inputs": inputs, "timeout": 30}
response = self.custom_code_api.post_customcode_executions(body=body)
return response.id
except ApiException as e:
raise RuntimeError(f"Execution POST failed: {e.status} {e.body}") from e
def _get_cache_key(self, custom_code_id: str, inputs: Dict[str, Any]) -> str:
payload = json.dumps({"id": custom_code_id, "inputs": inputs}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
def invoke_cached(self, custom_code_id: str, inputs: Dict[str, Any], timeout_seconds: int = 120) -> Dict[str, Any]:
import hashlib
cache_key = self._get_cache_key(custom_code_id, inputs)
if cache_key in self.cache:
self.metrics["cache_hits"] += 1
self.metrics["total_calls"] += 1
return self.cache[cache_key]
self.metrics["total_calls"] += 1
start = time.time()
try:
definition = self.fetch_definition(custom_code_id)
self.validate_inputs(definition, inputs)
result = self.execute_with_polling(custom_code_id, inputs, timeout_seconds)
latency_ms = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
(self.metrics["avg_latency_ms"] * (self.metrics["total_calls"] - 1) + latency_ms)
/ self.metrics["total_calls"]
)
self.cache[cache_key] = result
self._write_audit_log(custom_code_id, inputs, result, latency_ms, status="success")
return result
except Exception as e:
latency_ms = (time.time() - start) * 1000
error_detail = {
"type": type(e).__name__,
"message": str(e),
"traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__))
}
self._write_audit_log(custom_code_id, inputs, error_detail, latency_ms, status="error")
raise
def _write_audit_log(self, custom_code_id: str, inputs: Dict[str, Any], payload: Any, latency_ms: float, status: str) -> None:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"customCodeId": custom_code_id,
"inputs": inputs,
"latencyMs": round(latency_ms, 2),
"status": status,
"payload": payload
}
self.audit_logger.info(json.dumps(log_entry))
def build_platform_client(environment: str, client_id: str, client_secret: str) -> PlatformClient:
platform = PlatformClient()
platform.set_base_url(f"https://{environment}")
oauth = OAuthClientCredentialsClient(
platform,
client_id=client_id,
client_secret=client_secret,
scope="customcode:execute customcode:view"
)
platform.set_auth_client(oauth)
return platform
if __name__ == "__main__":
ENV = "api.mypurecloud.com"
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
CUSTOM_CODE_ID = "YOUR_CUSTOM_CODE_ID"
platform = build_platform_client(ENV, CLIENT_ID, CLIENT_SECRET)
invoker = CustomCodeInvoker(platform)
try:
result = invoker.invoke_cached(CUSTOM_CODE_ID, {"customerId": "10024", "sessionId": "abc-123"})
print(json.dumps(result, indent=2))
print(f"Metrics: {invoker.metrics}")
except Exception as e:
print(f"Invocation failed: {e}", file=sys.stderr)
sys.exit(1)
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
- Cause: Missing
customcode:executescope, expired token, or incorrect environment domain. - Fix: Verify the OAuth client credentials match the environment. Ensure the scope string includes both
customcode:executeandcustomcode:view. The SDK refreshes tokens automatically, but initial credential errors will fail immediately. - Code Fix:
# Verify scope configuration
oauth = OAuthClientCredentialsClient(platform, client_id=client_id, client_secret=client_secret, scope="customcode:execute customcode:view")
Error: 429 Too Many Requests
- Cause: Polling frequency exceeds Genesys Cloud rate limits. Custom code execution polling triggers per-second quotas.
- Fix: Implement exponential backoff. The provided
execute_with_pollingmethod reads theRetry-Afterheader and scales the interval up to 8 seconds. - Code Fix:
except ApiException as e:
if e.status == 429:
retry_after = float(e.headers.get("Retry-After", poll_interval * 2))
time.sleep(retry_after)
continue
Error: 504 Gateway Timeout or timed_out Status
- Cause: Custom code execution exceeded the sandbox timeout limit or blocked on an external network call.
- Fix: Increase the
timeoutfield in the POST payload or optimize the custom code script. Genesys Cloud enforces hard sandbox limits that cannot be bypassed client-side. - Code Fix:
body = {"customCodeId": custom_code_id, "inputs": inputs, "timeout": 60}
Error: Input Parameter Mismatch
- Cause: The
inputsdictionary contains keys not defined in the custom code schema. - Fix: Run
validate_inputsbefore invocation. Genesys Cloud rejects payloads with undefined keys at the API layer. - Code Fix:
allowed_inputs = definition.get("inputs", [])
for key in inputs:
if key not in allowed_inputs:
raise ValueError(f"Input parameter '{key}' is not defined in custom code schema.")