Securing NICE Cognigy.AI LLM Gateway Requests with TypeScript

Securing NICE Cognigy.AI LLM Gateway Requests with TypeScript

What You Will Build

  • A production-grade TypeScript middleware that intercepts LLM gateway requests, validates JWT tokens against a custom claims store, and enforces tenant isolation.
  • An intelligent routing layer that scores prompt complexity and directs traffic to optimized model endpoints.
  • A streaming response handler with explicit backpressure management, LRU completion caching, and structured token consumption logging.

Prerequisites

  • Node.js 18.x or 20.x LTS runtime
  • TypeScript 5.1+ with strict mode enabled
  • express@4.18+, undici@5.28+, jsonwebtoken@9.0+, lru-cache@10.0+, pino@8.17+
  • Cognigy.AI LLM Proxy API access with llm:proxy:write and llm:models:read scopes
  • A custom JWT signing key and claims store (Redis, PostgreSQL, or in-memory for development)

Authentication Setup

The gateway requires a stateless JWT verification step before any LLM routing occurs. The middleware extracts the Authorization header, validates the signature, checks the exp claim, and verifies tenant ownership against a custom claims store.

import { Request, Response, NextFunction } from 'express';
import jwt from 'jsonwebtoken';
import { LRUCache } from 'lru-cache';

// Custom claims store interface
export interface TenantClaim {
  tenantId: string;
  allowedModels: string[];
  rateLimitRpm: number;
}

// In-memory claims store (replace with Redis/PostgreSQL in production)
const claimsStore = new Map<string, TenantClaim>();
claimsStore.set('tenant-alpha', {
  tenantId: 'tenant-alpha',
  allowedModels: ['fast-v1', 'reasoning-v2'],
  rateLimitRpm: 60
});

const JWT_SECRET = process.env.JWT_SECRET || 'development-secret-change-me';

export function validateJwtMiddleware(req: Request, res: Response, next: NextFunction): void {
  const authHeader = req.headers.authorization;
  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    res.status(401).json({ error: 'Missing or malformed Authorization header' });
    return;
  }

  const token = authHeader.split(' ')[1];
  try {
    const decoded = jwt.verify(token, JWT_SECRET, { algorithms: ['HS256'] }) as { sub: string; tenant: string; exp: number };
    
    // Check expiration explicitly (jsonwebtoken handles this, but explicit logging helps)
    if (decoded.exp < Math.floor(Date.now() / 1000)) {
      res.status(401).json({ error: 'JWT token expired' });
      return;
    }

    const tenantClaim = claimsStore.get(decoded.tenant);
    if (!tenantClaim) {
      res.status(403).json({ error: 'Tenant claims not found in authorization store' });
      return;
    }

    // Attach validated context to request
    (req as any).tenantContext = {
      tenantId: decoded.tenant,
      userId: decoded.sub,
      allowedModels: tenantClaim.allowedModels,
      rateLimitRpm: tenantClaim.rateLimitRpm
    };

    next();
  } catch (err) {
    if (err instanceof jwt.JsonWebTokenError) {
      res.status(401).json({ error: 'Invalid JWT signature or malformed token' });
    } else {
      res.status(500).json({ error: 'Internal authentication failure' });
    }
  }
}

Implementation

Step 1: Tenant Context Injection and Complexity-Based Routing

After authentication, the gateway must inject tenant-specific headers and route the request based on prompt complexity. Complexity scoring evaluates prompt length, tool definition count, and requested max_tokens. Requests scoring above a threshold route to reasoning models; otherwise, they route to fast inference endpoints.

import { Request } from 'express';

export interface LLMRequestPayload {
  model: string;
  messages: Array<{ role: string; content: string }>;
  tools?: Array<{ type: string; function: { name: string; parameters: Record<string, unknown> } }>;
  max_tokens?: number;
  stream?: boolean;
}

export function calculateComplexityScore(payload: LLMRequestPayload): number {
  let score = 0;
  
  // Message length contributes to base complexity
  const totalChars = payload.messages.reduce((sum, msg) => sum + (msg.content?.length || 0), 0);
  score += Math.min(Math.floor(totalChars / 500), 10);
  
  // Tool calls increase reasoning requirements
  if (payload.tools && payload.tools.length > 0) {
    score += payload.tools.length * 3;
  }
  
  // High token requests indicate complex generation
  if (payload.max_tokens && payload.max_tokens > 1024) {
    score += 5;
  }

  return score;
}

export function routeAndInjectContext(req: Request): { targetUrl: string; headers: Record<string, string>; score: number } {
  const payload = req.body as LLMRequestPayload;
  const context = (req as any).tenantContext as { tenantId: string; allowedModels: string[] };
  const score = calculateComplexityScore(payload);

  // Select model based on score and tenant permissions
  let targetModel = payload.model;
  if (score > 12 && context.allowedModels.includes('reasoning-v2')) {
    targetModel = 'reasoning-v2';
  } else if (context.allowedModels.includes('fast-v1')) {
    targetModel = 'fast-v1';
  }

  const targetUrl = `https://llm-proxy.cognigy.ai/api/v2/llm/completions/${targetModel}`;
  
  const headers: Record<string, string> = {
    'Content-Type': 'application/json',
    'X-Cognigy-Tenant-Id': context.tenantId,
    'X-Cognigy-Request-Id': req.headers['x-request-id'] as string || `req-${Date.now()}`,
    'X-Complexity-Score': score.toString(),
    'Authorization': `Bearer ${process.env.COGNIGY_API_KEY}`
  };

  return { targetUrl, headers, score };
}

Step 2: Response Streaming with Backpressure Handling

LLM responses arrive as Server-Sent Events (SSE). The gateway must pipe the upstream stream to the downstream client while respecting Node.js stream backpressure. Writing to res without checking the drain event causes memory leaks and dropped chunks.

import { Request, Response } from 'express';
import { fetch } from 'undici';
import { pipeline, Readable } from 'stream';
import { promisify } from 'util';

const pipelineAsync = promisify(pipeline);

export async function streamLLMResponse(req: Request, res: Response, targetUrl: string, headers: Record<string, string>, payload: any): Promise<void> {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  try {
    const upstreamRes = await fetch(targetUrl, {
      method: 'POST',
      headers,
      body: JSON.stringify(payload),
      signal: req.signal // Propagate client disconnect
    });

    if (!upstreamRes.ok) {
      const errorBody = await upstreamRes.text();
      res.status(upstreamRes.status).json({ error: errorBody });
      return;
    }

    // Convert web ReadableStream to Node.js Readable
    const nodeStream = Readable.fromWeb(upstreamRes.body as any);
    
    // Backpressure-aware piping
    await pipelineAsync(
      nodeStream,
      (source) => {
        source.on('data', (chunk: Buffer) => {
          const shouldContinue = res.write(chunk);
          if (!shouldContinue) {
            source.pause();
            res.once('drain', () => source.resume());
          }
        });
        return source;
      }
    );

    res.end();
  } catch (err) {
    if (err instanceof Error && err.name === 'AbortError') {
      res.status(499).json({ error: 'Client disconnected during streaming' });
    } else {
      res.status(502).json({ error: 'Upstream LLM proxy failed', details: (err as Error).message });
    }
  }
}

Step 3: LRU Caching for Frequent Completions

Non-streaming completions benefit from caching. The gateway uses an LRU strategy keyed on a hash of the prompt, model, and tenant. Cache entries expire after 300 seconds to balance freshness and latency.

import { LRUCache } from 'lru-cache';
import { createHash } from 'crypto';

export interface CacheEntry {
  data: any;
  timestamp: number;
  tokenUsage: { prompt: number; completion: number; total: number };
}

const cache = new LRUCache<string, CacheEntry>({
  max: 1000,
  ttl: 300_000, // 5 minutes
  updateAgeOnGet: true
});

function generateCacheKey(tenantId: string, payload: any): string {
  const raw = JSON.stringify({ tenantId, model: payload.model, messages: payload.messages });
  return createHash('sha256').update(raw).digest('hex').substring(0, 16);
}

export async function getCachedOrFetch(
  req: Request, 
  res: Response, 
  targetUrl: string, 
  headers: Record<string, string>, 
  payload: any
): Promise<void> {
  const context = (req as any).tenantContext as { tenantId: string };
  const cacheKey = generateCacheKey(context.tenantId, payload);

  const cached = cache.get(cacheKey);
  if (cached) {
    res.status(200).json({
      ...cached.data,
      cached: true,
      cacheAge: Date.now() - cached.timestamp
    });
    return;
  }

  const upstreamRes = await fetch(targetUrl, {
    method: 'POST',
    headers,
    body: JSON.stringify(payload)
  });

  if (!upstreamRes.ok) {
    res.status(upstreamRes.status).json({ error: await upstreamRes.text() });
    return;
  }

  const data = await upstreamRes.json();
  cache.set(cacheKey, {
    data,
    timestamp: Date.now(),
    tokenUsage: data.usage || { prompt: 0, completion: 0, total: 0 }
  });

  res.status(200).json({ ...data, cached: false });
}

Step 4: Structured Logging for Token Consumption

Token consumption monitoring requires deterministic JSON logs. The gateway attaches token usage metrics to every request lifecycle event using pino. Logs include tenant context, model routed, complexity score, and upstream latency.

import pino from 'pino';
import { Request, Response, NextFunction } from 'express';

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  formatters: {
    level: (label) => ({ level: label.toUpperCase() }),
    log: (obj) => obj
  }
});

export function tokenUsageLogger(req: Request, res: Response, next: NextFunction): void {
  const start = Date.now();
  const originalEnd = res.end;

  res.end = function(chunk?: any, encoding?: BufferEncoding | (() => void)) {
    const duration = Date.now() - start;
    const context = (req as any).tenantContext as { tenantId: string } || {};
    const score = (req as any).complexityScore as number || 0;

    // Extract token usage from response body if available
    let tokenUsage = { prompt: 0, completion: 0, total: 0 };
    if (res.statusCode === 200 && chunk) {
      try {
        const body = typeof chunk === 'string' ? JSON.parse(chunk) : chunk;
        if (body?.usage) tokenUsage = body.usage;
      } catch {
        // Ignore parse failures for streaming chunks
      }
    }

    logger.info({
      event: 'llm_gateway_request',
      tenantId: context.tenantId,
      userId: (req as any).tenantContext?.userId,
      model: (req.body as any)?.model,
      complexityScore: score,
      statusCode: res.statusCode,
      durationMs: duration,
      tokenUsage,
      cached: (res as any).cachedHit || false
    });

    // Restore original end
    res.end = originalEnd;
    (originalEnd as any).call(res, chunk, encoding);
  };

  next();
}

Step 5: Request Routing and 429 Retry Logic

The final assembly wires authentication, caching, streaming, and logging together. The gateway implements exponential backoff for 429 Too Many Requests responses from the upstream Cognigy.AI proxy.

import { Request, Response, NextFunction } from 'express';

async function fetchWithRetry(url: string, options: RequestInit, maxRetries = 3): Promise<Response> {
  let lastError: Error | null = null;
  
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const res = await fetch(url, options);
      
      if (res.status === 429 && attempt < maxRetries) {
        const retryAfter = res.headers.get('Retry-After');
        const waitTime = retryAfter ? parseInt(retryAfter, 10) * 1000 : Math.pow(2, attempt) * 1000;
        
        await new Promise(resolve => setTimeout(resolve, waitTime));
        continue;
      }
      
      return res;
    } catch (err) {
      lastError = err as Error;
      if (attempt < maxRetries) {
        await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 500));
      }
    }
  }
  
  throw lastError || new Error('Max retries exceeded for upstream LLM proxy');
}

export async function handleLLMGatewayRequest(req: Request, res: Response): Promise<void> {
  const { targetUrl, headers, score } = routeAndInjectContext(req);
  (req as any).complexityScore = score;

  const payload = req.body as LLMRequestPayload;

  if (payload.stream) {
    try {
      const upstreamRes = await fetchWithRetry(targetUrl, {
        method: 'POST',
        headers,
        body: JSON.stringify(payload),
        signal: req.signal
      });

      if (!upstreamRes.ok) {
        res.status(upstreamRes.status).json({ error: await upstreamRes.text() });
        return;
      }

      await streamLLMResponse(req, res, targetUrl, headers, payload);
    } catch (err) {
      res.status(500).json({ error: 'Gateway streaming failure', details: (err as Error).message });
    }
  } else {
    await getCachedOrFetch(req, res, targetUrl, headers, payload);
  }
}

Complete Working Example

import express from 'express';
import { validateJwtMiddleware } from './auth';
import { tokenUsageLogger } from './logging';
import { handleLLMGatewayRequest } from './router';

const app = express();
app.use(express.json({ limit: '10mb' }));

// Global middleware chain
app.use(validateJwtMiddleware);
app.use(tokenUsageLogger);

// LLM Gateway endpoint
app.post('/api/v2/llm/gateway', handleLLMGatewayRequest);

// Health check
app.get('/health', (_req, res) => {
  res.status(200).json({ status: 'healthy', timestamp: new Date().toISOString() });
});

// Error handler
app.use((err: Error, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
  res.status(500).json({ error: 'Internal gateway error', message: err.message });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Cognigy.AI LLM Gateway running on port ${PORT}`);
});

To run the gateway, execute npx ts-node index.ts. Send a request with a valid JWT and LLM payload:

curl -X POST http://localhost:3000/api/v2/llm/gateway \
  -H "Authorization: Bearer <YOUR_JWT>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "fast-v1",
    "messages": [
      {"role": "system", "content": "You are a technical assistant."},
      {"role": "user", "content": "Explain Node.js stream backpressure."}
    ],
    "max_tokens": 256,
    "stream": false
  }'

Common Errors & Debugging

Error: 401 Unauthorized - Invalid JWT signature or malformed token

  • What causes it: The Authorization header is missing, uses the wrong scheme, or the token was signed with a different key than JWT_SECRET.
  • How to fix it: Verify the JWT payload using a debugger. Ensure the signing algorithm matches HS256. Rotate the secret and reissue tokens if the key changed.
  • Code showing the fix:
// Verify algorithm explicitly during verification
const decoded = jwt.verify(token, JWT_SECRET, { algorithms: ['HS256'], audience: 'cognigy-llm-gateway' });

Error: 429 Too Many Requests - Upstream rate limit exceeded

  • What causes it: The Cognigy.AI LLM Proxy enforces tenant-level RPM limits. Burst traffic triggers 429 responses.
  • How to fix it: The fetchWithRetry function implements exponential backoff. Increase the maxRetries parameter or implement client-side request queuing.
  • Code showing the fix:
// Adjust backoff multiplier for aggressive throttling
const waitTime = retryAfter ? parseInt(retryAfter, 10) * 1000 : Math.pow(3, attempt) * 1000;

Error: 502 Bad Gateway - Upstream LLM proxy failed

  • What causes it: Network timeout, upstream service degradation, or malformed prompt exceeding model context limits.
  • How to fix it: Check the X-Cognigy-Request-Id header value in Cognigy.AI logs. Validate that max_tokens plus prompt length does not exceed the target model context window.
  • Code showing the fix:
// Pre-flight context length validation
if (totalChars + (payload.max_tokens || 0) > 8192) {
  res.status(400).json({ error: 'Request exceeds model context window limit' });
  return;
}

Error: Stream Backpressure Warning - Data lost during pipe

  • What causes it: The downstream client processes chunks slower than the upstream stream emits them. Node.js drops data if drain is not handled.
  • How to fix it: The streamLLMResponse function pauses the source stream when res.write() returns false and resumes on drain. Ensure no third-party middleware intercepts the stream without preserving backpressure semantics.
  • Code showing the fix:
source.on('data', (chunk: Buffer) => {
  const shouldContinue = res.write(chunk);
  if (!shouldContinue) {
    source.pause();
    res.once('drain', () => source.resume());
  }
});

Official References