Configuring and Routing External LLM Requests Through NICE Cognigy.AI LLM Gateway in Python
What You Will Build
- A production-ready Python module that registers external model endpoints with automatic API key rotation, injects dynamic variables into prompt templates, streams inference responses over WebSocket, implements fallback logic for provider failures, validates outputs against safety filters and JSON schemas, tracks token usage and latency per dialog session, and exposes a gateway health check endpoint.
- The code uses the NICE Cognigy.AI LLM Gateway REST API and WebSocket streaming interface directly via
requestsandwebsockets. - The implementation is written in Python 3.10+ with strict type hints and production-grade error handling.
Prerequisites
- A NICE Cognigy.AI organization with the LLM Gateway feature enabled
- An API key with the following permissions:
llm-gateway:manage,llm-inference:execute,llm-inference:read - Python 3.10 or higher
- External dependencies:
requests>=2.31,websockets>=12.0,pydantic>=2.5,structlog>=23.0 - An external LLM provider account (OpenAI, Anthropic, or Azure AI) with at least two API keys for rotation testing
Authentication Setup
Cognigy.AI authenticates gateway configuration and inference requests using a Bearer token derived from your organization API key. The token does not expire, but you must implement credential rotation logic on your side if your security policy requires it. The following setup establishes the base client configuration and validates connectivity before any inference occurs.
import os
import requests
from typing import Optional
class CognigyGatewayClient:
def __init__(self, org_id: str, api_key: str, base_url: Optional[str] = None):
self.org_id = org_id
self.api_key = api_key
self.base_url = base_url or f"https://{org_id}.cognigy.ai"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def validate_credentials(self) -> bool:
"""Verify API key permissions before proceeding."""
try:
response = requests.get(
f"{self.base_url}/api/v1/llm-gateway/health",
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.status_code == 200
except requests.exceptions.RequestException as exc:
raise RuntimeError(f"Authentication failed: {exc}") from exc
The validate_credentials method performs a synchronous health check against the gateway endpoint. A 401 Unauthorized response indicates an invalid or revoked API key. A 403 Forbidden response indicates missing scopes. The method raises a RuntimeError to fail fast during initialization.
Implementation
Step 1: Register External Provider with API Key Rotation
The LLM Gateway supports multiple API keys per provider to prevent downtime during credential rotation. You register the provider with a list of keys. The gateway automatically cycles through valid keys and marks failed keys as unhealthy until they recover.
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def register_provider_with_rotation(self, provider_name: str, api_keys: list[str], model_id: str) -> dict:
"""Register an external LLM provider with automatic key rotation."""
payload = {
"name": provider_name,
"type": "OPENAI",
"apiKeyRotation": {
"enabled": True,
"keys": api_keys,
"rotationStrategy": "ROUND_ROBIN_ON_FAILURE"
},
"defaultModel": model_id,
"timeoutMs": 15000,
"retryOnRateLimit": True
}
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
try:
response = session.post(
f"{self.base_url}/api/v1/llm-gateway/providers",
headers=self.headers,
json=payload,
timeout=15
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as exc:
if response.status_code == 409:
return {"status": "exists", "message": "Provider already registered"}
raise
The endpoint requires the llm-gateway:manage scope. The ROUND_ROBIN_ON_FAILURE strategy ensures that if one key hits a 429 or 401, the gateway routes the next request to the next key in the array. The retry adapter handles transient 429 and 5xx responses at the HTTP client level before the request reaches the gateway.
Step 2: Construct Prompt Templates with Dynamic Variable Injection
Cognigy.AI evaluates prompt templates server-side before forwarding them to the external model. Templates use double-brace syntax for variable injection. You pass the template string and a variables object in the inference request. The gateway resolves variables at runtime and enforces template length limits.
def build_inference_request(self, template: str, variables: dict, session_id: str) -> dict:
"""Construct a gateway inference payload with template injection."""
return {
"sessionId": session_id,
"template": template,
"variables": variables,
"modelId": "gpt-4-1106-preview",
"streaming": True,
"fallbackModels": ["claude-3-haiku-20240307"],
"safetyFilters": ["profanity", "pii", "self_harm"],
"responseFormat": {
"type": "json_schema",
"schema": {
"type": "object",
"properties": {
"intent": {"type": "string"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
"entities": {"type": "array", "items": {"type": "string"}}
},
"required": ["intent", "confidence", "entities"]
}
}
}
The template field accepts strings like "Extract the user intent from: {{user_message}}. Respond strictly in JSON.". The variables dictionary maps {"user_message": "I want to cancel my subscription"}. The gateway resolves {{user_message}} before sending the prompt to the external model. The responseFormat block enforces JSON schema validation on the output. If the external model returns malformed JSON, the gateway returns a 422 Validation Failed error.
Step 3: Handle Streaming Inference via WebSocket
Streaming inference requires a persistent WebSocket connection. The gateway pushes incremental tokens as they arrive. You must handle connection lifecycle, chunk accumulation, and final metadata extraction.
import asyncio
import websockets
import json
from dataclasses import dataclass, field
from typing import AsyncGenerator
@dataclass
class InferenceResult:
full_response: str = field(default_factory=str)
prompt_tokens: int = 0
completion_tokens: int = 0
latency_ms: int = 0
model_used: str = ""
error: Optional[str] = None
async def stream_inference(self, payload: dict) -> AsyncGenerator[InferenceResult, None]:
"""Establish WebSocket connection and stream inference chunks."""
ws_url = f"wss://{self.org_id}.cognigy.ai/ws/v1/llm-gateway/stream"
start_time = time.time()
result = InferenceResult()
try:
async with websockets.connect(ws_url, additional_headers={"Authorization": f"Bearer {self.api_key}"}) as ws:
await ws.send(json.dumps(payload))
while True:
message = await asyncio.wait_for(ws.recv(), timeout=30.0)
chunk = json.loads(message)
if chunk.get("type") == "error":
result.error = chunk.get("message", "Unknown streaming error")
yield result
return
if chunk.get("type") == "content":
result.full_response += chunk.get("delta", "")
yield result
if chunk.get("type") == "complete":
result.prompt_tokens = chunk.get("usage", {}).get("prompt_tokens", 0)
result.completion_tokens = chunk.get("usage", {}).get("completion_tokens", 0)
result.latency_ms = int((time.time() - start_time) * 1000)
result.model_used = chunk.get("model", payload.get("modelId", "unknown"))
yield result
return
except websockets.exceptions.ConnectionClosed as exc:
result.error = f"WebSocket closed unexpectedly: {exc}"
yield result
except asyncio.TimeoutError:
result.error = "Inference timed out after 30 seconds"
yield result
The WebSocket endpoint requires the llm-inference:execute scope. The gateway emits three frame types: content (incremental tokens), complete (final metadata and usage), and error (provider failure or validation error). The asyncio.wait_for wrapper enforces a hard timeout to prevent hung connections. The InferenceResult dataclass accumulates state across yields.
Step 4: Implement Fallback Logic, Safety Validation, and Cost Tracking
The gateway handles fallback routing automatically when you specify fallbackModels in the payload. If the primary model times out or returns a 429, the gateway retries with the next model in the list. You must validate the final output against your schema and log cost metrics per session.
from pydantic import BaseModel, ValidationError
import structlog
logger = structlog.get_logger()
class IntentResponse(BaseModel):
intent: str
confidence: float
entities: list[str]
def validate_and_log_result(self, result: InferenceResult, session_id: str) -> Optional[IntentResponse]:
"""Validate LLM output against schema and log performance metrics."""
if result.error:
logger.warning("inference_failed", session_id=session_id, error=result.error)
return None
try:
parsed = IntentResponse.model_validate_json(result.full_response)
except ValidationError as exc:
logger.error("schema_validation_failed", session_id=session_id, raw_output=result.full_response)
return None
total_cost = (result.prompt_tokens * 0.000001) + (result.completion_tokens * 0.000002)
logger.info(
"inference_completed",
session_id=session_id,
model=result.model_used,
latency_ms=result.latency_ms,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
estimated_cost_usd=total_cost
)
return parsed
The validate_and_log_result method uses Pydantic to enforce the JSON schema defined in Step 2. If the external model returns a string that does not match the schema, the method logs the failure and returns None. The cost calculation uses standard OpenAI pricing as an example. You must adjust the multipliers to match your actual provider rates. The structlog library emits structured logs that integrate with observability platforms like Datadog or Splunk.
Complete Working Example
import asyncio
import os
import requests
import websockets
import json
import time
from typing import Optional
from dataclasses import dataclass, field
from pydantic import BaseModel, ValidationError
import structlog
structlog.configure(
processors=[
structlog.processors.JSONRenderer()
],
logger_factory=structlog.PrintLoggerFactory()
)
logger = structlog.get_logger()
@dataclass
class InferenceResult:
full_response: str = field(default_factory=str)
prompt_tokens: int = 0
completion_tokens: int = 0
latency_ms: int = 0
model_used: str = ""
error: Optional[str] = None
class IntentResponse(BaseModel):
intent: str
confidence: float
entities: list[str]
class CognigyLLMGateway:
def __init__(self, org_id: str, api_key: str, base_url: Optional[str] = None):
self.org_id = org_id
self.api_key = api_key
self.base_url = base_url or f"https://{org_id}.cognigy.ai"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def check_health(self) -> bool:
try:
resp = requests.get(f"{self.base_url}/api/v1/llm-gateway/health", headers=self.headers, timeout=10)
resp.raise_for_status()
return True
except requests.exceptions.RequestException as exc:
logger.error("health_check_failed", error=str(exc))
return False
def register_provider(self, provider_name: str, api_keys: list[str], model_id: str) -> dict:
payload = {
"name": provider_name,
"type": "OPENAI",
"apiKeyRotation": {"enabled": True, "keys": api_keys, "rotationStrategy": "ROUND_ROBIN_ON_FAILURE"},
"defaultModel": model_id,
"timeoutMs": 15000,
"retryOnRateLimit": True
}
try:
resp = requests.post(f"{self.base_url}/api/v1/llm-gateway/providers", headers=self.headers, json=payload, timeout=15)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 409:
return {"status": "exists"}
raise
async def stream_inference(self, template: str, variables: dict, session_id: str) -> InferenceResult:
payload = {
"sessionId": session_id,
"template": template,
"variables": variables,
"modelId": "gpt-4-1106-preview",
"streaming": True,
"fallbackModels": ["claude-3-haiku-20240307"],
"safetyFilters": ["profanity", "pii", "self_harm"],
"responseFormat": {
"type": "json_schema",
"schema": {
"type": "object",
"properties": {
"intent": {"type": "string"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
"entities": {"type": "array", "items": {"type": "string"}}
},
"required": ["intent", "confidence", "entities"]
}
}
}
ws_url = f"wss://{self.org_id}.cognigy.ai/ws/v1/llm-gateway/stream"
start_time = time.time()
result = InferenceResult()
try:
async with websockets.connect(ws_url, additional_headers={"Authorization": f"Bearer {self.api_key}"}) as ws:
await ws.send(json.dumps(payload))
while True:
message = await asyncio.wait_for(ws.recv(), timeout=30.0)
chunk = json.loads(message)
if chunk.get("type") == "error":
result.error = chunk.get("message", "Streaming error")
return result
if chunk.get("type") == "content":
result.full_response += chunk.get("delta", "")
if chunk.get("type") == "complete":
result.prompt_tokens = chunk.get("usage", {}).get("prompt_tokens", 0)
result.completion_tokens = chunk.get("usage", {}).get("completion_tokens", 0)
result.latency_ms = int((time.time() - start_time) * 1000)
result.model_used = chunk.get("model", "unknown")
return result
except websockets.exceptions.ConnectionClosed as exc:
result.error = f"WebSocket closed: {exc}"
return result
except asyncio.TimeoutError:
result.error = "Inference timed out"
return result
def validate_and_log(self, result: InferenceResult, session_id: str) -> Optional[IntentResponse]:
if result.error:
logger.warning("inference_failed", session_id=session_id, error=result.error)
return None
try:
parsed = IntentResponse.model_validate_json(result.full_response)
except ValidationError:
logger.error("schema_validation_failed", session_id=session_id, raw=result.full_response)
return None
cost = (result.prompt_tokens * 0.000001) + (result.completion_tokens * 0.000002)
logger.info("inference_completed", session_id=session_id, model=result.model_used,
latency_ms=result.latency_ms, tokens=result.prompt_tokens + result.completion_tokens, cost=cost)
return parsed
async def main():
org = os.getenv("COGNIGY_ORG_ID")
key = os.getenv("COGNIGY_API_KEY")
gateway = CognigyLLMGateway(org, key)
if not gateway.check_health():
raise RuntimeError("Gateway health check failed")
gateway.register_provider("production-openai", ["sk-key1", "sk-key2"], "gpt-4-1106-preview")
template = "Extract intent and entities from: {{user_input}}. Return JSON only."
variables = {"user_input": "I need to reset my password and update my billing address"}
session = "dialog-9f8a7b6c"
result = await gateway.stream_inference(template, variables, session)
validated = gateway.validate_and_log(result, session)
if validated:
print(f"Intent: {validated.intent} | Confidence: {validated.confidence}")
else:
print("Inference failed validation or encountered an error.")
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Connection
- Cause: The API key is invalid, expired, or lacks the
llm-inference:executepermission. - Fix: Verify the key in the Cognigy.AI admin console. Ensure the
Authorizationheader matches exactlyBearer <KEY>. Regenerate the key if it was rotated externally. - Code adjustment: Add explicit scope validation during initialization by calling
/api/v1/llm-gateway/healthand checking theX-Permissionsresponse header.
Error: 429 Too Many Requests on Provider Registration
- Cause: The gateway rate-limits configuration endpoints to prevent accidental overwrites.
- Fix: Implement exponential backoff. The retry adapter in Step 1 handles this automatically. If you encounter persistent 429s, reduce the frequency of provider updates and cache the registration response.
- Code adjustment: Increase
backoff_factorin theRetryconfiguration to2and settotal=5.
Error: WebSocket Frame Timeout or ConnectionClosed
- Cause: The external model exceeded the gateway timeout threshold or the network dropped the persistent connection.
- Fix: Increase
timeoutMsin the provider configuration. Verify firewall rules allow outbound WebSocket traffic on port 443. Theasyncio.wait_forwrapper in Step 3 enforces a client-side timeout to prevent indefinite hangs. - Code adjustment: Wrap the WebSocket connection in a retry loop that reconnects up to three times before raising a fatal error.
Error: JSON Schema Validation Failed
- Cause: The external model returned unstructured text instead of the requested JSON format, or the schema definition contains invalid JSON Schema syntax.
- Fix: Add explicit system instructions in the template:
"Respond strictly in valid JSON matching the provided schema. Do not include markdown formatting."Validate the schema against a JSON Schema linter before deployment. - Code adjustment: Use
pydantic’smodel_validate_jsonwithstrict=Trueto reject non-conforming outputs immediately.