Subscribing to Genesys Cloud Real-Time Events via WebSocket API with TypeScript

Subscribing to Genesys Cloud Real-Time Events via WebSocket API with TypeScript

What You Will Build

  • A production-grade TypeScript WebSocket client that subscribes to Genesys Cloud real-time streaming events, validates payloads, handles connection resilience, processes high-throughput streams, and exposes a monitoring interface.
  • This tutorial uses the Genesys Cloud Analytics Events WebSocket API at wss://api.{region}.mypurecloud.com/api/v2/analytics/events.
  • The implementation is written in modern TypeScript using fetch for OAuth, ws for WebSocket communication, and zod for schema validation.

Prerequisites

  • OAuth Client credentials (Confidential or Public) with scope analytics:events:read
  • Genesys Cloud API v2
  • Node.js 18+ runtime
  • External dependencies: npm install ws zod uuid @types/ws

Authentication Setup

The WebSocket streaming API requires a valid OAuth 2.0 Bearer token. You must obtain the token via the Client Credentials flow before initiating the WebSocket connection. The token is passed as a JSON authentication message immediately after the WebSocket handshake completes.

import fetch from 'node-fetch';

interface OAuthConfig {
  baseUrl: string;
  clientId: string;
  clientSecret: string;
  scope: string;
}

interface OAuthTokenResponse {
  access_token: string;
  expires_in: number;
  token_type: string;
}

async function acquireOAuthToken(config: OAuthConfig): Promise<string> {
  const response = await fetch(`${config.baseUrl}/oauth/token`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/x-www-form-urlencoded',
      'Authorization': `Basic ${Buffer.from(`${config.clientId}:${config.clientSecret}`).toString('base64')}`
    },
    body: new URLSearchParams({
      grant_type: 'client_credentials',
      scope: config.scope
    })
  });

  if (!response.ok) {
    const errorBody = await response.text();
    throw new Error(`OAuth token acquisition failed with status ${response.status}: ${errorBody}`);
  }

  const data = await response.json() as OAuthTokenResponse;
  return data.access_token;
}

Expected response body:

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 7200
}

Required OAuth scope: analytics:events:read

Error handling for authentication:

  • HTTP 401 indicates invalid credentials or malformed basic auth header.
  • HTTP 403 indicates the client lacks the required scope.
  • HTTP 429 indicates rate limiting on the OAuth endpoint. Implement exponential backoff before retrying.

Implementation

Step 1: WebSocket Connection & Authentication Handshake

Genesys Cloud streaming uses standard WebSocket transport. You establish the connection to the region-specific endpoint, then send a JSON authentication message. The server validates the token and responds with an auth_ack or an error message.

import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';

interface StreamConfig {
  region: string;
  accessToken: string;
  consumerGroup: string;
  maxSubscriptionsPerOrg: number;
  activeSubscriptionCount: number;
}

class GenesysEventSubscriber {
  private ws: WebSocket | null = null;
  private config: StreamConfig;
  private sessionId: string;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private reconnectDelayMs = 1000;
  private isConnecting = false;
  private heartbeatInterval: NodeJS.Timeout | null = null;
  private lastHeartbeatTime = Date.now();

  constructor(config: StreamConfig) {
    this.config = config;
    this.sessionId = uuidv4();
    this.validateSubscriptionLimits();
  }

  private validateSubscriptionLimits(): void {
    if (this.config.activeSubscriptionCount >= this.config.maxSubscriptionsPerOrg) {
      throw new Error(
        `Subscription limit reached. Current: ${this.config.activeSubscriptionCount}, Max: ${this.config.maxSubscriptionsPerOrg}`
      );
    }
  }

  public async connect(): Promise<void> {
    if (this.isConnecting || this.ws?.readyState === WebSocket.OPEN) return;
    this.isConnecting = true;

    const wsUrl = `wss://api.${this.config.region}.mypurecloud.com/api/v2/analytics/events`;
    this.ws = new WebSocket(wsUrl);

    return new Promise((resolve, reject) => {
      this.ws!.on('open', () => {
        this.writeAuditLog('connection_opened', { sessionId: this.sessionId, url: wsUrl });
        this.authenticate();
        resolve();
      });

      this.ws!.on('error', (err) => {
        this.writeAuditLog('connection_error', { sessionId: this.sessionId, error: err.message });
        this.isConnecting = false;
        reject(err);
      });

      this.ws!.on('close', (code, reason) => {
        this.handleDisconnection(code, reason.toString());
      });
    });
  }

  private authenticate(): void {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;

    const authPayload = {
      type: 'auth',
      access_token: this.config.accessToken,
      session_id: this.sessionId
    };

    this.ws.send(JSON.stringify(authPayload));
  }

Expected server response after authentication:

{
  "type": "auth_ack",
  "session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}

Error handling for handshake:

  • The server sends {"type":"error","error":{"code":"401","message":"Invalid token"}} if authentication fails.
  • The server closes the connection with code 1008 if the token lacks required scopes.

Step 2: Subscription Payload Construction & Validation

After authentication, you send a subscription message containing event types, optional filters, and a consumer group identifier. Genesys validates the payload against available event types and org limits.

  interface SubscriptionPayload {
    type: 'subscribe';
    subscription: {
      eventTypes: string[];
      filters?: Record<string, unknown>;
      consumerGroup: string;
    };
  }

  public subscribe(eventTypes: string[], filters?: Record<string, unknown>): void {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
      throw new Error('WebSocket is not connected. Call connect() first.');
    }

    const availableEvents = ['conversation', 'interaction', 'routing', 'analytics', 'user'];
    const invalidTypes = eventTypes.filter(et => !availableEvents.includes(et));
    if (invalidTypes.length > 0) {
      throw new Error(`Invalid event types requested: ${invalidTypes.join(', ')}`);
    }

    const payload: SubscriptionPayload = {
      type: 'subscribe',
      subscription: {
        eventTypes,
        filters,
        consumerGroup: this.config.consumerGroup
      }
    };

    this.ws.send(JSON.stringify(payload));
    this.writeAuditLog('subscription_requested', { 
      sessionId: this.sessionId, 
      eventTypes, 
      consumerGroup: this.config.consumerGroup 
    });
  }

Expected server response:

{
  "type": "subscribe_ack",
  "consumer_group": "my-analytics-group",
  "event_types": ["conversation"]
}

Error handling for subscription:

  • {"type":"error","error":{"code":"403","message":"Consumer group limit reached"}} indicates quota exhaustion.
  • {"type":"error","error":{"code":"400","message":"Invalid filter syntax"}} indicates malformed filter expressions.

Step 3: Event Processing, Schema Validation, & Deduplication

High-throughput streams require strict schema validation and deduplication to prevent memory leaks and processing errors. You use zod to validate incoming events against the Genesys Cloud event schema and a sliding window Set to filter duplicates.

import { z } from 'zod';

const GenesysEventSchema = z.object({
  type: z.string(),
  id: z.string().uuid(),
  timestamp: z.string().datetime(),
  data: z.record(z.unknown()),
  metadata: z.object({
    consumer_group: z.string(),
    sequence_number: z.number().optional()
  })
});

type ValidatedEvent = z.infer<typeof GenesysEventSchema>;

class EventProcessor {
  private processedIds = new Set<string>();
  private readonly maxDedupWindow = 10000;

  public process(eventJson: string): ValidatedEvent | null {
    const parsed = JSON.parse(eventJson);
    
    const result = GenesysEventSchema.safeParse(parsed);
    if (!result.success) {
      console.error('Schema validation failed:', result.error.errors);
      return null;
    }

    const event = result.data;
    if (this.processedIds.has(event.id)) {
      return null;
    }

    this.processedIds.add(event.id);
    if (this.processedIds.size > this.maxDedupWindow) {
      const idsArray = Array.from(this.processedIds);
      this.processedIds = new Set(idsArray.slice(idsArray.length - this.maxDedupWindow));
    }

    return event;
  }
}

This processor rejects malformed payloads, skips duplicate events, and maintains a bounded memory footprint. You integrate this processor into the WebSocket message handler.

Step 4: Connection Lifecycle, Heartbeat, & Reconnection

Genesys Cloud streaming connections require heartbeat monitoring. You track the last message timestamp and trigger reconnection when the connection drops unexpectedly. You implement exponential backoff to respect server rate limits.

  private handleDisconnection(code: number, reason: string): void {
    this.writeAuditLog('connection_closed', { 
      sessionId: this.sessionId, 
      code, 
      reason,
      reconnectAttempts: this.reconnectAttempts
    });

    this.clearHeartbeat();
    this.ws = null;
    this.isConnecting = false;

    if (code === 1000 || code === 1001) return;

    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      const delay = this.reconnectDelayMs * Math.pow(2, this.reconnectAttempts);
      console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
      setTimeout(() => {
        this.reconnectAttempts++;
        this.connect().then(() => {
          this.reconnectAttempts = 0;
          this.subscribe(['conversation']);
        }).catch(err => console.error('Reconnection failed:', err));
      }, delay);
    } else {
      console.error('Max reconnection attempts reached. Session terminated.');
    }
  }

  private startHeartbeat(): void {
    this.clearHeartbeat();
    this.heartbeatInterval = setInterval(() => {
      const elapsed = Date.now() - this.lastHeartbeatTime;
      if (elapsed > 30000) {
        console.warn('Heartbeat timeout detected. Forcing reconnection.');
        this.ws?.terminate();
      }
    }, 10000);
  }

  private clearHeartbeat(): void {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
      this.heartbeatInterval = null;
    }
  }

  public handleMessage(data: WebSocket.Data): void {
    this.lastHeartbeatTime = Date.now();
    const message = data.toString();
    const parsed = JSON.parse(message);

    switch (parsed.type) {
      case 'auth_ack':
        this.startHeartbeat();
        console.log('Authentication successful.');
        break;
      case 'subscribe_ack':
        console.log('Subscription confirmed.');
        break;
      case 'event':
        this.processStreamEvent(parsed.data);
        break;
      default:
        console.warn('Unknown message type:', parsed.type);
    }
  }

Heartbeat monitoring prevents zombie connections. Reconnection logic restores session state by resubscribing to the same event types and consumer group.

Step 5: Metrics, Audit Logging, & External Pipeline Sync

You track subscription latency, event drop rates, and write structured audit logs for security governance. You also expose a streaming pipeline interface to synchronize data with external analytics engines.

interface StreamMetrics {
  totalEvents: number;
  droppedEvents: number;
  latencyMs: number[];
  lastProcessedTimestamp: string;
}

class GenesysEventSubscriber {
  // ... previous members ...
  private metrics: StreamMetrics = {
    totalEvents: 0,
    droppedEvents: 0,
    latencyMs: [],
    lastProcessedTimestamp: ''
  };
  private eventProcessor = new EventProcessor();
  private onEventCallback?: (event: ValidatedEvent, metrics: StreamMetrics) => void;

  public setEventConsumer(callback: (event: ValidatedEvent, metrics: StreamMetrics) => void): void {
    this.onEventCallback = callback;
  }

  private processStreamEvent(eventPayload: Record<string, unknown>): void {
    const event = this.eventProcessor.process(JSON.stringify(eventPayload));
    if (!event) {
      this.metrics.droppedEvents++;
      return;
    }

    this.metrics.totalEvents++;
    this.metrics.lastProcessedTimestamp = event.timestamp;
    
    const eventTime = new Date(event.timestamp).getTime();
    const receiveTime = Date.now();
    const latency = receiveTime - eventTime;
    this.metrics.latencyMs.push(latency);
    if (this.metrics.latencyMs.length > 1000) {
      this.metrics.latencyMs.shift();
    }

    this.syncToExternalPipeline(event);
    this.writeAuditLog('event_processed', { 
      eventId: event.id, 
      latencyMs: latency, 
      metrics: this.metrics 
    });

    if (this.onEventCallback) {
      this.onEventCallback(event, { ...this.metrics });
    }
  }

  private async syncToExternalPipeline(event: ValidatedEvent): Promise<void> {
    try {
      const pipelinePayload = {
        stream: 'genesys_realtime',
        timestamp: new Date().toISOString(),
        event: event
      };
      
      await fetch('https://analytics.internal/api/v1/ingest', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(pipelinePayload)
      });
    } catch (err) {
      console.error('Pipeline sync failed:', err);
    }
  }

  private writeAuditLog(action: string, details: Record<string, unknown>): void {
    const auditEntry = {
      timestamp: new Date().toISOString(),
      sessionId: this.sessionId,
      action,
      details
    };
    console.log(JSON.stringify(auditEntry));
  }

  public getMetrics(): StreamMetrics {
    return { ...this.metrics };
  }
}

The metrics object tracks throughput and latency. The audit log captures every connection state change and processing event. The pipeline sync function demonstrates how to push validated events to an external system.

Step 6: Exposing the Subscriber Interface

You expose the subscriber as a reusable module that handles initialization, event consumption, and graceful shutdown.

export async function createSubscriber(config: {
  region: string;
  clientId: string;
  clientSecret: string;
  consumerGroup: string;
  maxSubscriptions: number;
  activeSubscriptions: number;
}): Promise<GenesysEventSubscriber> {
  const baseUrl = `https://api.${config.region}.mypurecloud.com`;
  const token = await acquireOAuthToken({
    baseUrl,
    clientId: config.clientId,
    clientSecret: config.clientSecret,
    scope: 'analytics:events:read'
  });

  const subscriber = new GenesysEventSubscriber({
    region: config.region,
    accessToken: token,
    consumerGroup: config.consumerGroup,
    maxSubscriptionsPerOrg: config.maxSubscriptions,
    activeSubscriptionCount: config.activeSubscriptions
  });

  await subscriber.connect();
  subscriber.subscribe(['conversation', 'interaction']);
  
  return subscriber;
}

This factory function encapsulates authentication and initial subscription. You call createSubscriber once per deployment instance.

Complete Working Example

The following script combines all components into a single runnable module. Replace the placeholder credentials with your OAuth client values.

import WebSocket from 'ws';
import { z } from 'zod';
import { v4 as uuidv4 } from 'uuid';
import fetch from 'node-fetch';

// --- OAuth & Types ---
interface OAuthConfig { baseUrl: string; clientId: string; clientSecret: string; scope: string; }
interface OAuthTokenResponse { access_token: string; expires_in: number; token_type: string; }

async function acquireOAuthToken(config: OAuthConfig): Promise<string> {
  const response = await fetch(`${config.baseUrl}/oauth/token`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/x-www-form-urlencoded',
      'Authorization': `Basic ${Buffer.from(`${config.clientId}:${config.clientSecret}`).toString('base64')}`
    },
    body: new URLSearchParams({ grant_type: 'client_credentials', scope: config.scope })
  });
  if (!response.ok) throw new Error(`OAuth failed: ${response.status}`);
  return (await response.json() as OAuthTokenResponse).access_token;
}

const GenesysEventSchema = z.object({
  type: z.string(),
  id: z.string().uuid(),
  timestamp: z.string().datetime(),
  data: z.record(z.unknown()),
  metadata: z.object({ consumer_group: z.string(), sequence_number: z.number().optional() })
});
type ValidatedEvent = z.infer<typeof GenesysEventSchema>;

interface StreamConfig {
  region: string; accessToken: string; consumerGroup: string;
  maxSubscriptionsPerOrg: number; activeSubscriptionCount: number;
}

interface StreamMetrics {
  totalEvents: number; droppedEvents: number; latencyMs: number[]; lastProcessedTimestamp: string;
}

// --- Core Subscriber ---
class GenesysEventSubscriber {
  private ws: WebSocket | null = null;
  private config: StreamConfig;
  private sessionId: string;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private reconnectDelayMs = 1000;
  private isConnecting = false;
  private heartbeatInterval: NodeJS.Timeout | null = null;
  private lastHeartbeatTime = Date.now();
  private metrics: StreamMetrics = { totalEvents: 0, droppedEvents: 0, latencyMs: [], lastProcessedTimestamp: '' };
  private processedIds = new Set<string>();
  private onEventCallback?: (event: ValidatedEvent, metrics: StreamMetrics) => void;

  constructor(config: StreamConfig) {
    this.config = config;
    this.sessionId = uuidv4();
    if (config.activeSubscriptionCount >= config.maxSubscriptionsPerOrg) {
      throw new Error('Subscription limit reached.');
    }
  }

  public async connect(): Promise<void> {
    if (this.isConnecting || this.ws?.readyState === WebSocket.OPEN) return;
    this.isConnecting = true;
    const wsUrl = `wss://api.${this.config.region}.mypurecloud.com/api/v2/analytics/events`;
    this.ws = new WebSocket(wsUrl);
    return new Promise((resolve, reject) => {
      this.ws!.on('open', () => {
        this.writeAuditLog('connection_opened', { sessionId: this.sessionId, url: wsUrl });
        this.authenticate();
        resolve();
      });
      this.ws!.on('error', (err) => {
        this.writeAuditLog('connection_error', { sessionId: this.sessionId, error: err.message });
        this.isConnecting = false;
        reject(err);
      });
      this.ws!.on('message', (data) => this.handleMessage(data));
      this.ws!.on('close', (code, reason) => this.handleDisconnection(code, reason.toString()));
    });
  }

  private authenticate(): void {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
    this.ws.send(JSON.stringify({ type: 'auth', access_token: this.config.accessToken, session_id: this.sessionId }));
  }

  public subscribe(eventTypes: string[]): void {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) throw new Error('Not connected.');
    const validTypes = ['conversation', 'interaction', 'routing', 'analytics', 'user'];
    const invalid = eventTypes.filter(et => !validTypes.includes(et));
    if (invalid.length > 0) throw new Error(`Invalid event types: ${invalid.join(', ')}`);
    this.ws.send(JSON.stringify({
      type: 'subscribe',
      subscription: { eventTypes, consumerGroup: this.config.consumerGroup }
    }));
    this.writeAuditLog('subscription_requested', { sessionId: this.sessionId, eventTypes });
  }

  private handleMessage(data: WebSocket.Data): void {
    this.lastHeartbeatTime = Date.now();
    const parsed = JSON.parse(data.toString());
    switch (parsed.type) {
      case 'auth_ack':
        this.startHeartbeat();
        break;
      case 'subscribe_ack':
        console.log('Subscription confirmed.');
        break;
      case 'event':
        this.processStreamEvent(parsed.data);
        break;
      case 'error':
        console.error('Server error:', parsed.error);
        break;
    }
  }

  private processStreamEvent(payload: Record<string, unknown>): void {
    const result = GenesysEventSchema.safeParse(payload);
    if (!result.success) { this.metrics.droppedEvents++; return; }
    const event = result.data;
    if (this.processedIds.has(event.id)) { this.metrics.droppedEvents++; return; }
    this.processedIds.add(event.id);
    if (this.processedIds.size > 10000) {
      const arr = Array.from(this.processedIds);
      this.processedIds = new Set(arr.slice(arr.length - 10000));
    }
    this.metrics.totalEvents++;
    this.metrics.lastProcessedTimestamp = event.timestamp;
    const latency = Date.now() - new Date(event.timestamp).getTime();
    this.metrics.latencyMs.push(latency);
    if (this.metrics.latencyMs.length > 1000) this.metrics.latencyMs.shift();
    this.writeAuditLog('event_processed', { eventId: event.id, latencyMs: latency });
    this.syncToPipeline(event);
    if (this.onEventCallback) this.onEventCallback(event, { ...this.metrics });
  }

  private async syncToPipeline(event: ValidatedEvent): Promise<void> {
    try {
      await fetch('https://analytics.internal/api/v1/ingest', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ stream: 'genesys_realtime', timestamp: new Date().toISOString(), event })
      });
    } catch (err) { console.error('Pipeline sync failed:', err); }
  }

  private handleDisconnection(code: number, reason: string): void {
    this.writeAuditLog('connection_closed', { sessionId: this.sessionId, code, reason });
    this.clearHeartbeat();
    this.ws = null;
    this.isConnecting = false;
    if (code === 1000 || code === 1001) return;
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      const delay = this.reconnectDelayMs * Math.pow(2, this.reconnectAttempts);
      setTimeout(() => {
        this.reconnectAttempts++;
        this.connect().then(() => { this.reconnectAttempts = 0; this.subscribe(['conversation']); });
      }, delay);
    }
  }

  private startHeartbeat(): void {
    this.clearHeartbeat();
    this.heartbeatInterval = setInterval(() => {
      if (Date.now() - this.lastHeartbeatTime > 30000) {
        console.warn('Heartbeat timeout. Forcing reconnect.');
        this.ws?.terminate();
      }
    }, 10000);
  }

  private clearHeartbeat(): void {
    if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; }
  }

  private writeAuditLog(action: string, details: Record<string, unknown>): void {
    console.log(JSON.stringify({ timestamp: new Date().toISOString(), sessionId: this.sessionId, action, details }));
  }

  public setEventConsumer(cb: (event: ValidatedEvent, metrics: StreamMetrics) => void): void { this.onEventCallback = cb; }
  public getMetrics(): StreamMetrics { return { ...this.metrics }; }
  public close(): void { this.ws?.close(1000, 'Client shutdown'); this.clearHeartbeat(); }
}

// --- Factory & Execution ---
export async function createSubscriber(config: {
  region: string; clientId: string; clientSecret: string;
  consumerGroup: string; maxSubscriptions: number; activeSubscriptions: number;
}): Promise<GenesysEventSubscriber> {
  const baseUrl = `https://api.${config.region}.mypurecloud.com`;
  const token = await acquireOAuthToken({ baseUrl, clientId: config.clientId, clientSecret: config.clientSecret, scope: 'analytics:events:read' });
  const subscriber = new GenesysEventSubscriber({
    region: config.region, accessToken: token, consumerGroup: config.consumerGroup,
    maxSubscriptionsPerOrg: config.maxSubscriptions, activeSubscriptionCount: config.activeSubscriptions
  });
  await subscriber.connect();
  subscriber.subscribe(['conversation', 'interaction']);
  return subscriber;
}

// Run locally
(async () => {
  const sub = await createSubscriber({
    region: 'us-east-1',
    clientId: 'YOUR_CLIENT_ID',
    clientSecret: 'YOUR_CLIENT_SECRET',
    consumerGroup: 'prod-analytics-group',
    maxSubscriptions: 10,
    activeSubscriptions: 2
  });

  sub.setEventConsumer((event, metrics) => {
    console.log(`[REAL-TIME] Event: ${event.id} | Latency: ${metrics.latencyMs[metrics.latencyMs.length - 1]}ms | Total: ${metrics.totalEvents}`);
  });

  process.on('SIGINT', () => { sub.close(); process.exit(0); });
})();

Common Errors & Debugging

Error: WebSocket Close 1008 / Policy Violation

  • What causes it: The server rejects the connection due to invalid OAuth scopes, expired tokens, or IP restrictions.
  • How to fix it: Verify the client credentials possess the analytics:events:read scope. Check your org security settings for IP allowlists.
  • Code showing the fix: Implement token refresh logic before subscription. Cache tokens and request new ones when expires_in approaches zero.

Error: Subscription Limit Exceeded (403)

  • What causes it: Your org has reached the maximum number of active subscriptions per consumer group or per tenant.
  • How to fix it: Close idle subscriptions via {"type":"unsubscribe","consumer_group":"..."} or increase your org limit through the admin console.
  • Code showing the fix: Track active subscriptions in your deployment orchestrator and rotate consumer groups when limits approach.

Error: Schema Validation Failures

  • What causes it: Genesys Cloud updates event schemas, or malformed payloads arrive due to network fragmentation.
  • How to fix it: Use zod safe parsing with fallback handlers. Log failed payloads for inspection instead of crashing the stream.
  • Code showing the fix: The processStreamEvent method already implements safeParse and increments droppedEvents on failure.

Error: High Latency or Event Drops

  • What causes it: Network congestion, slow external pipeline sync, or blocking synchronous operations in the event consumer.
  • How to fix it: Offload pipeline synchronization to a message queue. Monitor latencyMs and droppedEvents metrics. Implement backpressure by pausing subscription flow when external systems lag.
  • Code showing the fix: The syncToPipeline method runs asynchronously. You can add a queue with a max size and drop events when the queue fills.

Official References