Offload Cognigy.AI Intent Resolution to an External LLM Gateway with Context and Token Management

Offload Cognigy.AI Intent Resolution to an External LLM Gateway with Context and Token Management

What You Will Build

  • A Python Flask microservice that intercepts user utterances from NICE Cognigy.AI, maintains a strict conversation context window, enforces token usage limits, routes structured prompts to an OpenAI-compatible LLM gateway, and returns classified intents back to Cognigy.
  • The service uses the Cognigy.AI REST API (/api/v1/bot and /api/v1/dialog/sessions/{id}/messages) for session synchronization and the OpenAI chat/completions endpoint for intent classification.
  • The implementation is written in Python 3.10+ using Flask, httpx, tiktoken, and pydantic.

Prerequisites

  • Cognigy.AI API key with Bot Execution, Dialog Management, and Session Variables permissions
  • LLM gateway endpoint supporting OpenAI-compatible chat completions with a valid API key
  • Python 3.10 or higher
  • pip install flask httpx tiktoken pydantic python-dotenv
  • Environment variables: COGNIGY_ORG, COGNIGY_API_KEY, LLM_API_KEY, LLM_BASE_URL, LLM_MODEL, MAX_CONTEXT_TOKENS

Authentication Setup

Cognigy.AI authenticates external services using API keys passed as Bearer tokens. The LLM gateway uses standard Bearer token authentication. Both clients require explicit retry logic for rate limits and transient failures.

import httpx
import os
from dotenv import load_dotenv

load_dotenv()

COGNIGY_API_KEY = os.getenv("COGNIGY_API_KEY")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")

def create_cognigy_client() -> httpx.AsyncClient:
    transport = httpx.AsyncHTTPTransport(retries=3)
    return httpx.AsyncClient(
        base_url=f"https://{os.getenv('COGNIGY_ORG')}.cognigy.ai/api/v1",
        transport=transport,
        headers={"Authorization": f"Bearer {COGNIGY_API_KEY}", "Content-Type": "application/json"},
        timeout=httpx.Timeout(30.0, connect=10.0)
    )

def create_llm_client() -> httpx.AsyncClient:
    transport = httpx.AsyncHTTPTransport(retries=3)
    return httpx.AsyncClient(
        base_url=LLM_BASE_URL,
        transport=transport,
        headers={"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"},
        timeout=httpx.Timeout(60.0, connect=10.0)
    )

The Cognigy client requires Bot Execution and Dialog Management permissions. The LLM client requires standard API access. Both clients use httpx.AsyncHTTPTransport with automatic retries for 5xx responses and configurable backoff for 429 responses.

Implementation

Step 1: Initialize HTTP Clients and Retry Logic

Rate limiting is the most common failure mode when offloading to external LLM gateways. The httpx library handles 5xx retries automatically, but 429 responses require explicit backoff logic. The following function wraps the LLM call with exponential backoff.

import asyncio
import httpx

async def call_llm_with_retry(client: httpx.AsyncClient, payload: dict, max_retries: int = 4) -> dict:
    for attempt in range(max_retries):
        try:
            response = await client.post("/chat/completions", json=payload)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == 429 and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
            else:
                raise exc
        except httpx.RequestError as exc:
            print(f"Network error on attempt {attempt + 1}: {exc}")
            if attempt == max_retries - 1:
                raise

This function catches 429 Too Many Requests and implements exponential backoff. It re-raises all other HTTP errors and network failures after the final retry attempt.

Step 2: Build the Context Window and Token Budget Manager

LLM context windows have hard limits. Maintaining a sliding window requires accurate token counting and deterministic trimming. The tiktoken library provides byte-level accurate tokenization for OpenAI models.

import tiktoken
from typing import List, Dict, Optional

class ConversationContextManager:
    def __init__(self, model_encoding: str, max_tokens: int):
        self.encoder = tiktoken.encoding_for_model(model_encoding)
        self.max_tokens = max_tokens
        self.history: Dict[str, List[Dict[str, str]]] = {}

    def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg["content"]))
            total += 4  # Account for role and name metadata overhead
        return total

    def add_message(self, session_id: str, role: str, content: str) -> None:
        if session_id not in self.history:
            self.history[session_id] = []
        self.history[session_id].append({"role": role, "content": content})
        self._enforce_token_limit(session_id)

    def _enforce_token_limit(self, session_id: str) -> None:
        while self._count_tokens(self.history[session_id]) > self.max_tokens and len(self.history[session_id]) > 1:
            self.history[session_id].pop(0)

    def get_context(self, session_id: str) -> List[Dict[str, str]]:
        return self.history.get(session_id, [])

The manager stores messages per session identifier. The _enforce_token_limit method removes the oldest messages until the token count falls below max_tokens. The encoder overhead accounts for JSON structure and role metadata that OpenAI counts toward the limit.

Step 3: Implement the Intent Resolution Endpoint and Cognigy Sync

The Flask route receives the Cognigy.AI HTTP Action payload, extracts the session identifier and user message, updates the context window, calls the LLM gateway, parses the structured response, and synchronizes the result back to Cognigy.AI session variables.

import json
import os
from flask import Flask, request, jsonify
from pydantic import ValidationError

app = Flask(__name__)
cognigy_client = create_cognigy_client()
llm_client = create_llm_client()
context_manager = ConversationContextManager(
    model_encoding=os.getenv("LLM_MODEL", "gpt-4o"),
    max_tokens=int(os.getenv("MAX_CONTEXT_TOKENS", "4000"))
)

SYSTEM_PROMPT = """You are an intent classification engine. Analyze the user message and conversation history. 
Return ONLY a JSON object with this structure:
{"intent": "string", "confidence": float, "entities": []}
Valid intents: booking_flight, changing_reservation, canceling_reservation, general_inquiry, escalation_required"""

@app.route("/resolve-intent", methods=["POST"])
async def resolve_intent():
    try:
        payload = request.get_json()
        if not payload:
            return jsonify({"error": "Missing JSON payload"}), 400

        session_id = payload.get("session", {}).get("id")
        user_message = payload.get("user", {}).get("message", "")

        if not session_id or not user_message:
            return jsonify({"error": "Missing session.id or user.message"}), 400

        context_manager.add_message(session_id, "user", user_message)
        context = context_manager.get_context(session_id)

        llm_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + context
        llm_payload = {
            "model": os.getenv("LLM_MODEL", "gpt-4o"),
            "messages": llm_messages,
            "temperature": 0.0,
            "max_tokens": 150,
            "response_format": {"type": "json_object"}
        }

        llm_response = await call_llm_with_retry(llm_client, llm_payload)
        raw_content = llm_response["choices"][0]["message"]["content"]
        intent_data = json.loads(raw_content)

        context_manager.add_message(session_id, "assistant", json.dumps(intent_data))

        await sync_to_cognigy(session_id, intent_data)

        return jsonify({
            "status": "success",
            "intent": intent_data["intent"],
            "confidence": intent_data["confidence"],
            "entities": intent_data.get("entities", [])
        })

    except httpx.HTTPStatusError as exc:
        status = exc.response.status_code
        if status == 401:
            return jsonify({"error": "Authentication failed. Verify API keys."}), 401
        if status == 403:
            return jsonify({"error": "Forbidden. Check Cognigy permissions or LLM model access."}), 403
        return jsonify({"error": f"HTTP {status} from external service"}), 502
    except json.JSONDecodeError:
        return jsonify({"error": "LLM returned invalid JSON"}), 500
    except Exception as exc:
        return jsonify({"error": f"Internal processing failure: {str(exc)}"}), 500

async def sync_to_cognigy(session_id: str, intent_data: dict) -> None:
    bot_id = os.getenv("COGNIGY_BOT_ID")
    sync_payload = {
        "botId": bot_id,
        "sessionId": session_id,
        "variables": {
            "resolved_intent": intent_data["intent"],
            "intent_confidence": str(intent_data["confidence"]),
            "llm_processed": "true"
        }
    }
    try:
        response = await cognigy_client.post(f"/bot", json=sync_payload)
        response.raise_for_status()
    except httpx.HTTPStatusError as exc:
        print(f"Cognigy sync failed: {exc.response.status_code} - {exc.response.text}")

The endpoint requires the Bot Execution scope for the /bot POST call. The /resolve-intent route enforces strict JSON parsing and maps LLM failures to appropriate HTTP status codes. The sync_to_cognigy function updates Cognigy session variables so downstream flow nodes can branch on resolved_intent.

Complete Working Example

import asyncio
import httpx
import json
import os
from typing import Dict, List
from flask import Flask, request, jsonify
from dotenv import load_dotenv
import tiktoken

load_dotenv()

COGNIGY_ORG = os.getenv("COGNIGY_ORG")
COGNIGY_API_KEY = os.getenv("COGNIGY_API_KEY")
COGNIGY_BOT_ID = os.getenv("COGNIGY_BOT_ID")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o")
MAX_CONTEXT_TOKENS = int(os.getenv("MAX_CONTEXT_TOKENS", "4000"))

SYSTEM_PROMPT = """You are an intent classification engine. Analyze the user message and conversation history. 
Return ONLY a JSON object with this structure:
{"intent": "string", "confidence": float, "entities": []}
Valid intents: booking_flight, changing_reservation, canceling_reservation, general_inquiry, escalation_required"""

class ConversationContextManager:
    def __init__(self, model_encoding: str, max_tokens: int):
        self.encoder = tiktoken.encoding_for_model(model_encoding)
        self.max_tokens = max_tokens
        self.history: Dict[str, List[Dict[str, str]]] = {}

    def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg["content"]))
            total += 4
        return total

    def add_message(self, session_id: str, role: str, content: str) -> None:
        if session_id not in self.history:
            self.history[session_id] = []
        self.history[session_id].append({"role": role, "content": content})
        self._enforce_token_limit(session_id)

    def _enforce_token_limit(self, session_id: str) -> None:
        while self._count_tokens(self.history[session_id]) > self.max_tokens and len(self.history[session_id]) > 1:
            self.history[session_id].pop(0)

    def get_context(self, session_id: str) -> List[Dict[str, str]]:
        return self.history.get(session_id, [])

def create_cognigy_client() -> httpx.AsyncClient:
    return httpx.AsyncClient(
        base_url=f"https://{COGNIGY_ORG}.cognigy.ai/api/v1",
        transport=httpx.AsyncHTTPTransport(retries=3),
        headers={"Authorization": f"Bearer {COGNIGY_API_KEY}", "Content-Type": "application/json"},
        timeout=httpx.Timeout(30.0, connect=10.0)
    )

def create_llm_client() -> httpx.AsyncClient:
    return httpx.AsyncClient(
        base_url=LLM_BASE_URL,
        transport=httpx.AsyncHTTPTransport(retries=3),
        headers={"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"},
        timeout=httpx.Timeout(60.0, connect=10.0)
    )

async def call_llm_with_retry(client: httpx.AsyncClient, payload: dict, max_retries: int = 4) -> dict:
    for attempt in range(max_retries):
        try:
            response = await client.post("/chat/completions", json=payload)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == 429 and attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)
            else:
                raise exc
        except httpx.RequestError:
            if attempt == max_retries - 1:
                raise

cognigy_client = create_cognigy_client()
llm_client = create_llm_client()
context_manager = ConversationContextManager(LLM_MODEL, MAX_CONTEXT_TOKENS)
app = Flask(__name__)

@app.route("/resolve-intent", methods=["POST"])
async def resolve_intent():
    try:
        payload = request.get_json()
        if not payload:
            return jsonify({"error": "Missing JSON payload"}), 400

        session_id = payload.get("session", {}).get("id")
        user_message = payload.get("user", {}).get("message", "")

        if not session_id or not user_message:
            return jsonify({"error": "Missing session.id or user.message"}), 400

        context_manager.add_message(session_id, "user", user_message)
        context = context_manager.get_context(session_id)

        llm_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + context
        llm_payload = {
            "model": LLM_MODEL,
            "messages": llm_messages,
            "temperature": 0.0,
            "max_tokens": 150,
            "response_format": {"type": "json_object"}
        }

        llm_response = await call_llm_with_retry(llm_client, llm_payload)
        raw_content = llm_response["choices"][0]["message"]["content"]
        intent_data = json.loads(raw_content)

        context_manager.add_message(session_id, "assistant", json.dumps(intent_data))

        sync_payload = {
            "botId": COGNIGY_BOT_ID,
            "sessionId": session_id,
            "variables": {
                "resolved_intent": intent_data["intent"],
                "intent_confidence": str(intent_data["confidence"]),
                "llm_processed": "true"
            }
        }
        try:
            response = await cognigy_client.post("/bot", json=sync_payload)
            response.raise_for_status()
        except httpx.HTTPStatusError as exc:
            print(f"Cognigy sync failed: {exc.response.status_code}")

        return jsonify({
            "status": "success",
            "intent": intent_data["intent"],
            "confidence": intent_data["confidence"],
            "entities": intent_data.get("entities", [])
        })

    except httpx.HTTPStatusError as exc:
        status = exc.response.status_code
        if status == 401:
            return jsonify({"error": "Authentication failed. Verify API keys."}), 401
        if status == 403:
            return jsonify({"error": "Forbidden. Check permissions."}), 403
        return jsonify({"error": f"HTTP {status} from external service"}), 502
    except json.JSONDecodeError:
        return jsonify({"error": "LLM returned invalid JSON"}), 500
    except Exception as exc:
        return jsonify({"error": f"Internal processing failure: {str(exc)}"}), 500

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

Run the script with python app.py. The service listens on port 5000. Configure the Cognigy.AI HTTP Action node to POST to http://<host>:5000/resolve-intent with the request body set to {{payload}}.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Missing or expired API key in the Authorization header. Cognigy.AI rejects requests without valid Bearer tokens. The LLM gateway returns 401 when the key lacks access to the specified model.
  • Fix: Verify COGNIGY_API_KEY and LLM_API_KEY in the environment. Regenerate the Cognigy key if it was rotated. Ensure the LLM key has read access to the target model.
  • Code check: Print the header value before sending. Confirm the key does not contain trailing whitespace.

Error: 403 Forbidden

  • Cause: The Cognigy API key lacks Bot Execution or Session Variables permissions. The LLM gateway may restrict access to certain models for the provided key.
  • Fix: In the Cognigy.AI console, navigate to Settings, API Keys, and assign the required permissions. For the LLM gateway, verify model access tiers.
  • Code check: The sync function catches 403 and logs the response body. Parse the error message to identify the missing permission.

Error: 429 Too Many Requests

  • Cause: The LLM gateway enforces rate limits per minute or per token. High concurrency from Cognigy flow execution triggers throttling.
  • Fix: The retry logic implements exponential backoff. Increase max_retries if traffic spikes persist. Implement request queuing at the Flask level using Celery or RQ for production workloads.
  • Code check: Monitor the Retry-After header in 429 responses. Adjust backoff multipliers accordingly.

Error: Context Window Overflow or Token Mismatch

  • Cause: The tiktoken encoder version does not match the LLM model version. Some providers count system prompts differently.
  • Fix: Use tiktoken.encoding_for_model(LLM_MODEL) to load the correct tokenizer. Add a 5 percent buffer to MAX_CONTEXT_TOKENS to account for metadata overhead.
  • Code check: Log the token count before and after trimming. Verify the encoder matches the exact model identifier (e.g., gpt-4o-2024-05-13).

Official References