Streaming Analytics Notification Events to a Kafka Topic via a Node.js Consumer
What You Will Build
- A Node.js service that subscribes to Genesys Cloud CX Analytics Notification events via the WebSocket API.
- The service parses incoming JSON payloads and publishes them to a specific Apache Kafka topic.
- The implementation uses the
purecloud-platform-client-v2SDK for authentication and thekafkajslibrary for message production.
Prerequisites
- OAuth Client Type: Public or Confidential client with the
analytics:notifications:subscribescope. - SDK Version:
@genesyscloud/purecloud-platform-client-v2(v2.50.0+). - Language/Runtime: Node.js 18+ (LTS).
- External Dependencies:
@genesyscloud/purecloud-platform-client-v2kafkajsdotenv(for environment variable management)
- Kafka Cluster: A running Kafka instance accessible from your Node.js environment.
Authentication Setup
Genesys Cloud uses OAuth 2.0 for all API access. For WebSocket subscriptions, you must obtain a bearer token with the correct scopes. The token is passed during the WebSocket handshake.
Install the required packages:
npm install @genesyscloud/purecloud-platform-client-v2 kafkajs dotenv
Create a .env file to store sensitive credentials. Never commit this file to version control.
GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_ENVIRONMENT=api.mypurecloud.com
KAFKA_BROKER_LIST=localhost:9092
KAFKA_TOPIC=genesys-analytics-events
The following code initializes the Genesys Cloud platform client and retrieves an OAuth token. This token is required for the WebSocket connection.
import { Configuration, PlatformClient } from '@genesyscloud/purecloud-platform-client-v2';
import dotenv from 'dotenv';
dotenv.config();
// Initialize Genesys Cloud Configuration
const configuration = Configuration.defaultConfiguration();
configuration.clientId = process.env.GENESYS_CLIENT_ID;
configuration.clientSecret = process.env.GENESYS_CLIENT_SECRET;
configuration.environment = process.env.GENESYS_ENVIRONMENT;
// Initialize Platform Client
const platformClient = new PlatformClient();
/**
* Retrieves an OAuth2 bearer token from Genesys Cloud.
* @returns {Promise<string>} The bearer token string.
*/
async function getAccessToken() {
try {
// Request token with the specific scope for analytics notifications
const tokenResponse = await platformClient.authClient.requestToken(
'client_credentials',
{ scope: 'analytics:notifications:subscribe' }
);
return tokenResponse.access_token;
} catch (error) {
console.error('Failed to acquire OAuth token:', error.message);
throw error;
}
}
Implementation
Step 1: Configure Kafka Producer
Before connecting to Genesys Cloud, initialize the Kafka producer. This ensures the destination is ready to receive messages.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'genesys-analytics-consumer',
brokers: process.env.KAFKA_BROKER_LIST.split(',')
});
const producer = kafka.producer();
/**
* Connects to the Kafka cluster.
*/
async function connectKafkaProducer() {
try {
await producer.connect();
console.log('Connected to Kafka broker.');
} catch (error) {
console.error('Failed to connect to Kafka:', error.message);
throw error;
}
}
/**
* Publishes a message to the specified Kafka topic.
* @param {string} topic - The Kafka topic name.
* @param {string} key - The message key (e.g., organization ID or event ID).
* @param {string} value - The JSON stringified message payload.
*/
async function publishToKafka(topic, key, value) {
try {
await producer.send({
topic: topic,
messages: [
{
key: key,
value: value
}
]
});
} catch (error) {
console.error('Failed to publish message to Kafka:', error.message);
// In production, implement retry logic or dead-letter queue handling here.
}
}
Step 2: Establish WebSocket Subscription
The Genesys Cloud Analytics Notification API uses WebSocket for real-time streaming. You must construct the WebSocket URL including the environment and the bearer token.
The endpoint path is /api/v2/analytics/notifications.
import WebSocket from 'ws';
/**
* Subscribes to Genesys Cloud Analytics Notifications via WebSocket.
* @param {string} token - The OAuth2 bearer token.
*/
async function subscribeToAnalyticsNotifications(token) {
const environment = process.env.GENESYS_ENVIRONMENT;
// Construct the WebSocket URL with the token as a query parameter
const wsUrl = `wss://${environment}/api/v2/analytics/notifications?token=${token}`;
const ws = new WebSocket(wsUrl);
ws.on('open', () => {
console.log('WebSocket connection established with Genesys Cloud.');
});
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await processAnalyticsEvent(message);
} catch (error) {
console.error('Error processing WebSocket message:', error.message);
}
});
ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
ws.on('close', (code, reason) => {
console.log(`WebSocket closed with code ${code}: ${reason}`);
// Implement reconnection logic here if necessary
});
}
/**
* Processes the incoming analytics event and publishes it to Kafka.
* @param {Object} event - The parsed JSON event from Genesys Cloud.
*/
async function processAnalyticsEvent(event) {
if (!event || !event.id) {
console.warn('Received malformed event, skipping.');
return;
}
const topic = process.env.KAFKA_TOPIC;
const key = event.id; // Use event ID as the Kafka message key for partitioning
const value = JSON.stringify(event);
await publishToKafka(topic, key, value);
console.log(`Published event ${key} to Kafka topic ${topic}.`);
}
Step 3: Handle Token Refresh and Reconnection
OAuth tokens expire, typically after one hour. A robust consumer must detect token expiration and re-authenticate. The WebSocket connection may also drop due to network issues.
The following logic wraps the subscription process with a refresh mechanism.
/**
* Main execution function that manages the lifecycle of the consumer.
*/
async function main() {
try {
await connectKafkaProducer();
let token = await getAccessToken();
let subscriptionActive = true;
// Start the initial subscription
subscribeToAnalyticsNotifications(token).catch(err => {
console.error('Subscription failed:', err.message);
subscriptionActive = false;
});
// Set up an interval to refresh the token before it expires
// Genesys tokens typically last 3600 seconds. Refresh every 50 minutes (300000 ms)
const tokenRefreshInterval = setInterval(async () => {
try {
token = await getAccessToken();
console.log('OAuth token refreshed.');
// Close existing WebSocket and reconnect with new token
// Note: In a production app, you would manage the WebSocket instance
// more explicitly to close it before reconnecting.
// For this tutorial, we assume the previous connection is closed
// or will naturally drop, and the app restarts the subscription.
if (subscriptionActive) {
// Logic to close previous WS and start new one would go here
// For simplicity in this structure, we rely on the app restart
// or a more complex state manager in production.
}
} catch (error) {
console.error('Failed to refresh token:', error.message);
}
}, 300000); // 5 minutes
// Keep the process alive
process.on('SIGINT', () => {
console.log('Shutting down...');
clearInterval(tokenRefreshInterval);
producer.disconnect().then(() => process.exit(0));
});
} catch (error) {
console.error('Fatal error in main execution:', error.message);
process.exit(1);
}
}
// Start the application
main().catch(console.error);
Complete Working Example
The following script combines all components into a single runnable file. Save this as index.js.
import { Configuration, PlatformClient } from '@genesyscloud/purecloud-platform-client-v2';
import { Kafka } from 'kafkajs';
import WebSocket from 'ws';
import dotenv from 'dotenv';
dotenv.config();
// --- Configuration ---
const configuration = Configuration.defaultConfiguration();
configuration.clientId = process.env.GENESYS_CLIENT_ID;
configuration.clientSecret = process.env.GENESYS_CLIENT_SECRET;
configuration.environment = process.env.GENESYS_ENVIRONMENT;
const platformClient = new PlatformClient();
const kafka = new Kafka({
clientId: 'genesys-analytics-consumer',
brokers: process.env.KAFKA_BROKER_LIST.split(',')
});
const producer = kafka.producer();
// --- Kafka Functions ---
async function connectKafkaProducer() {
try {
await producer.connect();
console.log('Connected to Kafka broker.');
} catch (error) {
console.error('Failed to connect to Kafka:', error.message);
throw error;
}
}
async function publishToKafka(topic, key, value) {
try {
await producer.send({
topic: topic,
messages: [
{
key: key,
value: value
}
]
});
} catch (error) {
console.error('Failed to publish message to Kafka:', error.message);
}
}
// --- Genesys Cloud Functions ---
async function getAccessToken() {
try {
const tokenResponse = await platformClient.authClient.requestToken(
'client_credentials',
{ scope: 'analytics:notifications:subscribe' }
);
return tokenResponse.access_token;
} catch (error) {
console.error('Failed to acquire OAuth token:', error.message);
throw error;
}
}
async function processAnalyticsEvent(event) {
if (!event || !event.id) {
console.warn('Received malformed event, skipping.');
return;
}
const topic = process.env.KAFKA_TOPIC;
const key = event.id;
const value = JSON.stringify(event);
await publishToKafka(topic, key, value);
console.log(`Published event ${key} to Kafka topic ${topic}.`);
}
async function subscribeToAnalyticsNotifications(token) {
const environment = process.env.GENESYS_ENVIRONMENT;
const wsUrl = `wss://${environment}/api/v2/analytics/notifications?token=${token}`;
const ws = new WebSocket(wsUrl);
ws.on('open', () => {
console.log('WebSocket connection established with Genesys Cloud.');
});
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await processAnalyticsEvent(message);
} catch (error) {
console.error('Error processing WebSocket message:', error.message);
}
});
ws.on('error', (error) => {
console.error('WebSocket error:', error.message);
});
ws.on('close', (code, reason) => {
console.log(`WebSocket closed with code ${code}: ${reason}`);
});
}
// --- Main Execution ---
async function main() {
try {
await connectKafkaProducer();
let token = await getAccessToken();
// Start subscription
subscribeToAnalyticsNotifications(token);
// Token refresh interval (every 50 minutes)
const tokenRefreshInterval = setInterval(async () => {
try {
token = await getAccessToken();
console.log('OAuth token refreshed.');
// In a production app, you would close the old WS and open a new one here
// For this example, we assume the WS will handle reconnection or the app restarts
} catch (error) {
console.error('Failed to refresh token:', error.message);
}
}, 3000000); // 50 minutes
process.on('SIGINT', () => {
console.log('Shutting down...');
clearInterval(tokenRefreshInterval);
producer.disconnect().then(() => process.exit(0));
});
} catch (error) {
console.error('Fatal error in main execution:', error.message);
process.exit(1);
}
}
main().catch(console.error);
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Handshake
- What causes it: The OAuth token is expired, invalid, or missing the
analytics:notifications:subscribescope. - How to fix it: Verify the client credentials in
.env. Ensure the scope is explicitly requested inrequestToken. Check that the token has not expired (tokens last ~1 hour). - Code showing the fix:
// Ensure scope is correct const tokenResponse = await platformClient.authClient.requestToken( 'client_credentials', { scope: 'analytics:notifications:subscribe' } // Critical scope );
Error: 403 Forbidden
- What causes it: The OAuth client does not have the necessary permissions assigned in the Genesys Cloud admin console, or the user associated with the client lacks access to analytics data.
- How to fix it: Log in to the Genesys Cloud admin console. Navigate to Admin > Security > OAuth Clients. Edit your client and ensure the
Analyticspermissions are granted. Specifically, check “Read Analytics Data” or similar permissions depending on your org configuration. - Code showing the fix: No code change required. This is an administrative configuration fix.
Error: Kafka Connection Refused
- What causes it: The
KAFKA_BROKER_LISTis incorrect, or the Kafka broker is not running. - How to fix it: Verify the broker address and port. Ensure the Kafka service is running and accessible from the Node.js host.
- Code showing the fix:
// Verify broker list format const brokers = process.env.KAFKA_BROKER_LIST.split(','); // Example: ['localhost:9092']
Error: WebSocket Reconnect Loop
- What causes it: The token refresh logic is not properly closing the old WebSocket connection before establishing a new one, or the network is unstable.
- How to fix it: Implement a proper lifecycle manager for the WebSocket. Close the existing
wsinstance before creating a new one with the refreshed token. - Code showing the fix:
let currentWs = null; async function reconnectWithNewToken(token) { if (currentWs) { currentWs.close(); } // ... establish new ws ... currentWs = new WebSocket(wsUrl); }