Streaming Genesys Cloud Analytics Notification Events to Kafka via Node.js

Streaming Genesys Cloud Analytics Notification Events to Kafka via Node.js

What You Will Build

  • A Node.js service that subscribes to Genesys Cloud Analytics Notification events and forwards them to a specific Apache Kafka topic.
  • This implementation uses the Genesys Cloud REST API for subscription management and the Node.js SDK for event consumption.
  • The code is written in TypeScript/JavaScript using axios, purecloud-platform-client-v2, and kafkajs.

Prerequisites

  • OAuth Client Type: A Genesys Cloud OAuth client with public or confidential type.
  • Required Scopes: analytics:read, notification:subscribe.
  • SDK Version: @genesyscloud/purecloud-platform-client-v2 v2.0.0 or later.
  • Runtime: Node.js 18+ (for native fetch support, though axios is used here for robustness).
  • External Dependencies:
    • axios for HTTP requests.
    • @genesyscloud/purecloud-platform-client-v2 for SDK operations.
    • kafkajs for Kafka producer integration.
    • dotenv for environment variable management.

Install dependencies with:

npm install axios @genesyscloud/purecloud-platform-client-v2 kafkajs dotenv

Authentication Setup

Genesys Cloud uses OAuth 2.0. For a background service, the Client Credentials flow is the standard. You must store your Client ID and Client Secret securely.

The following function handles token acquisition and basic caching. In production, implement an in-memory cache with TTL expiration rather than fetching a new token on every request.

// auth.js
const axios = require('axios');

const GENESYS_CLOUD_REGION = process.env.GENESYS_CLOUD_REGION || 'us-east-1';
const CLIENT_ID = process.env.GENESYS_CLOUD_CLIENT_ID;
const CLIENT_SECRET = process.env.GENESYS_CLOUD_CLIENT_SECRET;

const BASE_URL = `https://${GENESYS_CLOUD_REGION}.mypurecloud.com`;
const TOKEN_URL = `${BASE_URL}/oauth/token`;

let accessToken = null;
let tokenExpiry = 0;

async function getAccessToken() {
  // Return cached token if valid
  if (accessToken && Date.now() < tokenExpiry) {
    return accessToken;
  }

  try {
    const response = await axios.post(
      TOKEN_URL,
      new URLSearchParams({
        grant_type: 'client_credentials',
        client_id: CLIENT_ID,
        client_secret: CLIENT_SECRET,
      }),
      {
        headers: {
          'Content-Type': 'application/x-www-form-urlencoded',
        },
      }
    );

    accessToken = response.data.access_token;
    // Expire 60 seconds before actual expiry to allow for race conditions
    tokenExpiry = Date.now() + (response.data.expires_in - 60) * 1000;
    
    return accessToken;
  } catch (error) {
    console.error('Failed to obtain OAuth token:', error.response?.data || error.message);
    throw new Error('Authentication failed');
  }
}

module.exports = { getAccessToken, BASE_URL };

Implementation

Step 1: Create the Analytics Notification Subscription

Before consuming events, you must register a subscription with the Genesys Cloud Notification API. This returns a subscriptionId and potentially a callbackUrl if you were using HTTP callbacks. Since we are using the SDK to poll/consume directly, we focus on the subscription creation.

We will create a subscription for analytics.conversation events. Note that the resourceTypes array determines which specific analytics data triggers the event.

// subscription.js
const axios = require('axios');
const { getAccessToken, BASE_URL } = require('./auth');

async function createAnalyticsSubscription() {
  const token = await getAccessToken();
  const SUBSCRIPTIONS_URL = `${BASE_URL}/api/v2/analytics/notifications/subscriptions`;

  const payload = {
    name: 'kafka-streaming-sub',
    resourceTypes: [
      'analytics.conversation.summary' // Triggers on completed conversations
    ],
    filters: [
      {
        type: 'conversation',
        filters: [
          {
            field: 'direction',
            operator: 'eq',
            values: ['inbound'] // Only inbound calls/chats
          }
        ]
      }
    ],
    // Note: For direct SDK consumption, callbackUrl is often omitted or used for webhooks.
    // The SDK method 'getNotificationsSubscriptionEvents' pulls from this subscription.
    callbackUrl: null 
  };

  try {
    const response = await axios.post(SUBSCRIPTIONS_URL, payload, {
      headers: {
        'Authorization': `Bearer ${token}`,
        'Content-Type': 'application/json'
      }
    });

    console.log('Subscription created:', response.data);
    return response.data.id;
  } catch (error) {
    // 409 Conflict means subscription might already exist with same name/filters
    if (error.response?.status === 409) {
      console.warn('Subscription may already exist. Check existing subscriptions.');
      return null;
    }
    throw error;
  }
}

module.exports = { createAnalyticsSubscription };

OAuth Scope Required: analytics:read

Expected Response Body:

{
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "name": "kafka-streaming-sub",
  "resourceTypes": ["analytics.conversation.summary"],
  "filters": [...],
  "callbackUrl": null,
  "status": "active"
}

Step 2: Initialize Kafka Producer

We need a robust Kafka producer that can handle asynchronous writes without blocking the event loop. We will use kafkajs.

// kafkaProducer.js
const { Kafka } = require('kafkajs');

const KAFKA_BROKERS = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const KAFKA_TOPIC = process.env.KAFKA_TOPIC || 'genesys-analytics-events';

const kafka = new Kafka({
  clientId: 'genesys-consumer',
  brokers: KAFKA_BROKERS
});

const producer = kafka.producer();

async function initKafka() {
  console.log('Connecting to Kafka...');
  await producer.connect();
  console.log('Kafka producer connected.');
}

async function sendEventToKafka(eventData) {
  try {
    await producer.send({
      topic: KAFKA_TOPIC,
      messages: [
        {
          // Key can be used for partitioning strategy, e.g., conversation ID
          key: eventData.conversationId || 'default',
          value: JSON.stringify(eventData)
        }
      ]
    });
    // Log success at debug level to avoid noise
    // console.log('Event sent to Kafka:', eventData.conversationId);
  } catch (error) {
    console.error('Failed to send event to Kafka:', error.message);
    // In production, consider implementing a retry mechanism or dead-letter queue here
  }
}

module.exports = { initKafka, sendEventToKafka };

Step 3: Consume Events via Genesys Cloud SDK

The core logic involves using the Genesys Cloud Node.js SDK to poll for new notifications. The SDK provides a method getNotificationsSubscriptionEvents which returns a list of events since the last poll or based on a cursor.

Critical Note: The SDK’s notification consumer is long-polling by nature when used correctly, but the REST API endpoint /api/v2/analytics/notifications/events is typically polled. To avoid tight loops, we implement an exponential backoff or a fixed delay between polls.

// eventConsumer.js
const { PureCloudPlatformClientV2 } = require('@genesyscloud/purecloud-platform-client-v2');
const { getAccessToken } = require('./auth');
const { sendEventToKafka } = require('./kafkaProducer');

const platformClient = new PureCloudPlatformClientV2();
const analyticsApi = new PureCloudPlatformClientV2.AnalyticsApi();

const SUBSCRIPTION_ID = process.env.GENESYS_SUBSCRIPTION_ID; // Must be set from Step 1
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '10000', 10); // 10 seconds default

let lastCursor = null;

async function startConsuming() {
  if (!SUBSCRIPTION_ID) {
    throw new Error('GENESYS_SUBSCRIPTION_ID is not set');
  }

  console.log(`Starting event consumer for subscription: ${SUBSCRIPTION_ID}`);

  while (true) {
    try {
      const token = await getAccessToken();
      platformClient.setAuthOptions({ token });

      // Parameters for getNotificationsSubscriptionEvents
      const queryParams = {
        subscriptionId: SUBSCRIPTION_ID,
        // cursor: lastCursor, // Optional: Use cursor for precise continuity
        // count: 100 // Max events to fetch per call
      };

      // Fetch events
      const response = await analyticsApi.getNotificationsSubscriptionEvents(queryParams);
      
      if (response.body && response.body.events && response.body.events.length > 0) {
        console.log(`Fetched ${response.body.events.length} events`);
        
        // Update cursor for next poll to ensure no duplicates/misses
        lastCursor = response.body.cursor;

        // Process each event
        for (const event of response.body.events) {
          await processEvent(event);
        }
      } else {
        // No events, wait for next poll
        // console.log('No new events.');
      }

    } catch (error) {
      console.error('Error polling for events:', error.response?.data || error.message);
      
      // Handle specific HTTP errors
      if (error.response?.status === 401) {
        console.error('Token expired or invalid. Refreshing...');
        // The next iteration will fetch a new token via getAccessToken()
      } else if (error.response?.status === 429) {
        console.warn('Rate limited. Waiting 60 seconds before retry.');
        await new Promise(resolve => setTimeout(resolve, 60000));
      }
    }

    // Wait before next poll
    await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
  }
}

async function processEvent(event) {
  // Event structure varies by resourceType. 
  // For analytics.conversation.summary, the data is in event.data
  const eventData = event.data;

  // Transform data if necessary before sending to Kafka
  const kafkaPayload = {
    eventType: event.resourceType,
    timestamp: event.timestamp,
    conversationId: eventData.conversationId,
    summary: eventData.summary,
    rawEvent: event // Include raw event for debugging if needed
  };

  // Send to Kafka
  await sendEventToKafka(kafkaPayload);
}

module.exports = { startConsuming };

OAuth Scope Required: notification:subscribe

Expected Response Body (from SDK):

{
  "events": [
    {
      "resourceType": "analytics.conversation.summary",
      "timestamp": "2023-10-27T10:00:00.000Z",
      "data": {
        "conversationId": "12345678-1234-1234-1234-123456789012",
        "summary": {
          "totalHandleTime": 12000,
          "totalTalkTime": 10000,
          "totalHoldTime": 2000
        }
      }
    }
  ],
  "cursor": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}

Complete Working Example

Create a file named index.js to orchestrate the entire flow.

// index.js
require('dotenv').config();

const { createAnalyticsSubscription } = require('./subscription');
const { initKafka } = require('./kafkaProducer');
const { startConsuming } = require('./eventConsumer');

async function main() {
  console.log('Initializing Genesys Cloud Analytics to Kafka Streamer...');

  try {
    // 1. Initialize Kafka Connection
    await initKafka();

    // 2. Create Subscription if not already done manually
    // Note: In production, you likely create the subscription once via CLI or admin script.
    // Here we attempt to create it to ensure it exists.
    const subId = await createAnalyticsSubscription();
    if (subId) {
      console.log(`Using Subscription ID: ${subId}`);
      // If you created it dynamically, you must set the env var or pass it to the consumer
      // For this example, we assume the env var GENESYS_SUBSCRIPTION_ID is set to the known ID
    }

    // 3. Start Consuming Events
    await startConsuming();

  } catch (error) {
    console.error('Fatal error in main process:', error);
    process.exit(1);
  }
}

// Handle graceful shutdown
process.on('SIGINT', () => {
  console.log('Shutting down...');
  process.exit(0);
});

main();

Create a .env file in the root directory:

# Genesys Cloud Credentials
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
GENESYS_CLOUD_REGION=us-east-1

# Subscription ID (Create this once via Step 1 and paste the ID here)
GENESYS_SUBSCRIPTION_ID=a1b2c3d4-e5f6-7890-abcd-ef1234567890

# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=genesys-analytics-events

# Polling
POLL_INTERVAL_MS=10000

Run the application:

node index.js

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or was never successfully retrieved.
  • Fix: Ensure GENESYS_CLOUD_CLIENT_ID and GENESYS_CLOUD_CLIENT_SECRET are correct. Check that the client has the analytics:read and notification:subscribe scopes assigned in the Genesys Cloud Admin Console under Platform > OAuth.
  • Code Fix: The getAccessToken function automatically retries. If it fails consistently, log the raw response body to check for specific error codes like invalid_client.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the necessary permissions, or the user associated with the client (if using user-based auth) does not have access to Analytics.
  • Fix: Verify the OAuth Client Permissions in Genesys Cloud. Ensure the “Analytics” permission set includes “Read Analytics” and “Subscribe to Notifications”.

Error: 429 Too Many Requests

  • Cause: Polling too frequently. Genesys Cloud enforces rate limits on the Notification API.
  • Fix: Increase POLL_INTERVAL_MS. Do not poll faster than 5 seconds. The code above implements a 60-second backoff on 429 errors.

Error: Kafka Producer Connection Timeout

  • Cause: The Kafka broker is unreachable or the KAFKA_BROKERS environment variable is incorrect.
  • Fix: Verify network connectivity to the Kafka brokers. Ensure the kafkajs client can resolve the hostnames.

Error: Event Data is Empty

  • Cause: No conversations match the filters defined in the subscription.
  • Fix: Check the filters in createAnalyticsSubscription. Ensure you are generating traffic that matches the criteria (e.g., inbound calls).

Official References