Streaming Genesys Cloud Analytics Events to Kafka via Node.js

Streaming Genesys Cloud Analytics Events to Kafka via Node.js

What You Will Build

  • You will build a Node.js service that consumes real-time analytics notification events from Genesys Cloud CX and forwards them to a specific Apache Kafka topic.
  • This tutorial uses the Genesys Cloud CX REST API for authentication and the WebSocket API for event streaming.
  • The implementation covers Node.js with ws for WebSockets and kafkajs for Kafka producer operations.

Prerequisites

  • OAuth Client: A Genesys Cloud CX OAuth Client with the analytics:conversation:read scope.
  • SDK/API Version: Genesys Cloud CX API v2.
  • Language/Runtime: Node.js v18 or later.
  • Dependencies:
    • ws: For WebSocket connections.
    • kafkajs: For producing messages to Kafka.
    • dotenv: For managing environment variables.

Install dependencies:

npm install ws kafkajs dotenv

Authentication Setup

Genesys Cloud CX uses OAuth 2.0 for authentication. For WebSocket connections, you must obtain an access token and include it in the WebSocket handshake URL. The token expires after 1 hour, so your application must implement a refresh strategy.

The following code demonstrates how to acquire and refresh an OAuth token using the Genesys Cloud CX REST API.

import dotenv from 'dotenv';
dotenv.config();

const { GENESYS_CLOUD_CLIENT_ID, GENESYS_CLOUD_CLIENT_SECRET, GENESYS_CLOUD_REGION } = process.env;

const getAccessToken = async () => {
  const url = `https://${GENESYS_CLOUD_REGION || 'mypurecloud.com'}/oauth/token`;
  const body = new URLSearchParams({
    grant_type: 'client_credentials',
    client_id: GENESYS_CLOUD_CLIENT_ID,
    client_secret: GENESYS_CLOUD_CLIENT_SECRET,
    scope: 'analytics:conversation:read'
  });

  try {
    const response = await fetch(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded'
      },
      body: body
    });

    if (!response.ok) {
      const errorText = await response.text();
      throw new Error(`OAuth request failed with status ${response.status}: ${errorText}`);
    }

    const data = await response.json();
    return {
      accessToken: data.access_token,
      expiresIn: data.expires_in,
      expiresAt: Date.now() + (data.expires_in * 1000)
    };
  } catch (error) {
    console.error('Failed to acquire OAuth token:', error.message);
    throw error;
  }
};

Implementation

Step 1: Initialize Kafka Producer

Before handling events, you must initialize the Kafka producer. The producer must be ready to send messages before the WebSocket connection starts.

import { Kafka } from 'kafkajs';

const { KAFKA_BROKER, KAFKA_TOPIC } = process.env;

const kafka = new Kafka({
  clientId: 'genesys-analytics-consumer',
  brokers: [KAFKA_BROKER || 'localhost:9092']
});

const producer = kafka.producer();

const connectProducer = async () => {
  try {
    await producer.connect();
    console.log('Kafka producer connected successfully.');
  } catch (error) {
    console.error('Failed to connect to Kafka:', error.message);
    throw error;
  }
};

Step 2: Establish WebSocket Connection for Analytics Events

Genesys Cloud CX provides a WebSocket endpoint for real-time analytics events. The endpoint is wss://{region}.mypurecloud.com/api/v2/analytics/events. You must pass the access token as a query parameter.

The analytics:conversation:read scope is required. The WebSocket connection will emit conversation events. You must handle connection drops and re-establish the connection with a fresh token if necessary.

import WebSocket from 'ws';

let currentToken = null;
let ws = null;
let reconnectTimeout = null;

const connectWebSocket = async () => {
  if (!currentToken || currentToken.expiresAt < Date.now() + 60000) {
    try {
      currentToken = await getAccessToken();
      console.log('New OAuth token acquired.');
    } catch (error) {
      console.error('Failed to acquire token for WebSocket. Retrying in 5 seconds...');
      reconnectTimeout = setTimeout(connectWebSocket, 5000);
      return;
    }
  }

  const wsUrl = `wss://${GENESYS_CLOUD_REGION || 'mypurecloud.com'}/api/v2/analytics/events?access_token=${currentToken.accessToken}`;
  
  if (ws && ws.readyState === WebSocket.OPEN) {
    ws.close();
  }

  ws = new WebSocket(wsUrl);

  ws.on('open', () => {
    console.log('WebSocket connection established.');
    // Clear any pending reconnect timeouts
    if (reconnectTimeout) {
      clearTimeout(reconnectTimeout);
      reconnectTimeout = null;
    }
  });

  ws.on('message', (data) => {
    try {
      const event = JSON.parse(data.toString());
      handleAnalyticsEvent(event);
    } catch (error) {
      console.error('Failed to parse WebSocket message:', error.message);
    }
  });

  ws.on('close', (code, reason) => {
    console.log(`WebSocket closed with code ${code}: ${reason.toString()}`);
    if (code !== 1000 && code !== 4001) { // 1000 is normal closure, 4001 is often token expiry
      console.log('Reconnecting WebSocket in 5 seconds...');
      reconnectTimeout = setTimeout(connectWebSocket, 5000);
    } else if (code === 4001) {
      // Token expired, fetch new token and reconnect
      console.log('Token expired. Refreshing and reconnecting...');
      currentToken = null; // Force token refresh
      reconnectTimeout = setTimeout(connectWebSocket, 1000);
    }
  });

  ws.on('error', (error) => {
    console.error('WebSocket error:', error.message);
  });
};

Step 3: Process and Forward Events to Kafka

The handleAnalyticsEvent function processes the incoming analytics event. You must filter for relevant event types (e.g., conversation) and forward the payload to Kafka. The Kafka message value should be a JSON string.

const handleAnalyticsEvent = async (event) => {
  // Genesys Cloud analytics events have a 'type' field.
  // We are interested in 'conversation' events for this tutorial.
  if (event.type !== 'conversation') {
    return;
  }

  const kafkaMessage = {
    topic: KAFKA_TOPIC || 'genesys-analytics-events',
    messages: [
      {
        key: event.conversationId, // Use conversation ID as key for partitioning
        value: JSON.stringify(event)
      }
    ]
  };

  try {
    await producer.send(kafkaMessage);
    console.log(`Sent event ${event.conversationId} to Kafka.`);
  } catch (error) {
    console.error('Failed to send message to Kafka:', error.message);
  }
};

Complete Working Example

The following code combines all steps into a single runnable script. It initializes the Kafka producer, starts the WebSocket connection, and handles graceful shutdown.

import dotenv from 'dotenv';
import { Kafka } from 'kafkajs';
import WebSocket from 'ws';

dotenv.config();

const {
  GENESYS_CLOUD_CLIENT_ID,
  GENESYS_CLOUD_CLIENT_SECRET,
  GENESYS_CLOUD_REGION,
  KAFKA_BROKER,
  KAFKA_TOPIC
} = process.env;

if (!GENESYS_CLOUD_CLIENT_ID || !GENESYS_CLOUD_CLIENT_SECRET) {
  throw new Error('GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET must be set.');
}

// Kafka Setup
const kafka = new Kafka({
  clientId: 'genesys-analytics-consumer',
  brokers: [KAFKA_BROKER || 'localhost:9092']
});

const producer = kafka.producer();

// OAuth Token Management
let currentToken = null;

const getAccessToken = async () => {
  const url = `https://${GENESYS_CLOUD_REGION || 'mypurecloud.com'}/oauth/token`;
  const body = new URLSearchParams({
    grant_type: 'client_credentials',
    client_id: GENESYS_CLOUD_CLIENT_ID,
    client_secret: GENESYS_CLOUD_CLIENT_SECRET,
    scope: 'analytics:conversation:read'
  });

  const response = await fetch(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/x-www-form-urlencoded'
    },
    body: body
  });

  if (!response.ok) {
    const errorText = await response.text();
    throw new Error(`OAuth request failed with status ${response.status}: ${errorText}`);
  }

  const data = await response.json();
  return {
    accessToken: data.access_token,
    expiresIn: data.expires_in,
    expiresAt: Date.now() + (data.expires_in * 1000)
  };
};

// WebSocket Management
let ws = null;
let reconnectTimeout = null;

const handleAnalyticsEvent = async (event) => {
  if (event.type !== 'conversation') {
    return;
  }

  const kafkaMessage = {
    topic: KAFKA_TOPIC || 'genesys-analytics-events',
    messages: [
      {
        key: event.conversationId,
        value: JSON.stringify(event)
      }
    ]
  };

  try {
    await producer.send(kafkaMessage);
  } catch (error) {
    console.error('Failed to send message to Kafka:', error.message);
  }
};

const connectWebSocket = async () => {
  if (!currentToken || currentToken.expiresAt < Date.now() + 60000) {
    try {
      currentToken = await getAccessToken();
    } catch (error) {
      console.error('Failed to acquire token for WebSocket. Retrying in 5 seconds...');
      reconnectTimeout = setTimeout(connectWebSocket, 5000);
      return;
    }
  }

  const wsUrl = `wss://${GENESYS_CLOUD_REGION || 'mypurecloud.com'}/api/v2/analytics/events?access_token=${currentToken.accessToken}`;
  
  if (ws && ws.readyState === WebSocket.OPEN) {
    ws.close();
  }

  ws = new WebSocket(wsUrl);

  ws.on('open', () => {
    console.log('WebSocket connection established.');
    if (reconnectTimeout) {
      clearTimeout(reconnectTimeout);
      reconnectTimeout = null;
    }
  });

  ws.on('message', (data) => {
    try {
      const event = JSON.parse(data.toString());
      handleAnalyticsEvent(event);
    } catch (error) {
      console.error('Failed to parse WebSocket message:', error.message);
    }
  });

  ws.on('close', (code, reason) => {
    console.log(`WebSocket closed with code ${code}: ${reason.toString()}`);
    if (code !== 1000 && code !== 4001) {
      console.log('Reconnecting WebSocket in 5 seconds...');
      reconnectTimeout = setTimeout(connectWebSocket, 5000);
    } else if (code === 4001) {
      console.log('Token expired. Refreshing and reconnecting...');
      currentToken = null;
      reconnectTimeout = setTimeout(connectWebSocket, 1000);
    }
  });

  ws.on('error', (error) => {
    console.error('WebSocket error:', error.message);
  });
};

// Main Execution
const main = async () => {
  try {
    await producer.connect();
    console.log('Kafka producer connected.');
    await connectWebSocket();
    console.log('Service started.');
  } catch (error) {
    console.error('Failed to start service:', error.message);
    process.exit(1);
  }
};

// Graceful Shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  if (ws) ws.close();
  await producer.disconnect();
  process.exit(0);
});

main();

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Connection

This error occurs when the access token is invalid or expired. The WebSocket API returns a 4001 close code for expired tokens.

Fix: Ensure your OAuth token refresh logic is correct. The code above checks expiresAt and refreshes the token before it expires. If you see 401 errors, verify that the client_id and client_secret are correct and that the analytics:conversation:read scope is granted to the OAuth client.

Error: Kafka Producer Timeout

This error occurs when the Kafka broker is unreachable or the topic does not exist.

Fix: Verify that the Kafka broker is running and accessible from your Node.js application. Ensure that the topic genesys-analytics-events exists in your Kafka cluster. You can create the topic manually using the Kafka CLI:

kafka-topics.sh --create --topic genesys-analytics-events --bootstrap-server localhost:9092

Error: WebSocket Connection Refused

This error occurs when the WebSocket URL is incorrect or the region is not specified.

Fix: Ensure that the GENESYS_CLOUD_REGION environment variable is set correctly. For example, us-east-1.mypurecloud.com or mypurecloud.com for the default region. The WebSocket URL must start with wss://.

Official References