Subscribing to NICE CXone Analytics Events with TypeScript

Subscribing to NICE CXone Analytics Events with TypeScript

What You Will Build

  • Build a TypeScript service that configures NICE CXone analytics event subscriptions, connects to the real-time WebSocket stream, parses and filters payloads, aggregates metrics in memory, persists snapshots to SQLite, triggers threshold alerts, and includes a local event simulator for UI testing.
  • Uses the NICE CXone Analytics REST API and Real-Time WebSocket API.
  • Covers TypeScript with Node.js, axios, ws, and better-sqlite3.

Prerequisites

  • OAuth 2.0 Client Credentials grant type registered in the NICE CXone Admin Portal
  • Required OAuth scopes: analytics:read, realtime:read, events:subscribe
  • NICE CXone API v2
  • Node.js 18+ and TypeScript 5+
  • External dependencies: npm install axios ws better-sqlite3 typescript @types/node @types/ws @types/better-sqlite3

Authentication Setup

NICE CXone uses standard OAuth 2.0 client credentials flow. The access token is required for both the REST configuration endpoint and the WebSocket handshake. The token expires after thirty minutes, so the service must cache it and request a new one before expiration.

import axios, { AxiosError } from 'axios';

interface CxoneCredentials {
  clientId: string;
  clientSecret: string;
  region: string;
}

interface CxoneTokenResponse {
  access_token: string;
  expires_in: number;
  token_type: string;
  scope: string;
}

class CxoneAuth {
  private token: string | null = null;
  private expiresAt: number | null = null;
  private credentials: CxoneCredentials;

  constructor(credentials: CxoneCredentials) {
    this.credentials = credentials;
  }

  async getAccessToken(): Promise<string> {
    if (this.token && this.expiresAt && Date.now() < this.expiresAt) {
      return this.token;
    }

    const baseUrl = `https://api-${this.credentials.region}.cxone.com`;
    try {
      const response = await axios.post<CxoneTokenResponse>(
        `${baseUrl}/oauth/token`,
        new URLSearchParams({
          grant_type: 'client_credentials',
          client_id: this.credentials.clientId,
          client_secret: this.credentials.clientSecret,
          scope: 'analytics:read realtime:read events:subscribe',
        }),
        {
          headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
          timeout: 10000,
        }
      );

      this.token = response.data.access_token;
      this.expiresAt = Date.now() + (response.data.expires_in * 1000) - 60000; // Refresh 1 minute early
      return this.token;
    } catch (error) {
      if (error instanceof AxiosError) {
        if (error.response?.status === 401) {
          throw new Error('OAuth 401: Invalid client credentials or missing scope.');
        }
        if (error.response?.status === 403) {
          throw new Error('OAuth 403: Client lacks required analytics or realtime scopes.');
        }
        if (error.response?.status === 429) {
          throw new Error('OAuth 429: Rate limit exceeded. Back off and retry.');
        }
      }
      throw error;
    }
  }
}

Implementation

Step 1: Configure Event Subscription via Analytics API

Before establishing the WebSocket stream, you must register the subscription configuration through the Analytics API. This step defines the event types, filters, and delivery preferences. The endpoint returns a subscription identifier that you attach to the WebSocket connection.

Required Scope: analytics:read, events:subscribe

import axios, { AxiosError } from 'axios';

interface SubscriptionConfig {
  name: string;
  eventType: string;
  filters: Record<string, any>;
  fields: string[];
}

interface SubscriptionResponse {
  id: string;
  status: string;
  createdAt: string;
}

async function configureSubscription(
  baseUrl: string,
  token: string,
  config: SubscriptionConfig
): Promise<string> {
  try {
    const response = await axios.post<SubscriptionResponse>(
      `${baseUrl}/api/v2/analytics/eventsub`,
      {
        subscriptionName: config.name,
        eventType: config.eventType,
        filters: config.filters,
        requestedFields: config.fields,
      },
      {
        headers: {
          'Authorization': `Bearer ${token}`,
          'Content-Type': 'application/json',
        },
        timeout: 8000,
      }
    );

    console.log('Subscription configured:', response.data.id);
    return response.data.id;
  } catch (error) {
    if (error instanceof AxiosError) {
      if (error.response?.status === 401) {
        throw new Error('Analytics API 401: Token expired or invalid.');
      }
      if (error.response?.status === 403) {
        throw new Error('Analytics API 403: Insufficient permissions for event subscription.');
      }
      if (error.response?.status === 422) {
        throw new Error(`Analytics API 422: Invalid payload structure. ${error.response.data.message}`);
      }
    }
    throw error;
  }
}

Step 2: Establish WebSocket Connection with Automatic Reconnection

NICE CXone delivers analytics events over a persistent WebSocket connection. The handshake requires the Bearer token in the Authorization header. You must implement exponential backoff to handle network partitions, server-side resets, and 429 rate-limit cascades.

import WebSocket from 'ws';

interface CxoneEvent {
  eventType: string;
  timestamp: string;
  interactionId: string;
  agentId: string;
  metrics: {
    handleTime: number;
    waitTime: number;
    state: string;
    queueTime: number;
  };
  metadata: Record<string, any>;
}

class CxoneEventStream {
  private ws: WebSocket | null = null;
  private subscriptionId: string;
  private reconnectAttempts: number = 0;
  private maxReconnectAttempts: number = 10;
  private onEvent: (event: CxoneEvent) => void;

  constructor(subscriptionId: string, onEvent: (event: CxoneEvent) => void) {
    this.subscriptionId = subscriptionId;
    this.onEvent = onEvent;
  }

  connect(token: string, region: string): void {
    const url = `wss://api-${region}.cxone.com/api/v2/events`;
    
    this.ws = new WebSocket(url, {
      headers: { Authorization: `Bearer ${token}` },
      handshakeTimeout: 5000,
    });

    this.ws.on('open', () => {
      console.log('WebSocket connected. Sending subscription message.');
      this.reconnectAttempts = 0;
      const subscribeMessage = JSON.stringify({
        type: 'subscribe',
        subscriptionId: this.subscriptionId,
        events: ['interaction', 'agent', 'queue'],
        format: 'json',
      });
      this.ws?.send(subscribeMessage);
    });

    this.ws.on('message', (data: WebSocket.Data) => {
      try {
        const parsed = JSON.parse(data.toString()) as CxoneEvent;
        this.onEvent(parsed);
      } catch (err) {
        console.error('Failed to parse WebSocket message:', err);
      }
    });

    this.ws.on('error', (error) => {
      console.error('WebSocket error:', error.message);
      this.handleReconnect(token, region);
    });

    this.ws.on('close', (code, reason) => {
      console.log(`WebSocket closed with code ${code}: ${reason.toString()}`);
      this.handleReconnect(token, region);
    });
  }

  private handleReconnect(token: string, region: string): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('Max reconnection attempts reached. Giving up.');
      return;
    }

    this.reconnectAttempts++;
    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts - 1), 30000);
    console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);

    setTimeout(() => this.connect(token, region), delay);
  }

  close(): void {
    this.ws?.close(1000, 'Client shutdown');
  }
}

Step 3: Parse, Filter, and Aggregate Streaming Payloads

The WebSocket stream delivers every event matching the subscription. You must apply client-side filtering to isolate relevant interaction types and aggregate metrics in memory for dashboard rendering. The aggregation window resets every sixty seconds.

interface AggregatedMetrics {
  totalInteractions: number;
  avgHandleTime: number;
  avgWaitTime: number;
  activeAgents: Set<string>;
  lastUpdated: number;
}

class MetricAggregator {
  private metrics: AggregatedMetrics = {
    totalInteractions: 0,
    avgHandleTime: 0,
    avgWaitTime: 0,
    activeAgents: new Set(),
    lastUpdated: Date.now(),
  };
  private onAggregate: (m: AggregatedMetrics) => void;
  private eventBuffer: CxoneEvent[] = [];
  private bufferInterval: NodeJS.Timeout | null = null;

  constructor(onAggregate: (m: AggregatedMetrics) => void) {
    this.onAggregate = onAggregate;
    this.startBufferFlush();
  }

  ingest(event: CxoneEvent): void {
    // Client-side filtering: only process active interaction events
    if (event.eventType !== 'interaction' || event.metrics.state === 'idle') {
      return;
    }

    this.eventBuffer.push(event);
    this.metrics.activeAgents.add(event.agentId);
  }

  private startBufferFlush(): void {
    this.bufferInterval = setInterval(() => this.flushBuffer(), 60000);
  }

  private flushBuffer(): void {
    if (this.eventBuffer.length === 0) return;

    const handleTimes = this.eventBuffer.map(e => e.metrics.handleTime);
    const waitTimes = this.eventBuffer.map(e => e.metrics.waitTime);

    this.metrics.totalInteractions += this.eventBuffer.length;
    this.metrics.avgHandleTime = handleTimes.length > 0 
      ? handleTimes.reduce((a, b) => a + b, 0) / handleTimes.length 
      : 0;
    this.metrics.avgWaitTime = waitTimes.length > 0 
      ? waitTimes.reduce((a, b) => a + b, 0) / waitTimes.length 
      : 0;
    this.metrics.lastUpdated = Date.now();

    this.onAggregate({ ...this.metrics, activeAgents: new Set(this.metrics.activeAgents) });
    this.eventBuffer = [];
  }

  stop(): void {
    if (this.bufferInterval) clearInterval(this.bufferInterval);
  }
}

Step 4: Persist Snapshots and Generate Alerts

Real-time aggregation requires historical persistence for trend analysis. You will write snapshot records to a local SQLite database. The service also evaluates metric thresholds and emits alert notifications when values exceed operational limits.

import Database from 'better-sqlite3';

interface AlertPayload {
  timestamp: string;
  metric: string;
  value: number;
  threshold: number;
  severity: 'warning' | 'critical';
}

class DataPersistence {
  private db: Database.Database;

  constructor(dbPath: string) {
    this.db = new Database(dbPath);
    this.initializeSchema();
  }

  private initializeSchema(): void {
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS metric_snapshots (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp TEXT NOT NULL,
        total_interactions INTEGER NOT NULL,
        avg_handle_time REAL NOT NULL,
        avg_wait_time REAL NOT NULL,
        active_agents INTEGER NOT NULL,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP
      );
    `);
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS alerts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp TEXT NOT NULL,
        metric TEXT NOT NULL,
        value REAL NOT NULL,
        threshold REAL NOT NULL,
        severity TEXT NOT NULL
      );
    `);
  }

  saveSnapshot(metrics: AggregatedMetrics): void {
    const stmt = this.db.prepare(`
      INSERT INTO metric_snapshots (timestamp, total_interactions, avg_handle_time, avg_wait_time, active_agents)
      VALUES (?, ?, ?, ?, ?)
    `);
    stmt.run(
      new Date(metrics.lastUpdated).toISOString(),
      metrics.totalInteractions,
      metrics.avgHandleTime,
      metrics.avgWaitTime,
      metrics.activeAgents.size
    );
  }

  checkThresholds(metrics: AggregatedMetrics, thresholds: { handleTime: number; waitTime: number }): AlertPayload[] {
    const alerts: AlertPayload[] = [];
    const now = new Date().toISOString();

    if (metrics.avgHandleTime > thresholds.handleTime) {
      const alert: AlertPayload = {
        timestamp: now,
        metric: 'avg_handle_time',
        value: metrics.avgHandleTime,
        threshold: thresholds.handleTime,
        severity: metrics.avgHandleTime > thresholds.handleTime * 1.5 ? 'critical' : 'warning',
      };
      this.persistAlert(alert);
      alerts.push(alert);
    }

    if (metrics.avgWaitTime > thresholds.waitTime) {
      const alert: AlertPayload = {
        timestamp: now,
        metric: 'avg_wait_time',
        value: metrics.avgWaitTime,
        threshold: thresholds.waitTime,
        severity: metrics.avgWaitTime > thresholds.waitTime * 1.5 ? 'critical' : 'warning',
      };
      this.persistAlert(alert);
      alerts.push(alert);
    }

    return alerts;
  }

  private persistAlert(alert: AlertPayload): void {
    const stmt = this.db.prepare(`
      INSERT INTO alerts (timestamp, metric, value, threshold, severity)
      VALUES (?, ?, ?, ?, ?)
    `);
    stmt.run(alert.timestamp, alert.metric, alert.value, alert.threshold, alert.severity);
  }

  close(): void {
    this.db.close();
  }
}

Complete Working Example

The following module combines authentication, subscription configuration, WebSocket streaming, filtering, aggregation, persistence, alerting, and a local event simulator into a single runnable service.

import CxoneAuth from './auth';
import { configureSubscription } from './subscription';
import { CxoneEventStream, CxoneEvent } from './stream';
import { MetricAggregator, AggregatedMetrics } from './aggregator';
import { DataPersistence, AlertPayload } from './persistence';

interface ServiceConfig {
  credentials: { clientId: string; clientSecret: string; region: string };
  subscriptionName: string;
  thresholds: { handleTime: number; waitTime: number };
  dbPath: string;
}

class CxoneAnalyticsService {
  private auth: CxoneAuth;
  private stream: CxoneEventStream | null = null;
  private aggregator: MetricAggregator;
  private persistence: DataPersistence;
  private config: ServiceConfig;

  constructor(config: ServiceConfig) {
    this.config = config;
    this.auth = new CxoneAuth(config.credentials);
    this.persistence = new DataPersistence(config.dbPath);
    this.aggregator = new MetricAggregator(async (metrics: AggregatedMetrics) => {
      await this.persistence.saveSnapshot(metrics);
      const alerts = this.persistence.checkThresholds(metrics, config.thresholds);
      if (alerts.length > 0) {
        console.log('ALERTS TRIGGERED:', JSON.stringify(alerts, null, 2));
      }
    });
  }

  async start(): Promise<void> {
    console.log('Initializing NICE CXone Analytics Service...');
    const token = await this.auth.getAccessToken();
    const baseUrl = `https://api-${this.config.credentials.region}.cxone.com`;

    const subscriptionId = await configureSubscription(baseUrl, token, {
      name: this.config.subscriptionName,
      eventType: 'interaction',
      filters: { status: 'active' },
      fields: ['interactionId', 'agentId', 'metrics.handleTime', 'metrics.waitTime', 'metrics.state'],
    });

    this.stream = new CxoneEventStream(subscriptionId, (event: CxoneEvent) => {
      this.aggregator.ingest(event);
    });

    this.stream.connect(token, this.config.credentials.region);
    console.log('Service running. Press Ctrl+C to stop.');
  }

  stop(): void {
    this.stream?.close();
    this.aggregator.stop();
    this.persistence.close();
    process.exit(0);
  }
}

// Event Simulator for UI Testing
function startSimulator(aggregator: MetricAggregator): NodeJS.Timeout {
  let counter = 0;
  return setInterval(() => {
    const mockEvent: CxoneEvent = {
      eventType: 'interaction',
      timestamp: new Date().toISOString(),
      interactionId: `sim-${counter++}`,
      agentId: `agent-${Math.floor(Math.random() * 5) + 1}`,
      metrics: {
        handleTime: Math.floor(Math.random() * 300) + 60,
        waitTime: Math.floor(Math.random() * 120),
        state: Math.random() > 0.2 ? 'active' : 'idle',
        queueTime: Math.floor(Math.random() * 30),
      },
      metadata: { simulator: true },
    };
    aggregator.ingest(mockEvent);
    console.log('Simulator emitted:', mockEvent.interactionId);
  }, 2000);
}

// Entry Point
(async () => {
  const config: ServiceConfig = {
    credentials: {
      clientId: process.env.CXONE_CLIENT_ID || '',
      clientSecret: process.env.CXONE_CLIENT_SECRET || '',
      region: process.env.CXONE_REGION || 'us-1',
    },
    subscriptionName: 'realtime-dashboard-sub',
    thresholds: { handleTime: 240, waitTime: 90 },
    dbPath: './cxone_analytics.db',
  };

  const service = new CxoneAnalyticsService(config);
  await service.start();

  // Uncomment to run simulator instead of live stream
  // const simInterval = startSimulator(service.aggregator);
  // setTimeout(() => { clearInterval(simInterval); service.stop(); }, 60000);

  process.on('SIGINT', () => service.stop());
})();

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Handshake

  • Cause: The OAuth token expired, was malformed, or the region endpoint does not match the token issuer.
  • Fix: Verify the token expiration timestamp. Ensure the Authorization: Bearer <token> header is passed exactly during the WebSocket upgrade. Implement token refresh logic before the thirty-minute expiry.
  • Code Fix: The CxoneAuth class caches the token and subtracts sixty seconds from the expiry window to force a proactive refresh.

Error: 403 Forbidden on Analytics Eventsub

  • Cause: The OAuth client lacks the events:subscribe or analytics:read scope, or the tenant restricts programmatic subscription creation.
  • Fix: Navigate to the CXone Admin Portal, locate the OAuth Client, and append the missing scopes to the allowed list. Reauthorize the client credentials.
  • Code Fix: The configureSubscription function explicitly catches 403 and throws a descriptive message. Validate scope configuration before deployment.

Error: WebSocket Close Code 1006 or 1011

  • Cause: Network partition, server-side load balancing reset, or malformed subscription payload causing an abnormal closure.
  • Fix: Implement exponential backoff reconnection. Validate the JSON subscription message structure matches the CXone Real-Time API schema.
  • Code Fix: The handleReconnect method applies Math.min(1000 * Math.pow(2, attempt - 1), 30000) backoff and resets the attempt counter on successful reconnection.

Error: SQLite Database is Locked

  • Cause: Concurrent write operations without proper transaction handling or an open connection in another process.
  • Fix: Use better-sqlite3 synchronous mode for single-threaded Node.js execution. Ensure close() is called exactly once during shutdown.
  • Code Fix: The DataPersistence class uses prepared statements and synchronous writes, which eliminates locking in single-process environments.

Official References