Balancing NICE CXone Routing Queues with Node.js Using Least-Connections and BullMQ

Balancing NICE CXone Routing Queues with Node.js Using Least-Connections and BullMQ

What You Will Build

  • A Node.js service that polls CXone routing queue metrics, calculates optimal split percentages using a least-connections algorithm, and applies configuration updates with ETag conflict resolution.
  • Uses the CXone Routing API (/api/v2/routing/queues/{queueId} and /api/v2/routing/queues/{queueId}/metrics) with OAuth 2.0 client credentials.
  • Implements Node.js with axios, bullmq, and standard library modules.

Prerequisites

  • CXone OAuth Client ID and Secret with scopes: routing:queue:read, routing:queue:write
  • CXone API base URL (typically https://api.cxone.com or environment-specific)
  • Node.js 18+
  • Redis instance running on localhost:6379 (required for BullMQ)
  • Dependencies: npm install axios bullmq dotenv uuid
  • Environment variables in .env:
    • CXONE_BASE_URL
    • CXONE_CLIENT_ID
    • CXONE_CLIENT_SECRET
    • QUEUE_IDS (comma-separated list of CXone queue IDs to balance)
    • DASHBOARD_WEBHOOK_URL (optional, for exporting decisions)

Authentication Setup

CXone uses OAuth 2.0 client credentials flow. The service must cache the access token and refresh it before expiration. The token response includes an expires_in field measured in seconds. The code below implements a TTL cache with a sixty-second safety buffer to prevent edge-case expiration during active polling.

const axios = require('axios');
require('dotenv').config();

const CXONE_BASE = process.env.CXONE_BASE_URL || 'https://api.cxone.com';
const CLIENT_ID = process.env.CXONE_CLIENT_ID;
const CLIENT_SECRET = process.env.CXONE_CLIENT_SECRET;

let accessToken = null;
let tokenExpiry = 0;

async function getOAuthToken() {
  if (accessToken && Date.now() < tokenExpiry) {
    return accessToken;
  }

  const payload = new URLSearchParams();
  payload.append('grant_type', 'client_credentials');
  payload.append('client_id', CLIENT_ID);
  payload.append('client_secret', CLIENT_SECRET);

  const response = await axios.post(`${CXONE_BASE}/oauth/token`, payload, {
    headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
  });

  accessToken = response.data.access_token;
  tokenExpiry = Date.now() + (response.data.expires_in * 1000) - 60000;
  return accessToken;
}

async function cxoneRequest(method, path, body = null) {
  const token = await getOAuthToken();
  const config = {
    headers: {
      'Authorization': `Bearer ${token}`,
      'Content-Type': 'application/json',
      'Accept': 'application/json'
    }
  };

  if (body) config.data = body;
  const url = `${CXONE_BASE}${path}`;
  return axios[method](url, body, config);
}

Implementation

Step 1: Poll Queue Metrics for Real-Time Load

The CXone Routing API exposes current queue statistics at /api/v2/routing/queues/{queueId}/metrics. This endpoint returns waiting, in-progress, and abandoned counts. The least-connections algorithm requires the total active connections (in_progress + waiting) per queue. The code fetches metrics concurrently for all target queues and normalizes the response.

async function fetchQueueMetrics(queueIds) {
  const metricsRequests = queueIds.map(id => 
    cxoneRequest('get', `/api/v2/routing/queues/${id}/metrics`)
      .then(res => ({ id, metrics: res.data }))
      .catch(err => {
        console.error(`Failed to fetch metrics for ${id}: ${err.message}`);
        return { id, metrics: null, error: err.message };
      })
  );

  const results = await Promise.all(metricsRequests);
  return results.filter(r => r.metrics !== null);
}

Required OAuth scope: routing:queue:read

Expected response body:

{
  "waiting": 8,
  "in_progress": 24,
  "abandoned": 2,
  "total": 34
}

Step 2: Calculate Load Distribution Using Least-Connections

The least-connections algorithm assigns higher routing percentages to queues with fewer active sessions. The calculation uses inverse load weighting. A minimum floor prevents starving underutilized queues. The algorithm normalizes weights into percentages that sum to one hundred.

function calculateSplitPercentages(queueMetrics, minPercentage = 10) {
  // Calculate inverse load weight for each queue
  const weights = queueMetrics.map(q => {
    const activeConnections = (q.metrics.in_progress || 0) + (q.metrics.waiting || 0);
    return {
      id: q.id,
      weight: 1 / (activeConnections + 1),
      activeConnections
    };
  });

  const totalWeight = weights.reduce((sum, w) => sum + w.weight, 0);
  
  // Normalize to percentages
  let percentages = weights.map(w => ({
    id: w.id,
    percentage: Math.round((w.weight / totalWeight) * 100)
  }));

  // Apply minimum floor and redistribute remainder
  const belowFloor = percentages.filter(p => p.percentage < minPercentage);
  const aboveFloor = percentages.filter(p => p.percentage >= minPercentage);
  
  belowFloor.forEach(p => p.percentage = minPercentage);
  
  const currentSum = percentages.reduce((sum, p) => sum + p.percentage, 0);
  const remainder = 100 - currentSum;
  
  if (remainder > 0 && aboveFloor.length > 0) {
    const addPerQueue = Math.floor(remainder / aboveFloor.length);
    const extra = remainder % aboveFloor.length;
    aboveFloor.forEach((p, idx) => {
      p.percentage += addPerQueue + (idx < extra ? 1 : 0);
    });
  }

  return percentages;
}

Step 3: Apply Updates with ETag Conflict Resolution

CXone queue configuration lives at /api/v2/routing/queues/{queueId}. The API supports optimistic concurrency control via ETag headers. The service must fetch the current queue configuration, extract the ETag, modify the routing.split array, and send the update with If-Match: <etag>. If another process modified the queue between fetch and update, the API returns 412 Precondition Failed.

async function updateQueueSplit(queueId, newPercentage, currentEtag) {
  // Fetch current queue configuration to preserve existing structure
  const queueResponse = await cxoneRequest('get', `/api/v2/routing/queues/${queueId}`);
  const queueConfig = queueResponse.data;
  const serverEtag = queueResponse.headers['etag'];

  // Conflict resolution: abort if ETag changed since last read
  if (currentEtag && serverEtag !== currentEtag) {
    throw new Error(`ETag mismatch for queue ${queueId}. Expected ${currentEtag}, got ${serverEtag}`);
  }

  // Ensure routing.split exists
  if (!queueConfig.routing) queueConfig.routing = {};
  if (!Array.isArray(queueConfig.routing.split)) queueConfig.routing.split = [];

  // Update or create the split entry for this queue
  const splitIndex = queueConfig.routing.split.findIndex(s => s.destination?.id === queueId);
  if (splitIndex >= 0) {
    queueConfig.routing.split[splitIndex].percentage = newPercentage;
  } else {
    queueConfig.routing.split.push({
      destination: { id: queueId, type: 'queue' },
      percentage: newPercentage
    });
  }

  // Apply PUT with If-Match header
  const updateResponse = await cxoneRequest('put', `/api/v2/routing/queues/${queueId}`, queueConfig, {
    headers: { 'If-Match': serverEtag }
  });

  return {
    queueId,
    newPercentage,
    updatedEtag: updateResponse.headers['etag'],
    success: true
  };
}

Required OAuth scope: routing:queue:write

Step 4: Handle API Quota Exhaustion with BullMQ and Export Decisions

CXone enforces API quotas per tenant. When the service hits a 429 Too Many Requests response, it must defer the update rather than fail immediately. BullMQ provides a reliable job queue with built-in retry mechanisms. The worker processes update jobs, catches 429 responses, and retries with exponential backoff. Completed decisions export to a JSON log and optionally POST to a dashboard webhook.

const { Queue, Worker } = require('bullmq');
const fs = require('fs');
const path = require('path');

const updateQueue = new Queue('cxone_queue_updates', { connection: { host: 'localhost', port: 6379 } });

async function processQueueUpdate(job) {
  const { queueId, newPercentage, currentEtag } = job.data;
  let attempts = 0;
  const maxAttempts = 5;

  while (attempts < maxAttempts) {
    try {
      const result = await updateQueueSplit(queueId, newPercentage, currentEtag);
      
      // Export decision to monitoring dashboard
      const decisionLog = {
        timestamp: new Date().toISOString(),
        queueId,
        newPercentage,
        previousEtag: currentEtag,
        updatedEtag: result.updatedEtag,
        status: 'SUCCESS'
      };
      
      appendDecisionLog(decisionLog);
      await postToDashboard(decisionLog);
      
      return result;
    } catch (error) {
      const axiosError = error.response ? error.response : null;
      
      if (axiosError && axiosError.status === 429) {
        attempts++;
        const delay = Math.pow(2, attempts) * 1000;
        console.warn(`429 Quota exceeded for ${queueId}. Retrying in ${delay}ms (attempt ${attempts}/${maxAttempts})`);
        await new Promise(resolve => setTimeout(resolve, delay));
        continue;
      }
      
      if (axiosError && axiosError.status === 412) {
        console.error(`412 Conflict for ${queueId}. Another process modified the queue.`);
        return { queueId, status: 'CONFLICT', error: error.message };
      }
      
      throw error;
    }
  }
  throw new Error(`Max retry attempts reached for queue ${queueId}`);
}

async function appendDecisionLog(decision) {
  const logPath = path.join(__dirname, 'load_balancing_decisions.json');
  const exists = fs.existsSync(logPath);
  const content = exists ? JSON.parse(fs.readFileSync(logPath, 'utf8')) : [];
  content.push(decision);
  fs.writeFileSync(logPath, JSON.stringify(content, null, 2));
}

async function postToDashboard(decision) {
  const webhookUrl = process.env.DASHBOARD_WEBHOOK_URL;
  if (!webhookUrl) return;
  
  try {
    await axios.post(webhookUrl, decision, { timeout: 5000 });
  } catch (err) {
    console.warn(`Dashboard export failed: ${err.message}`);
  }
}

Complete Working Example

The following script combines authentication, metric polling, algorithm calculation, BullMQ job enqueueing, and worker processing. Replace the environment variables in the .env file before execution.

const axios = require('axios');
const { Queue, Worker } = require('bullmq');
const fs = require('fs');
const path = require('path');
require('dotenv').config();

const CXONE_BASE = process.env.CXONE_BASE_URL || 'https://api.cxone.com';
const CLIENT_ID = process.env.CXONE_CLIENT_ID;
const CLIENT_SECRET = process.env.CXONE_CLIENT_SECRET;
const QUEUE_IDS = (process.env.QUEUE_IDS || '').split(',').filter(Boolean);

let accessToken = null;
let tokenExpiry = 0;

async function getOAuthToken() {
  if (accessToken && Date.now() < tokenExpiry) return accessToken;
  const payload = new URLSearchParams();
  payload.append('grant_type', 'client_credentials');
  payload.append('client_id', CLIENT_ID);
  payload.append('client_secret', CLIENT_SECRET);
  
  const response = await axios.post(`${CXONE_BASE}/oauth/token`, payload, {
    headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
  });
  
  accessToken = response.data.access_token;
  tokenExpiry = Date.now() + (response.data.expires_in * 1000) - 60000;
  return accessToken;
}

async function cxoneRequest(method, path, body = null, extraHeaders = {}) {
  const token = await getOAuthToken();
  const config = {
    headers: {
      'Authorization': `Bearer ${token}`,
      'Content-Type': 'application/json',
      'Accept': 'application/json',
      ...extraHeaders
    }
  };
  if (body) config.data = body;
  return axios[method](`${CXONE_BASE}${path}`, body, config);
}

async function fetchQueueMetrics(queueIds) {
  const results = await Promise.all(queueIds.map(async (id) => {
    try {
      const res = await cxoneRequest('get', `/api/v2/routing/queues/${id}/metrics`);
      return { id, metrics: res.data };
    } catch (err) {
      console.error(`Metrics fetch failed for ${id}: ${err.message}`);
      return null;
    }
  }));
  return results.filter(Boolean);
}

function calculateSplitPercentages(queueMetrics, minPercentage = 10) {
  const weights = queueMetrics.map(q => {
    const active = (q.metrics.in_progress || 0) + (q.metrics.waiting || 0);
    return { id: q.id, weight: 1 / (active + 1) };
  });
  
  const totalWeight = weights.reduce((sum, w) => sum + w.weight, 0);
  let percentages = weights.map(w => ({
    id: w.id,
    percentage: Math.round((w.weight / totalWeight) * 100)
  }));
  
  const belowFloor = percentages.filter(p => p.percentage < minPercentage);
  const aboveFloor = percentages.filter(p => p.percentage >= minPercentage);
  belowFloor.forEach(p => p.percentage = minPercentage);
  
  const currentSum = percentages.reduce((sum, p) => sum + p.percentage, 0);
  const remainder = 100 - currentSum;
  
  if (remainder > 0 && aboveFloor.length > 0) {
    const addPerQueue = Math.floor(remainder / aboveFloor.length);
    const extra = remainder % aboveFloor.length;
    aboveFloor.forEach((p, idx) => { p.percentage += addPerQueue + (idx < extra ? 1 : 0); });
  }
  
  return percentages;
}

async function updateQueueSplit(queueId, newPercentage, currentEtag) {
  const queueResponse = await cxoneRequest('get', `/api/v2/routing/queues/${queueId}`);
  const queueConfig = queueResponse.data;
  const serverEtag = queueResponse.headers['etag'];
  
  if (currentEtag && serverEtag !== currentEtag) {
    throw new Error(`ETag mismatch for ${queueId}`);
  }
  
  if (!queueConfig.routing) queueConfig.routing = {};
  if (!Array.isArray(queueConfig.routing.split)) queueConfig.routing.split = [];
  
  const idx = queueConfig.routing.split.findIndex(s => s.destination?.id === queueId);
  if (idx >= 0) {
    queueConfig.routing.split[idx].percentage = newPercentage;
  } else {
    queueConfig.routing.split.push({ destination: { id: queueId, type: 'queue' }, percentage: newPercentage });
  }
  
  const updateResponse = await cxoneRequest('put', `/api/v2/routing/queues/${queueId}`, queueConfig, {
    'If-Match': serverEtag
  });
  
  return { queueId, newPercentage, updatedEtag: updateResponse.headers['etag'] };
}

const updateJobQueue = new Queue('cxone_queue_updates', { connection: { host: 'localhost', port: 6379 } });

async function processQueueUpdate(job) {
  const { queueId, newPercentage, currentEtag } = job.data;
  let attempts = 0;
  
  while (attempts < 5) {
    try {
      const result = await updateQueueSplit(queueId, newPercentage, currentEtag);
      const decision = { timestamp: new Date().toISOString(), queueId, newPercentage, status: 'SUCCESS' };
      fs.appendFileSync('decisions.log', JSON.stringify(decision) + '\n');
      return result;
    } catch (error) {
      const status = error.response?.status;
      if (status === 429) {
        attempts++;
        await new Promise(r => setTimeout(r, Math.pow(2, attempts) * 1000));
        continue;
      }
      if (status === 412) throw new Error('Concurrency conflict detected');
      throw error;
    }
  }
  throw new Error('Max retries exceeded');
}

const worker = new Worker('cxone_queue_updates', processQueueUpdate, { connection: { host: 'localhost', port: 6379 } });

async function runBalancer() {
  console.log('Starting CXone Queue Balancer...');
  
  setInterval(async () => {
    if (QUEUE_IDS.length === 0) return;
    
    const metrics = await fetchQueueMetrics(QUEUE_IDS);
    if (metrics.length === 0) return;
    
    const splits = calculateSplitPercentages(metrics);
    
    for (const split of splits) {
      await updateJobQueue.add('update_split', {
        queueId: split.id,
        newPercentage: split.percentage,
        currentEtag: null
      }, { removeOnComplete: true, attempts: 5, backoff: { type: 'exponential', delay: 1000 } });
    }
    
    console.log(`Evaluated splits at ${new Date().toISOString()}`);
  }, 15000);
}

runBalancer().catch(console.error);

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The OAuth token has expired, the client credentials are incorrect, or the token endpoint returned an invalid payload.
  • How to fix it: Verify CXONE_CLIENT_ID and CXONE_CLIENT_SECRET in the environment. Ensure the token cache refreshes before expires_in elapses. The provided code subtracts sixty seconds from the expiry timestamp to prevent mid-request expiration.
  • Code showing the fix: The getOAuthToken function checks Date.now() < tokenExpiry and forces a fresh POST to /oauth/token when the threshold is crossed.

Error: 403 Forbidden

  • What causes it: The OAuth client lacks the required scopes. Queue metrics require routing:queue:read. Queue configuration updates require routing:queue:write.
  • How to fix it: Navigate to the CXone Admin Console, locate the OAuth client, and append both scopes to the client configuration. Restart the Node.js service to trigger a new token request.
  • Code showing the fix: No code change is required. The scopes are granted at the CXone tenant level. The service will automatically use the new scopes on the next token refresh cycle.

Error: 412 Precondition Failed

  • What causes it: The ETag header sent in If-Match does not match the current server version. Another admin or automation modified the queue between the GET and PUT requests.
  • How to fix it: Implement a retry loop that fetches the latest configuration, recalculates the split, and attempts the PUT again. The BullMQ worker should treat 412 as a terminal failure for that job instance and log the conflict.
  • Code showing the fix: The updateQueueSplit function throws immediately on 412. The worker catches this, marks the job as failed, and allows the next polling cycle to fetch fresh state.

Error: 429 Too Many Requests

  • What causes it: The tenant API quota has been exhausted. CXone enforces rate limits per client and per tenant. Burst polling across multiple queues triggers this limit.
  • How to fix it: Defer updates using BullMQ with exponential backoff. The worker sleeps for 2^n seconds between retries. Reduce polling frequency if quota exhaustion persists.
  • Code showing the fix: The processQueueUpdate worker contains a while (attempts < 5) loop that catches status === 429, increments the attempt counter, and delays execution before retrying.

Official References