Trying to pipe CXone Analytics Notification events directly into a Kafka topic using a custom Node.js consumer. The setup works fine for steady state, but I’m seeing significant data loss right after a network blip or a 401 token refresh.
Here’s the flow:
- Connect to
wss://api.mypurecloud.com/api/v2/analytics/events - Subscribe to
queue-statsandconversation-summary - On message, push to Kafka producer
The problem is the initial burst. When the socket reconnects, Genesys sends a backlog of missed events. My consumer hits the Kafka broker too fast and the producer backpressure chokes the WebSocket read loop. I end up with a SIGKILL on the process or just silent drops because the socket buffer fills up.
Code snippet:
ws.on('message', (data) => {
const events = JSON.parse(data);
// This is blocking the event loop if Kafka is slow
kafkaProducer.send({
topic: 'cxone-analytics',
messages: [{ value: JSON.stringify(events) }]
}).catch(err => console.error('Kafka push failed', err));
});
I can’t seem to find a way to pause the WebSocket stream while the producer catches up. Is there a backpressure mechanism I’m missing in the ws library, or should I be batching these events in memory before pushing to Kafka? The documentation doesn’t mention flow control for the analytics stream.