Build a Real-Time Sentiment Scorer for Genesys Cloud Agent Assist Using Node.js WebSocket API
What You Will Build
A Node.js service that subscribes to live conversation events via the Genesys Cloud WebSocket API, scores sentiment and emotion in real time, validates payloads against NLP engine constraints, and exposes a reusable scorer class for Agent Assist integrations. This implementation uses the Genesys Cloud WebSocket API and OAuth 2.0 Client Credentials flow. The tutorial covers Node.js 18+ with the ws and axios libraries.
Prerequisites
- OAuth 2.0 Client Credentials flow with
view:interactionandview:analyticsscopes - Genesys Cloud WebSocket API v2
- Node.js 18 or later
- External dependencies:
ws,axios,uuid - Active Speech Analytics license with real-time sentiment and emotion detection enabled
Authentication Setup
The Genesys Cloud platform requires a Bearer token for WebSocket authentication. The Client Credentials flow returns a short-lived token that must be refreshed before expiration. The following implementation caches the token, handles 429 rate limits with exponential backoff, and validates the response structure.
const axios = require('axios');
class GenesysAuth {
constructor(domain, clientId, clientSecret) {
this.domain = domain.replace(/^https?:\/\//, '').replace(/\/$/, '');
this.clientId = clientId;
this.clientSecret = clientSecret;
this.token = null;
this.tokenExpiry = 0;
this.baseAuthUrl = `https://${this.domain}/api/v2/oauth/token`;
}
async getToken() {
const now = Date.now();
if (this.token && now < this.tokenExpiry - 60000) {
return this.token;
}
const credentials = Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64');
const payload = new URLSearchParams({
grant_type: 'client_credentials',
scope: 'view:interaction view:analytics'
});
try {
const response = await axios.post(this.baseAuthUrl, payload, {
headers: {
'Authorization': `Basic ${credentials}`,
'Content-Type': 'application/x-www-form-urlencoded'
},
timeout: 10000
});
if (response.status !== 200) {
throw new Error(`OAuth token request failed with status ${response.status}`);
}
this.token = response.data.access_token;
this.tokenExpiry = now + (response.data.expires_in * 1000);
return this.token;
} catch (error) {
if (error.response?.status === 429) {
const retryAfter = parseInt(error.response.headers['retry-after'] || '5', 10);
console.warn(`OAuth 429 rate limit hit. Retrying in ${retryAfter}s`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return this.getToken();
}
if (error.response?.status === 401 || error.response?.status === 403) {
throw new Error('OAuth 401/403: Invalid client credentials or missing scopes');
}
if (error.response?.status >= 500) {
throw new Error('OAuth 5xx: Genesys Cloud authentication service unavailable');
}
throw error;
}
}
}
OAuth Scope Requirement: view:interaction is required to subscribe to conversation events. view:analytics is required to access real-time sentiment and emotion data within the interaction payload.
Implementation
Step 1: WebSocket Connection and Atomic SUBSCRIBE Management
The WebSocket API uses a specific JSON protocol for subscriptions. Each subscription requires a unique message ID and a topic path. The platform enforces maximum analysis frequency limits per interaction. This step implements a subscription queue that validates payload format, enforces atomic SUBSCRIBE operations, and handles automatic context windowing triggers.
const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');
class WebSocketManager {
constructor(domain) {
this.domain = domain.replace(/^https?:\/\//, '').replace(/\/$/, '');
this.wsUrl = `wss://${this.domain}/api/v2/websocket`;
this.ws = null;
this.subscribeQueue = [];
this.isProcessingQueue = false;
this.subscribedInteractions = new Set();
}
async connect(authToken) {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.wsUrl, {
headers: {
'Authorization': `Bearer ${authToken}`,
'User-Agent': 'Genesys-Sentiment-Scorer/1.0'
}
});
this.ws.on('open', () => {
console.log('WebSocket connection established');
this.processQueue();
resolve();
});
this.ws.on('error', (error) => {
console.error('WebSocket connection error:', error.message);
reject(error);
});
this.ws.on('close', (code, reason) => {
console.warn(`WebSocket closed with code ${code}: ${reason}`);
});
});
}
async subscribeToInteraction(interactionId) {
if (this.subscribedInteractions.has(interactionId)) {
return;
}
const subscribePayload = {
op: 'SUBSCRIBE',
topic: `v2.interaction.${interactionId}`,
id: uuidv4()
};
this.subscribeQueue.push(subscribePayload);
if (!this.isProcessingQueue) {
this.processQueue();
}
}
async processQueue() {
if (this.isProcessingQueue || this.subscribeQueue.length === 0) {
return;
}
this.isProcessingQueue = true;
while (this.subscribeQueue.length > 0) {
const payload = this.subscribeQueue.shift();
try {
this.ws.send(JSON.stringify(payload));
this.subscribedInteractions.add(payload.topic.split('.').pop());
console.log(`SUBSCRIBE sent for topic: ${payload.topic}`);
// Enforce maximum analysis frequency limit (500ms delay between subscriptions)
await new Promise(resolve => setTimeout(resolve, 500));
} catch (error) {
console.error(`Failed to send SUBSCRIBE for ${payload.topic}:`, error.message);
this.subscribeQueue.unshift(payload);
break;
}
}
this.isProcessingQueue = false;
}
close() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
}
}
Step 2: Payload Validation and Scoring Logic
Genesys Cloud returns interaction events containing transcription segments, sentiment polarity, and emotion classifications. The NLP engine enforces constraints on analysis frequency, supported languages, and confidence thresholds. This step implements scoring validation logic, sarcasm detection checking, multi-lingual verification, and automatic context windowing triggers.
class SentimentScorer {
constructor(config) {
this.polarityThresholds = config.polarityThresholds || {
positive: 0.6,
neutral: 0.3,
negative: -0.3
};
this.emotionDirectives = config.emotionDirectives || ['joy', 'anger', 'fear', 'sadness', 'disgust', 'surprise'];
this.supportedLanguages = config.supportedLanguages || ['en', 'es', 'fr', 'de', 'pt'];
this.maxAnalysisFrequencyMs = config.maxAnalysisFrequencyMs || 2000;
this.contextWindowMs = config.contextWindowMs || 30000;
this.lastScoreTimestamps = new Map();
this.contextWindows = new Map();
}
validatePayload(eventData) {
if (!eventData || !eventData.segments) {
throw new Error('Invalid interaction payload: missing segments array');
}
if (!eventData.language) {
throw new Error('Invalid interaction payload: missing language identifier');
}
if (!this.supportedLanguages.includes(eventData.language)) {
throw new Error(`Unsupported language for NLP engine: ${eventData.language}`);
}
}
shouldScore(interactionId, timestamp) {
const lastScore = this.lastScoreTimestamps.get(interactionId) || 0;
if (timestamp - lastScore < this.maxAnalysisFrequencyMs) {
return false;
}
this.lastScoreTimestamps.set(interactionId, timestamp);
return true;
}
getWindowedContext(interactionId, currentTimestamp) {
const window = this.contextWindows.get(interactionId) || [];
const cutoff = currentTimestamp - this.contextWindowMs;
const filteredWindow = window.filter(seg => seg.timestamp > cutoff);
this.contextWindows.set(interactionId, filteredWindow);
return filteredWindow;
}
scoreSentiment(eventData, interactionId, timestamp) {
this.validatePayload(eventData);
if (!this.shouldScore(interactionId, timestamp)) {
return null;
}
const contextWindow = this.getWindowedContext(interactionId, timestamp);
const currentSegment = eventData.segments[eventData.segments.length - 1];
contextWindow.push(currentSegment);
const sentiment = eventData.sentiment || { polarity: 0, confidence: 0, label: 'neutral' };
const emotion = eventData.emotion || { label: 'unknown', confidence: 0 };
// Sarcasm detection check: low polarity confidence with high emotion intensity
const isPotentialSarcasm = sentiment.confidence < 0.5 &&
sentiment.polarity > 0.4 &&
emotion.confidence > 0.8 &&
(emotion.label === 'joy' || emotion.label === 'surprise');
const polarityScore = this.calculatePolarityScore(sentiment.polarity);
return {
interactionId,
timestamp,
language: eventData.language,
polarity: polarityScore,
polarityRaw: sentiment.polarity,
confidence: sentiment.confidence,
emotion: this.emotionDirectives.includes(emotion.label) ? emotion.label : 'uncategorized',
emotionConfidence: emotion.confidence,
sarcasmFlag: isPotentialSarcasm,
contextSegmentCount: contextWindow.length,
scoredAt: Date.now()
};
}
calculatePolarityScore(rawPolarity) {
if (rawPolarity >= this.polarityThresholds.positive) return 'positive';
if (rawPolarity <= this.polarityThresholds.negative) return 'negative';
return 'neutral';
}
}
Step 3: Event Routing, Latency Tracking, and Audit Logging
Real-time scoring requires synchronization with external coaching platforms, latency monitoring, and governance-compliant audit trails. This step implements callback handlers, performance metrics collection, and structured audit log generation.
class SentimentAuditLogger {
constructor() {
this.logs = [];
this.metrics = {
totalEvents: 0,
scoredEvents: 0,
skippedEvents: 0,
averageLatencyMs: 0,
latencySamples: []
};
}
recordEvent(eventType, payload, processingTimeMs) {
const logEntry = {
id: uuidv4(),
timestamp: Date.now(),
eventType,
interactionId: payload.interactionId,
language: payload.language,
polarity: payload.polarity,
emotion: payload.emotion,
sarcasmFlag: payload.sarcasmFlag,
processingTimeMs,
status: 'success'
};
this.logs.push(logEntry);
this.metrics.totalEvents++;
if (payload.polarity) {
this.metrics.scoredEvents++;
this.metrics.latencySamples.push(processingTimeMs);
this.metrics.averageLatencyMs = this.metrics.latencySamples.reduce((a, b) => a + b, 0) / this.metrics.latencySamples.length;
} else {
this.metrics.skippedEvents++;
}
}
getAuditReport() {
return {
metrics: this.metrics,
recentLogs: this.logs.slice(-100)
};
}
}
class EventRouter {
constructor(callbacks) {
this.callbacks = callbacks || [];
}
addCallback(handler) {
if (typeof handler !== 'function') {
throw new Error('Callback handler must be a function');
}
this.callbacks.push(handler);
}
async routeEvent(scoringResult) {
const routingPromises = this.callbacks.map(callback => {
try {
return callback(scoringResult);
} catch (error) {
console.error(`Callback execution failed: ${error.message}`);
return Promise.reject(error);
}
});
return Promise.allSettled(routingPromises);
}
}
Complete Working Example
The following module integrates authentication, WebSocket management, scoring validation, and audit logging into a single exportable class. It is ready to run after replacing the environment variables.
const axios = require('axios');
const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');
class GenesysAuth {
constructor(domain, clientId, clientSecret) {
this.domain = domain.replace(/^https?:\/\//, '').replace(/\/$/, '');
this.clientId = clientId;
this.clientSecret = clientSecret;
this.token = null;
this.tokenExpiry = 0;
this.baseAuthUrl = `https://${this.domain}/api/v2/oauth/token`;
}
async getToken() {
const now = Date.now();
if (this.token && now < this.tokenExpiry - 60000) {
return this.token;
}
const credentials = Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64');
const payload = new URLSearchParams({
grant_type: 'client_credentials',
scope: 'view:interaction view:analytics'
});
try {
const response = await axios.post(this.baseAuthUrl, payload, {
headers: {
'Authorization': `Basic ${credentials}`,
'Content-Type': 'application/x-www-form-urlencoded'
},
timeout: 10000
});
if (response.status !== 200) {
throw new Error(`OAuth token request failed with status ${response.status}`);
}
this.token = response.data.access_token;
this.tokenExpiry = now + (response.data.expires_in * 1000);
return this.token;
} catch (error) {
if (error.response?.status === 429) {
const retryAfter = parseInt(error.response.headers['retry-after'] || '5', 10);
console.warn(`OAuth 429 rate limit hit. Retrying in ${retryAfter}s`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return this.getToken();
}
if (error.response?.status === 401 || error.response?.status === 403) {
throw new Error('OAuth 401/403: Invalid client credentials or missing scopes');
}
if (error.response?.status >= 500) {
throw new Error('OAuth 5xx: Genesys Cloud authentication service unavailable');
}
throw error;
}
}
}
class WebSocketManager {
constructor(domain) {
this.domain = domain.replace(/^https?:\/\//, '').replace(/\/$/, '');
this.wsUrl = `wss://${this.domain}/api/v2/websocket`;
this.ws = null;
this.subscribeQueue = [];
this.isProcessingQueue = false;
this.subscribedInteractions = new Set();
this.messageHandlers = [];
}
async connect(authToken) {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.wsUrl, {
headers: {
'Authorization': `Bearer ${authToken}`,
'User-Agent': 'Genesys-Sentiment-Scorer/1.0'
}
});
this.ws.on('open', () => {
console.log('WebSocket connection established');
this.processQueue();
resolve();
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.messageHandlers.forEach(handler => handler(message));
} catch (error) {
console.error('Failed to parse WebSocket message:', error.message);
}
});
this.ws.on('error', (error) => {
console.error('WebSocket connection error:', error.message);
reject(error);
});
this.ws.on('close', (code, reason) => {
console.warn(`WebSocket closed with code ${code}: ${reason}`);
});
});
}
onMessage(handler) {
this.messageHandlers.push(handler);
}
async subscribeToInteraction(interactionId) {
if (this.subscribedInteractions.has(interactionId)) {
return;
}
const subscribePayload = {
op: 'SUBSCRIBE',
topic: `v2.interaction.${interactionId}`,
id: uuidv4()
};
this.subscribeQueue.push(subscribePayload);
if (!this.isProcessingQueue) {
this.processQueue();
}
}
async processQueue() {
if (this.isProcessingQueue || this.subscribeQueue.length === 0) {
return;
}
this.isProcessingQueue = true;
while (this.subscribeQueue.length > 0) {
const payload = this.subscribeQueue.shift();
try {
this.ws.send(JSON.stringify(payload));
this.subscribedInteractions.add(payload.topic.split('.').pop());
console.log(`SUBSCRIBE sent for topic: ${payload.topic}`);
await new Promise(resolve => setTimeout(resolve, 500));
} catch (error) {
console.error(`Failed to send SUBSCRIBE for ${payload.topic}:`, error.message);
this.subscribeQueue.unshift(payload);
break;
}
}
this.isProcessingQueue = false;
}
close() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
}
}
class SentimentScorer {
constructor(config) {
this.polarityThresholds = config.polarityThresholds || { positive: 0.6, neutral: 0.3, negative: -0.3 };
this.emotionDirectives = config.emotionDirectives || ['joy', 'anger', 'fear', 'sadness', 'disgust', 'surprise'];
this.supportedLanguages = config.supportedLanguages || ['en', 'es', 'fr', 'de', 'pt'];
this.maxAnalysisFrequencyMs = config.maxAnalysisFrequencyMs || 2000;
this.contextWindowMs = config.contextWindowMs || 30000;
this.lastScoreTimestamps = new Map();
this.contextWindows = new Map();
}
validatePayload(eventData) {
if (!eventData || !eventData.segments) throw new Error('Invalid interaction payload: missing segments array');
if (!eventData.language) throw new Error('Invalid interaction payload: missing language identifier');
if (!this.supportedLanguages.includes(eventData.language)) throw new Error(`Unsupported language for NLP engine: ${eventData.language}`);
}
shouldScore(interactionId, timestamp) {
const lastScore = this.lastScoreTimestamps.get(interactionId) || 0;
if (timestamp - lastScore < this.maxAnalysisFrequencyMs) return false;
this.lastScoreTimestamps.set(interactionId, timestamp);
return true;
}
getWindowedContext(interactionId, currentTimestamp) {
const window = this.contextWindows.get(interactionId) || [];
const cutoff = currentTimestamp - this.contextWindowMs;
const filteredWindow = window.filter(seg => seg.timestamp > cutoff);
this.contextWindows.set(interactionId, filteredWindow);
return filteredWindow;
}
scoreSentiment(eventData, interactionId, timestamp) {
this.validatePayload(eventData);
if (!this.shouldScore(interactionId, timestamp)) return null;
const contextWindow = this.getWindowedContext(interactionId, timestamp);
const currentSegment = eventData.segments[eventData.segments.length - 1];
contextWindow.push(currentSegment);
const sentiment = eventData.sentiment || { polarity: 0, confidence: 0, label: 'neutral' };
const emotion = eventData.emotion || { label: 'unknown', confidence: 0 };
const isPotentialSarcasm = sentiment.confidence < 0.5 &&
sentiment.polarity > 0.4 &&
emotion.confidence > 0.8 &&
(emotion.label === 'joy' || emotion.label === 'surprise');
const polarityScore = sentiment.polarity >= this.polarityThresholds.positive ? 'positive' :
sentiment.polarity <= this.polarityThresholds.negative ? 'negative' : 'neutral';
return {
interactionId,
timestamp,
language: eventData.language,
polarity: polarityScore,
polarityRaw: sentiment.polarity,
confidence: sentiment.confidence,
emotion: this.emotionDirectives.includes(emotion.label) ? emotion.label : 'uncategorized',
emotionConfidence: emotion.confidence,
sarcasmFlag: isPotentialSarcasm,
contextSegmentCount: contextWindow.length,
scoredAt: Date.now()
};
}
}
class SentimentAuditLogger {
constructor() {
this.logs = [];
this.metrics = { totalEvents: 0, scoredEvents: 0, skippedEvents: 0, averageLatencyMs: 0, latencySamples: [] };
}
recordEvent(eventType, payload, processingTimeMs) {
const logEntry = {
id: uuidv4(), timestamp: Date.now(), eventType, interactionId: payload.interactionId,
language: payload.language, polarity: payload.polarity, emotion: payload.emotion,
sarcasmFlag: payload.sarcasmFlag, processingTimeMs, status: 'success'
};
this.logs.push(logEntry);
this.metrics.totalEvents++;
if (payload.polarity) {
this.metrics.scoredEvents++;
this.metrics.latencySamples.push(processingTimeMs);
this.metrics.averageLatencyMs = this.metrics.latencySamples.reduce((a, b) => a + b, 0) / this.metrics.latencySamples.length;
} else {
this.metrics.skippedEvents++;
}
}
getAuditReport() { return { metrics: this.metrics, recentLogs: this.logs.slice(-100) }; }
}
class EventRouter {
constructor(callbacks) { this.callbacks = callbacks || []; }
addCallback(handler) {
if (typeof handler !== 'function') throw new Error('Callback handler must be a function');
this.callbacks.push(handler);
}
async routeEvent(scoringResult) {
return Promise.allSettled(this.callbacks.map(callback => callback(scoringResult).catch(e => console.error(`Callback failed: ${e.message}`))));
}
}
class AgentAssistSentimentScorer {
constructor(config) {
this.auth = new GenesysAuth(config.domain, config.clientId, config.clientSecret);
this.wsManager = new WebSocketManager(config.domain);
this.scorer = new SentimentScorer(config.scoringConfig);
this.logger = new SentimentAuditLogger();
this.router = new EventRouter(config.externalCallbacks);
this.isRunning = false;
}
async start() {
console.log('Initializing Agent Assist Sentiment Scorer...');
const token = await this.auth.getToken();
await this.wsManager.connect(token);
this.wsManager.onMessage(async (message) => {
if (message.type !== 'v2.interaction') return;
const startMs = Date.now();
try {
const interactionId = message.data.id;
const scoringResult = this.scorer.scoreSentiment(message.data, interactionId, Date.now());
const processingTime = Date.now() - startMs;
this.logger.recordEvent('sentiment_score', scoringResult || { interactionId, polarity: null }, processingTime);
if (scoringResult) {
await this.router.routeEvent(scoringResult);
console.log(`Scored interaction ${interactionId}: ${scoringResult.polarity} (${scoringResult.emotion})`);
}
} catch (error) {
console.error(`Scoring pipeline error: ${error.message}`);
}
});
this.isRunning = true;
console.log('Sentiment scorer is running');
}
async stop() {
this.isRunning = false;
this.wsManager.close();
console.log('Sentiment scorer stopped');
return this.logger.getAuditReport();
}
getMetrics() { return this.logger.getAuditReport(); }
}
module.exports = { AgentAssistSentimentScorer };
Common Errors & Debugging
Error: WebSocket 401 Unauthorized
- What causes it: The Bearer token expired or lacks the
view:interactionscope. The WebSocket API validates the token at connection time and does not support automatic token refresh. - How to fix it: Implement token expiration tracking. Reconnect the WebSocket when the token expires. Ensure the OAuth client has both
view:interactionandview:analyticsscopes. - Code showing the fix: The
GenesysAuth.getToken()method caches the token and subtracts 60 seconds from the expiry window to force early refresh. TheAgentAssistSentimentScorerclass must be updated to listen for WebSocket close events and reconnect with a fresh token.
Error: NLP Engine Constraint Violation (429 Too Many Requests)
- What causes it: Exceeding the maximum analysis frequency limit per interaction or sending SUBSCRIBE operations faster than the platform allows.
- How to fix it: Enforce a minimum delay between subscription requests. Implement a scoring frequency filter that skips analysis if the last score for an interaction occurred within the threshold window.
- Code showing the fix: The
SentimentScorer.shouldScore()method checksmaxAnalysisFrequencyMs. TheWebSocketManager.processQueue()method enforces a 500ms delay between SUBSCRIBE operations.
Error: Unsupported Language or Missing Segments
- What causes it: The interaction payload lacks transcription segments or uses a language not enabled in the Speech Analytics configuration.
- How to fix it: Validate the
languagefield against thesupportedLanguagesarray before scoring. Filter out interactions withoutsegmentsorsentimentdata. - Code showing the fix: The
SentimentScorer.validatePayload()method throws explicit errors for missing segments or unsupported languages. The event handler catches these errors and logs them without crashing the scorer.
Error: Callback Execution Timeout
- What causes it: External coaching platforms respond slowly or hang, blocking the main event loop.
- How to fix it: Use
Promise.allSettledto isolate callback failures. Implement request timeouts for external HTTP calls. - Code showing the fix: The
EventRouter.routeEvent()method usesPromise.allSettledand catches individual callback errors, ensuring one failing endpoint does not drop other scoring events.