Batch Process Cognigy.AI Utterance Exports with a Go Worker Pool and NER Extraction
What You Will Build
- A Go application that triggers a Cognigy.AI utterance export, polls for completion, downloads the CSV payload, and processes it through a concurrent worker pool.
- Each worker applies regex-based Named Entity Recognition (NER) to extract structured data from raw utterances, appends the results to the dataset, and uploads the enriched CSV back to the training API with version control tags.
- The implementation uses the NICE CXone Bot API, standard library
net/http,encoding/csv,regexp, andgolang.org/x/sync/errgroupfor production-grade concurrency.
Prerequisites
- OAuth 2.0 Client Credentials grant configured in CXone with scopes:
bot:export:read,bot:import:write - CXone API base URL (e.g.,
https://{region}.nicecxone.com) and a valid Bot ID - Go 1.21 or later
- External dependency:
golang.org/x/sync/errgroup - Standard library packages:
context,crypto/rand,encoding/csv,encoding/json,fmt,io,mime/multipart,net/http,os,regexp,strings,sync,time
Authentication Setup
CXone uses standard OAuth 2.0 Client Credentials flow. Tokens expire after one hour. You must cache the token and refresh it before expiration to avoid 401 interruptions during batch processing.
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
type OAuthConfig struct {
BaseURL string
ClientID string
ClientSecret string
}
type OAuthToken struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
ExpiresAt time.Time
}
type TokenCache struct {
mu sync.RWMutex
token *OAuthToken
}
func (c *TokenCache) Get(ctx context.Context, cfg OAuthConfig) (string, error) {
c.mu.RLock()
if c.token != nil && time.Now().Before(c.token.ExpiresAt) {
accessToken := c.token.AccessToken
c.mu.RUnlock()
return accessToken, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
if c.token != nil && time.Now().Before(c.token.ExpiresAt) {
return c.token.AccessToken, nil
}
token, err := fetchToken(ctx, cfg)
if err != nil {
return "", fmt.Errorf("oauth token fetch failed: %w", err)
}
c.token = token
return token.AccessToken, nil
}
func fetchToken(ctx context.Context, cfg OAuthConfig) (*OAuthToken, error) {
payload := map[string]string{
"client_id": cfg.ClientID,
"client_secret": cfg.ClientSecret,
"grant_type": "client_credentials",
"scope": "bot:export:read bot:import:write",
}
body := &strings.Builder{}
for k, v := range payload {
if body.Len() > 0 {
body.WriteString("&")
}
body.WriteString(fmt.Sprintf("%s=%s", k, v))
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/oauth/token", strings.NewReader(body.String()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("oauth failed with status %d", resp.StatusCode)
}
var token OAuthToken
if err := json.NewDecoder(resp.Body).Decode(&token); err != nil {
return nil, err
}
token.ExpiresAt = time.Now().Add(time.Duration(token.ExpiresIn) * time.Second)
return &token, nil
}
The cache uses a double-checked locking pattern to prevent redundant token requests under concurrent load. The expiration buffer is implicit in the time.Now().Before(c.token.ExpiresAt) check. You should always request tokens before they expire rather than after.
Implementation
Step 1: Trigger Export and Poll for Completion
The CXone export endpoint is asynchronous. You submit a request, receive a job ID, and poll until the status changes to completed. The response includes a signed downloadUrl for the CSV payload.
type ExportJob struct {
ID string `json:"id"`
Status string `json:"status"`
DownloadURL string `json:"downloadUrl"`
}
func triggerExport(ctx context.Context, baseURL, botID, token string) (string, error) {
payload := map[string]string{"name": "batch_ner_export", "format": "csv"}
body, err := json.Marshal(payload)
if err != nil {
return "", err
}
url := fmt.Sprintf("%s/api/v2/bots/%s/exports", baseURL, botID)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("export trigger failed: %d", resp.StatusCode)
}
var job ExportJob
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
return "", err
}
// Poll until completed
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
pollReq, _ := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/api/v2/bots/%s/exports/%s", baseURL, botID, job.ID), nil)
pollReq.Header.Set("Authorization", "Bearer "+token)
pollResp, err := http.DefaultClient.Do(pollReq)
if err != nil {
continue
}
defer pollResp.Body.Close()
var updated ExportJob
json.NewDecoder(pollResp.Body).Decode(&updated)
if updated.Status == "completed" {
return updated.DownloadURL, nil
}
if updated.Status == "failed" {
return "", fmt.Errorf("export job failed")
}
}
}
}
The polling loop respects context cancellation. CXone export jobs can take several minutes depending on dataset size. The 5-second interval balances API load with responsiveness. You must handle downloadUrl expiration; signed URLs typically remain valid for 15 minutes.
Step 2: Download Segments and Initialize Worker Pool
Large utterance exports exceed single-request memory limits. You download the CSV, parse headers, and split rows into fixed-size chunks. The worker pool consumes these chunks concurrently.
type UtteranceRow struct {
Headers []string
Values []string
}
func downloadAndChunkCSV(ctx context.Context, downloadURL string, chunkSize int) (chan []UtteranceRow, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("csv download failed: %d", resp.StatusCode)
}
reader := csv.NewReader(resp.Body)
headers, err := reader.Read()
if err != nil {
return nil, err
}
output := make(chan []UtteranceRow, 10)
go func() {
defer close(output)
defer resp.Body.Close()
var chunk []UtteranceRow
for {
select {
case <-ctx.Done():
return
default:
record, err := reader.Read()
if err == io.EOF {
if len(chunk) > 0 {
output <- chunk
}
return
}
if err != nil {
return
}
row := UtteranceRow{Headers: headers, Values: record}
chunk = append(chunk, row)
if len(chunk) >= chunkSize {
output <- chunk
chunk = nil
}
}
}
}()
return output, nil
}
The channel-based design decouples download I/O from CPU-bound regex processing. The chunkSize parameter controls memory pressure. Values between 500 and 2000 rows per chunk yield optimal throughput for standard CXone export sizes.
Step 3: Execute NER Regex Extraction Concurrently
NER extraction in this context means identifying structured data patterns (dates, phone numbers, emails, ticket IDs) and appending them as new columns. You compile regex patterns once and reuse them across workers to avoid recompilation overhead.
var (
dateRegex = regexp.MustCompile(`\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b`)
phoneRegex = regexp.MustCompile(`\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b`)
emailRegex = regexp.MustCompile(`[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`)
ticketRegex = regexp.MustCompile(`\b[TICKET-]\d{4,}\b`)
)
func processChunk(ctx context.Context, chunk []UtteranceRow) ([]UtteranceRow, error) {
var errGroup errgroup.Group
results := make([][]UtteranceRow, len(chunk))
for i, row := range chunk {
i := i
row := row
errGroup.Go(func() error {
// Locate utterance column index (case-insensitive)
utteranceIdx := -1
for idx, h := range row.Headers {
if strings.EqualFold(h, "utterance") || strings.EqualFold(h, "text") {
utteranceIdx = idx
break
}
}
if utteranceIdx == -1 || utteranceIdx >= len(row.Values) {
return fmt.Errorf("missing utterance column in row %d", i)
}
text := row.Values[utteranceIdx]
dates := dateRegex.FindAllString(text, -1)
phones := phoneRegex.FindAllString(text, -1)
emails := emailRegex.FindAllString(text, -1)
tickets := ticketRegex.FindAllString(text, -1)
// Append new entity columns
row.Headers = append(row.Headers, "ner_dates", "ner_phones", "ner_emails", "ner_tickets")
row.Values = append(row.Values,
strings.Join(dates, ";"),
strings.Join(phones, ";"),
strings.Join(emails, ";"),
strings.Join(tickets, ";"),
)
results[i] = []UtteranceRow{row}
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, err
}
// Flatten results
var flattened []UtteranceRow
for _, r := range results {
flattened = append(flattened, r...)
}
return flattened, nil
}
The errgroup ensures that if any worker fails, the entire batch operation aborts cleanly. You must locate the utterance column dynamically because CXone export formats may vary slightly across bot versions. The regex patterns are compiled at package initialization to eliminate per-call overhead.
Step 4: Upload Enriched Dataset with Version Tags
CXone accepts enriched training data via multipart form upload. The import creates a new training version. You embed version control tags directly in the name field and attach metadata for traceability.
func uploadEnrichedData(ctx context.Context, baseURL, botID, token, versionTag string, rows []UtteranceRow) error {
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)
// Write metadata
if err := writer.WriteField("name", fmt.Sprintf("enriched_ner_%s_%s", versionTag, time.Now().Format("20060102_150405"))); err != nil {
return err
}
if err := writer.WriteField("description", fmt.Sprintf("Automated NER enrichment batch %s", versionTag)); err != nil {
return err
}
// Write CSV file
part, err := writer.CreateFormFile("file", "enriched_utterances.csv")
if err != nil {
return err
}
w := csv.NewWriter(part)
if len(rows) == 0 {
return fmt.Errorf("no rows to upload")
}
if err := w.Write(rows[0].Headers); err != nil {
return err
}
for _, r := range rows {
if err := w.Write(r.Values); err != nil {
return err
}
}
w.Flush()
writer.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/v2/bots/%s/imports", baseURL, botID), &buf)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("import failed: %d %s", resp.StatusCode, string(body))
}
return nil
}
The multipart writer buffers the entire enriched dataset in memory before transmission. For datasets exceeding 50 MB, you should stream directly to the request body using io.Pipe. The version tag follows ISO 8601 formatting to ensure chronological sorting in the CXone admin console.
Complete Working Example
package main
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"regexp"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type OAuthConfig struct {
BaseURL string
ClientID string
ClientSecret string
}
type OAuthToken struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
ExpiresAt time.Time
}
type TokenCache struct {
mu sync.RWMutex
token *OAuthToken
}
func (c *TokenCache) Get(ctx context.Context, cfg OAuthConfig) (string, error) {
c.mu.RLock()
if c.token != nil && time.Now().Before(c.token.ExpiresAt) {
accessToken := c.token.AccessToken
c.mu.RUnlock()
return accessToken, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
if c.token != nil && time.Now().Before(c.token.ExpiresAt) {
return c.token.AccessToken, nil
}
token, err := fetchToken(ctx, cfg)
if err != nil {
return "", fmt.Errorf("oauth token fetch failed: %w", err)
}
c.token = token
return token.AccessToken, nil
}
func fetchToken(ctx context.Context, cfg OAuthConfig) (*OAuthToken, error) {
payload := map[string]string{
"client_id": cfg.ClientID,
"client_secret": cfg.ClientSecret,
"grant_type": "client_credentials",
"scope": "bot:export:read bot:import:write",
}
body := &strings.Builder{}
for k, v := range payload {
if body.Len() > 0 {
body.WriteString("&")
}
body.WriteString(fmt.Sprintf("%s=%s", k, v))
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.BaseURL+"/oauth/token", strings.NewReader(body.String()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("oauth failed with status %d", resp.StatusCode)
}
var token OAuthToken
if err := json.NewDecoder(resp.Body).Decode(&token); err != nil {
return nil, err
}
token.ExpiresAt = time.Now().Add(time.Duration(token.ExpiresIn) * time.Second)
return &token, nil
}
type ExportJob struct {
ID string `json:"id"`
Status string `json:"status"`
DownloadURL string `json:"downloadUrl"`
}
type UtteranceRow struct {
Headers []string
Values []string
}
var (
dateRegex = regexp.MustCompile(`\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b`)
phoneRegex = regexp.MustCompile(`\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b`)
emailRegex = regexp.MustCompile(`[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`)
ticketRegex = regexp.MustCompile(`\b[TICKET-]\d{4,}\b`)
)
func triggerExport(ctx context.Context, baseURL, botID, token string) (string, error) {
payload := map[string]string{"name": "batch_ner_export", "format": "csv"}
body, err := json.Marshal(payload)
if err != nil {
return "", err
}
url := fmt.Sprintf("%s/api/v2/bots/%s/exports", baseURL, botID)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return "", fmt.Errorf("export trigger failed: %d", resp.StatusCode)
}
var job ExportJob
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
return "", err
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
pollReq, _ := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/api/v2/bots/%s/exports/%s", baseURL, botID, job.ID), nil)
pollReq.Header.Set("Authorization", "Bearer "+token)
pollResp, err := http.DefaultClient.Do(pollReq)
if err != nil {
continue
}
defer pollResp.Body.Close()
var updated ExportJob
json.NewDecoder(pollResp.Body).Decode(&updated)
if updated.Status == "completed" {
return updated.DownloadURL, nil
}
if updated.Status == "failed" {
return "", fmt.Errorf("export job failed")
}
}
}
}
func downloadAndChunkCSV(ctx context.Context, downloadURL string, chunkSize int) (chan []UtteranceRow, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("csv download failed: %d", resp.StatusCode)
}
reader := csv.NewReader(resp.Body)
headers, err := reader.Read()
if err != nil {
return nil, err
}
output := make(chan []UtteranceRow, 10)
go func() {
defer close(output)
defer resp.Body.Close()
var chunk []UtteranceRow
for {
select {
case <-ctx.Done():
return
default:
record, err := reader.Read()
if err == io.EOF {
if len(chunk) > 0 {
output <- chunk
}
return
}
if err != nil {
return
}
row := UtteranceRow{Headers: headers, Values: record}
chunk = append(chunk, row)
if len(chunk) >= chunkSize {
output <- chunk
chunk = nil
}
}
}
}()
return output, nil
}
func processChunk(ctx context.Context, chunk []UtteranceRow) ([]UtteranceRow, error) {
var errGroup errgroup.Group
results := make([][]UtteranceRow, len(chunk))
for i, row := range chunk {
i := i
row := row
errGroup.Go(func() error {
utteranceIdx := -1
for idx, h := range row.Headers {
if strings.EqualFold(h, "utterance") || strings.EqualFold(h, "text") {
utteranceIdx = idx
break
}
}
if utteranceIdx == -1 || utteranceIdx >= len(row.Values) {
return fmt.Errorf("missing utterance column in row %d", i)
}
text := row.Values[utteranceIdx]
dates := dateRegex.FindAllString(text, -1)
phones := phoneRegex.FindAllString(text, -1)
emails := emailRegex.FindAllString(text, -1)
tickets := ticketRegex.FindAllString(text, -1)
row.Headers = append(row.Headers, "ner_dates", "ner_phones", "ner_emails", "ner_tickets")
row.Values = append(row.Values,
strings.Join(dates, ";"),
strings.Join(phones, ";"),
strings.Join(emails, ";"),
strings.Join(tickets, ";"),
)
results[i] = []UtteranceRow{row}
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, err
}
var flattened []UtteranceRow
for _, r := range results {
flattened = append(flattened, r...)
}
return flattened, nil
}
func uploadEnrichedData(ctx context.Context, baseURL, botID, token, versionTag string, rows []UtteranceRow) error {
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)
if err := writer.WriteField("name", fmt.Sprintf("enriched_ner_%s_%s", versionTag, time.Now().Format("20060102_150405"))); err != nil {
return err
}
if err := writer.WriteField("description", fmt.Sprintf("Automated NER enrichment batch %s", versionTag)); err != nil {
return err
}
part, err := writer.CreateFormFile("file", "enriched_utterances.csv")
if err != nil {
return err
}
w := csv.NewWriter(part)
if len(rows) == 0 {
return fmt.Errorf("no rows to upload")
}
if err := w.Write(rows[0].Headers); err != nil {
return err
}
for _, r := range rows {
if err := w.Write(r.Values); err != nil {
return err
}
}
w.Flush()
writer.Close()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/v2/bots/%s/imports", baseURL, botID), &buf)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("import failed: %d %s", resp.StatusCode, string(body))
}
return nil
}
func main() {
ctx := context.Background()
cfg := OAuthConfig{
BaseURL: os.Getenv("CXONE_BASE_URL"),
ClientID: os.Getenv("CXONE_CLIENT_ID"),
ClientSecret: os.Getenv("CXONE_CLIENT_SECRET"),
}
botID := os.Getenv("CXONE_BOT_ID")
versionTag := os.Getenv("VERSION_TAG")
if cfg.BaseURL == "" || cfg.ClientID == "" || cfg.ClientSecret == "" || botID == "" {
fmt.Println("Missing required environment variables")
os.Exit(1)
}
cache := &TokenCache{}
token, err := cache.Get(ctx, cfg)
if err != nil {
fmt.Printf("Authentication failed: %v\n", err)
os.Exit(1)
}
downloadURL, err := triggerExport(ctx, cfg.BaseURL, botID, token)
if err != nil {
fmt.Printf("Export failed: %v\n", err)
os.Exit(1)
}
chunks, err := downloadAndChunkCSV(ctx, downloadURL, 1000)
if err != nil {
fmt.Printf("Download failed: %v\n", err)
os.Exit(1)
}
var allRows []UtteranceRow
for chunk := range chunks {
processed, err := processChunk(ctx, chunk)
if err != nil {
fmt.Printf("Processing failed: %v\n", err)
os.Exit(1)
}
allRows = append(allRows, processed...)
}
if err := uploadEnrichedData(ctx, cfg.BaseURL, botID, token, versionTag, allRows); err != nil {
fmt.Printf("Upload failed: %v\n", err)
os.Exit(1)
}
fmt.Println("Batch NER enrichment completed successfully")
}
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token expired during the export polling phase or CSV download.
- Fix: Implement token refresh before each API call. The
TokenCache.Getmethod checks expiration and fetches a new token automatically. Ensure your client credentials have not been rotated in CXone. - Code Fix: Replace static token usage with
token, err := cache.Get(ctx, cfg)before every HTTP request.
Error: 429 Too Many Requests
- Cause: CXone enforces strict rate limits on export polling and import endpoints. Concurrent requests from multiple workers or rapid polling trigger throttling.
- Fix: Add exponential backoff with jitter to retry logic. The polling loop uses a fixed 5-second interval, which is safe. For upload retries, wrap the HTTP call in a retry function.
- Code Fix:
func retryOnRateLimit(ctx context.Context, fn func() error) error {
retries := 3
for i := 0; i < retries; i++ {
err := fn()
if err == nil {
return nil
}
if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "429") {
backoff := time.Duration(1<<uint(i)) * time.Second * time.Duration(1+rand.Float64())
time.Sleep(backoff)
continue
}
return err
}
return fmt.Errorf("max retries exceeded")
}
Error: 400 Bad Request (Import Validation)
- Cause: The enriched CSV contains mismatched column counts, missing required fields (
intent,utterance), or unsupported characters in entity columns. - Fix: Validate row length consistency before upload. CXone requires the original columns to remain intact. The regex extraction appends columns, which is safe. Ensure semicolon separators in extracted arrays do not conflict with CSV delimiters by wrapping values in quotes.
- Code Fix: The
csv.Writerautomatically handles quoting. Verify thatrow.Valueslength matchesrow.Headerslength in every processed row.
Error: Context Deadline Exceeded
- Cause: Large exports exceed the default HTTP client timeout or the polling loop runs indefinitely.
- Fix: Attach a context with a reasonable timeout (e.g., 10 minutes for export, 5 minutes for download) and propagate it through all goroutines. The
errgrouprespects context cancellation and aborts workers cleanly.