Streaming Genesys Cloud Analytics Notification Events to Kafka via Node.js
What You Will Build
- A Node.js service that subscribes to Genesys Cloud Analytics Notification events and forwards them to a specific Apache Kafka topic.
- This implementation uses the Genesys Cloud REST API for subscription management and the Node.js SDK for event consumption.
- The code is written in TypeScript/JavaScript using
axios,purecloud-platform-client-v2, andkafkajs.
Prerequisites
- OAuth Client Type: A Genesys Cloud OAuth client with
publicorconfidentialtype. - Required Scopes:
analytics:read,notification:subscribe. - SDK Version:
@genesyscloud/purecloud-platform-client-v2v2.0.0 or later. - Runtime: Node.js 18+ (for native
fetchsupport, thoughaxiosis used here for robustness). - External Dependencies:
axiosfor HTTP requests.@genesyscloud/purecloud-platform-client-v2for SDK operations.kafkajsfor Kafka producer integration.dotenvfor environment variable management.
Install dependencies with:
npm install axios @genesyscloud/purecloud-platform-client-v2 kafkajs dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. For a background service, the Client Credentials flow is the standard. You must store your Client ID and Client Secret securely.
The following function handles token acquisition and basic caching. In production, implement an in-memory cache with TTL expiration rather than fetching a new token on every request.
// auth.js
const axios = require('axios');
const GENESYS_CLOUD_REGION = process.env.GENESYS_CLOUD_REGION || 'us-east-1';
const CLIENT_ID = process.env.GENESYS_CLOUD_CLIENT_ID;
const CLIENT_SECRET = process.env.GENESYS_CLOUD_CLIENT_SECRET;
const BASE_URL = `https://${GENESYS_CLOUD_REGION}.mypurecloud.com`;
const TOKEN_URL = `${BASE_URL}/oauth/token`;
let accessToken = null;
let tokenExpiry = 0;
async function getAccessToken() {
// Return cached token if valid
if (accessToken && Date.now() < tokenExpiry) {
return accessToken;
}
try {
const response = await axios.post(
TOKEN_URL,
new URLSearchParams({
grant_type: 'client_credentials',
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
}
);
accessToken = response.data.access_token;
// Expire 60 seconds before actual expiry to allow for race conditions
tokenExpiry = Date.now() + (response.data.expires_in - 60) * 1000;
return accessToken;
} catch (error) {
console.error('Failed to obtain OAuth token:', error.response?.data || error.message);
throw new Error('Authentication failed');
}
}
module.exports = { getAccessToken, BASE_URL };
Implementation
Step 1: Create the Analytics Notification Subscription
Before consuming events, you must register a subscription with the Genesys Cloud Notification API. This returns a subscriptionId and potentially a callbackUrl if you were using HTTP callbacks. Since we are using the SDK to poll/consume directly, we focus on the subscription creation.
We will create a subscription for analytics.conversation events. Note that the resourceTypes array determines which specific analytics data triggers the event.
// subscription.js
const axios = require('axios');
const { getAccessToken, BASE_URL } = require('./auth');
async function createAnalyticsSubscription() {
const token = await getAccessToken();
const SUBSCRIPTIONS_URL = `${BASE_URL}/api/v2/analytics/notifications/subscriptions`;
const payload = {
name: 'kafka-streaming-sub',
resourceTypes: [
'analytics.conversation.summary' // Triggers on completed conversations
],
filters: [
{
type: 'conversation',
filters: [
{
field: 'direction',
operator: 'eq',
values: ['inbound'] // Only inbound calls/chats
}
]
}
],
// Note: For direct SDK consumption, callbackUrl is often omitted or used for webhooks.
// The SDK method 'getNotificationsSubscriptionEvents' pulls from this subscription.
callbackUrl: null
};
try {
const response = await axios.post(SUBSCRIPTIONS_URL, payload, {
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json'
}
});
console.log('Subscription created:', response.data);
return response.data.id;
} catch (error) {
// 409 Conflict means subscription might already exist with same name/filters
if (error.response?.status === 409) {
console.warn('Subscription may already exist. Check existing subscriptions.');
return null;
}
throw error;
}
}
module.exports = { createAnalyticsSubscription };
OAuth Scope Required: analytics:read
Expected Response Body:
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "kafka-streaming-sub",
"resourceTypes": ["analytics.conversation.summary"],
"filters": [...],
"callbackUrl": null,
"status": "active"
}
Step 2: Initialize Kafka Producer
We need a robust Kafka producer that can handle asynchronous writes without blocking the event loop. We will use kafkajs.
// kafkaProducer.js
const { Kafka } = require('kafkajs');
const KAFKA_BROKERS = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const KAFKA_TOPIC = process.env.KAFKA_TOPIC || 'genesys-analytics-events';
const kafka = new Kafka({
clientId: 'genesys-consumer',
brokers: KAFKA_BROKERS
});
const producer = kafka.producer();
async function initKafka() {
console.log('Connecting to Kafka...');
await producer.connect();
console.log('Kafka producer connected.');
}
async function sendEventToKafka(eventData) {
try {
await producer.send({
topic: KAFKA_TOPIC,
messages: [
{
// Key can be used for partitioning strategy, e.g., conversation ID
key: eventData.conversationId || 'default',
value: JSON.stringify(eventData)
}
]
});
// Log success at debug level to avoid noise
// console.log('Event sent to Kafka:', eventData.conversationId);
} catch (error) {
console.error('Failed to send event to Kafka:', error.message);
// In production, consider implementing a retry mechanism or dead-letter queue here
}
}
module.exports = { initKafka, sendEventToKafka };
Step 3: Consume Events via Genesys Cloud SDK
The core logic involves using the Genesys Cloud Node.js SDK to poll for new notifications. The SDK provides a method getNotificationsSubscriptionEvents which returns a list of events since the last poll or based on a cursor.
Critical Note: The SDK’s notification consumer is long-polling by nature when used correctly, but the REST API endpoint /api/v2/analytics/notifications/events is typically polled. To avoid tight loops, we implement an exponential backoff or a fixed delay between polls.
// eventConsumer.js
const { PureCloudPlatformClientV2 } = require('@genesyscloud/purecloud-platform-client-v2');
const { getAccessToken } = require('./auth');
const { sendEventToKafka } = require('./kafkaProducer');
const platformClient = new PureCloudPlatformClientV2();
const analyticsApi = new PureCloudPlatformClientV2.AnalyticsApi();
const SUBSCRIPTION_ID = process.env.GENESYS_SUBSCRIPTION_ID; // Must be set from Step 1
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '10000', 10); // 10 seconds default
let lastCursor = null;
async function startConsuming() {
if (!SUBSCRIPTION_ID) {
throw new Error('GENESYS_SUBSCRIPTION_ID is not set');
}
console.log(`Starting event consumer for subscription: ${SUBSCRIPTION_ID}`);
while (true) {
try {
const token = await getAccessToken();
platformClient.setAuthOptions({ token });
// Parameters for getNotificationsSubscriptionEvents
const queryParams = {
subscriptionId: SUBSCRIPTION_ID,
// cursor: lastCursor, // Optional: Use cursor for precise continuity
// count: 100 // Max events to fetch per call
};
// Fetch events
const response = await analyticsApi.getNotificationsSubscriptionEvents(queryParams);
if (response.body && response.body.events && response.body.events.length > 0) {
console.log(`Fetched ${response.body.events.length} events`);
// Update cursor for next poll to ensure no duplicates/misses
lastCursor = response.body.cursor;
// Process each event
for (const event of response.body.events) {
await processEvent(event);
}
} else {
// No events, wait for next poll
// console.log('No new events.');
}
} catch (error) {
console.error('Error polling for events:', error.response?.data || error.message);
// Handle specific HTTP errors
if (error.response?.status === 401) {
console.error('Token expired or invalid. Refreshing...');
// The next iteration will fetch a new token via getAccessToken()
} else if (error.response?.status === 429) {
console.warn('Rate limited. Waiting 60 seconds before retry.');
await new Promise(resolve => setTimeout(resolve, 60000));
}
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
}
}
async function processEvent(event) {
// Event structure varies by resourceType.
// For analytics.conversation.summary, the data is in event.data
const eventData = event.data;
// Transform data if necessary before sending to Kafka
const kafkaPayload = {
eventType: event.resourceType,
timestamp: event.timestamp,
conversationId: eventData.conversationId,
summary: eventData.summary,
rawEvent: event // Include raw event for debugging if needed
};
// Send to Kafka
await sendEventToKafka(kafkaPayload);
}
module.exports = { startConsuming };
OAuth Scope Required: notification:subscribe
Expected Response Body (from SDK):
{
"events": [
{
"resourceType": "analytics.conversation.summary",
"timestamp": "2023-10-27T10:00:00.000Z",
"data": {
"conversationId": "12345678-1234-1234-1234-123456789012",
"summary": {
"totalHandleTime": 12000,
"totalTalkTime": 10000,
"totalHoldTime": 2000
}
}
}
],
"cursor": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}
Complete Working Example
Create a file named index.js to orchestrate the entire flow.
// index.js
require('dotenv').config();
const { createAnalyticsSubscription } = require('./subscription');
const { initKafka } = require('./kafkaProducer');
const { startConsuming } = require('./eventConsumer');
async function main() {
console.log('Initializing Genesys Cloud Analytics to Kafka Streamer...');
try {
// 1. Initialize Kafka Connection
await initKafka();
// 2. Create Subscription if not already done manually
// Note: In production, you likely create the subscription once via CLI or admin script.
// Here we attempt to create it to ensure it exists.
const subId = await createAnalyticsSubscription();
if (subId) {
console.log(`Using Subscription ID: ${subId}`);
// If you created it dynamically, you must set the env var or pass it to the consumer
// For this example, we assume the env var GENESYS_SUBSCRIPTION_ID is set to the known ID
}
// 3. Start Consuming Events
await startConsuming();
} catch (error) {
console.error('Fatal error in main process:', error);
process.exit(1);
}
}
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down...');
process.exit(0);
});
main();
Create a .env file in the root directory:
# Genesys Cloud Credentials
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
GENESYS_CLOUD_REGION=us-east-1
# Subscription ID (Create this once via Step 1 and paste the ID here)
GENESYS_SUBSCRIPTION_ID=a1b2c3d4-e5f6-7890-abcd-ef1234567890
# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=genesys-analytics-events
# Polling
POLL_INTERVAL_MS=10000
Run the application:
node index.js
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or was never successfully retrieved.
- Fix: Ensure
GENESYS_CLOUD_CLIENT_IDandGENESYS_CLOUD_CLIENT_SECRETare correct. Check that the client has theanalytics:readandnotification:subscribescopes assigned in the Genesys Cloud Admin Console under Platform > OAuth. - Code Fix: The
getAccessTokenfunction automatically retries. If it fails consistently, log the raw response body to check for specific error codes likeinvalid_client.
Error: 403 Forbidden
- Cause: The OAuth client lacks the necessary permissions, or the user associated with the client (if using user-based auth) does not have access to Analytics.
- Fix: Verify the OAuth Client Permissions in Genesys Cloud. Ensure the “Analytics” permission set includes “Read Analytics” and “Subscribe to Notifications”.
Error: 429 Too Many Requests
- Cause: Polling too frequently. Genesys Cloud enforces rate limits on the Notification API.
- Fix: Increase
POLL_INTERVAL_MS. Do not poll faster than 5 seconds. The code above implements a 60-second backoff on 429 errors.
Error: Kafka Producer Connection Timeout
- Cause: The Kafka broker is unreachable or the
KAFKA_BROKERSenvironment variable is incorrect. - Fix: Verify network connectivity to the Kafka brokers. Ensure the
kafkajsclient can resolve the hostnames.
Error: Event Data is Empty
- Cause: No conversations match the filters defined in the subscription.
- Fix: Check the
filtersincreateAnalyticsSubscription. Ensure you are generating traffic that matches the criteria (e.g., inbound calls).