Managing Prompt Budgets in NICE Cognigy.AI LLM Gateway with Python
What You Will Build
- A Python service that tracks token consumption per dialog session, truncates prompts dynamically, swaps system instructions when budgets are exceeded, caches frequent intents to bypass inference, handles rate limits with adaptive backoff, aggregates cost metrics, sanitizes inputs for prompt injection, and exposes a FastAPI endpoint for financial tracking.
- This tutorial uses the NICE Cognigy.AI LLM Gateway REST API.
- The implementation is written in Python 3.10+ using
httpxandfastapi.
Prerequisites
- Cognigy.AI tenant credentials with OAuth client credentials or service account access
- Required scopes:
llm.gateway:invoke,llm.gateway:metrics:read - Python 3.10 or newer
- External dependencies:
httpx,fastapi,uvicorn,pydantic,aiofiles,tiktoken - Install dependencies:
pip install httpx fastapi uvicorn pydantic aiofiles tiktoken
Authentication Setup
Cognigy.AI uses JWT bearer tokens for API authentication. The token must be refreshed before expiration. The following class handles token acquisition and caching.
import httpx
import time
from typing import Optional
class CognigyAuth:
def __init__(self, tenant_url: str, client_id: str, client_secret: str, scope: str = "llm.gateway:invoke llm.gateway:metrics:read"):
self.tenant_url = tenant_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._token: Optional[str] = None
self._expires_at: float = 0.0
async def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self.tenant_url}/api/v1/auth/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
)
response.raise_for_status()
payload = response.json()
self._token = payload["access_token"]
self._expires_at = time.time() + payload["expires_in"]
return self._token
Implementation
Step 1: Gateway Client with Adaptive Backoff and Input Sanitization
The LLM Gateway endpoint /api/v1/llm/gateway/chat accepts conversation payloads. Rate limits (HTTP 429) require adaptive exponential backoff with jitter. Input sanitization prevents prompt injection by stripping or flagging malicious patterns before transmission.
import asyncio
import random
import re
import httpx
from typing import Dict, Any, List
PROMPT_INJECTION_PATTERNS = [
r"(?i)ignore\s+previous\s+instructions",
r"(?i)system\s+prompt",
r"(?i)override\s+security",
r"(?i)developer\s+mode"
]
def sanitize_user_input(text: str) -> str:
cleaned = text
for pattern in PROMPT_INJECTION_PATTERNS:
cleaned = re.sub(pattern, "[FILTERED]", cleaned, flags=re.IGNORECASE)
return cleaned.strip()
class LLMGatewayClient:
def __init__(self, auth: CognigyAuth, max_retries: int = 5):
self.auth = auth
self.max_retries = max_retries
self.base_url = f"{auth.tenant_url}/api/v1/llm/gateway/chat"
async def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
token = await self.auth.get_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
for attempt in range(self.max_retries):
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(self.base_url, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
await asyncio.sleep(retry_after)
elif response.status_code in (401, 403):
raise PermissionError(f"Authentication or authorization failed: {response.status_code}")
else:
response.raise_for_status()
raise RuntimeError("Max retries exceeded for 429 rate limit")
Step 2: Session Budget Tracking and Dynamic Prompt Truncation
Each dialog session maintains a token budget. The tiktoken library counts tokens accurately for OpenAI-compatible models. When the remaining budget falls below a threshold, the conversation history is truncated from the oldest messages first until the request fits.
import tiktoken
from typing import List, Dict, Any, Optional
class SessionBudgetManager:
def __init__(self, budget_per_session: int = 4096, lightweight_threshold: int = 1024):
self.budget_per_session = budget_per_session
self.lightweight_threshold = lightweight_threshold
self.sessions: Dict[str, Dict[str, Any]] = {}
self.encoder = tiktoken.encoding_for_model("gpt-4")
def _count_tokens(self, messages: List[Dict[str, str]]) -> int:
tokens = 0
for msg in messages:
tokens += len(self.encoder.encode(msg.get("content", "")))
return tokens
def track_usage(self, session_id: str, prompt_tokens: int, completion_tokens: int) -> None:
if session_id not in self.sessions:
self.sessions[session_id] = {"used": 0, "lightweight_mode": False}
self.sessions[session_id]["used"] += prompt_tokens + completion_tokens
def get_remaining_budget(self, session_id: str) -> int:
if session_id not in self.sessions:
self.sessions[session_id] = {"used": 0, "lightweight_mode": False}
return max(0, self.budget_per_session - self.sessions[session_id]["used"])
def truncate_history(self, session_id: str, history: List[Dict[str, str]], system_prompt: str) -> List[Dict[str, str]]:
remaining = self.get_remaining_budget(session_id)
system_tokens = len(self.encoder.encode(system_prompt))
available_for_history = remaining - system_tokens - 500
if self._count_tokens(history) <= available_for_history:
return history
truncated = history
while self._count_tokens(truncated) > available_for_history and len(truncated) > 1:
truncated.pop(0)
return truncated
Step 3: Intent Caching and System Instruction Swapping
Frequent intents are cached to bypass LLM inference entirely. When a session breaches the lightweight threshold, the system instruction is swapped to a concise variant. The cache uses a simple in-memory dictionary with TTL simulation.
import time
from typing import Dict, Any, Optional, Tuple
class IntentCache:
def __init__(self, ttl_seconds: int = 3600):
self.cache: Dict[str, Tuple[str, float]] = {}
self.ttl = ttl_seconds
def get(self, user_input: str) -> Optional[str]:
normalized = user_input.strip().lower()
if normalized in self.cache:
intent, timestamp = self.cache[normalized]
if time.time() - timestamp < self.ttl:
return intent
del self.cache[normalized]
return None
def set(self, user_input: str, intent: str) -> None:
self.cache[user_input.strip().lower()] = (intent, time.time())
class SystemInstructionManager:
def __init__(self):
self.standard = "You are a professional customer support agent. Provide detailed, accurate, and empathetic responses. Follow all safety guidelines."
self.lightweight = "Answer concisely. Prioritize accuracy. Keep responses under 50 words."
def get_instruction(self, use_lightweight: bool) -> str:
return self.lightweight if use_lightweight else self.standard
Step 4: Metrics Aggregation and Cost Allocation
The gateway returns token usage per request. Metrics are aggregated per session and per tenant. Cost allocation applies a configurable price per 1000 tokens. The metrics endpoint /api/v1/llm/gateway/metrics supports pagination for historical data.
import httpx
from typing import Dict, Any, List
class MetricsAggregator:
def __init__(self, auth: CognigyAuth, cost_per_1k_tokens: float = 0.01):
self.auth = auth
self.cost_per_1k = cost_per_1k_tokens
self.session_costs: Dict[str, float] = {}
self.tenant_total_cost: float = 0.0
async def fetch_historical_metrics(self, page: int = 1, size: int = 100) -> List[Dict[str, Any]]:
token = await self.auth.get_token()
headers = {"Authorization": f"Bearer {token}"}
params = {"page": page, "size": size}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(
f"{self.auth.tenant_url}/api/v1/llm/gateway/metrics",
headers=headers,
params=params
)
response.raise_for_status()
return response.json().get("items", [])
def allocate_cost(self, session_id: str, total_tokens: int) -> float:
cost = (total_tokens / 1000.0) * self.cost_per_1k
self.session_costs[session_id] = self.session_costs.get(session_id, 0.0) + cost
self.tenant_total_cost += cost
return cost
Complete Working Example
The following FastAPI application integrates all components. It exposes a /chat endpoint that processes requests with budget management, sanitization, caching, and adaptive backoff. A /budget/status endpoint provides financial tracking.
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import asyncio
# Import classes from previous steps
# CognigyAuth, LLMGatewayClient, SessionBudgetManager, IntentCache, SystemInstructionManager, MetricsAggregator
# For brevity in this tutorial, assume they are defined in the same file or imported.
app = FastAPI(title="Cognigy.AI LLM Budget Manager")
# Initialize components
# Replace with actual tenant credentials
auth = CognigyAuth(tenant_url="https://your-tenant.cognigy.ai", client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
gateway = LLMGatewayClient(auth)
budget_mgr = SessionBudgetManager(budget_per_session=4096, lightweight_threshold=1024)
intent_cache = IntentCache()
sys_mgr = SystemInstructionManager()
metrics_agg = MetricsAggregator(auth, cost_per_1k_tokens=0.012)
class ChatRequest(BaseModel):
session_id: str
messages: List[Dict[str, str]]
model: str = "gpt-4"
class ChatResponse(BaseModel):
session_id: str
response: str
prompt_tokens: int
completion_tokens: int
cost: float
lightweight_mode: bool
cached: bool = False
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(req: ChatRequest):
sanitized_input = sanitize_user_input(req.messages[-1]["content"])
cached_intent = intent_cache.get(sanitized_input)
if cached_intent:
return ChatResponse(
session_id=req.session_id,
response=cached_intent,
prompt_tokens=0,
completion_tokens=0,
cost=0.0,
lightweight_mode=False,
cached=True
)
remaining = budget_mgr.get_remaining_budget(req.session_id)
use_lightweight = remaining <= budget_mgr.lightweight_threshold
system_prompt = sys_mgr.get_instruction(use_lightweight)
truncated_history = budget_mgr.truncate_history(req.session_id, req.messages, system_prompt)
final_messages = [{"role": "system", "content": system_prompt}] + truncated_history
payload = {
"model": req.model,
"messages": final_messages,
"temperature": 0.7,
"max_tokens": 512
}
try:
result = await gateway.invoke(payload)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Gateway error: {str(e)}")
prompt_tokens = result.get("usage", {}).get("prompt_tokens", 0)
completion_tokens = result.get("usage", {}).get("completion_tokens", 0)
response_text = result.get("choices", [{}])[0].get("message", {}).get("content", "")
budget_mgr.track_usage(req.session_id, prompt_tokens, completion_tokens)
cost = metrics_agg.allocate_cost(req.session_id, prompt_tokens + completion_tokens)
intent_cache.set(sanitized_input, response_text)
return ChatResponse(
session_id=req.session_id,
response=response_text,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost=cost,
lightweight_mode=use_lightweight
)
class BudgetStatus(BaseModel):
tenant_total_cost: float
session_costs: Dict[str, float]
active_sessions: int
@app.get("/budget/status", response_model=BudgetStatus)
async def budget_status():
return BudgetStatus(
tenant_total_cost=metrics_agg.tenant_total_cost,
session_costs=metrics_agg.session_costs,
active_sessions=len(budget_mgr.sessions)
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The JWT token has expired or the client credentials are invalid.
- How to fix it: Ensure the
CognigyAuthclass refreshes the token before expiration. Verify that thescopeparameter includesllm.gateway:invoke. - Code showing the fix: The
get_tokenmethod already checkstime.time() < self._expires_at - 60and refreshes automatically. If credentials are wrong, updateclient_idandclient_secret.
Error: 403 Forbidden
- What causes it: The OAuth token lacks the required scope for the requested endpoint.
- How to fix it: Add
llm.gateway:metrics:readto the scope string during authentication. Verify tenant role permissions in the Cognigy console. - Code showing the fix: Update
scope="llm.gateway:invoke llm.gateway:metrics:read"in theCognigyAuthconstructor.
Error: 429 Too Many Requests
- What causes it: The tenant has exceeded the configured RPM or TPM limits for the LLM Gateway.
- How to fix it: The
LLMGatewayClient.invokemethod implements adaptive exponential backoff with jitter. Ensuremax_retriesis sufficient for your workload. - Code showing the fix: The retry loop calculates
retry_afterusing2 ** attempt + random.uniform(0, 1)and respects theRetry-Afterheader if provided.
Error: 500 Internal Server Error from Gateway
- What causes it: Malformed payload or unsupported model identifier.
- How to fix it: Validate the
modelfield against Cognigy.AI supported models. Ensure message roles are strictlysystem,user, orassistant. - Code showing the fix: Add payload validation before
gateway.invoke():
for msg in final_messages:
if msg["role"] not in ("system", "user", "assistant"):
raise ValueError(f"Invalid message role: {msg['role']}")