Building a Real-Time Translation Proxy for Cognigy.AI Agent Assist in TypeScript

Building a Real-Time Translation Proxy for Cognigy.AI Agent Assist in TypeScript

What You Will Build

  • The proxy intercepts live transcript fragments over WebSocket, translates them with automatic language detection, caches overlapping phrases in a Trie, and pushes localized assist cards to the agent desktop.
  • This implementation uses the Cognigy.AI Real-Time API, Cognigy Assist API, and a standard REST translation service.
  • The code is written in TypeScript with Node.js and relies on the ws and axios packages.

Prerequisites

  • Cognigy.AI tenant credentials with OAuth client credentials flow enabled. Required scopes: realtime:events:subscribe, agent:assist:write, agent:assist:read.
  • Translation API key supporting auto-detection and REST endpoints (DeepL used in examples).
  • Node.js 18.0 or higher with TypeScript 5.0+.
  • External dependencies: npm install ws axios uuid dotenv

Authentication Setup

Cognigy.AI uses JWT tokens issued via the client credentials flow. The proxy must acquire a token before establishing the WebSocket connection and before making Assist API calls. Token caching prevents unnecessary authentication requests and reduces latency.

import axios from 'axios';

export interface TokenResponse {
  access_token: string;
  expires_in: number;
  token_type: string;
}

const AUTH_ENDPOINT = 'https://api.cognigy.ai/v1/oauth/token';

export class CognigyAuth {
  private token: string | null = null;
  private expiryTimestamp: number = 0;

  async getToken(): Promise<string> {
    if (this.token && Date.now() < this.expiryTimestamp) {
      return this.token;
    }

    const formData = new URLSearchParams({
      grant_type: 'client_credentials',
      client_id: process.env.COGNIGY_CLIENT_ID!,
      client_secret: process.env.COGNIGY_CLIENT_SECRET!,
      scope: 'realtime:events:subscribe agent:assist:write agent:assist:read'
    });

    const response = await axios.post<TokenResponse>(AUTH_ENDPOINT, formData, {
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
    });

    this.token = response.data.access_token;
    // Subtract 5 seconds to account for clock skew and network latency
    this.expiryTimestamp = Date.now() + (response.data.expires_in * 1000) - 5000;
    return this.token;
  }
}

The request body uses application/x-www-form-urlencoded. The response contains a JWT valid for the duration specified in expires_in. The cache checks Date.now() against the stored expiry timestamp. If the token has expired or is about to expire, the client credentials flow executes again.

Implementation

Step 1: WebSocket Connection & Transcript Interception

The Cognigy.AI Real-Time API streams events over wss://api.cognigy.ai/v1/realtime. You must authenticate the WebSocket upgrade request with a Bearer token. The proxy subscribes to TranscriptFragment events, which fire as the customer types or speaks.

import WebSocket from 'ws';
import { CognigyAuth } from './auth';

export interface TranscriptEvent {
  type: string;
  conversationId: string;
  agentId: string;
  fragment: {
    text: string;
    offset: number;
    length: number;
  };
  timestamp: string;
}

export class TranscriptInterceptor {
  private ws: WebSocket | null = null;
  private auth: CognigyAuth;
  private readonly WS_URL = 'wss://api.cognigy.ai/v1/realtime';

  constructor(auth: CognigyAuth) {
    this.auth = auth;
  }

  async connect(): Promise<void> {
    const token = await this.auth.getToken();
    this.ws = new WebSocket(this.WS_URL, {
      headers: { Authorization: `Bearer ${token}` }
    });

    this.ws.on('open', () => {
      // Subscribe to transcript events
      this.ws?.send(JSON.stringify({
        type: 'Subscribe',
        event: 'TranscriptFragment'
      }));
    });

    this.ws.on('message', (data) => {
      const payload = JSON.parse(data.toString()) as TranscriptEvent;
      if (payload.type === 'TranscriptFragment') {
        this.processFragment(payload);
      }
    });

    this.ws.on('close', () => {
      console.log('WebSocket disconnected. Reconnecting in 2 seconds...');
      setTimeout(() => this.connect(), 2000);
    });

    this.ws.on('error', (err) => {
      console.error('WebSocket error:', err.message);
    });
  }

  private processFragment(event: TranscriptEvent): void {
    console.log(`Fragment received: ${event.fragment.text.substring(0, 50)}...`);
    // Handoff to translation pipeline
  }
}

The WebSocket client sends a Subscribe message upon connection. The message listener parses incoming JSON and filters for TranscriptFragment. A 401 response on the upgrade request indicates an invalid or expired token. The close handler implements automatic reconnection with a fixed delay. Production systems should use exponential backoff for reconnection attempts.

Step 2: Language Detection & Translation Routing

Translation APIs vary in their detection capabilities. The DeepL API auto-detects source language when the source_lang parameter is omitted. The proxy routes text to the translation endpoint and implements retry logic for 429 Too Many Requests responses.

import axios, { AxiosError } from 'axios';

export interface TranslationResponse {
  translations: Array<{
    detected_source_language: string;
    text: string;
  }>;
}

export class TranslationRouter {
  private readonly API_URL = 'https://api-free.deepl.com/v2/translate';
  private readonly MAX_RETRIES = 3;

  async translate(text: string, targetLang: string): Promise<string> {
    let attempt = 0;

    while (attempt < this.MAX_RETRIES) {
      try {
        const formData = new URLSearchParams({
          text: text,
          target_lang: targetLang,
          auth_key: process.env.DEEPL_API_KEY!
        });

        const response = await axios.post<TranslationResponse>(this.API_URL, formData, {
          headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
        });

        return response.data.translations[0].text;
      } catch (error) {
        const axiosError = error as AxiosError;
        
        if (axiosError.response?.status === 429) {
          const delay = Math.pow(2, attempt) * 1000;
          console.warn(`Rate limited. Retrying in ${delay}ms...`);
          await new Promise(resolve => setTimeout(resolve, delay));
          attempt++;
          continue;
        }

        if (axiosError.response?.status === 400) {
          throw new Error(`Translation API returned 400: ${axiosError.message}`);
        }

        throw error;
      }
    }

    throw new Error('Translation failed after maximum retries');
  }
}

The request body omits source_lang to trigger automatic detection. The response includes detected_source_language for logging purposes. The retry loop uses exponential backoff (2^attempt seconds) specifically for 429 status codes. Other errors fail immediately to prevent silent data loss.

Step 3: Trie-Based Caching for Overlapping Phrases

Transcript fragments often contain overlapping substrings. A standard hash map caches exact matches but fails on partial overlaps. A Trie structure stores prefixes and allows O(L) lookup for the longest cached translation within a new fragment.

export interface TrieNode {
  translation?: string;
  [key: string]: TrieNode | string | undefined;
}

export class TranslationTrie {
  private root: TrieNode = {};
  private readonly MAX_DEPTH = 150; // Prevent runaway memory usage

  insert(phrase: string, translation: string): void {
    if (phrase.length > this.MAX_DEPTH) return;
    
    let node = this.root;
    for (const char of phrase.toLowerCase()) {
      if (!node[char]) {
        node[char] = {};
      }
      node = node[char] as TrieNode;
    }
    node.translation = translation;
  }

  search(fragment: string): string | null {
    let node = this.root;
    let longestMatch: string | null = null;
    const lowerFragment = fragment.toLowerCase();

    for (const char of lowerFragment) {
      if (!node[char]) break;
      node = node[char] as TrieNode;
      if (node.translation) {
        longestMatch = node.translation;
      }
    }
    return longestMatch;
  }
}

The insert method traverses character by character and marks the terminal node with the translation. The search method walks the Trie while tracking the last node containing a translation. This returns the longest cached prefix match. The MAX_DEPTH constraint prevents memory exhaustion from extremely long customer inputs.

Step 4: Assist API Injection & Fallback Routing

Translated text must reach the agent desktop via the Cognigy Assist API. The proxy pushes localized content as an assist card. If the translation API rejects the language or returns an error, the proxy routes to a fallback provider or returns the original text with a warning flag.

import axios from 'axios';
import { CognigyAuth } from './auth';

export interface AssistPayload {
  agentId: string;
  conversationId: string;
  type: 'translation' | 'fallback';
  content: string;
  metadata?: Record<string, unknown>;
}

export class AssistInjector {
  private readonly API_URL = 'https://api.cognigy.ai/v1/assist/push';
  private auth: CognigyAuth;

  constructor(auth: CognigyAuth) {
    this.auth = auth;
  }

  async push(
    agentId: string,
    conversationId: string,
    content: string,
    isFallback: boolean = false
  ): Promise<void> {
    const token = await this.auth.getToken();
    const payload: AssistPayload = {
      agentId,
      conversationId,
      type: isFallback ? 'fallback' : 'translation',
      content,
      metadata: { injectedAt: new Date().toISOString() }
    };

    try {
      await axios.post(this.API_URL, payload, {
        headers: {
          Authorization: `Bearer ${token}`,
          'Content-Type': 'application/json'
        }
      });
    } catch (error) {
      console.error('Assist API injection failed:', (error as Error).message);
      throw error;
    }
  }
}

The Assist API expects agentId and conversationId to route the card to the correct desktop session. The type field distinguishes successful translations from fallback routing. A 403 response indicates missing agent:assist:write scope. A 404 response indicates an invalid agent or conversation identifier.

Step 5: Latency & Error Monitoring

Real-time translation requires strict latency guarantees. The proxy tracks pipeline duration from fragment receipt to Assist injection. Error rates are aggregated per conversation to identify failing translation routes.

export class MetricsCollector {
  private latencies: number[] = [];
  private errorCount = 0;
  private totalRequests = 0;
  private readonly LATENCY_THRESHOLD_MS = 800;

  recordLatency(durationMs: number): void {
    this.latencies.push(durationMs);
    if (this.latencies.length > 1000) {
      this.latencies.shift(); // Keep rolling window
    }
    if (durationMs > this.LATENCY_THRESHOLD_MS) {
      console.warn(`High latency detected: ${durationMs.toFixed(2)}ms`);
    }
  }

  recordError(): void {
    this.errorCount++;
    this.totalRequests++;
  }

  recordSuccess(): void {
    this.totalRequests++;
  }

  getReport(): { avgLatency: number; errorRate: number; sampleSize: number } {
    const avgLatency = this.latencies.length > 0
      ? this.latencies.reduce((a, b) => a + b, 0) / this.latencies.length
      : 0;
    
    return {
      avgLatency: Number(avgLatency.toFixed(2)),
      errorRate: this.totalRequests > 0 ? this.errorCount / this.totalRequests : 0,
      sampleSize: this.latencies.length
    };
  }
}

The collector maintains a rolling window of 1,000 latency samples to calculate moving averages. The LATENCY_THRESHOLD_MS constant triggers warnings when translation exceeds acceptable real-time bounds. Error rates are calculated as errorCount / totalRequests. This data can be exported to Prometheus or Datadog using standard metric pushers.

Complete Working Example

import dotenv from 'dotenv';
dotenv.config();

import { CognigyAuth } from './auth';
import { TranscriptInterceptor } from './interceptor';
import { TranslationRouter } from './router';
import { TranslationTrie } from './trie';
import { AssistInjector } from './injector';
import { MetricsCollector } from './metrics';

class TranslationProxy {
  private auth: CognigyAuth;
  private interceptor: TranscriptInterceptor;
  private router: TranslationRouter;
  private trie: TranslationTrie;
  private injector: AssistInjector;
  private metrics: MetricsCollector;

  constructor() {
    this.auth = new CognigyAuth();
    this.interceptor = new TranscriptInterceptor(this.auth);
    this.router = new TranslationRouter();
    this.trie = new TranslationTrie();
    this.injector = new AssistInjector(this.auth);
    this.metrics = new MetricsCollector();

    // Bind processFragment to access class methods
    (this.interceptor as any).processFragment = this.handleFragment.bind(this);
  }

  private async handleFragment(event: any): Promise<void> {
    const startTime = performance.now();
    const { fragment, agentId, conversationId } = event;
    const text = fragment.text.trim();

    if (!text) return;

    this.metrics.recordSuccess();

    try {
      // 1. Check Trie cache
      const cached = this.trie.search(text);
      if (cached) {
        await this.injector.push(agentId, conversationId, cached);
        this.metrics.recordLatency(performance.now() - startTime);
        return;
      }

      // 2. Translate with fallback
      let translatedText: string;
      let isFallback = false;

      try {
        translatedText = await this.router.translate(text, 'EN');
      } catch (err) {
        console.warn('Primary translation failed, routing to fallback');
        translatedText = `[UNTRANSLATED] ${text}`;
        isFallback = true;
        this.metrics.recordError();
      }

      // 3. Cache and inject
      this.trie.insert(text, translatedText);
      await this.injector.push(agentId, conversationId, translatedText, isFallback);
      
      this.metrics.recordLatency(performance.now() - startTime);
    } catch (err) {
      this.metrics.recordError();
      console.error('Pipeline failure:', (err as Error).message);
    }
  }

  start(): void {
    console.log('Starting Cognigy Translation Proxy...');
    this.interceptor.connect();
    
    // Periodic metrics logging
    setInterval(() => {
      console.log('Metrics:', this.metrics.getReport());
    }, 30000);
  }
}

new TranslationProxy().start();

This module initializes all components, binds the fragment handler, and starts the WebSocket connection. The pipeline checks the Trie before calling the translation API. Fallback routing activates on translation errors. Metrics log every 30 seconds. Replace process.env variables with your tenant credentials and API keys before execution.

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The JWT token has expired or the OAuth client lacks the required scopes.
  • How to fix it: Verify COGNIGY_CLIENT_ID and COGNIGY_CLIENT_SECRET in your environment. Ensure the client credentials flow is enabled in the Cognigy tenant settings. Add agent:assist:write and realtime:events:subscribe to the scope string.
  • Code showing the fix: The CognigyAuth.getToken() method automatically refreshes expired tokens. If the error persists, log the raw OAuth response to verify scope approval.

Error: 429 Too Many Requests

  • What causes it: The translation API enforces rate limits per API key or IP address.
  • How to fix it: The TranslationRouter implements exponential backoff. If failures continue, increase MAX_RETRIES or implement request queuing to batch fragments before translation.
  • Code showing the fix: The retry loop in translate() calculates delay = Math.pow(2, attempt) * 1000 and pauses execution before retrying.

Error: WebSocket Connection Refused or Immediate Close

  • What causes it: Invalid Bearer token format or missing TLS certificate validation.
  • How to fix it: Ensure the Authorization: Bearer <token> header is attached to the WebSocket upgrade request. Disable strict TLS verification only in development environments using NODE_TLS_REJECT_UNAUTHORIZED=0.
  • Code showing the fix: The TranscriptInterceptor.connect() method attaches the token via the headers option in the WebSocket constructor.

Error: Trie Memory Growth

  • What causes it: Unbounded insertion of unique customer phrases without eviction.
  • How to fix it: Implement an LRU eviction policy or enforce MAX_DEPTH on inserted phrases. The provided Trie caps depth at 150 characters. For production, track node visit counts and prune unused branches periodically.
  • Code showing the fix: Add a prune() method that traverses the Trie and removes nodes with zero access counters, or switch to a fixed-size cache wrapper around the Trie.

Official References