Streaming Genesys Cloud Analytics Notification Events to a Kafka Topic via Node.js
What You Will Build
- A Node.js service that subscribes to Genesys Cloud Real-Time Analytics notifications, processes incoming JSON payloads, and produces them to a specific Apache Kafka topic.
- This integration uses the Genesys Cloud Real-Time Analytics API (WebSocket-based) and the
kafkajslibrary for Kafka production. - The tutorial covers JavaScript (Node.js) with strict TypeScript-like typing patterns using JSDoc for clarity.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Grant).
- Required Scopes:
analytics:realtime:readis mandatory for subscribing to real-time analytics events. If you need specific conversation details later,conversation:readmay be required, but for raw analytics streams,analytics:realtime:readsuffices. - SDK/API Version: Genesys Cloud Real-Time Analytics API (v2).
- Language/Runtime: Node.js 18+ (for stable native WebSocket support and modern async/await patterns).
- External Dependencies:
@genesyscloud/realtime(or manual WebSocket implementation as shown below for lower-level control).kafkajs(for Kafka integration).dotenv(for environment variable management).
Authentication Setup
Genesys Cloud Real-Time Analytics does not use standard HTTP Bearer tokens in the WebSocket handshake. Instead, it uses a specific WebSocket URL pattern that includes an OAuth access token as a query parameter. This token must be obtained via the standard Client Credentials flow.
We will create a helper class to manage the token lifecycle. Genesys tokens expire after a specific duration (usually 1 hour). For a long-running WebSocket connection, you typically do not need to refresh the token during the connection unless the connection drops and reconnects. However, if your application manages multiple connections or reconnects frequently, token caching is essential.
// auth.js
const https = require('https');
class GenesysAuth {
constructor(clientId, clientSecret, baseUrl = 'https://api.mypurecloud.com') {
this.clientId = clientId;
this.clientSecret = clientSecret;
this.baseUrl = baseUrl;
this.token = null;
this.tokenExpiry = 0;
}
async getToken() {
// Return cached token if valid
if (this.token && Date.now() < this.tokenExpiry) {
return this.token;
}
const url = `${this.baseUrl}/oauth/token`;
const payload = `grant_type=client_credentials&scope=analytics:realtime:read`;
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': `Basic ${Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64')}`
}
};
try {
const response = await new Promise((resolve, reject) => {
const req = https.request(url, options, resolve);
req.on('error', reject);
req.write(payload);
req.end();
});
let data = '';
response.on('data', chunk => data += chunk);
response.on('end', () => JSON.parse(data));
} catch (error) {
throw new Error(`Failed to acquire OAuth token: ${error.message}`);
}
const tokenData = JSON.parse(data); // Note: In a real app, parse inside the promise chain properly
if (!tokenData.access_token) {
throw new Error('No access_token in OAuth response');
}
this.token = tokenData.access_token;
// Expires_in is in seconds, convert to milliseconds and subtract a buffer
this.tokenExpiry = Date.now() + ((tokenData.expires_in || 3600) - 60) * 1000;
return this.token;
}
}
module.exports = GenesysAuth;
Implementation
Step 1: Establishing the Real-Time Analytics WebSocket Connection
The Genesys Cloud Real-Time Analytics API exposes a WebSocket endpoint. Unlike REST APIs, you do not send a request and get a response. You establish a persistent connection and send subscription commands.
The base WebSocket URL is wss://api.mypurecloud.com/api/v2/analytics/realtime. You must append ?access_token=YOUR_TOKEN.
Once connected, you send a JSON message to subscribe to specific metrics. For this tutorial, we will subscribe to conversation events, which provide a stream of state changes for active conversations.
// kafka-producer.js
const { Kafka } = require('kafkajs');
class KafkaProducer {
constructor(kafkaConfig) {
this.kafka = new Kafka(kafkaConfig);
this.producer = this.kafka.producer();
}
async connect() {
await this.producer.connect();
console.log('Connected to Kafka');
}
async send(topic, messages) {
// messages is an array of { key, value }
try {
await this.producer.send({
topic,
messages
});
} catch (error) {
console.error('Kafka send failed:', error);
// In production, you might want to implement a retry queue here
}
}
async disconnect() {
await this.producer.disconnect();
}
}
module.exports = KafkaProducer;
Step 2: Subscribing to Analytics Events
After the WebSocket connection is established, Genesys expects a JSON payload to define the subscription. The structure is strict. You must specify the type of event (e.g., conversation, queue, user) and the filters to narrow down the data.
For this example, we will subscribe to all conversation events. This includes events like conversation:start, conversation:update, and conversation:end.
// subscription-manager.js
const WebSocket = require('ws');
const GenesysAuth = require('./auth');
const KafkaProducer = require('./kafka-producer');
require('dotenv').config();
class AnalyticsStream {
constructor() {
this.auth = new GenesysAuth(
process.env.GENESYS_CLIENT_ID,
process.env.GENESYS_CLIENT_SECRET,
process.env.GENESYS_BASE_URL || 'https://api.mypurecloud.com'
);
this.kafkaProducer = new KafkaProducer({
clientId: 'genesys-analytics-consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092']
});
this.ws = null;
this.reconnectTimer = null;
}
async start() {
await this.kafkaProducer.connect();
this.connectWebSocket();
}
async connectWebSocket() {
const token = await this.auth.getToken();
const wsUrl = `${process.env.GENESYS_BASE_URL || 'wss://api.mypurecloud.com'}/api/v2/analytics/realtime?access_token=${token}`;
console.log(`Connecting to WebSocket: ${wsUrl}`);
this.ws = new WebSocket(wsUrl);
this.ws.on('open', () => {
console.log('WebSocket connection established');
this.subscribeToEvents();
});
this.ws.on('message', (data) => {
this.handleMessage(data);
});
this.ws.on('error', (err) => {
console.error('WebSocket error:', err);
this.scheduleReconnect();
});
this.ws.on('close', (code, reason) => {
console.log(`WebSocket closed: ${code} ${reason}`);
// Genesys often closes connections for maintenance or if idle. Reconnect.
this.scheduleReconnect();
});
}
subscribeToEvents() {
// Subscribe to conversation events
const subscription = {
type: 'conversation',
filters: {
// Optional: filter by queue ID, user ID, etc.
// queueIds: ['YOUR_QUEUE_ID'],
}
};
// Send the subscription command
this.ws.send(JSON.stringify(subscription));
console.log('Sent subscription command for conversation events');
}
handleMessage(data) {
try {
const message = JSON.parse(data);
// Genesys sends various message types. We care about 'event' type.
if (message.type === 'event') {
this.processEvent(message);
} else if (message.type === 'error') {
console.error('Genesys Error:', message);
} else {
// Other types like 'info' or 'ping' can be ignored or logged
console.log('Non-event message:', message.type);
}
} catch (error) {
console.error('Failed to parse message:', error);
}
}
processEvent(eventMessage) {
// The eventMessage contains the actual analytics data in the 'data' field
const eventData = eventMessage.data;
// Prepare the message for Kafka
// We use the conversationId as the key to ensure partitioning by conversation
const kafkaMessage = {
key: eventData.conversationId,
value: JSON.stringify({
eventType: eventMessage.eventType,
timestamp: eventMessage.timestamp,
data: eventData
})
};
// Send to Kafka
this.kafkaProducer.send('genesys-analytics-events', [kafkaMessage]);
}
scheduleReconnect() {
if (this.reconnectTimer) return;
console.log('Scheduling reconnect in 5 seconds...');
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
await this.connectWebSocket();
}, 5000);
}
async stop() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.ws) {
this.ws.close();
}
await this.kafkaProducer.disconnect();
}
}
module.exports = AnalyticsStream;
Step 3: Processing and Producing Results
The handleMessage function receives raw JSON from Genesys. The structure of these messages is consistent. A typical event message looks like this:
{
"type": "event",
"eventType": "conversation:update",
"timestamp": "2023-10-27T10:00:00.000Z",
"data": {
"conversationId": "12345678-abcd-efgh-ijkl-123456789012",
"type": "voice",
"state": "active",
"participants": [
{
"id": "user-id-1",
"role": "agent",
"state": "connected"
},
{
"id": "customer-id-1",
"role": "customer",
"state": "connected"
}
],
"metrics": {
"waitTime": 1000,
"talkTime": 5000
}
}
}
In the processEvent method, we extract the data field, which contains the payload. We wrap this in a new object that includes the eventType and timestamp from the outer message envelope. This is crucial because the data object alone might not always contain the global timestamp or the specific event type that triggered the update.
We send this wrapped object to Kafka. Using conversationId as the Kafka key ensures that all events related to a single conversation are routed to the same partition in Kafka. This is vital for downstream consumers that need to reconstruct the conversation timeline in order.
Complete Working Example
Below is the complete index.js file that ties everything together.
// index.js
const AnalyticsStream = require('./subscription-manager');
async function main() {
const stream = new AnalyticsStream();
// Handle graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down...');
await stream.stop();
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down...');
await stream.stop();
process.exit(0);
});
try {
await stream.start();
console.log('Analytics stream is running. Waiting for events...');
} catch (error) {
console.error('Failed to start analytics stream:', error);
process.exit(1);
}
}
main();
To run this, create a .env file with the following variables:
GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_BASE_URL=https://api.mypurecloud.com
KAFKA_BROKER=localhost:9092
Install dependencies:
npm install ws kafkajs dotenv
Run the application:
node index.js
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Connection
- Cause: The OAuth token in the query parameter is invalid, expired, or missing the
analytics:realtime:readscope. - Fix: Verify your client credentials. Ensure the token request includes the correct scope. Check the
getTokenmethod inauth.jsto ensure it is successfully retrieving a token before passing it to the WebSocket URL.
Error: 403 Forbidden on WebSocket Connection
- Cause: The OAuth client does not have permission to access real-time analytics.
- Fix: In the Genesys Cloud Admin portal, navigate to Users > Your User > Roles. Ensure the user associated with the OAuth client (or the client itself if using Client Credentials) has the Real-Time Analytics role or a custom role with the
analytics:realtime:readpermission.
Error: WebSocket Connection Closed Unexpectedly
- Cause: Genesys Cloud may close WebSocket connections for maintenance, idle timeouts, or if the message queue backs up.
- Fix: The provided code includes a
scheduleReconnectmethod. Ensure this logic is robust. In production, consider implementing an exponential backoff strategy for reconnection attempts to avoid overwhelming the Genesys API during outages.
Error: Kafka Producer Send Failed
- Cause: The Kafka broker is unreachable, or the topic does not exist and auto-creation is disabled.
- Fix: Verify that your Kafka broker is running and accessible from the Node.js application. Ensure the topic
genesys-analytics-eventsexists or thatauto.create.topics.enable=trueis set in your Kafka broker configuration.