Streaming Analytics Notification Events to a Kafka Topic via Node.js
What You Will Build
- You will build a Node.js application that subscribes to Genesys Cloud CX Analytics Notification events.
- The application uses the Genesys Cloud CX REST API and WebSocket streaming endpoints to receive real-time conversation analytics.
- The code demonstrates how to authenticate, establish a persistent WebSocket connection, parse incoming JSON payloads, and publish those events to a local Apache Kafka topic.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:conversation:view,analytics:conversation:read,websockets:subscribe. - SDK Version: Genesys Cloud CX Node.js SDK v5.x or later.
- Language/Runtime: Node.js 18+ (for native fetch and modern async/await support).
- External Dependencies:
@genesys/cloud-purecloud-platform-client-v2(Genesys SDK)kafka-nodeor@kafkajs/kafka(Kafka client library)dotenv(for environment variable management)ws(WebSocket client, usually included in SDK but good to have explicit control)
Install dependencies:
npm install @genesys/cloud-purecloud-platform-client-v2 kafka-node dotenv ws
Authentication Setup
Genesys Cloud CX requires OAuth 2.0 Client Credentials grant for server-to-server communication. You must obtain an access token before subscribing to any analytics stream. The token expires after 30 minutes, so your application must handle token refresh logic.
The following code snippet demonstrates a robust token retrieval function using the axios library (standard for HTTP requests in Node.js) to interact with the Genesys Identity service.
const axios = require('axios');
const dotenv = require('dotenv');
dotenv.config();
const {
GENESYS_CLIENT_ID,
GENESYS_CLIENT_SECRET,
GENESYS_ENVIRONMENT, // e.g., mycompany.mygenesiscustomers.com
KAFKA_BROKER,
KAFKA_TOPIC
} = process.env;
/**
* Retrieves an OAuth2 access token from Genesys Cloud.
* @returns {Promise<string>} The access token.
*/
async function getAccessToken() {
const url = `https://${GENESYS_ENVIRONMENT}/oauth/token`;
const params = new URLSearchParams();
params.append('grant_type', 'client_credentials');
params.append('client_id', GENESYS_CLIENT_ID);
params.append('client_secret', GENESYS_CLIENT_SECRET);
try {
const response = await axios.post(url, params, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
});
if (response.status === 200) {
return response.data.access_token;
} else {
throw new Error(`Failed to get token: ${response.statusText}`);
}
} catch (error) {
if (error.response) {
console.error('Token Error:', error.response.data);
} else {
console.error('Token Error:', error.message);
}
throw error;
}
}
Note on Scopes: The default client credentials grant returns a token with limited scopes. Ensure your OAuth Client in the Genesys Admin Console has the analytics:conversation:view scope explicitly granted. Without this, the WebSocket subscription will fail with a 403 Forbidden error.
Implementation
Step 1: Initialize Kafka Producer
Before connecting to Genesys, initialize the Kafka producer. This ensures that when analytics events arrive, they can be immediately dispatched to the message broker. We use kafka-node for this example due to its simplicity in handling high-throughput publish scenarios.
const kafka = require('kafka-node');
let producer;
/**
* Initializes the Kafka producer.
*/
function initKafka() {
const client = new kafka.KafkaClient({ kafkaHost: KAFKA_BROKER });
producer = new kafka.Producer(client);
producer.on('ready', () => {
console.log('Kafka producer is ready.');
});
producer.on('error', (error) => {
console.error('Kafka producer error:', error);
});
}
/**
* Publishes a message to the specified Kafka topic.
* @param {string} topic - The Kafka topic name.
* @param {object} payload - The analytics event payload.
*/
function publishToKafka(topic, payload) {
const messages = [JSON.stringify(payload)];
const kafkaPayload = [
{
topic: topic,
messages: messages,
partition: 0 // Simple round-robin or specific partition logic can be added here
}
];
producer.send(kafkaPayload, (error, data) => {
if (error) {
console.error('Failed to send to Kafka:', error);
} else {
// console.log('Sent to Kafka:', data); // Optional: Log successful sends
}
});
}
Step 2: Establish WebSocket Subscription
Genesys Cloud CX provides a WebSocket endpoint for real-time analytics. Unlike REST endpoints, WebSockets maintain a persistent connection. You must subscribe to specific “streams” (e.g., analytics:conversation:details).
The subscription payload requires a filters object to define which conversations you want to monitor. For this tutorial, we will subscribe to all active conversations.
const WebSocket = require('ws');
let ws;
let reconnectTimer;
const RECONNECT_DELAY = 5000; // 5 seconds
/**
* Subscribes to the Analytics Conversation Details stream.
* @param {string} accessToken - Valid OAuth2 access token.
*/
async function subscribeToAnalytics(accessToken) {
const wsUrl = `wss://${GENESYS_ENVIRONMENT}/api/v2/analytics/conversations/details/stream`;
// Close existing connection if open
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close();
}
ws = new WebSocket(wsUrl, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
});
ws.on('open', () => {
console.log('WebSocket connected to Genesys Analytics.');
// Define the subscription payload
const subscriptionPayload = {
streams: ['analytics:conversation:details'],
filters: {
type: 'conversation',
query: {
// Filter for active conversations only
where: 'status = "active"'
}
}
};
// Send the subscription request
ws.send(JSON.stringify(subscriptionPayload));
console.log('Subscription payload sent.');
});
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
processAnalyticsEvent(message);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
});
ws.on('close', (code, reason) => {
console.log(`WebSocket closed: ${code} - ${reason}`);
// Attempt to reconnect after delay
scheduleReconnect();
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
scheduleReconnect();
});
}
function scheduleReconnect() {
if (reconnectTimer) return;
reconnectTimer = setTimeout(async () => {
reconnectTimer = null;
console.log('Attempting to reconnect...');
try {
const newToken = await getAccessToken();
await subscribeToAnalytics(newToken);
} catch (error) {
console.error('Reconnection failed:', error);
scheduleReconnect(); // Retry again
}
}, RECONNECT_DELAY);
}
Step 3: Process and Forward Events
The incoming WebSocket messages contain detailed conversation analytics. These messages can include start events, update events, and end events. You must parse the JSON payload and forward it to Kafka.
/**
* Processes incoming analytics events and forwards them to Kafka.
* @param {object} event - The parsed WebSocket message.
*/
function processAnalyticsEvent(event) {
// Genesys Analytics events typically contain:
// - eventType: e.g., 'start', 'update', 'end'
// - conversation: The conversation ID
// - details: Specific metrics (duration, hold time, etc.)
if (!event || !event.conversation) {
console.warn('Received invalid event structure:', event);
return;
}
console.log(`Received event type: ${event.eventType} for conversation: ${event.conversation}`);
// Publish to Kafka
publishToKafka(KAFKA_TOPIC, event);
}
Complete Working Example
The following script combines all components into a single runnable module. It handles token refresh, WebSocket reconnection, and Kafka publishing.
const axios = require('axios');
const dotenv = require('dotenv');
const WebSocket = require('ws');
const kafka = require('kafka-node');
dotenv.config();
const {
GENESYS_CLIENT_ID,
GENESYS_CLIENT_SECRET,
GENESYS_ENVIRONMENT,
KAFKA_BROKER = 'localhost:9092',
KAFKA_TOPIC = 'genesys-analytics'
} = process.env;
let producer;
let ws;
let reconnectTimer;
const RECONNECT_DELAY = 5000;
// --- Kafka Initialization ---
function initKafka() {
const client = new kafka.KafkaClient({ kafkaHost: KAFKA_BROKER });
producer = new kafka.Producer(client);
producer.on('ready', () => {
console.log('Kafka producer is ready.');
});
producer.on('error', (error) => {
console.error('Kafka producer error:', error);
});
}
function publishToKafka(topic, payload) {
if (!producer) {
console.error('Kafka producer not initialized.');
return;
}
const messages = [JSON.stringify(payload)];
const kafkaPayload = [
{
topic: topic,
messages: messages,
partition: 0
}
];
producer.send(kafkaPayload, (error, data) => {
if (error) {
console.error('Failed to send to Kafka:', error);
}
});
}
// --- Genesys Authentication ---
async function getAccessToken() {
const url = `https://${GENESYS_ENVIRONMENT}/oauth/token`;
const params = new URLSearchParams();
params.append('grant_type', 'client_credentials');
params.append('client_id', GENESYS_CLIENT_ID);
params.append('client_secret', GENESYS_CLIENT_SECRET);
try {
const response = await axios.post(url, params, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
});
if (response.status === 200) {
return response.data.access_token;
} else {
throw new Error(`Failed to get token: ${response.statusText}`);
}
} catch (error) {
console.error('Token Error:', error.response ? error.response.data : error.message);
throw error;
}
}
// --- WebSocket Subscription ---
async function subscribeToAnalytics(accessToken) {
const wsUrl = `wss://${GENESYS_ENVIRONMENT}/api/v2/analytics/conversations/details/stream`;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close();
}
ws = new WebSocket(wsUrl, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
});
ws.on('open', () => {
console.log('WebSocket connected to Genesys Analytics.');
const subscriptionPayload = {
streams: ['analytics:conversation:details'],
filters: {
type: 'conversation',
query: {
where: 'status = "active"'
}
}
};
ws.send(JSON.stringify(subscriptionPayload));
console.log('Subscription payload sent.');
});
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
processAnalyticsEvent(message);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
});
ws.on('close', (code, reason) => {
console.log(`WebSocket closed: ${code} - ${reason}`);
scheduleReconnect();
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
scheduleReconnect();
});
}
function scheduleReconnect() {
if (reconnectTimer) return;
reconnectTimer = setTimeout(async () => {
reconnectTimer = null;
console.log('Attempting to reconnect...');
try {
const newToken = await getAccessToken();
await subscribeToAnalytics(newToken);
} catch (error) {
console.error('Reconnection failed:', error);
scheduleReconnect();
}
}, RECONNECT_DELAY);
}
// --- Event Processing ---
function processAnalyticsEvent(event) {
if (!event || !event.conversation) {
console.warn('Received invalid event structure:', event);
return;
}
console.log(`Received event type: ${event.eventType} for conversation: ${event.conversation}`);
publishToKafka(KAFKA_TOPIC, event);
}
// --- Main Execution ---
async function main() {
console.log('Starting Genesys Analytics to Kafka Streamer...');
initKafka();
try {
const token = await getAccessToken();
await subscribeToAnalytics(token);
} catch (error) {
console.error('Initial startup failed:', error);
process.exit(1);
}
}
main();
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token is invalid, expired, or missing.
- Fix: Ensure
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare correct. Verify that the token retrieval function is called before the WebSocket connection. Implement token refresh logic if the application runs for longer than 30 minutes.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scopes (
analytics:conversation:view,websockets:subscribe). - Fix: Go to the Genesys Admin Console > Security > OAuth Clients. Edit your client and add the missing scopes. Restart the application to obtain a new token with the updated scopes.
Error: WebSocket Connection Refused
- Cause: The WebSocket URL is incorrect or the environment variable
GENESYS_ENVIRONMENTis malformed. - Fix: Ensure
GENESYS_ENVIRONMENTcontains the full domain (e.g.,mycompany.mygenesiscustomers.com) without thehttps://prefix. The code prependswss://.
Error: Kafka Producer Not Ready
- Cause: The application attempts to publish messages before the Kafka producer has fully initialized.
- Fix: Use the
producer.on('ready')event to confirm initialization before sending messages. In the provided code, thepublishToKafkafunction checks if the producer exists, but you should also implement a queue or retry mechanism if the producer is not yet ready.