Ingesting Genesys Cloud Analytics Interaction Data into Snowflake with Node.js

Ingesting Genesys Cloud Analytics Interaction Data into Snowflake with Node.js

What You Will Build

  • This pipeline extracts granular call metrics from Genesys Cloud, transforms nested JSON into flat CSV files, stages them in S3 with date-based partition keys, loads them into Snowflake using the official SDK, and validates row counts against API totals to guarantee data integrity.
  • The implementation relies on the Genesys Cloud REST API (/api/v2/analytics/conversations/details/query) and the @snowflake/snowflake-sdk for database operations.
  • All code is written in modern Node.js using async/await, axios for HTTP requests, and @aws-sdk/client-s3 for storage operations.

Prerequisites

  • Genesys Cloud OAuth2 Client Credentials grant configured with the analytics:conversation:read scope
  • Snowflake account with a dedicated warehouse, database, schema, and a database user possessing CREATE TABLE, INSERT, STAGE, and USAGE privileges
  • AWS S3 bucket with an IAM role or access key that grants s3:PutObject and s3:GetObject permissions
  • Node.js 18 or later
  • Required npm packages: @snowflake/snowflake-sdk, @aws-sdk/client-s3, axios, dotenv, uuid, dayjs

Authentication Setup

Genesys Cloud uses OAuth2 Client Credentials for service-to-service authentication. The token expires after 15 minutes, so the pipeline must cache the token and refresh it when expired. The following function handles token acquisition and implements exponential backoff for 429 Too Many Requests responses, which occur when the API rate limit is exceeded.

import axios from 'axios';
import dotenv from 'dotenv';
dotenv.config();

const GENESYS_BASE_URL = 'https://api.mypurecloud.com';
const OAUTH_ENDPOINT = `${GENESYS_BASE_URL}/api/v2/oauth/token`;

let cachedToken = null;
let tokenExpiry = 0;

async function getGenesysToken() {
  const now = Date.now();
  if (cachedToken && now < tokenExpiry) {
    return cachedToken;
  }

  const response = await axios.post(
    OAUTH_ENDPOINT,
    new URLSearchParams({
      grant_type: 'client_credentials',
      client_id: process.env.GENESYS_CLIENT_ID,
      client_secret: process.env.GENESYS_CLIENT_SECRET,
      scope: 'analytics:conversation:read',
    }),
    {
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      maxRedirects: 0,
    }
  );

  cachedToken = response.data.access_token;
  tokenExpiry = now + (response.data.expires_in * 1000) - 60000; // Refresh 60s early
  return cachedToken;
}

async function genesysFetch(url, options = {}) {
  const maxRetries = 5;
  let attempt = 0;

  while (attempt < maxRetries) {
    try {
      const token = await getGenesysToken();
      const response = await axios.get(url, {
        ...options,
        headers: {
          'Authorization': `Bearer ${token}`,
          'Content-Type': 'application/json',
          ...options.headers,
        },
      });
      return response;
    } catch (error) {
      if (error.response?.status === 429) {
        const retryAfter = error.response.headers['retry-after'] || Math.pow(2, attempt);
        console.log(`Rate limited (429). Retrying in ${retryAfter} seconds...`);
        await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
        attempt++;
      } else if (error.response?.status === 401) {
        cachedToken = null; // Force token refresh on next attempt
        attempt++;
      } else {
        throw error;
      }
    }
  }
  throw new Error('Max retries exceeded for Genesys Cloud API request.');
}

Implementation

Step 1: Query the Interaction Analytics API with Pagination

The /api/v2/analytics/conversations/details/query endpoint returns granular conversation data. The API returns a nextPageUri when additional results exist. The pipeline must follow this URI until pagination completes. The request body requires a date range, a prebuilt view, and a selection object. The analytics:conversation:read scope is mandatory.

async function fetchAllInteractions(startDate, endDate) {
  const queryEndpoint = `${GENESYS_BASE_URL}/api/v2/analytics/conversations/details/query`;
  const requestBody = {
    dateFrom: startDate,
    dateTo: endDate,
    view: 'callmetrics',
    selection: {
      groupBy: [],
      metrics: [
        'wrapupCode',
        'duration',
        'talkDuration',
        'holdDuration',
        'waitDuration',
        'direction',
        'queueName',
        'participantRole'
      ],
      filter: {
        and: [
          { type: 'interaction', field: 'type', op: 'eq', value: 'call' }
        ]
      }
    }
  };

  let nextPageUri = queryEndpoint;
  let allRecords = [];
  let totalApiCount = 0;

  while (nextPageUri) {
    const url = nextPageUri.startsWith('http') ? nextPageUri : `${GENESYS_BASE_URL}${nextPageUri}`;
    const response = await axios.post(url, requestBody, {
      headers: {
        'Authorization': `Bearer ${await getGenesysToken()}`,
        'Content-Type': 'application/json',
      },
      params: new URLSearchParams(new URL(url).search),
    });

    const data = response.data;
    totalApiCount += data.page.size;
    allRecords = allRecords.concat(data.page.entities);

    console.log(`Fetched ${data.page.size} records. Total so far: ${totalApiCount}`);
    nextPageUri = data.nextPageUri;
  }

  return { records: allRecords, totalCount: totalApiCount };
}

Expected Response Structure:

{
  "page": {
    "size": 100,
    "entities": [
      {
        "conversationId": "1a2b3c4d-5e6f-7890-abcd-ef1234567890",
        "startTime": "2024-10-15T14:30:00Z",
        "direction": "inbound",
        "duration": 145000,
        "wrapupCode": "Sales Inquiry",
        "queueName": "Support Tier 1",
        "participantRole": "agent"
      }
    ]
  },
  "nextPageUri": "/api/v2/analytics/conversations/details/query?dateFrom=2024-10-15T00:00:00Z&dateTo=2024-10-15T23:59:59Z&view=callmetrics&selection=%7B%22groupBy%22%3A%5B%5D%2C%22metrics%22%3A%5B%22wrapupCode%22%2C%22duration%22%5D%2C%22filter%22%3A%7B%22and%22%3A%5B%7B%22type%22%3A%22interaction%22%2C%22field%22%3A%22type%22%2C%22op%22%3A%22eq%22%2C%22value%22%3A%22call%22%7D%5D%7D%7D&page=2",
  "totalCount": 12540
}

Step 2: Transform JSON to Columnar Format

Genesys Cloud returns deeply nested JSON. Snowflake performs optimally with flat CSV or Parquet files. The transformation step flattens the response, extracts partition keys, and writes to a buffer. Using CSV avoids schema drift issues and reduces Snowflake parsing overhead compared to raw JSON ingestion.

import dayjs from 'dayjs';
import { v4 as uuidv4 } from 'uuid';

function transformToCsv(records) {
  if (!records || records.length === 0) return '';

  const headers = [
    'conversationId', 'startTime', 'direction', 'duration', 
    'wrapupCode', 'queueName', 'participantRole', 'extractDate'
  ];

  const rows = records.map(record => [
    record.conversationId || '',
    record.startTime || '',
    record.direction || '',
    record.duration || 0,
    record.wrapupCode || '',
    record.queueName || '',
    record.participantRole || '',
    dayjs(record.startTime).format('YYYY-MM-DD')
  ]);

  const csvRows = [headers, ...rows];
  return csvRows.map(row => row.map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',')).join('\n');
}

Step 3: Stage Data in S3 with Partition Keys

Partitioning by date enables Snowflake to perform partition pruning during queries. The pipeline uploads the CSV to an S3 path structured as s3://bucket/genesys-analytics/year=YYYY/month=MM/day=DD/filename.csv. Using @aws-sdk/client-s3 ensures modern, type-safe AWS interactions.

import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const s3Client = new S3Client({
  region: process.env.AWS_REGION,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  },
});

async function stageInS3(csvContent, interactionDate) {
  const date = dayjs(interactionDate);
  const partitionKey = `year=${date.year()}/month=${date.month() + 1}/day=${date.date()}`;
  const fileName = `${partitionKey}/genesys_calls_${uuidv4()}.csv`;
  const bucket = process.env.S3_BUCKET_NAME;

  const command = new PutObjectCommand({
    Bucket: bucket,
    Key: fileName,
    Body: csvContent,
    ContentType: 'text/csv',
    ServerSideEncryption: 'AES256',
  });

  await s3Client.send(command);
  return `s3://${bucket}/${fileName}`;
}

Step 4: Execute COPY INTO Using the Snowflake SDK

The Snowflake SDK establishes a connection pool and executes SQL commands. The COPY INTO command loads the staged CSV into a table. The FILE_FORMAT specification handles quoting and field delimiters. The SDK automatically handles connection lifecycle and query routing.

import snowflake from '@snowflake/snowflake-sdk';

function getSnowflakeConnection() {
  return new Promise((resolve, reject) => {
    const conn = snowflake.createConnection({
      account: process.env.SNOWFLAKE_ACCOUNT,
      username: process.env.SNOWFLAKE_USER,
      password: process.env.SNOWFLAKE_PASSWORD,
      warehouse: process.env.SNOWFLAKE_WAREHOUSE,
      database: process.env.SNOWFLAKE_DATABASE,
      schema: process.env.SNOWFLAKE_SCHEMA,
      role: process.env.SNOWFLAKE_ROLE,
    });

    conn.connect((err, conn) => {
      if (err) {
        console.error('Unable to connect to Snowflake:', err.message);
        reject(err);
      } else {
        console.log('Successfully connected to Snowflake.');
        resolve(conn);
      }
    });
  });
}

async function loadIntoSnowflake(conn, s3Path) {
  const createTableSql = `
    CREATE TABLE IF NOT EXISTS GENESYS_INTERACTIONS (
      CONVERSATION_ID STRING,
      START_TIME TIMESTAMP_TZ,
      DIRECTION STRING,
      DURATION NUMBER,
      WRAPUP_CODE STRING,
      QUEUE_NAME STRING,
      PARTICIPANT_ROLE STRING,
      EXTRACT_DATE DATE
    );
  `;

  await new Promise((resolve, reject) => {
    conn.execute({
      sqlText: createTableSql,
      complete: (err, stmt) => {
        if (err) reject(err);
        else resolve(stmt);
      }
    });
  });

  const copySql = `
    COPY INTO GENESYS_INTERACTIONS
    FROM '${s3Path}'
    FILE_FORMAT = (
      TYPE = CSV
      FIELD_DELIMITER = ','
      RECORD_DELIMITER = '\\n'
      SKIP_HEADER = 1
      FIELD_OPTIONALLY_ENCLOSED_BY = '"'
      EMPTY_FIELD_AS_NULL = TRUE
    )
    ON_ERROR = 'CONTINUE';
  `;

  const result = await new Promise((resolve, reject) => {
    conn.execute({
      sqlText: copySql,
      complete: (err, stmt) => {
        if (err) reject(err);
        else resolve(stmt);
      }
    });
  });

  return result;
}

Step 5: Validate Row Counts Against API Totals

Data integrity verification compares the cumulative record count returned by Genesys Cloud against the actual rows inserted into Snowflake. A mismatch indicates truncation, S3 upload failure, or COPY INTO parsing errors.

async function validateRowCount(conn, expectedCount) {
  const countSql = 'SELECT COUNT(*) AS TOTAL_ROWS FROM GENESYS_INTERACTIONS;';
  
  const result = await new Promise((resolve, reject) => {
    conn.execute({
      sqlText: countSql,
      complete: (err, stmt) => {
        if (err) reject(err);
        else resolve(stmt);
      }
    });
  });

  const actualCount = result.rows[0].TOTAL_ROWS;
  if (actualCount !== expectedCount) {
    throw new Error(`Row count mismatch: Expected ${expectedCount}, Found ${actualCount} in Snowflake.`);
  }
  console.log(`Validation passed. ${actualCount} rows confirmed.`);
  return true;
}

Complete Working Example

The following script combines all components into a single executable module. Replace environment variables with your credentials before running.

import dotenv from 'dotenv';
dotenv.config();

import axios from 'axios';
import dayjs from 'dayjs';
import { v4 as uuidv4 } from 'uuid';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import snowflake from '@snowflake/snowflake-sdk';

const GENESYS_BASE_URL = 'https://api.mypurecloud.com';
const OAUTH_ENDPOINT = `${GENESYS_BASE_URL}/api/v2/oauth/token`;
let cachedToken = null;
let tokenExpiry = 0;

async function getGenesysToken() {
  const now = Date.now();
  if (cachedToken && now < tokenExpiry) return cachedToken;

  const response = await axios.post(OAUTH_ENDPOINT, new URLSearchParams({
    grant_type: 'client_credentials',
    client_id: process.env.GENESYS_CLIENT_ID,
    client_secret: process.env.GENESYS_CLIENT_SECRET,
    scope: 'analytics:conversation:read',
  }), { headers: { 'Content-Type': 'application/x-www-form-urlencoded' } });

  cachedToken = response.data.access_token;
  tokenExpiry = now + (response.data.expires_in * 1000) - 60000;
  return cachedToken;
}

async function genesysFetch(url, options = {}) {
  let attempt = 0;
  while (attempt < 5) {
    try {
      return await axios.get(url, {
        ...options,
        headers: { 'Authorization': `Bearer ${await getGenesysToken()}`, 'Content-Type': 'application/json', ...options.headers },
      });
    } catch (error) {
      if (error.response?.status === 429) {
        const retryAfter = error.response.headers['retry-after'] || Math.pow(2, attempt);
        await new Promise(res => setTimeout(res, retryAfter * 1000));
        attempt++;
      } else if (error.response?.status === 401) {
        cachedToken = null;
        attempt++;
      } else {
        throw error;
      }
    }
  }
  throw new Error('Max retries exceeded.');
}

async function fetchAllInteractions(startDate, endDate) {
  const queryEndpoint = `${GENESYS_BASE_URL}/api/v2/analytics/conversations/details/query`;
  const requestBody = {
    dateFrom: startDate,
    dateTo: endDate,
    view: 'callmetrics',
    selection: {
      groupBy: [],
      metrics: ['wrapupCode', 'duration', 'talkDuration', 'holdDuration', 'waitDuration', 'direction', 'queueName', 'participantRole'],
      filter: { and: [{ type: 'interaction', field: 'type', op: 'eq', value: 'call' }] }
    }
  };

  let nextPageUri = queryEndpoint;
  let allRecords = [];
  let totalApiCount = 0;

  while (nextPageUri) {
    const url = nextPageUri.startsWith('http') ? nextPageUri : `${GENESYS_BASE_URL}${nextPageUri}`;
    const response = await axios.post(url, requestBody, {
      headers: { 'Authorization': `Bearer ${await getGenesysToken()}`, 'Content-Type': 'application/json' },
      params: new URLSearchParams(new URL(url).search),
    });

    totalApiCount += response.data.page.size;
    allRecords = allRecords.concat(response.data.page.entities);
    nextPageUri = response.data.nextPageUri;
  }

  return { records: allRecords, totalCount: totalApiCount };
}

function transformToCsv(records) {
  if (!records?.length) return '';
  const headers = ['conversationId', 'startTime', 'direction', 'duration', 'wrapupCode', 'queueName', 'participantRole', 'extractDate'];
  const rows = records.map(r => [
    r.conversationId || '', r.startTime || '', r.direction || '', r.duration || 0,
    r.wrapupCode || '', r.queueName || '', r.participantRole || '', dayjs(r.startTime).format('YYYY-MM-DD')
  ]);
  return [headers, ...rows].map(row => row.map(c => `"${String(c).replace(/"/g, '""')}"`).join(',')).join('\n');
}

async function stageInS3(csvContent, interactionDate) {
  const date = dayjs(interactionDate);
  const fileName = `year=${date.year()}/month=${date.month() + 1}/day=${date.date()}/genesys_${uuidv4()}.csv`;
  const s3Client = new S3Client({ region: process.env.AWS_REGION, credentials: { accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY } });
  await s3Client.send(new PutObjectCommand({ Bucket: process.env.S3_BUCKET_NAME, Key: fileName, Body: csvContent, ContentType: 'text/csv', ServerSideEncryption: 'AES256' }));
  return `s3://${process.env.S3_BUCKET_NAME}/${fileName}`;
}

async function runPipeline() {
  const targetDate = dayjs().subtract(1, 'day').format('YYYY-MM-DD');
  console.log(`Starting ETL for ${targetDate}`);

  const { records, totalCount } = await fetchAllInteractions(targetDate, targetDate);
  console.log(`Extracted ${totalCount} interactions.`);

  const csvData = transformToCsv(records);
  const s3Path = await stageInS3(csvData, targetDate);
  console.log(`Staged to ${s3Path}`);

  const conn = await new Promise((res, rej) => {
    const c = snowflake.createConnection({
      account: process.env.SNOWFLAKE_ACCOUNT, username: process.env.SNOWFLAKE_USER,
      password: process.env.SNOWFLAKE_PASSWORD, warehouse: process.env.SNOWFLAKE_WAREHOUSE,
      database: process.env.SNOWFLAKE_DATABASE, schema: process.env.SNOWFLAKE_SCHEMA, role: process.env.SNOWFLAKE_ROLE,
    });
    c.connect((err, con) => err ? rej(err) : res(con));
  });

  await new Promise((res, rej) => conn.execute({ sqlText: `CREATE TABLE IF NOT EXISTS GENESYS_INTERACTIONS (CONVERSATION_ID STRING, START_TIME TIMESTAMP_TZ, DIRECTION STRING, DURATION NUMBER, WRAPUP_CODE STRING, QUEUE_NAME STRING, PARTICIPANT_ROLE STRING, EXTRACT_DATE DATE)`, complete: (err, s) => err ? rej(err) : res(s) }));

  await new Promise((res, rej) => conn.execute({ sqlText: `COPY INTO GENESYS_INTERACTIONS FROM '${s3Path}' FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' RECORD_DELIMITER = '\\n' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"' EMPTY_FIELD_AS_NULL = TRUE) ON_ERROR = 'CONTINUE'`, complete: (err, s) => err ? rej(err) : res(s) }));

  const countResult = await new Promise((res, rej) => conn.execute({ sqlText: 'SELECT COUNT(*) AS TOTAL FROM GENESYS_INTERACTIONS', complete: (err, s) => err ? rej(err) : res(s) }));
  const snowflakeCount = countResult.rows[0].TOTAL;

  if (snowflakeCount !== totalCount) {
    throw new Error(`Integrity check failed. Expected ${totalCount}, Snowflake has ${snowflakeCount}.`);
  }

  console.log(`Pipeline complete. ${totalCount} rows validated.`);
  conn.destroy();
}

runPipeline().catch(console.error);

Common Errors & Debugging

Error: 401 Unauthorized or 403 Forbidden

  • Cause: Missing or incorrect OAuth scopes. The analytics:conversation:read scope is required for the details query endpoint. Client credentials may also lack the necessary API permissions in the Genesys Cloud admin console.
  • Fix: Verify the OAuth client configuration in Genesys Cloud. Ensure the client has analytics:conversation:read explicitly granted. Regenerate the client secret if it was rotated.

Error: 429 Too Many Requests

  • Cause: The Genesys Cloud API enforces strict rate limits per OAuth client. Rapid pagination or concurrent pipeline runs trigger throttling.
  • Fix: The retry logic in genesysFetch implements exponential backoff. If failures persist, reduce the query date range to fewer days per execution, or implement a queue-based scheduler to space out requests.

Error: Snowflake 000606 or 260001

  • Cause: Invalid credentials, incorrect account identifier, or warehouse suspension. The 000606 code indicates authentication failure. The 260001 code indicates the warehouse is suspended or does not exist.
  • Fix: Verify the Snowflake account URL format (account.region.cloud). Ensure the warehouse is resumed (ALTER WAREHOUSE <name> RESUME). Confirm the database user has USAGE on the warehouse and CREATE TABLE on the schema.

Error: S3 AccessDenied or NoSuchBucket

  • Cause: IAM credentials lack s3:PutObject permissions, or the bucket name in environment variables contains a typo. Cross-region requests may also fail if the S3 client region does not match the bucket region.
  • Fix: Attach an IAM policy granting s3:PutObject, s3:GetObject, and s3:ListBucket to the target bucket. Verify the AWS_REGION matches the physical bucket location.

Error: Row Count Mismatch During Validation

  • Cause: COPY INTO silently skips malformed rows when ON_ERROR = 'CONTINUE' is set. CSV quoting issues or timezone parsing failures in Snowflake can also drop records.
  • Fix: Query TABLE(INFORMATION_SCHEMA.COPY_HISTORY(TABLE_NAME => 'GENESYS_INTERACTIONS')) to inspect rejected rows. Adjust the FILE_FORMAT specification or clean the transformation logic to handle null values and special characters correctly.

Official References