Streaming NICE CXone LLM Responses with Node.js via Server-Sent Events
What You Will Build
- A Node.js service that establishes a server-sent events connection to the NICE CXone Generative AI gateway, parses incremental token chunks, aggregates them into complete responses, and exposes a client-facing streaming endpoint.
- This tutorial uses the CXone REST API with
Accept: text/event-streamheaders and native Node.jsfetchstreaming capabilities. - The implementation covers JavaScript/Node.js (v18+).
Prerequisites
- OAuth 2.0 Client Credentials grant with scope
ai:generative:write - CXone API version:
v2 - Node.js runtime: v18.0.0 or later
- External dependencies:
express,dotenv
Authentication Setup
CXone uses standard OAuth 2.0 client credentials flow. The token manager below caches the access token, validates expiration, implements exponential backoff for HTTP 429 rate limits, and handles token refresh transparently.
import fetch from 'node-fetch';
/**
* @typedef {Object} TokenResponse
* @property {string} access_token
* @property {number} expires_in
* @property {string} token_type
*/
export class CXoneTokenManager {
/**
* @param {string} clientId
* @param {string} clientSecret
* @param {string} tenant
*/
constructor(clientId, clientSecret, tenant) {
this.clientId = clientId;
this.clientSecret = clientSecret;
this.baseUrl = `https://${tenant}.cxonecloud.com/api/oauth/token`;
this.token = null;
this.expiresAt = 0;
}
/**
* @returns {Promise<string>}
*/
async getToken() {
const now = Date.now();
if (this.token && now < this.expiresAt - 60000) {
return this.token;
}
const body = new URLSearchParams({
grant_type: 'client_credentials',
client_id: this.clientId,
client_secret: this.clientSecret,
scope: 'ai:generative:write'
});
let retries = 0;
const maxRetries = 3;
while (retries <= maxRetries) {
try {
const response = await fetch(this.baseUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: body
});
if (response.status === 429) {
const delay = Math.pow(2, retries) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
retries++;
continue;
}
if (!response.ok) {
const errorBody = await response.text();
throw new Error(`OAuth token fetch failed with status ${response.status}: ${errorBody}`);
}
/** @type {TokenResponse} */
const data = await response.json();
this.token = data.access_token;
this.expiresAt = now + (data.expires_in * 1000);
return this.token;
} catch (error) {
if (retries === maxRetries) {
throw new Error(`Failed to acquire CXone token after ${maxRetries} retries: ${error.message}`);
}
retries++;
}
}
throw new Error('Token acquisition loop exited unexpectedly');
}
}
Implementation
Step 1: Dynamic System Prompt Injection & Request Construction
The CXone Generative AI endpoint accepts a structured message array. You must inject the system prompt dynamically based on session context before initiating the stream. The request body matches the CXone OpenAPI specification for generative completions.
/**
* @typedef {Object} LLMRequest
* @property {Array<{role: string, content: string}>} messages
* @property {boolean} stream
* @property {string} model
* @property {number} max_tokens
*/
/**
* Constructs the CXone LLM request payload with dynamic system prompt injection.
* @param {Object} sessionContext - Session metadata containing user history and intent
* @returns {LLMRequest}
*/
export function buildLLMRequest(sessionContext) {
const systemPrompt = `You are an enterprise support agent. Current user intent: ${sessionContext.intent}. Previous interactions: ${sessionContext.history.join(' | ')}. Respond concisely and maintain professional tone.`;
return {
model: 'cxone-llm-v1',
stream: true,
max_tokens: 1024,
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: sessionContext.userMessage }
]
};
}
Step 2: SSE Connection & Incremental Token Parsing
Establish the streaming connection using fetch. The response body returns a ReadableStream. You must read chunks, decode them, and parse standard SSE framing (data: {...}). The parser extracts delta content and tracks connection state.
/**
* @typedef {'idle'|'streaming'|'interrupted'|'complete'} StreamState
*/
/**
* Parses a raw SSE chunk and extracts token deltas and usage metrics.
* @param {string} chunk
* @param {{ state: StreamState, tokens: string, usage: {prompt_tokens: number, completion_tokens: number} }} streamContext
*/
export function parseSSEChunk(chunk, streamContext) {
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.slice(6);
if (jsonStr === '[DONE]') {
streamContext.state = 'complete';
continue;
}
try {
const payload = JSON.parse(jsonStr);
if (payload.choices?.[0]?.delta?.content) {
streamContext.tokens += payload.choices[0].delta.content;
streamContext.state = 'streaming';
}
if (payload.usage) {
streamContext.usage.prompt_tokens = payload.usage.prompt_tokens || streamContext.usage.prompt_tokens;
streamContext.usage.completion_tokens = payload.usage.completion_tokens || streamContext.usage.completion_tokens;
}
if (payload.error) {
streamContext.state = 'interrupted';
throw new Error(`LLM API Error: ${payload.error.message}`);
}
} catch (parseError) {
if (parseError.message.includes('LLM API Error')) throw parseError;
// Ignore malformed SSE control lines
}
}
}
}
Step 3: Aggregation, Timeout, & Connection State Management
Wrap the stream reader in a timeout controller and connection state tracker. The AbortController handles incomplete responses, while the state machine manages interruptions and successful completion.
/**
* Streams LLM tokens with timeout, state tracking, and aggregation.
* @param {string} accessToken
* @param {LLMRequest} payload
* @param {number} timeoutMs
* @param {Function} onToken - Callback fired for each aggregated token batch
* @param {Function} onUsage - Callback fired when usage metrics arrive
* @param {Function} onError - Callback fired on interruption or timeout
*/
export async function streamLLMResponse(accessToken, payload, timeoutMs, onToken, onUsage, onError) {
const abortController = new AbortController();
const signal = abortController.signal;
let timeoutId;
const streamContext = {
state: 'idle',
tokens: '',
usage: { prompt_tokens: 0, completion_tokens: 0 }
};
try {
const response = await fetch('https://your-tenant.cxonecloud.com/api/v2/ai/generative/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`,
'Accept': 'text/event-stream'
},
body: JSON.stringify(payload),
signal: signal
});
if (response.status === 401 || response.status === 403) {
throw new Error(`Unauthorized or forbidden: ${response.status}`);
}
if (response.status === 429) {
throw new Error('Rate limit exceeded. Implement retry logic at the caller level.');
}
if (!response.ok) {
throw new Error(`LLM request failed with status ${response.status}`);
}
if (!response.body) {
throw new Error('Response body is null');
}
streamContext.state = 'streaming';
const reader = response.body.getReader();
const decoder = new TextDecoder();
timeoutId = setTimeout(() => {
streamContext.state = 'interrupted';
abortController.abort();
onError(new Error('Streaming timeout exceeded'));
}, timeoutMs);
while (true) {
const { done, value } = await reader.read();
if (done) {
streamContext.state = 'complete';
break;
}
const chunk = decoder.decode(value, { stream: true });
parseSSEChunk(chunk, streamContext);
if (streamContext.state === 'interrupted') {
break;
}
if (streamContext.tokens.length > 0) {
onToken(streamContext.tokens);
}
if (streamContext.usage.completion_tokens > 0) {
onUsage(streamContext.usage);
}
}
clearTimeout(timeoutId);
} catch (error) {
clearTimeout(timeoutId);
if (error.name !== 'AbortError') {
onError(error);
}
}
}
Step 4: Express Streaming Endpoint for Client-Side Rendering
Expose the streaming capability via an HTTP endpoint. This endpoint sets the correct SSE headers, manages client disconnections, and pipes aggregated tokens to the browser or downstream service.
import express from 'express';
const app = express();
app.use(express.json());
/**
* @param {CXoneTokenManager} tokenManager
*/
export function registerStreamingEndpoint(tokenManager) {
app.get('/api/stream-llm', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
try {
const accessToken = await tokenManager.getToken();
const payload = buildLLMRequest({
intent: req.query.intent || 'general_support',
history: req.query.history ? req.query.history.split(',') : [],
userMessage: req.query.message || 'Hello'
});
const timeoutMs = parseInt(req.query.timeout || '30000', 10);
await streamLLMResponse(
accessToken,
payload,
timeoutMs,
// onToken
(fullText) => {
res.write(`data: ${JSON.stringify({ delta: fullText })}\n\n`);
res.flush();
},
// onUsage
(usage) => {
res.write(`data: ${JSON.stringify({ usage })}\n\n`);
res.flush();
},
// onError
(error) => {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
);
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
}
Complete Working Example
The following module integrates authentication, request construction, SSE parsing, state management, and the Express endpoint into a single runnable service.
import express from 'express';
import dotenv from 'dotenv';
dotenv.config();
import { CXoneTokenManager } from './tokenManager.js';
import { buildLLMRequest, parseSSEChunk, streamLLMResponse } from './streamHandler.js';
const app = express();
const PORT = process.env.PORT || 3000;
const tokenManager = new CXoneTokenManager(
process.env.CXONE_CLIENT_ID,
process.env.CXONE_CLIENT_SECRET,
process.env.CXONE_TENANT
);
// Re-exported logic for single-file execution
app.get('/api/stream-llm', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
try {
const accessToken = await tokenManager.getToken();
const payload = buildLLMRequest({
intent: req.query.intent || 'general_support',
history: req.query.history ? req.query.history.split(',') : [],
userMessage: req.query.message || 'Explain server-sent events'
});
const timeoutMs = parseInt(req.query.timeout || '30000', 10);
const streamContext = {
state: 'idle',
tokens: '',
usage: { prompt_tokens: 0, completion_tokens: 0 }
};
const response = await fetch(`https://${process.env.CXONE_TENANT}.cxonecloud.com/api/v2/ai/generative/completions`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`,
'Accept': 'text/event-stream'
},
body: JSON.stringify(payload)
});
if (!response.ok) {
res.write(`data: ${JSON.stringify({ error: `HTTP ${response.status}` })}\n\n`);
res.end();
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
parseSSEChunk(chunk, streamContext);
if (streamContext.state === 'interrupted') {
res.write(`data: ${JSON.stringify({ error: 'Stream interrupted' })}\n\n`);
break;
}
if (streamContext.tokens.length > 0) {
res.write(`data: ${JSON.stringify({ delta: streamContext.tokens })}\n\n`);
}
if (streamContext.usage.completion_tokens > 0) {
res.write(`data: ${JSON.stringify({ usage: streamContext.usage })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
app.listen(PORT, () => {
console.log(`LLM streaming gateway running on port ${PORT}`);
});
Common Errors & Debugging
Error: HTTP 401 Unauthorized
- Cause: The OAuth token expired, the client credentials are invalid, or the scope
ai:generative:writeis missing from the client configuration. - Fix: Verify the tenant domain matches the token endpoint. Ensure the CXone admin console grants the
ai:generative:writescope to the OAuth client. Implement token refresh before expiration as shown in theCXoneTokenManager. - Code: The
getToken()method checksthis.expiresAt - 60000to proactively refresh tokens before they expire.
Error: HTTP 429 Too Many Requests
- Cause: The CXone Generative AI gateway enforces rate limits per tenant or per OAuth client. Concurrent streaming requests exceed the quota.
- Fix: Implement exponential backoff with jitter. The token manager includes a retry loop for 429 responses. Apply the same pattern to the LLM streaming request if you queue multiple concurrent streams.
- Code:
if (response.status === 429) {
const delay = Math.pow(2, retries) * 1000 + Math.random() * 500;
await new Promise(resolve => setTimeout(resolve, delay));
retries++;
continue;
}
Error: Connection Reset or Stream Interruption
- Cause: Network instability, proxy timeouts, or CXone gateway dropping idle connections during long generation tasks.
- Fix: Use
AbortControllerto enforce client-side timeouts. TrackstreamContext.stateto detect interruptions. Implement a reconnection mechanism that resumes from the last aggregated token if the API supports conversation continuation. - Code: The
streamLLMResponsefunction wraps the reader loop withsetTimeoutandabortController.abort()to cleanly terminate hanging streams.
Error: Malformed JSON in SSE Data Lines
- Cause: CXone streams control events, empty lines, or partial JSON chunks when network buffers split payloads.
- Fix: Accumulate raw chunks before parsing. Use a buffer string to ensure complete JSON objects are parsed. The
parseSSEChunkfunction safely wrapsJSON.parsein a try-catch and ignores malformed control lines. - Code:
try {
const payload = JSON.parse(jsonStr);
// process payload
} catch (parseError) {
if (parseError.message.includes('LLM API Error')) throw parseError;
// Ignore malformed SSE control lines
}