Offload Cognigy.AI Intent Resolution to an External LLM Gateway with Context and Token Management
What You Will Build
- A Python Flask microservice that intercepts user utterances from NICE Cognigy.AI, maintains a strict conversation context window, enforces token usage limits, routes structured prompts to an OpenAI-compatible LLM gateway, and returns classified intents back to Cognigy.
- The service uses the Cognigy.AI REST API (
/api/v1/botand/api/v1/dialog/sessions/{id}/messages) for session synchronization and the OpenAIchat/completionsendpoint for intent classification. - The implementation is written in Python 3.10+ using Flask, httpx, tiktoken, and pydantic.
Prerequisites
- Cognigy.AI API key with
Bot Execution,Dialog Management, andSession Variablespermissions - LLM gateway endpoint supporting OpenAI-compatible chat completions with a valid API key
- Python 3.10 or higher
pip install flask httpx tiktoken pydantic python-dotenv- Environment variables:
COGNIGY_ORG,COGNIGY_API_KEY,LLM_API_KEY,LLM_BASE_URL,LLM_MODEL,MAX_CONTEXT_TOKENS
Authentication Setup
Cognigy.AI authenticates external services using API keys passed as Bearer tokens. The LLM gateway uses standard Bearer token authentication. Both clients require explicit retry logic for rate limits and transient failures.
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
COGNIGY_API_KEY = os.getenv("COGNIGY_API_KEY")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
def create_cognigy_client() -> httpx.AsyncClient:
transport = httpx.AsyncHTTPTransport(retries=3)
return httpx.AsyncClient(
base_url=f"https://{os.getenv('COGNIGY_ORG')}.cognigy.ai/api/v1",
transport=transport,
headers={"Authorization": f"Bearer {COGNIGY_API_KEY}", "Content-Type": "application/json"},
timeout=httpx.Timeout(30.0, connect=10.0)
)
def create_llm_client() -> httpx.AsyncClient:
transport = httpx.AsyncHTTPTransport(retries=3)
return httpx.AsyncClient(
base_url=LLM_BASE_URL,
transport=transport,
headers={"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"},
timeout=httpx.Timeout(60.0, connect=10.0)
)
The Cognigy client requires Bot Execution and Dialog Management permissions. The LLM client requires standard API access. Both clients use httpx.AsyncHTTPTransport with automatic retries for 5xx responses and configurable backoff for 429 responses.
Implementation
Step 1: Initialize HTTP Clients and Retry Logic
Rate limiting is the most common failure mode when offloading to external LLM gateways. The httpx library handles 5xx retries automatically, but 429 responses require explicit backoff logic. The following function wraps the LLM call with exponential backoff.
import asyncio
import httpx
async def call_llm_with_retry(client: httpx.AsyncClient, payload: dict, max_retries: int = 4) -> dict:
for attempt in range(max_retries):
try:
response = await client.post("/chat/completions", json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429 and attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Rate limited. Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
raise exc
except httpx.RequestError as exc:
print(f"Network error on attempt {attempt + 1}: {exc}")
if attempt == max_retries - 1:
raise
This function catches 429 Too Many Requests and implements exponential backoff. It re-raises all other HTTP errors and network failures after the final retry attempt.
Step 2: Build the Context Window and Token Budget Manager
LLM context windows have hard limits. Maintaining a sliding window requires accurate token counting and deterministic trimming. The tiktoken library provides byte-level accurate tokenization for OpenAI models.
import tiktoken
from typing import List, Dict, Optional
class ConversationContextManager:
def __init__(self, model_encoding: str, max_tokens: int):
self.encoder = tiktoken.encoding_for_model(model_encoding)
self.max_tokens = max_tokens
self.history: Dict[str, List[Dict[str, str]]] = {}
def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
total = 0
for msg in messages:
total += len(self.encoder.encode(msg["content"]))
total += 4 # Account for role and name metadata overhead
return total
def add_message(self, session_id: str, role: str, content: str) -> None:
if session_id not in self.history:
self.history[session_id] = []
self.history[session_id].append({"role": role, "content": content})
self._enforce_token_limit(session_id)
def _enforce_token_limit(self, session_id: str) -> None:
while self._count_tokens(self.history[session_id]) > self.max_tokens and len(self.history[session_id]) > 1:
self.history[session_id].pop(0)
def get_context(self, session_id: str) -> List[Dict[str, str]]:
return self.history.get(session_id, [])
The manager stores messages per session identifier. The _enforce_token_limit method removes the oldest messages until the token count falls below max_tokens. The encoder overhead accounts for JSON structure and role metadata that OpenAI counts toward the limit.
Step 3: Implement the Intent Resolution Endpoint and Cognigy Sync
The Flask route receives the Cognigy.AI HTTP Action payload, extracts the session identifier and user message, updates the context window, calls the LLM gateway, parses the structured response, and synchronizes the result back to Cognigy.AI session variables.
import json
import os
from flask import Flask, request, jsonify
from pydantic import ValidationError
app = Flask(__name__)
cognigy_client = create_cognigy_client()
llm_client = create_llm_client()
context_manager = ConversationContextManager(
model_encoding=os.getenv("LLM_MODEL", "gpt-4o"),
max_tokens=int(os.getenv("MAX_CONTEXT_TOKENS", "4000"))
)
SYSTEM_PROMPT = """You are an intent classification engine. Analyze the user message and conversation history.
Return ONLY a JSON object with this structure:
{"intent": "string", "confidence": float, "entities": []}
Valid intents: booking_flight, changing_reservation, canceling_reservation, general_inquiry, escalation_required"""
@app.route("/resolve-intent", methods=["POST"])
async def resolve_intent():
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "Missing JSON payload"}), 400
session_id = payload.get("session", {}).get("id")
user_message = payload.get("user", {}).get("message", "")
if not session_id or not user_message:
return jsonify({"error": "Missing session.id or user.message"}), 400
context_manager.add_message(session_id, "user", user_message)
context = context_manager.get_context(session_id)
llm_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + context
llm_payload = {
"model": os.getenv("LLM_MODEL", "gpt-4o"),
"messages": llm_messages,
"temperature": 0.0,
"max_tokens": 150,
"response_format": {"type": "json_object"}
}
llm_response = await call_llm_with_retry(llm_client, llm_payload)
raw_content = llm_response["choices"][0]["message"]["content"]
intent_data = json.loads(raw_content)
context_manager.add_message(session_id, "assistant", json.dumps(intent_data))
await sync_to_cognigy(session_id, intent_data)
return jsonify({
"status": "success",
"intent": intent_data["intent"],
"confidence": intent_data["confidence"],
"entities": intent_data.get("entities", [])
})
except httpx.HTTPStatusError as exc:
status = exc.response.status_code
if status == 401:
return jsonify({"error": "Authentication failed. Verify API keys."}), 401
if status == 403:
return jsonify({"error": "Forbidden. Check Cognigy permissions or LLM model access."}), 403
return jsonify({"error": f"HTTP {status} from external service"}), 502
except json.JSONDecodeError:
return jsonify({"error": "LLM returned invalid JSON"}), 500
except Exception as exc:
return jsonify({"error": f"Internal processing failure: {str(exc)}"}), 500
async def sync_to_cognigy(session_id: str, intent_data: dict) -> None:
bot_id = os.getenv("COGNIGY_BOT_ID")
sync_payload = {
"botId": bot_id,
"sessionId": session_id,
"variables": {
"resolved_intent": intent_data["intent"],
"intent_confidence": str(intent_data["confidence"]),
"llm_processed": "true"
}
}
try:
response = await cognigy_client.post(f"/bot", json=sync_payload)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
print(f"Cognigy sync failed: {exc.response.status_code} - {exc.response.text}")
The endpoint requires the Bot Execution scope for the /bot POST call. The /resolve-intent route enforces strict JSON parsing and maps LLM failures to appropriate HTTP status codes. The sync_to_cognigy function updates Cognigy session variables so downstream flow nodes can branch on resolved_intent.
Complete Working Example
import asyncio
import httpx
import json
import os
from typing import Dict, List
from flask import Flask, request, jsonify
from dotenv import load_dotenv
import tiktoken
load_dotenv()
COGNIGY_ORG = os.getenv("COGNIGY_ORG")
COGNIGY_API_KEY = os.getenv("COGNIGY_API_KEY")
COGNIGY_BOT_ID = os.getenv("COGNIGY_BOT_ID")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
MAX_CONTEXT_TOKENS = int(os.getenv("MAX_CONTEXT_TOKENS", "4000"))
SYSTEM_PROMPT = """You are an intent classification engine. Analyze the user message and conversation history.
Return ONLY a JSON object with this structure:
{"intent": "string", "confidence": float, "entities": []}
Valid intents: booking_flight, changing_reservation, canceling_reservation, general_inquiry, escalation_required"""
class ConversationContextManager:
def __init__(self, model_encoding: str, max_tokens: int):
self.encoder = tiktoken.encoding_for_model(model_encoding)
self.max_tokens = max_tokens
self.history: Dict[str, List[Dict[str, str]]] = {}
def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
total = 0
for msg in messages:
total += len(self.encoder.encode(msg["content"]))
total += 4
return total
def add_message(self, session_id: str, role: str, content: str) -> None:
if session_id not in self.history:
self.history[session_id] = []
self.history[session_id].append({"role": role, "content": content})
self._enforce_token_limit(session_id)
def _enforce_token_limit(self, session_id: str) -> None:
while self._count_tokens(self.history[session_id]) > self.max_tokens and len(self.history[session_id]) > 1:
self.history[session_id].pop(0)
def get_context(self, session_id: str) -> List[Dict[str, str]]:
return self.history.get(session_id, [])
def create_cognigy_client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=f"https://{COGNIGY_ORG}.cognigy.ai/api/v1",
transport=httpx.AsyncHTTPTransport(retries=3),
headers={"Authorization": f"Bearer {COGNIGY_API_KEY}", "Content-Type": "application/json"},
timeout=httpx.Timeout(30.0, connect=10.0)
)
def create_llm_client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=LLM_BASE_URL,
transport=httpx.AsyncHTTPTransport(retries=3),
headers={"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"},
timeout=httpx.Timeout(60.0, connect=10.0)
)
async def call_llm_with_retry(client: httpx.AsyncClient, payload: dict, max_retries: int = 4) -> dict:
for attempt in range(max_retries):
try:
response = await client.post("/chat/completions", json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429 and attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
else:
raise exc
except httpx.RequestError:
if attempt == max_retries - 1:
raise
cognigy_client = create_cognigy_client()
llm_client = create_llm_client()
context_manager = ConversationContextManager(LLM_MODEL, MAX_CONTEXT_TOKENS)
app = Flask(__name__)
@app.route("/resolve-intent", methods=["POST"])
async def resolve_intent():
try:
payload = request.get_json()
if not payload:
return jsonify({"error": "Missing JSON payload"}), 400
session_id = payload.get("session", {}).get("id")
user_message = payload.get("user", {}).get("message", "")
if not session_id or not user_message:
return jsonify({"error": "Missing session.id or user.message"}), 400
context_manager.add_message(session_id, "user", user_message)
context = context_manager.get_context(session_id)
llm_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + context
llm_payload = {
"model": LLM_MODEL,
"messages": llm_messages,
"temperature": 0.0,
"max_tokens": 150,
"response_format": {"type": "json_object"}
}
llm_response = await call_llm_with_retry(llm_client, llm_payload)
raw_content = llm_response["choices"][0]["message"]["content"]
intent_data = json.loads(raw_content)
context_manager.add_message(session_id, "assistant", json.dumps(intent_data))
sync_payload = {
"botId": COGNIGY_BOT_ID,
"sessionId": session_id,
"variables": {
"resolved_intent": intent_data["intent"],
"intent_confidence": str(intent_data["confidence"]),
"llm_processed": "true"
}
}
try:
response = await cognigy_client.post("/bot", json=sync_payload)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
print(f"Cognigy sync failed: {exc.response.status_code}")
return jsonify({
"status": "success",
"intent": intent_data["intent"],
"confidence": intent_data["confidence"],
"entities": intent_data.get("entities", [])
})
except httpx.HTTPStatusError as exc:
status = exc.response.status_code
if status == 401:
return jsonify({"error": "Authentication failed. Verify API keys."}), 401
if status == 403:
return jsonify({"error": "Forbidden. Check permissions."}), 403
return jsonify({"error": f"HTTP {status} from external service"}), 502
except json.JSONDecodeError:
return jsonify({"error": "LLM returned invalid JSON"}), 500
except Exception as exc:
return jsonify({"error": f"Internal processing failure: {str(exc)}"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Run the script with python app.py. The service listens on port 5000. Configure the Cognigy.AI HTTP Action node to POST to http://<host>:5000/resolve-intent with the request body set to {{payload}}.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Missing or expired API key in the
Authorizationheader. Cognigy.AI rejects requests without valid Bearer tokens. The LLM gateway returns 401 when the key lacks access to the specified model. - Fix: Verify
COGNIGY_API_KEYandLLM_API_KEYin the environment. Regenerate the Cognigy key if it was rotated. Ensure the LLM key has read access to the target model. - Code check: Print the header value before sending. Confirm the key does not contain trailing whitespace.
Error: 403 Forbidden
- Cause: The Cognigy API key lacks
Bot ExecutionorSession Variablespermissions. The LLM gateway may restrict access to certain models for the provided key. - Fix: In the Cognigy.AI console, navigate to Settings, API Keys, and assign the required permissions. For the LLM gateway, verify model access tiers.
- Code check: The sync function catches 403 and logs the response body. Parse the error message to identify the missing permission.
Error: 429 Too Many Requests
- Cause: The LLM gateway enforces rate limits per minute or per token. High concurrency from Cognigy flow execution triggers throttling.
- Fix: The retry logic implements exponential backoff. Increase
max_retriesif traffic spikes persist. Implement request queuing at the Flask level using Celery or RQ for production workloads. - Code check: Monitor the
Retry-Afterheader in 429 responses. Adjust backoff multipliers accordingly.
Error: Context Window Overflow or Token Mismatch
- Cause: The
tiktokenencoder version does not match the LLM model version. Some providers count system prompts differently. - Fix: Use
tiktoken.encoding_for_model(LLM_MODEL)to load the correct tokenizer. Add a 5 percent buffer toMAX_CONTEXT_TOKENSto account for metadata overhead. - Code check: Log the token count before and after trimming. Verify the encoder matches the exact model identifier (e.g.,
gpt-4o-2024-05-13).