Streaming Analytics Notification Events to a Kafka Topic via a Node.js Consumer

Streaming Analytics Notification Events to a Kafka Topic via a Node.js Consumer

What You Will Build

  • A Node.js service that subscribes to Genesys Cloud CX Analytics Notification events via the WebSocket API.
  • The service parses incoming JSON payloads and publishes them to a specific Apache Kafka topic.
  • The implementation uses the purecloud-platform-client-v2 SDK for authentication and the kafkajs library for message production.

Prerequisites

  • OAuth Client Type: Public or Confidential client with the analytics:notifications:subscribe scope.
  • SDK Version: @genesyscloud/purecloud-platform-client-v2 (v2.50.0+).
  • Language/Runtime: Node.js 18+ (LTS).
  • External Dependencies:
    • @genesyscloud/purecloud-platform-client-v2
    • kafkajs
    • dotenv (for environment variable management)
  • Kafka Cluster: A running Kafka instance accessible from your Node.js environment.

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API access. For WebSocket subscriptions, you must obtain a bearer token with the correct scopes. The token is passed during the WebSocket handshake.

Install the required packages:

npm install @genesyscloud/purecloud-platform-client-v2 kafkajs dotenv

Create a .env file to store sensitive credentials. Never commit this file to version control.

GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_ENVIRONMENT=api.mypurecloud.com
KAFKA_BROKER_LIST=localhost:9092
KAFKA_TOPIC=genesys-analytics-events

The following code initializes the Genesys Cloud platform client and retrieves an OAuth token. This token is required for the WebSocket connection.

import { Configuration, PlatformClient } from '@genesyscloud/purecloud-platform-client-v2';
import dotenv from 'dotenv';

dotenv.config();

// Initialize Genesys Cloud Configuration
const configuration = Configuration.defaultConfiguration();
configuration.clientId = process.env.GENESYS_CLIENT_ID;
configuration.clientSecret = process.env.GENESYS_CLIENT_SECRET;
configuration.environment = process.env.GENESYS_ENVIRONMENT;

// Initialize Platform Client
const platformClient = new PlatformClient();

/**
 * Retrieves an OAuth2 bearer token from Genesys Cloud.
 * @returns {Promise<string>} The bearer token string.
 */
async function getAccessToken() {
  try {
    // Request token with the specific scope for analytics notifications
    const tokenResponse = await platformClient.authClient.requestToken(
      'client_credentials',
      { scope: 'analytics:notifications:subscribe' }
    );
    return tokenResponse.access_token;
  } catch (error) {
    console.error('Failed to acquire OAuth token:', error.message);
    throw error;
  }
}

Implementation

Step 1: Configure Kafka Producer

Before connecting to Genesys Cloud, initialize the Kafka producer. This ensures the destination is ready to receive messages.

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'genesys-analytics-consumer',
  brokers: process.env.KAFKA_BROKER_LIST.split(',')
});

const producer = kafka.producer();

/**
 * Connects to the Kafka cluster.
 */
async function connectKafkaProducer() {
  try {
    await producer.connect();
    console.log('Connected to Kafka broker.');
  } catch (error) {
    console.error('Failed to connect to Kafka:', error.message);
    throw error;
  }
}

/**
 * Publishes a message to the specified Kafka topic.
 * @param {string} topic - The Kafka topic name.
 * @param {string} key - The message key (e.g., organization ID or event ID).
 * @param {string} value - The JSON stringified message payload.
 */
async function publishToKafka(topic, key, value) {
  try {
    await producer.send({
      topic: topic,
      messages: [
        {
          key: key,
          value: value
        }
      ]
    });
  } catch (error) {
    console.error('Failed to publish message to Kafka:', error.message);
    // In production, implement retry logic or dead-letter queue handling here.
  }
}

Step 2: Establish WebSocket Subscription

The Genesys Cloud Analytics Notification API uses WebSocket for real-time streaming. You must construct the WebSocket URL including the environment and the bearer token.

The endpoint path is /api/v2/analytics/notifications.

import WebSocket from 'ws';

/**
 * Subscribes to Genesys Cloud Analytics Notifications via WebSocket.
 * @param {string} token - The OAuth2 bearer token.
 */
async function subscribeToAnalyticsNotifications(token) {
  const environment = process.env.GENESYS_ENVIRONMENT;
  // Construct the WebSocket URL with the token as a query parameter
  const wsUrl = `wss://${environment}/api/v2/analytics/notifications?token=${token}`;
  
  const ws = new WebSocket(wsUrl);

  ws.on('open', () => {
    console.log('WebSocket connection established with Genesys Cloud.');
  });

  ws.on('message', async (data) => {
    try {
      const message = JSON.parse(data.toString());
      await processAnalyticsEvent(message);
    } catch (error) {
      console.error('Error processing WebSocket message:', error.message);
    }
  });

  ws.on('error', (error) => {
    console.error('WebSocket error:', error.message);
  });

  ws.on('close', (code, reason) => {
    console.log(`WebSocket closed with code ${code}: ${reason}`);
    // Implement reconnection logic here if necessary
  });
}

/**
 * Processes the incoming analytics event and publishes it to Kafka.
 * @param {Object} event - The parsed JSON event from Genesys Cloud.
 */
async function processAnalyticsEvent(event) {
  if (!event || !event.id) {
    console.warn('Received malformed event, skipping.');
    return;
  }

  const topic = process.env.KAFKA_TOPIC;
  const key = event.id; // Use event ID as the Kafka message key for partitioning
  const value = JSON.stringify(event);

  await publishToKafka(topic, key, value);
  console.log(`Published event ${key} to Kafka topic ${topic}.`);
}

Step 3: Handle Token Refresh and Reconnection

OAuth tokens expire, typically after one hour. A robust consumer must detect token expiration and re-authenticate. The WebSocket connection may also drop due to network issues.

The following logic wraps the subscription process with a refresh mechanism.

/**
 * Main execution function that manages the lifecycle of the consumer.
 */
async function main() {
  try {
    await connectKafkaProducer();
    
    let token = await getAccessToken();
    let subscriptionActive = true;

    // Start the initial subscription
    subscribeToAnalyticsNotifications(token).catch(err => {
      console.error('Subscription failed:', err.message);
      subscriptionActive = false;
    });

    // Set up an interval to refresh the token before it expires
    // Genesys tokens typically last 3600 seconds. Refresh every 50 minutes (300000 ms)
    const tokenRefreshInterval = setInterval(async () => {
      try {
        token = await getAccessToken();
        console.log('OAuth token refreshed.');
        
        // Close existing WebSocket and reconnect with new token
        // Note: In a production app, you would manage the WebSocket instance
        // more explicitly to close it before reconnecting.
        // For this tutorial, we assume the previous connection is closed
        // or will naturally drop, and the app restarts the subscription.
        if (subscriptionActive) {
          // Logic to close previous WS and start new one would go here
          // For simplicity in this structure, we rely on the app restart
          // or a more complex state manager in production.
        }
      } catch (error) {
        console.error('Failed to refresh token:', error.message);
      }
    }, 300000); // 5 minutes

    // Keep the process alive
    process.on('SIGINT', () => {
      console.log('Shutting down...');
      clearInterval(tokenRefreshInterval);
      producer.disconnect().then(() => process.exit(0));
    });

  } catch (error) {
    console.error('Fatal error in main execution:', error.message);
    process.exit(1);
  }
}

// Start the application
main().catch(console.error);

Complete Working Example

The following script combines all components into a single runnable file. Save this as index.js.

import { Configuration, PlatformClient } from '@genesyscloud/purecloud-platform-client-v2';
import { Kafka } from 'kafkajs';
import WebSocket from 'ws';
import dotenv from 'dotenv';

dotenv.config();

// --- Configuration ---
const configuration = Configuration.defaultConfiguration();
configuration.clientId = process.env.GENESYS_CLIENT_ID;
configuration.clientSecret = process.env.GENESYS_CLIENT_SECRET;
configuration.environment = process.env.GENESYS_ENVIRONMENT;

const platformClient = new PlatformClient();

const kafka = new Kafka({
  clientId: 'genesys-analytics-consumer',
  brokers: process.env.KAFKA_BROKER_LIST.split(',')
});

const producer = kafka.producer();

// --- Kafka Functions ---

async function connectKafkaProducer() {
  try {
    await producer.connect();
    console.log('Connected to Kafka broker.');
  } catch (error) {
    console.error('Failed to connect to Kafka:', error.message);
    throw error;
  }
}

async function publishToKafka(topic, key, value) {
  try {
    await producer.send({
      topic: topic,
      messages: [
        {
          key: key,
          value: value
        }
      ]
    });
  } catch (error) {
    console.error('Failed to publish message to Kafka:', error.message);
  }
}

// --- Genesys Cloud Functions ---

async function getAccessToken() {
  try {
    const tokenResponse = await platformClient.authClient.requestToken(
      'client_credentials',
      { scope: 'analytics:notifications:subscribe' }
    );
    return tokenResponse.access_token;
  } catch (error) {
    console.error('Failed to acquire OAuth token:', error.message);
    throw error;
  }
}

async function processAnalyticsEvent(event) {
  if (!event || !event.id) {
    console.warn('Received malformed event, skipping.');
    return;
  }

  const topic = process.env.KAFKA_TOPIC;
  const key = event.id;
  const value = JSON.stringify(event);

  await publishToKafka(topic, key, value);
  console.log(`Published event ${key} to Kafka topic ${topic}.`);
}

async function subscribeToAnalyticsNotifications(token) {
  const environment = process.env.GENESYS_ENVIRONMENT;
  const wsUrl = `wss://${environment}/api/v2/analytics/notifications?token=${token}`;
  
  const ws = new WebSocket(wsUrl);

  ws.on('open', () => {
    console.log('WebSocket connection established with Genesys Cloud.');
  });

  ws.on('message', async (data) => {
    try {
      const message = JSON.parse(data.toString());
      await processAnalyticsEvent(message);
    } catch (error) {
      console.error('Error processing WebSocket message:', error.message);
    }
  });

  ws.on('error', (error) => {
    console.error('WebSocket error:', error.message);
  });

  ws.on('close', (code, reason) => {
    console.log(`WebSocket closed with code ${code}: ${reason}`);
  });
}

// --- Main Execution ---

async function main() {
  try {
    await connectKafkaProducer();
    
    let token = await getAccessToken();
    
    // Start subscription
    subscribeToAnalyticsNotifications(token);

    // Token refresh interval (every 50 minutes)
    const tokenRefreshInterval = setInterval(async () => {
      try {
        token = await getAccessToken();
        console.log('OAuth token refreshed.');
        // In a production app, you would close the old WS and open a new one here
        // For this example, we assume the WS will handle reconnection or the app restarts
      } catch (error) {
        console.error('Failed to refresh token:', error.message);
      }
    }, 3000000); // 50 minutes

    process.on('SIGINT', () => {
      console.log('Shutting down...');
      clearInterval(tokenRefreshInterval);
      producer.disconnect().then(() => process.exit(0));
    });

  } catch (error) {
    console.error('Fatal error in main execution:', error.message);
    process.exit(1);
  }
}

main().catch(console.error);

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Handshake

  • What causes it: The OAuth token is expired, invalid, or missing the analytics:notifications:subscribe scope.
  • How to fix it: Verify the client credentials in .env. Ensure the scope is explicitly requested in requestToken. Check that the token has not expired (tokens last ~1 hour).
  • Code showing the fix:
    // Ensure scope is correct
    const tokenResponse = await platformClient.authClient.requestToken(
      'client_credentials',
      { scope: 'analytics:notifications:subscribe' } // Critical scope
    );
    

Error: 403 Forbidden

  • What causes it: The OAuth client does not have the necessary permissions assigned in the Genesys Cloud admin console, or the user associated with the client lacks access to analytics data.
  • How to fix it: Log in to the Genesys Cloud admin console. Navigate to Admin > Security > OAuth Clients. Edit your client and ensure the Analytics permissions are granted. Specifically, check “Read Analytics Data” or similar permissions depending on your org configuration.
  • Code showing the fix: No code change required. This is an administrative configuration fix.

Error: Kafka Connection Refused

  • What causes it: The KAFKA_BROKER_LIST is incorrect, or the Kafka broker is not running.
  • How to fix it: Verify the broker address and port. Ensure the Kafka service is running and accessible from the Node.js host.
  • Code showing the fix:
    // Verify broker list format
    const brokers = process.env.KAFKA_BROKER_LIST.split(',');
    // Example: ['localhost:9092']
    

Error: WebSocket Reconnect Loop

  • What causes it: The token refresh logic is not properly closing the old WebSocket connection before establishing a new one, or the network is unstable.
  • How to fix it: Implement a proper lifecycle manager for the WebSocket. Close the existing ws instance before creating a new one with the refreshed token.
  • Code showing the fix:
    let currentWs = null;
    
    async function reconnectWithNewToken(token) {
      if (currentWs) {
        currentWs.close();
      }
      // ... establish new ws ...
      currentWs = new WebSocket(wsUrl);
    }
    

Official References