Managing Prompt Budgets in NICE Cognigy.AI LLM Gateway with Python

Managing Prompt Budgets in NICE Cognigy.AI LLM Gateway with Python

What You Will Build

  • A Python service that tracks token consumption per dialog session, truncates prompts dynamically, swaps system instructions when budgets are exceeded, caches frequent intents to bypass inference, handles rate limits with adaptive backoff, aggregates cost metrics, sanitizes inputs for prompt injection, and exposes a FastAPI endpoint for financial tracking.
  • This tutorial uses the NICE Cognigy.AI LLM Gateway REST API.
  • The implementation is written in Python 3.10+ using httpx and fastapi.

Prerequisites

  • Cognigy.AI tenant credentials with OAuth client credentials or service account access
  • Required scopes: llm.gateway:invoke, llm.gateway:metrics:read
  • Python 3.10 or newer
  • External dependencies: httpx, fastapi, uvicorn, pydantic, aiofiles, tiktoken
  • Install dependencies: pip install httpx fastapi uvicorn pydantic aiofiles tiktoken

Authentication Setup

Cognigy.AI uses JWT bearer tokens for API authentication. The token must be refreshed before expiration. The following class handles token acquisition and caching.

import httpx
import time
from typing import Optional

class CognigyAuth:
    def __init__(self, tenant_url: str, client_id: str, client_secret: str, scope: str = "llm.gateway:invoke llm.gateway:metrics:read"):
        self.tenant_url = tenant_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self._token: Optional[str] = None
        self._expires_at: float = 0.0

    async def get_token(self) -> str:
        if self._token and time.time() < self._expires_at - 60:
            return self._token

        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                f"{self.tenant_url}/api/v1/auth/token",
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": self.scope
                }
            )
            response.raise_for_status()
            payload = response.json()
            self._token = payload["access_token"]
            self._expires_at = time.time() + payload["expires_in"]
            return self._token

Implementation

Step 1: Gateway Client with Adaptive Backoff and Input Sanitization

The LLM Gateway endpoint /api/v1/llm/gateway/chat accepts conversation payloads. Rate limits (HTTP 429) require adaptive exponential backoff with jitter. Input sanitization prevents prompt injection by stripping or flagging malicious patterns before transmission.

import asyncio
import random
import re
import httpx
from typing import Dict, Any, List

PROMPT_INJECTION_PATTERNS = [
    r"(?i)ignore\s+previous\s+instructions",
    r"(?i)system\s+prompt",
    r"(?i)override\s+security",
    r"(?i)developer\s+mode"
]

def sanitize_user_input(text: str) -> str:
    cleaned = text
    for pattern in PROMPT_INJECTION_PATTERNS:
        cleaned = re.sub(pattern, "[FILTERED]", cleaned, flags=re.IGNORECASE)
    return cleaned.strip()

class LLMGatewayClient:
    def __init__(self, auth: CognigyAuth, max_retries: int = 5):
        self.auth = auth
        self.max_retries = max_retries
        self.base_url = f"{auth.tenant_url}/api/v1/llm/gateway/chat"

    async def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        token = await self.auth.get_token()
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        
        for attempt in range(self.max_retries):
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.post(self.base_url, headers=headers, json=payload)
                
                if response.status_code == 200:
                    return response.json()
                elif response.status_code == 429:
                    retry_after = float(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
                    await asyncio.sleep(retry_after)
                elif response.status_code in (401, 403):
                    raise PermissionError(f"Authentication or authorization failed: {response.status_code}")
                else:
                    response.raise_for_status()
        raise RuntimeError("Max retries exceeded for 429 rate limit")

Step 2: Session Budget Tracking and Dynamic Prompt Truncation

Each dialog session maintains a token budget. The tiktoken library counts tokens accurately for OpenAI-compatible models. When the remaining budget falls below a threshold, the conversation history is truncated from the oldest messages first until the request fits.

import tiktoken
from typing import List, Dict, Any, Optional

class SessionBudgetManager:
    def __init__(self, budget_per_session: int = 4096, lightweight_threshold: int = 1024):
        self.budget_per_session = budget_per_session
        self.lightweight_threshold = lightweight_threshold
        self.sessions: Dict[str, Dict[str, Any]] = {}
        self.encoder = tiktoken.encoding_for_model("gpt-4")

    def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
        tokens = 0
        for msg in messages:
            tokens += len(self.encoder.encode(msg.get("content", "")))
        return tokens

    def track_usage(self, session_id: str, prompt_tokens: int, completion_tokens: int) -> None:
        if session_id not in self.sessions:
            self.sessions[session_id] = {"used": 0, "lightweight_mode": False}
        self.sessions[session_id]["used"] += prompt_tokens + completion_tokens

    def get_remaining_budget(self, session_id: str) -> int:
        if session_id not in self.sessions:
            self.sessions[session_id] = {"used": 0, "lightweight_mode": False}
        return max(0, self.budget_per_session - self.sessions[session_id]["used"])

    def truncate_history(self, session_id: str, history: List[Dict[str, str]], system_prompt: str) -> List[Dict[str, str]]:
        remaining = self.get_remaining_budget(session_id)
        system_tokens = len(self.encoder.encode(system_prompt))
        available_for_history = remaining - system_tokens - 500

        if self._count_tokens(history) <= available_for_history:
            return history

        truncated = history
        while self._count_tokens(truncated) > available_for_history and len(truncated) > 1:
            truncated.pop(0)
        return truncated

Step 3: Intent Caching and System Instruction Swapping

Frequent intents are cached to bypass LLM inference entirely. When a session breaches the lightweight threshold, the system instruction is swapped to a concise variant. The cache uses a simple in-memory dictionary with TTL simulation.

import time
from typing import Dict, Any, Optional, Tuple

class IntentCache:
    def __init__(self, ttl_seconds: int = 3600):
        self.cache: Dict[str, Tuple[str, float]] = {}
        self.ttl = ttl_seconds

    def get(self, user_input: str) -> Optional[str]:
        normalized = user_input.strip().lower()
        if normalized in self.cache:
            intent, timestamp = self.cache[normalized]
            if time.time() - timestamp < self.ttl:
                return intent
            del self.cache[normalized]
        return None

    def set(self, user_input: str, intent: str) -> None:
        self.cache[user_input.strip().lower()] = (intent, time.time())

class SystemInstructionManager:
    def __init__(self):
        self.standard = "You are a professional customer support agent. Provide detailed, accurate, and empathetic responses. Follow all safety guidelines."
        self.lightweight = "Answer concisely. Prioritize accuracy. Keep responses under 50 words."

    def get_instruction(self, use_lightweight: bool) -> str:
        return self.lightweight if use_lightweight else self.standard

Step 4: Metrics Aggregation and Cost Allocation

The gateway returns token usage per request. Metrics are aggregated per session and per tenant. Cost allocation applies a configurable price per 1000 tokens. The metrics endpoint /api/v1/llm/gateway/metrics supports pagination for historical data.

import httpx
from typing import Dict, Any, List

class MetricsAggregator:
    def __init__(self, auth: CognigyAuth, cost_per_1k_tokens: float = 0.01):
        self.auth = auth
        self.cost_per_1k = cost_per_1k_tokens
        self.session_costs: Dict[str, float] = {}
        self.tenant_total_cost: float = 0.0

    async def fetch_historical_metrics(self, page: int = 1, size: int = 100) -> List[Dict[str, Any]]:
        token = await self.auth.get_token()
        headers = {"Authorization": f"Bearer {token}"}
        params = {"page": page, "size": size}
        
        async with httpx.AsyncClient(timeout=15.0) as client:
            response = await client.get(
                f"{self.auth.tenant_url}/api/v1/llm/gateway/metrics",
                headers=headers,
                params=params
            )
            response.raise_for_status()
            return response.json().get("items", [])

    def allocate_cost(self, session_id: str, total_tokens: int) -> float:
        cost = (total_tokens / 1000.0) * self.cost_per_1k
        self.session_costs[session_id] = self.session_costs.get(session_id, 0.0) + cost
        self.tenant_total_cost += cost
        return cost

Complete Working Example

The following FastAPI application integrates all components. It exposes a /chat endpoint that processes requests with budget management, sanitization, caching, and adaptive backoff. A /budget/status endpoint provides financial tracking.

import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import asyncio

# Import classes from previous steps
# CognigyAuth, LLMGatewayClient, SessionBudgetManager, IntentCache, SystemInstructionManager, MetricsAggregator
# For brevity in this tutorial, assume they are defined in the same file or imported.

app = FastAPI(title="Cognigy.AI LLM Budget Manager")

# Initialize components
# Replace with actual tenant credentials
auth = CognigyAuth(tenant_url="https://your-tenant.cognigy.ai", client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
gateway = LLMGatewayClient(auth)
budget_mgr = SessionBudgetManager(budget_per_session=4096, lightweight_threshold=1024)
intent_cache = IntentCache()
sys_mgr = SystemInstructionManager()
metrics_agg = MetricsAggregator(auth, cost_per_1k_tokens=0.012)

class ChatRequest(BaseModel):
    session_id: str
    messages: List[Dict[str, str]]
    model: str = "gpt-4"

class ChatResponse(BaseModel):
    session_id: str
    response: str
    prompt_tokens: int
    completion_tokens: int
    cost: float
    lightweight_mode: bool
    cached: bool = False

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(req: ChatRequest):
    sanitized_input = sanitize_user_input(req.messages[-1]["content"])
    
    cached_intent = intent_cache.get(sanitized_input)
    if cached_intent:
        return ChatResponse(
            session_id=req.session_id,
            response=cached_intent,
            prompt_tokens=0,
            completion_tokens=0,
            cost=0.0,
            lightweight_mode=False,
            cached=True
        )

    remaining = budget_mgr.get_remaining_budget(req.session_id)
    use_lightweight = remaining <= budget_mgr.lightweight_threshold
    system_prompt = sys_mgr.get_instruction(use_lightweight)
    
    truncated_history = budget_mgr.truncate_history(req.session_id, req.messages, system_prompt)
    final_messages = [{"role": "system", "content": system_prompt}] + truncated_history

    payload = {
        "model": req.model,
        "messages": final_messages,
        "temperature": 0.7,
        "max_tokens": 512
    }

    try:
        result = await gateway.invoke(payload)
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"Gateway error: {str(e)}")

    prompt_tokens = result.get("usage", {}).get("prompt_tokens", 0)
    completion_tokens = result.get("usage", {}).get("completion_tokens", 0)
    response_text = result.get("choices", [{}])[0].get("message", {}).get("content", "")

    budget_mgr.track_usage(req.session_id, prompt_tokens, completion_tokens)
    cost = metrics_agg.allocate_cost(req.session_id, prompt_tokens + completion_tokens)
    
    intent_cache.set(sanitized_input, response_text)

    return ChatResponse(
        session_id=req.session_id,
        response=response_text,
        prompt_tokens=prompt_tokens,
        completion_tokens=completion_tokens,
        cost=cost,
        lightweight_mode=use_lightweight
    )

class BudgetStatus(BaseModel):
    tenant_total_cost: float
    session_costs: Dict[str, float]
    active_sessions: int

@app.get("/budget/status", response_model=BudgetStatus)
async def budget_status():
    return BudgetStatus(
        tenant_total_cost=metrics_agg.tenant_total_cost,
        session_costs=metrics_agg.session_costs,
        active_sessions=len(budget_mgr.sessions)
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The JWT token has expired or the client credentials are invalid.
  • How to fix it: Ensure the CognigyAuth class refreshes the token before expiration. Verify that the scope parameter includes llm.gateway:invoke.
  • Code showing the fix: The get_token method already checks time.time() < self._expires_at - 60 and refreshes automatically. If credentials are wrong, update client_id and client_secret.

Error: 403 Forbidden

  • What causes it: The OAuth token lacks the required scope for the requested endpoint.
  • How to fix it: Add llm.gateway:metrics:read to the scope string during authentication. Verify tenant role permissions in the Cognigy console.
  • Code showing the fix: Update scope="llm.gateway:invoke llm.gateway:metrics:read" in the CognigyAuth constructor.

Error: 429 Too Many Requests

  • What causes it: The tenant has exceeded the configured RPM or TPM limits for the LLM Gateway.
  • How to fix it: The LLMGatewayClient.invoke method implements adaptive exponential backoff with jitter. Ensure max_retries is sufficient for your workload.
  • Code showing the fix: The retry loop calculates retry_after using 2 ** attempt + random.uniform(0, 1) and respects the Retry-After header if provided.

Error: 500 Internal Server Error from Gateway

  • What causes it: Malformed payload or unsupported model identifier.
  • How to fix it: Validate the model field against Cognigy.AI supported models. Ensure message roles are strictly system, user, or assistant.
  • Code showing the fix: Add payload validation before gateway.invoke():
for msg in final_messages:
    if msg["role"] not in ("system", "user", "assistant"):
        raise ValueError(f"Invalid message role: {msg['role']}")

Official References