Implementing WebSocket Reconnection Logic for the Notification API in Node.js

Implementing WebSocket Reconnection Logic for the Notification API in Node.js

What You Will Build

  • A robust Node.js service that maintains a persistent WebSocket connection to the Genesys Cloud Notification API.
  • Automatic reconnection logic with exponential backoff and jitter to handle network blips and server-side resets.
  • State management to ensure no notifications are lost during connection transitions.
  • TypeScript implementation using the native WebSocket API and axios for authentication.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant).
  • Required Scopes: notification:all or specific scopes like conversation:all depending on the events you subscribe to.
  • SDK/API Version: Genesys Cloud REST API v2 (WebSocket endpoint is distinct from REST but uses the same auth model).
  • Language/Runtime: Node.js 18+ (LTS).
  • External Dependencies:
    • axios for handling the OAuth token exchange.
    • dotenv for managing environment variables.
    • typescript and ts-node for development (optional but recommended for type safety).

Install dependencies via npm:

npm install axios dotenv
npm install -D typescript @types/node ts-node

Authentication Setup

The Genesys Cloud WebSocket API does not accept basic authentication or API keys directly in the WebSocket handshake. It requires a valid OAuth Bearer token passed as a query parameter in the WebSocket URL.

The standard flow is:

  1. Exchange client_id and client_secret for an access token via the REST /api/v2/oauth/token endpoint.
  2. Append the token to the WebSocket URL: wss://api.mypurecloud.com/api/v2/notifications?access_token=YOUR_TOKEN.

Below is the authentication helper. This function returns a promise that resolves to the access token. In a production environment, you must implement token caching and refresh logic before the token expires (typically 1 hour).

import axios from 'axios';
import dotenv from 'dotenv';

dotenv.config();

const ENVIRONMENT = process.env.GENESYS_ENVIRONMENT || 'mypurecloud.com';
const CLIENT_ID = process.env.GENESYS_CLIENT_ID!;
const CLIENT_SECRET = process.env.GENESYS_CLIENT_SECRET!;

interface TokenResponse {
  access_token: string;
  token_type: string;
  expires_in: number;
}

/**
 * Retrieves an OAuth access token from Genesys Cloud.
 * In production, cache this token and refresh before expires_in.
 */
async function getAccessToken(): Promise<string> {
  const url = `https://api.${ENVIRONMENT}/api/v2/oauth/token`;
  
  const params = new URLSearchParams();
  params.append('grant_type', 'client_credentials');
  params.append('client_id', CLIENT_ID);
  params.append('client_secret', CLIENT_SECRET);

  try {
    const response = await axios.post<TokenResponse>(url, params.toString(), {
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded',
        'Accept': 'application/json'
      }
    });

    if (!response.data.access_token) {
      throw new Error('No access_token in response');
    }

    return response.data.access_token;
  } catch (error) {
    if (axios.isAxiosError(error)) {
      console.error('Auth Error:', error.response?.data || error.message);
    } else {
      console.error('Auth Error:', error);
    }
    throw error;
  }
}

Implementation

Step 1: Define the Reconnection Strategy

Network instability is inevitable. A naive ws.onclose(() => connect()) loop will hammer the server immediately upon failure, potentially triggering rate limits or failing due to transient DNS issues. We need Exponential Backoff with Jitter.

The formula for delay is:
delay = min(maxDelay, baseDelay * 2^attempt) + randomJitter

This ensures that:

  1. Initial retries are fast (user experience).
  2. Repeated failures slow down (server protection).
  3. Random jitter prevents “thundering herd” scenarios if multiple clients reconnect simultaneously.
const CONFIG = {
  BASE_DELAY: 1000,       // 1 second
  MAX_DELAY: 30000,       // 30 seconds
  JITTER_FACTOR: 0.1,     // 10% of delay
  MAX_RETRIES: 100,       // Effectively infinite for this demo
  SUBSCRIPTIONS: [
    'conversation:all'    // Subscribe to all conversation events
  ]
};

/**
 * Calculates the delay before the next reconnection attempt.
 */
function getReconnectDelay(attempt: number): number {
  const exponentialDelay = CONFIG.BASE_DELAY * Math.pow(2, attempt);
  const cappedDelay = Math.min(exponentialDelay, CONFIG.MAX_DELAY);
  const jitter = cappedDelay * CONFIG.JITTER_FACTOR * Math.random();
  
  return Math.floor(cappedDelay + jitter);
}

Step 2: Core WebSocket Manager Class

We will encapsulate the WebSocket logic in a class. This allows us to manage state (isConnected, reconnectAttempt) and cleanly separate the connection logic from the message processing logic.

Key considerations:

  • Ping/Pong: Genesys Cloud closes idle connections. We must send periodic pings.
  • Heartbeat: The server sends heartbeat messages. We must acknowledge them or the server may drop the connection.
  • Subscription: We must send a subscription message immediately upon connection.
import WebSocket from 'ws'; // Note: Using 'ws' library is preferred over Node native for production robustness

class GenesysNotificationClient {
  private ws: WebSocket | null = null;
  private reconnectAttempt = 0;
  private pingInterval: NodeJS.Timeout | null = null;
  private isIntentionalClose = false;
  private tokenRefreshTimeout: NodeJS.Timeout | null = null;

  constructor() {
    console.log('GenesysNotificationClient initialized.');
  }

  /**
   * Starts the connection loop.
   */
  async start() {
    console.log('Starting Genesys Notification Client...');
    await this.connect();
  }

  /**
   * Establishes the WebSocket connection.
   */
  private async connect() {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      return;
    }

    try {
      // 1. Get Token
      const token = await getAccessToken();
      
      // 2. Construct WebSocket URL
      const wsUrl = `wss://api.${ENVIRONMENT}/api/v2/notifications?access_token=${encodeURIComponent(token)}`;
      
      console.log(`Connecting to Genesys Cloud (Attempt ${this.reconnectAttempt + 1})...`);
      
      // 3. Create WebSocket
      this.ws = new WebSocket(wsUrl);

      // 4. Attach Event Listeners
      this.ws.on('open', () => this.onOpen());
      this.ws.on('message', (data) => this.onMessage(data));
      this.ws.on('close', (code, reason) => this.onClose(code, reason));
      this.ws.on('error', (error) => this.onError(error));

    } catch (error) {
      console.error('Failed to initialize connection:', error);
      this.scheduleReconnect();
    }
  }

  /**
   * Triggered when WebSocket opens.
   * Resets retry counter, starts ping interval, and sends subscriptions.
   */
  private onOpen() {
    console.log('WebSocket connected successfully.');
    this.reconnectAttempt = 0; // Reset on success
    this.isIntentionalClose = false;

    // Start Ping Interval to keep connection alive
    this.startPingInterval();

    // Send Subscriptions
    this.sendSubscription();
    
    // Schedule Token Refresh (Genesys tokens last 1 hour, refresh at 50 mins)
    this.scheduleTokenRefresh();
  }

  /**
   * Handles incoming messages.
   * Genesys sends JSON strings.
   */
  private onMessage(data: WebSocket.Data) {
    try {
      const message = JSON.parse(data.toString());
      
      // Handle Heartbeats
      if (message.type === 'heartbeat') {
        // Some implementations require sending a heartbeat back.
        // Genesys Cloud generally handles this via ping/pong, but 
        // explicit heartbeat handling can be added if required by specific sub-types.
        return;
      }

      // Process actual notification events
      this.processNotification(message);

    } catch (error) {
      console.error('Error parsing message:', error);
    }
  }

  /**
   * Handles WebSocket close events.
   * Determines if reconnection is necessary.
   */
  private onClose(code: number, reason: Buffer) {
    console.log(`WebSocket closed. Code: ${code}, Reason: ${reason.toString()}`);
    
    this.stopPingInterval();
    this.ws = null;

    // If we closed intentionally (e.g., shutdown), do not reconnect
    if (this.isIntentionalClose) {
      console.log('Intentional close. Exiting.');
      return;
    }

    // Schedule reconnection
    this.scheduleReconnect();
  }

  /**
   * Handles WebSocket errors.
   */
  private onError(error: Error) {
    console.error('WebSocket error:', error.message);
    // The 'close' event will usually follow an error.
    // We do not schedule reconnect here to avoid double scheduling.
  }

  /**
   * Schedules the next reconnection attempt with backoff.
   */
  private scheduleReconnect() {
    if (this.isIntentionalClose) return;

    this.reconnectAttempt++;
    const delay = getReconnectDelay(this.reconnectAttempt);
    
    console.log(`Reconnecting in ${delay}ms (Attempt ${this.reconnectAttempt})...`);
    
    setTimeout(() => {
      this.connect();
    }, delay);
  }

  /**
   * Sends the subscription message to Genesys.
   * This tells the server which events we want to receive.
   */
  private sendSubscription() {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;

    const subscriptionMessage = {
      type: 'subscription',
      subscriptions: CONFIG.SUBSCRIPTIONS
    };

    this.ws.send(JSON.stringify(subscriptionMessage));
    console.log('Subscriptions sent:', CONFIG.SUBSCRIPTIONS);
  }

  /**
   * Processes the actual notification payload.
   */
  private processNotification(notification: any) {
    // Example: Log the event type and ID
    const eventType = notification.eventType;
    const eventId = notification.id;
    
    console.log(`Received Event: [${eventType}] ID: ${eventId}`);
    
    // TODO: Implement your business logic here
    // e.g., Save to database, trigger webhook, update UI
  }

  /**
   * Starts a periodic ping to keep the WebSocket alive.
   * Genesys Cloud may close idle connections after ~30-60 seconds.
   */
  private startPingInterval() {
    this.stopPingInterval(); // Clear existing if any
    
    this.pingInterval = setInterval(() => {
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        // Send a simple ping. Genesys responds with pong automatically.
        this.ws.ping();
      }
    }, 20000); // Ping every 20 seconds
  }

  private stopPingInterval() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  /**
   * Schedules a token refresh to prevent expiration during active session.
   */
  private scheduleTokenRefresh() {
    if (this.tokenRefreshTimeout) {
      clearTimeout(this.tokenRefreshTimeout);
    }

    // Refresh token 10 minutes before expiration (assuming 1h token)
    const refreshDelay = 50 * 60 * 1000; 
    
    this.tokenRefreshTimeout = setTimeout(async () => {
      console.log('Refreshing OAuth token...');
      try {
        const newToken = await getAccessToken();
        // We cannot change the URL of an active WebSocket.
        // Standard practice: Close current WS, and reconnect with new token.
        this.forceReconnect();
      } catch (error) {
        console.error('Token refresh failed:', error);
        // Retry refresh in 5 minutes
        this.scheduleTokenRefresh();
      }
    }, refreshDelay);
  }

  /**
   * Forces a reconnection to pick up a new token.
   */
  private forceReconnect() {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.close(1000, 'Token Refresh');
    }
  }

  /**
   * Gracefully shuts down the client.
   */
  stop() {
    this.isIntentionalClose = true;
    this.stopPingInterval();
    if (this.tokenRefreshTimeout) {
      clearTimeout(this.tokenRefreshTimeout);
    }
    if (this.ws) {
      this.ws.close(1000, 'Client Shutdown');
    }
    console.log('Client stopped.');
  }
}

Step 3: Processing Results and Edge Cases

When receiving notifications, you must handle the specific structure of Genesys Cloud events. The notification object contains:

  • eventType: e.g., conversation:created, conversation:updated, conversation:ended.
  • id: The unique ID of the event.
  • resourceId: The ID of the entity (e.g., Conversation ID).
  • payload: The data associated with the event.

Edge Case: Duplicate Events
WebSocket connections are not strictly ordered across reconnections. If a network blip occurs, you might receive the same event twice after reconnection. Your processing logic should be idempotent. Use the id field to deduplicate events in your database or message queue.

Edge Case: Large Payloads
Some conversation updates can be large. Ensure your WebSocket library (ws) is configured to handle large buffers if necessary, though ws handles this well by default.

Complete Working Example

Combine the authentication helper, configuration, and client class into a single runnable script (index.ts).

import dotenv from 'dotenv';
import axios from 'axios';
import WebSocket from 'ws';

// Load environment variables
dotenv.config();

// --- Configuration ---
const ENVIRONMENT = process.env.GENESYS_ENVIRONMENT || 'mypurecloud.com';
const CLIENT_ID = process.env.GENESYS_CLIENT_ID!;
const CLIENT_SECRET = process.env.GENESYS_CLIENT_SECRET!;

const CONFIG = {
  BASE_DELAY: 1000,
  MAX_DELAY: 30000,
  JITTER_FACTOR: 0.1,
  SUBSCRIPTIONS: ['conversation:all']
};

// --- Types ---
interface TokenResponse {
  access_token: string;
  token_type: string;
  expires_in: number;
}

// --- Authentication ---
async function getAccessToken(): Promise<string> {
  const url = `https://api.${ENVIRONMENT}/api/v2/oauth/token`;
  const params = new URLSearchParams({
    grant_type: 'client_credentials',
    client_id: CLIENT_ID,
    client_secret: CLIENT_SECRET
  });

  try {
    const response = await axios.post<TokenResponse>(url, params.toString(), {
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded',
        'Accept': 'application/json'
      }
    });
    return response.data.access_token;
  } catch (error) {
    if (axios.isAxiosError(error)) {
      console.error('Auth Error:', error.response?.data || error.message);
    } else {
      console.error('Auth Error:', error);
    }
    throw error;
  }
}

// --- Reconnection Logic ---
function getReconnectDelay(attempt: number): number {
  const exponentialDelay = CONFIG.BASE_DELAY * Math.pow(2, attempt);
  const cappedDelay = Math.min(exponentialDelay, CONFIG.MAX_DELAY);
  const jitter = cappedDelay * CONFIG.JITTER_FACTOR * Math.random();
  return Math.floor(cappedDelay + jitter);
}

// --- Client Class ---
class GenesysNotificationClient {
  private ws: WebSocket | null = null;
  private reconnectAttempt = 0;
  private pingInterval: NodeJS.Timeout | null = null;
  private isIntentionalClose = false;
  private tokenRefreshTimeout: NodeJS.Timeout | null = null;

  async start() {
    console.log('Starting Genesys Notification Client...');
    await this.connect();
  }

  private async connect() {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) return;

    try {
      const token = await getAccessToken();
      const wsUrl = `wss://api.${ENVIRONMENT}/api/v2/notifications?access_token=${encodeURIComponent(token)}`;
      
      console.log(`Connecting to Genesys Cloud (Attempt ${this.reconnectAttempt + 1})...`);
      
      this.ws = new WebSocket(wsUrl);
      this.ws.on('open', () => this.onOpen());
      this.ws.on('message', (data) => this.onMessage(data));
      this.ws.on('close', (code, reason) => this.onClose(code, reason));
      this.ws.on('error', (error) => this.onError(error));

    } catch (error) {
      console.error('Failed to initialize connection:', error);
      this.scheduleReconnect();
    }
  }

  private onOpen() {
    console.log('WebSocket connected successfully.');
    this.reconnectAttempt = 0;
    this.isIntentionalClose = false;
    this.startPingInterval();
    this.sendSubscription();
    this.scheduleTokenRefresh();
  }

  private onMessage(data: WebSocket.Data) {
    try {
      const message = JSON.parse(data.toString());
      if (message.type === 'heartbeat') return;
      this.processNotification(message);
    } catch (error) {
      console.error('Error parsing message:', error);
    }
  }

  private onClose(code: number, reason: Buffer) {
    console.log(`WebSocket closed. Code: ${code}, Reason: ${reason.toString()}`);
    this.stopPingInterval();
    this.ws = null;
    
    if (this.isIntentionalClose) {
      console.log('Intentional close. Exiting.');
      return;
    }
    this.scheduleReconnect();
  }

  private onError(error: Error) {
    console.error('WebSocket error:', error.message);
  }

  private scheduleReconnect() {
    if (this.isIntentionalClose) return;
    this.reconnectAttempt++;
    const delay = getReconnectDelay(this.reconnectAttempt);
    console.log(`Reconnecting in ${delay}ms (Attempt ${this.reconnectAttempt})...`);
    setTimeout(() => this.connect(), delay);
  }

  private sendSubscription() {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
    const subscriptionMessage = {
      type: 'subscription',
      subscriptions: CONFIG.SUBSCRIPTIONS
    };
    this.ws.send(JSON.stringify(subscriptionMessage));
    console.log('Subscriptions sent:', CONFIG.SUBSCRIPTIONS);
  }

  private processNotification(notification: any) {
    const eventType = notification.eventType;
    const eventId = notification.id;
    console.log(`Received Event: [${eventType}] ID: ${eventId}`);
    // Implement business logic here
  }

  private startPingInterval() {
    this.stopPingInterval();
    this.pingInterval = setInterval(() => {
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        this.ws.ping();
      }
    }, 20000);
  }

  private stopPingInterval() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  private scheduleTokenRefresh() {
    if (this.tokenRefreshTimeout) clearTimeout(this.tokenRefreshTimeout);
    const refreshDelay = 50 * 60 * 1000; // 50 minutes
    this.tokenRefreshTimeout = setTimeout(async () => {
      console.log('Refreshing OAuth token...');
      try {
        await getAccessToken(); // Fetch new token
        this.forceReconnect(); // Reconnect with new token
      } catch (error) {
        console.error('Token refresh failed:', error);
        this.scheduleTokenRefresh();
      }
    }, refreshDelay);
  }

  private forceReconnect() {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.close(1000, 'Token Refresh');
    }
  }

  stop() {
    this.isIntentionalClose = true;
    this.stopPingInterval();
    if (this.tokenRefreshTimeout) clearTimeout(this.tokenRefreshTimeout);
    if (this.ws) this.ws.close(1000, 'Client Shutdown');
    console.log('Client stopped.');
  }
}

// --- Execution ---
const client = new GenesysNotificationClient();

// Handle graceful shutdown
process.on('SIGINT', () => {
  console.log('Shutting down...');
  client.stop();
  process.exit(0);
});

client.start().catch(console.error);

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Handshake

  • Cause: The access token in the URL is invalid, expired, or missing.
  • Fix: Verify that getAccessToken() is called successfully before creating the WebSocket instance. Ensure the client_id and client_secret are correct and have the notification:all scope.

Error: WebSocket Closed with Code 1006

  • Cause: Abnormal closure. Often caused by network drops, firewall interference, or the server closing the connection due to inactivity.
  • Fix: Ensure your startPingInterval is active. If the connection is idle for too long without pings, Genesys may close it. The reconnection logic with backoff will handle this automatically.

Error: 429 Too Many Requests

  • Cause: Reconnecting too quickly after a failure.
  • Fix: Ensure your getReconnectDelay function is correctly implementing exponential backoff. Do not set BASE_DELAY too low.

Error: Missing Subscriptions

  • Cause: The subscription message was sent before the WebSocket was fully open, or the subscription failed silently.
  • Fix: Always send subscriptions in the onOpen handler after confirming this.ws.readyState === WebSocket.OPEN. Check the server logs or use the Genesys Cloud Admin Console to verify active subscriptions for your client ID.

Official References