Batch Import Genesys Cloud Outbound Contacts from S3 with Node.js

Batch Import Genesys Cloud Outbound Contacts from S3 with Node.js

What You Will Build

A Node.js script that reads a CSV file from AWS S3, validates and transforms contact records into the Genesys Cloud Contact API schema, imports them in concurrent batches with semaphore-based concurrency control, retries only failed entries, adds successful contacts to a target contact list, and outputs a structured summary report with categorized errors. This tutorial uses the Genesys Cloud REST API directly via axios in Node.js 18+.

Prerequisites

  • Genesys Cloud API credentials with outbound:contact:write and outbound:contactlist:write OAuth scopes
  • Node.js 18 or later
  • AWS credentials with s3:GetObject permissions
  • External dependencies: npm install axios @aws-sdk/client-s3 papaparse libphonenumber-js

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server integrations. You must request a token before making any outbound API calls. The token expires after 3600 seconds, so production code should cache and refresh tokens automatically.

const axios = require('axios');

const GENESYS_BASE_URL = 'https://api.mypurecloud.com';
const OAUTH_URL = 'https://api.mypurecloud.com/login/oauth/token';

async function getAccessToken(clientId, clientSecret) {
  const response = await axios.post(OAUTH_URL, null, {
    params: {
      grant_type: 'client_credentials',
      scope: 'outbound:contact:write outbound:contactlist:write'
    },
    auth: { username: clientId, password: clientSecret },
    headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
  });

  if (!response.data.access_token) {
    throw new Error('OAuth token response missing access_token');
  }

  return response.data.access_token;
}

OAuth Scopes Required: outbound:contact:write, outbound:contactlist:write
HTTP Method: POST
Path: /login/oauth/token
Response Example:

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 3600,
  "scope": "outbound:contact:write outbound:contactlist:write"
}

Store the token in memory or a secure cache. Refresh it when expires_in approaches zero or when you receive a 401 Unauthorized response.

Implementation

Step 1: Read CSV from S3 and Parse Records

The script fetches the CSV object from S3, streams it to papaparse, and returns an array of raw row objects. Streaming prevents memory exhaustion for large files.

const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const papaparse = require('papaparse');

async function fetchCsvFromS3(bucket, key) {
  const s3 = new S3Client({ region: process.env.AWS_REGION });
  const command = new GetObjectCommand({ Bucket: bucket, Key: key });
  const response = await s3.send(command);

  return new Promise((resolve, reject) => {
    papaparse.parse(response.Body, {
      header: true,
      skipEmptyLines: true,
      complete: (results) => resolve(results.data),
      error: (err) => reject(err)
    });
  });
}

Expected Output: Array of objects matching CSV headers. Example row: { phone: '+12125551234', firstName: 'John', lastName: 'Doe', email: 'john@example.com', timezone: 'EST' }

Step 2: Transform Records and Validate Data

Genesys Cloud requires strict formatting. Phone numbers must be E.164 compliant. Timezones must be IANA identifiers. This step maps CSV fields to the Contact API schema and filters invalid records before they reach the API.

const { parsePhoneNumberFromString } = require('libphonenumber-js');

const TIMEZONE_MAP = {
  'EST': 'America/New_York',
  'CST': 'America/Chicago',
  'MST': 'America/Denver',
  'PST': 'America/Los_Angeles',
  'UTC': 'UTC'
};

function transformAndValidate(rawRecord) {
  const phone = parsePhoneNumberFromString(rawRecord.phone, 'US');
  
  if (!phone || !phone.isValid()) {
    return { error: 'INVALID_PHONE', record: rawRecord };
  }

  const ianaTimezone = TIMEZONE_MAP[rawRecord.timezone] || 'America/New_York';

  return {
    phone: phone.number,
    firstName: rawRecord.firstName || '',
    lastName: rawRecord.lastName || '',
    email: rawRecord.email || '',
    city: rawRecord.city || '',
    state: rawRecord.state || '',
    country: rawRecord.country || 'US',
    timezone: ianaTimezone,
    address1: rawRecord.address1 || '',
    address2: rawRecord.address2 || '',
    postalCode: rawRecord.postalCode || ''
  };
}

Validation Logic: libphonenumber-js normalizes international formats to E.164. Invalid records are tagged with error: 'INVALID_PHONE' and excluded from the import queue. Missing timezones default to America/New_York.

Step 3: Semaphore Concurrency Control and Batch Import

Genesys Cloud limits the import endpoint to 1000 contacts per request. To maximize throughput without triggering rate limits, this implementation uses a custom async semaphore that processes multiple batches concurrently. Each batch POSTs to /api/v2/outbound/contacts/import.

class AsyncSemaphore {
  constructor(maxConcurrency) {
    this.maxConcurrency = maxConcurrency;
    this.pending = 0;
    this.queue = [];
  }

  acquire() {
    if (this.pending < this.maxConcurrency) {
      this.pending++;
      return Promise.resolve();
    }
    return new Promise((resolve) => this.queue.push(resolve));
  }

  release() {
    this.pending--;
    if (this.queue.length > 0) {
      const next = this.queue.shift();
      this.pending++;
      next();
    }
  }
}

async function importBatch(batch, accessToken, semaphore) {
  await semaphore.acquire();
  try {
    const response = await axios.post(
      `${GENESYS_BASE_URL}/api/v2/outbound/contacts/import`,
      batch,
      {
        headers: {
          Authorization: `Bearer ${accessToken}`,
          'Content-Type': 'application/json'
        },
        timeout: 30000
      }
    );
    return response.data;
  } catch (error) {
    if (error.response?.status === 429) {
      const retryAfter = error.response.headers['retry-after'] * 1000 || 5000;
      await new Promise(r => setTimeout(r, retryAfter));
      return importBatch(batch, accessToken, semaphore);
    }
    throw error;
  } finally {
    semaphore.release();
  }
}

HTTP Method: POST
Path: /api/v2/outbound/contacts/import
Request Body: Array of contact objects (max 1000)
Response Example:

{
  "id": "import-task-uuid",
  "status": "success",
  "results": [
    { "id": "contact-uuid-1", "error": null },
    { "id": null, "error": "Duplicate phone number" }
  ]
}

The semaphore ensures no more than maxConcurrency requests are in flight simultaneously. The 429 handler reads the Retry-After header and recursively retries the exact same batch.

Step 4: Process Results, Retry Failures, and Update Contact List

After all batches complete, the script aggregates results. It separates successful contact IDs from failed records. Failed records are retried once. Upon final completion, successful IDs are added to the target contact list via the Campaign API.

async function addContactsToList(contactListId, contactIds, accessToken) {
  await axios.post(
    `${GENESYS_BASE_URL}/api/v2/outbound/contactlists/${contactListId}/contacts`,
    { contactIds },
    {
      headers: {
        Authorization: `Bearer ${accessToken}`,
        'Content-Type': 'application/json'
      }
    }
  );
}

async function processImportResults(importResults, failedBatches, accessToken, semaphore, contactListId) {
  const successfulIds = [];
  const categorizedErrors = {
    VALIDATION: [],
    RATE_LIMIT: [],
    SERVER_ERROR: [],
    UNKNOWN: []
  };

  // Extract successes and categorize failures
  for (const batchResult of importResults) {
    for (const item of batchResult.results) {
      if (item.id) {
        successfulIds.push(item.id);
      } else {
        categorizedErrors.VALIDATION.push(item.error);
      }
    }
  }

  // Retry failed batches once
  if (failedBatches.length > 0) {
    const retryPromises = failedBatches.map(batch => importBatch(batch, accessToken, semaphore));
    const retryResults = await Promise.allSettled(retryPromises);
    
    for (const result of retryResults) {
      if (result.status === 'fulfilled') {
        for (const item of result.value.results) {
          if (item.id) successfulIds.push(item.id);
          else categorizedErrors.VALIDATION.push(item.error);
        }
      } else {
        categorizedErrors.UNKNOWN.push(result.reason.message);
      }
    }
  }

  // Update contact list membership
  if (successfulIds.length > 0) {
    await addContactsToList(contactListId, successfulIds, accessToken);
  }

  return { successfulIds, categorizedErrors };
}

HTTP Method: POST
Path: /api/v2/outbound/contactlists/{contactListId}/contacts
Request Body: { "contactIds": ["uuid-1", "uuid-2"] }
OAuth Scope: outbound:contactlist:write

The Promise.allSettled call ensures the script continues even if retry batches fail. The contact list update happens only after all retries complete.

Step 5: Generate Import Summary Report

The final step formats the aggregated data into a structured report. This report categorizes errors, counts successes, and exports to JSON for downstream CI/CD or logging pipelines.

function generateSummaryReport(successfulIds, categorizedErrors, totalRecords) {
  const report = {
    timestamp: new Date().toISOString(),
    totalProcessed: totalRecords,
    successfullyImported: successfulIds.length,
    failedImports: totalRecords - successfulIds.length,
    errorBreakdown: {
      validation: categorizedErrors.VALIDATION.length,
      rateLimit: categorizedErrors.RATE_LIMIT.length,
      serverError: categorizedErrors.SERVER_ERROR.length,
      unknown: categorizedErrors.UNKNOWN.length
    },
    errorSamples: {
      validation: categorizedErrors.VALIDATION.slice(0, 5),
      serverError: categorizedErrors.SERVER_ERROR.slice(0, 5),
      unknown: categorizedErrors.UNKNOWN.slice(0, 5)
    },
    importedContactIds: successfulIds
  };

  console.log(JSON.stringify(report, null, 2));
  return report;
}

Output Format: JSON object containing counts, categorized error samples, and the full array of imported contact IDs. This structure allows automated validation in deployment pipelines.

Complete Working Example

const axios = require('axios');
const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const papaparse = require('papaparse');
const { parsePhoneNumberFromString } = require('libphonenumber-js');

const GENESYS_BASE_URL = 'https://api.mypurecloud.com';
const OAUTH_URL = 'https://api.mypurecloud.com/login/oauth/token';

class AsyncSemaphore {
  constructor(maxConcurrency) {
    this.maxConcurrency = maxConcurrency;
    this.pending = 0;
    this.queue = [];
  }
  acquire() {
    if (this.pending < this.maxConcurrency) {
      this.pending++;
      return Promise.resolve();
    }
    return new Promise((resolve) => this.queue.push(resolve));
  }
  release() {
    this.pending--;
    if (this.queue.length > 0) {
      const next = this.queue.shift();
      this.pending++;
      next();
    }
  }
}

async function getAccessToken(clientId, clientSecret) {
  const response = await axios.post(OAUTH_URL, null, {
    params: { grant_type: 'client_credentials', scope: 'outbound:contact:write outbound:contactlist:write' },
    auth: { username: clientId, password: clientSecret },
    headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
  });
  return response.data.access_token;
}

async function fetchCsvFromS3(bucket, key) {
  const s3 = new S3Client({ region: process.env.AWS_REGION });
  const command = new GetObjectCommand({ Bucket: bucket, Key: key });
  const response = await s3.send(command);
  return new Promise((resolve, reject) => {
    papaparse.parse(response.Body, {
      header: true,
      skipEmptyLines: true,
      complete: (results) => resolve(results.data),
      error: (err) => reject(err)
    });
  });
}

const TIMEZONE_MAP = { 'EST': 'America/New_York', 'CST': 'America/Chicago', 'MST': 'America/Denver', 'PST': 'America/Los_Angeles', 'UTC': 'UTC' };

function transformAndValidate(rawRecord) {
  const phone = parsePhoneNumberFromString(rawRecord.phone, 'US');
  if (!phone || !phone.isValid()) return { error: 'INVALID_PHONE', record: rawRecord };
  return {
    phone: phone.number,
    firstName: rawRecord.firstName || '',
    lastName: rawRecord.lastName || '',
    email: rawRecord.email || '',
    city: rawRecord.city || '',
    state: rawRecord.state || '',
    country: rawRecord.country || 'US',
    timezone: TIMEZONE_MAP[rawRecord.timezone] || 'America/New_York',
    address1: rawRecord.address1 || '',
    address2: rawRecord.address2 || '',
    postalCode: rawRecord.postalCode || ''
  };
}

async function importBatch(batch, accessToken, semaphore) {
  await semaphore.acquire();
  try {
    const response = await axios.post(
      `${GENESYS_BASE_URL}/api/v2/outbound/contacts/import`,
      batch,
      { headers: { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json' }, timeout: 30000 }
    );
    return response.data;
  } catch (error) {
    if (error.response?.status === 429) {
      const retryAfter = error.response.headers['retry-after'] * 1000 || 5000;
      await new Promise(r => setTimeout(r, retryAfter));
      return importBatch(batch, accessToken, semaphore);
    }
    throw error;
  } finally {
    semaphore.release();
  }
}

async function addContactsToList(contactListId, contactIds, accessToken) {
  await axios.post(
    `${GENESYS_BASE_URL}/api/v2/outbound/contactlists/${contactListId}/contacts`,
    { contactIds },
    { headers: { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json' } }
  );
}

async function runImport() {
  const clientId = process.env.GENESYS_CLIENT_ID;
  const clientSecret = process.env.GENESYS_CLIENT_SECRET;
  const contactListId = process.env.CONTACT_LIST_ID;
  const s3Bucket = process.env.S3_BUCKET;
  const s3Key = process.env.S3_KEY;
  const MAX_CONCURRENCY = 3;
  const BATCH_SIZE = 500;

  const accessToken = await getAccessToken(clientId, clientSecret);
  const rawRecords = await fetchCsvFromS3(s3Bucket, s3Key);
  
  const validRecords = [];
  const validationErrors = [];
  for (const row of rawRecords) {
    const transformed = transformAndValidate(row);
    if (transformed.error) validationErrors.push(transformed);
    else validRecords.push(transformed);
  }

  const batches = [];
  for (let i = 0; i < validRecords.length; i += BATCH_SIZE) {
    batches.push(validRecords.slice(i, i + BATCH_SIZE));
  }

  const semaphore = new AsyncSemaphore(MAX_CONCURRENCY);
  const failedBatches = [];
  const importResults = [];

  const batchPromises = batches.map(batch => 
    importBatch(batch, accessToken, semaphore).catch(err => {
      failedBatches.push(batch);
      throw err;
    })
  );

  const settledResults = await Promise.allSettled(batchPromises);
  for (const result of settledResults) {
    if (result.status === 'fulfilled') importResults.push(result.value);
  }

  const finalResults = await processImportResults(importResults, failedBatches, accessToken, semaphore, contactListId);
  generateSummaryReport(finalResults.successfulIds, finalResults.categorizedErrors, rawRecords.length);
}

async function processImportResults(importResults, failedBatches, accessToken, semaphore, contactListId) {
  const successfulIds = [];
  const categorizedErrors = { VALIDATION: [], RATE_LIMIT: [], SERVER_ERROR: [], UNKNOWN: [] };

  for (const batchResult of importResults) {
    for (const item of batchResult.results) {
      if (item.id) successfulIds.push(item.id);
      else categorizedErrors.VALIDATION.push(item.error);
    }
  }

  if (failedBatches.length > 0) {
    const retryPromises = failedBatches.map(batch => importBatch(batch, accessToken, semaphore));
    const retryResults = await Promise.allSettled(retryPromises);
    for (const result of retryResults) {
      if (result.status === 'fulfilled') {
        for (const item of result.value.results) {
          if (item.id) successfulIds.push(item.id);
          else categorizedErrors.VALIDATION.push(item.error);
        }
      } else {
        categorizedErrors.UNKNOWN.push(result.reason.message);
      }
    }
  }

  if (successfulIds.length > 0) {
    await addContactsToList(contactListId, successfulIds, accessToken);
  }

  return { successfulIds, categorizedErrors };
}

function generateSummaryReport(successfulIds, categorizedErrors, totalRecords) {
  const report = {
    timestamp: new Date().toISOString(),
    totalProcessed: totalRecords,
    successfullyImported: successfulIds.length,
    failedImports: totalRecords - successfulIds.length,
    errorBreakdown: {
      validation: categorizedErrors.VALIDATION.length,
      rateLimit: categorizedErrors.RATE_LIMIT.length,
      serverError: categorizedErrors.SERVER_ERROR.length,
      unknown: categorizedErrors.UNKNOWN.length
    },
    errorSamples: {
      validation: categorizedErrors.VALIDATION.slice(0, 5),
      serverError: categorizedErrors.SERVER_ERROR.slice(0, 5),
      unknown: categorizedErrors.UNKNOWN.slice(0, 5)
    },
    importedContactIds: successfulIds
  };
  console.log(JSON.stringify(report, null, 2));
  return report;
}

runImport().catch(console.error);

Environment Variables Required: GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, CONTACT_LIST_ID, S3_BUCKET, S3_KEY, AWS_REGION
Execution: node import-contacts.js
Dependencies: npm install axios @aws-sdk/client-s3 papaparse libphonenumber-js

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired OAuth token or invalid client credentials.
  • Fix: Implement token caching with expiry tracking. Refresh the token before the first batch request. Verify client_id and client_secret match the API credentials in Genesys Cloud Admin.
  • Code Fix: Wrap API calls in a retry function that catches 401, calls getAccessToken(), and retries the request.

Error: 400 Bad Request

  • Cause: Invalid phone format, missing required fields, or timezone mismatch.
  • Fix: Ensure all phone numbers pass E.164 validation before batching. Map timezones to IANA format. Verify the request body matches the Contact schema exactly.
  • Code Fix: The transformAndValidate function already filters invalid phones. Log the error field from batchResult.results to identify schema mismatches.

Error: 429 Too Many Requests

  • Cause: Exceeding Genesys Cloud rate limits (typically 10 requests per second for outbound APIs).
  • Fix: The semaphore limits concurrent requests. The importBatch function reads the Retry-After header and waits before retrying. Reduce MAX_CONCURRENCY if 429s persist.
  • Code Fix: Monitor categorizedErrors.RATE_LIMIT in the summary report. If populated, lower MAX_CONCURRENCY to 1 or 2.

Error: 500 Internal Server Error

  • Cause: Temporary Genesys Cloud backend failure or payload size limits.
  • Fix: Retry with exponential backoff. Reduce BATCH_SIZE to 250 if payload serialization fails.
  • Code Fix: The Promise.allSettled wrapper captures 5xx failures in UNKNOWN. Implement a second retry loop with longer delays if server errors dominate.

Official References