Curating NICE CXone Conversational Training Data with Python

Curating NICE CXone Conversational Training Data with Python

What You Will Build

A production-grade Python module that extracts bot interaction logs from NICE CXone, filters conversations by user satisfaction ratings, isolates user utterances and bot responses, removes near-duplicate phrases using cosine similarity on sentence embeddings, and exports a structured CSV dataset with metadata tags. The script uses the CXone Interactions API v2 and implements incremental synchronization via timestamp tracking. Python 3.9+ and the requests library are used throughout.

Prerequisites

  • OAuth Client Type: Confidential client (Client Credentials Grant)
  • Required Scopes: interactions:read, analytics:read
  • API Version: CXone REST API v2 (/api/v2/interactions)
  • Runtime: Python 3.9 or higher
  • Dependencies: requests, sentence-transformers, numpy, torch (required by the embedding model)
  • Installation: pip install requests sentence-transformers numpy torch

Authentication Setup

NICE CXone uses OAuth 2.0 for API authentication. The Client Credentials flow is appropriate for server-to-server data extraction. The token endpoint resides at https://{instance}.cxone.com/oauth/token. You must cache the access token and handle expiration gracefully to avoid unnecessary token requests during pagination loops.

import requests
import time
import json
from typing import Optional

class CXoneAuthClient:
    def __init__(self, instance: str, client_id: str, client_secret: str):
        self.instance = instance
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = f"https://{instance}.cxone.com/oauth/token"
        self._token: Optional[str] = None
        self._token_expiry: float = 0.0

    def get_access_token(self) -> str:
        if self._token and time.time() < self._token_expiry - 60:
            return self._token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "interactions:read analytics:read"
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        response = requests.post(self.token_url, data=payload, headers=headers)
        response.raise_for_status()
        
        token_data = response.json()
        self._token = token_data["access_token"]
        self._token_expiry = time.time() + token_data["expires_in"]
        return self._token

    def build_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_access_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

The get_access_token method checks the cached token against its expiration timestamp. It subtracts sixty seconds as a safety buffer to prevent mid-request authentication failures. The build_headers method attaches the token and standard JSON headers to every subsequent API call.

Implementation

Step 1: Initialize Client and Fetch Interactions with Pagination

The CXone Interactions API returns paginated results. You must loop through pages until the returned count reaches zero or matches the total count. The API enforces rate limits, so you must implement exponential backoff for HTTP 429 responses. The following code establishes a session, applies retry logic, and fetches interactions incrementally.

import requests
import time
from typing import List, Dict, Any

class CXoneInteractionFetcher:
    def __init__(self, auth_client: CXoneAuthClient, instance: str):
        self.auth = auth_client
        self.base_url = f"https://{instance}.cxone.com/api/v2/interactions"
        self.session = requests.Session()
        self.session.headers.update(self.auth.build_headers())

    def _request_with_retry(self, url: str, params: dict, max_retries: int = 5) -> requests.Response:
        for attempt in range(max_retries):
            response = self.session.get(url, params=params)
            
            if response.status_code == 429:
                retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
                time.sleep(retry_after)
                continue
            elif response.status_code == 401:
                self.session.headers.update(self.auth.build_headers())
                continue
            elif response.status_code >= 500:
                time.sleep(2 ** attempt)
                continue
                
            return response
            
        raise Exception("Max retries exceeded for API request")

    def fetch_interactions(self, last_sync_timestamp: Optional[str] = None, page_size: int = 200) -> List[Dict[str, Any]]:
        all_interactions = []
        page = 1
        params = {
            "pageSize": page_size,
            "page": page,
            "sort": "timestamp desc",
            "returnCount": "true"
        }
        
        if last_sync_timestamp:
            params["filter"] = f"timestamp gt '{last_sync_timestamp}'"
            
        while True:
            response = self._request_with_retry(self.base_url, params)
            response.raise_for_status()
            
            data = response.json()
            interactions = data.get("interactions", [])
            all_interactions.extend(interactions)
            
            total_count = int(data.get("totalCount", 0))
            if len(all_interactions) >= total_count or len(interactions) == 0:
                break
                
            page += 1
            params["page"] = page
            time.sleep(0.5)  # Polite rate limiting
            
        return all_interactions

Expected HTTP Request:

GET /api/v2/interactions?pageSize=200&page=1&sort=timestamp desc&returnCount=true&filter=timestamp gt '2023-10-01T00:00:00.000Z' HTTP/1.1
Host: your-instance.cxone.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Accept: application/json

Expected Response Snippet:

{
  "interactions": [
    {
      "id": "int_8f3a2b1c",
      "timestamp": "2023-10-05T14:22:10.000Z",
      "botId": "bot_k9x2m",
      "rating": 5,
      "messages": [
        {"from": "user", "text": "I need to reset my password", "timestamp": "2023-10-05T14:22:10.000Z"},
        {"from": "bot", "text": "I can help with that. Please verify your email address.", "timestamp": "2023-10-05T14:22:11.000Z"}
      ]
    }
  ],
  "totalCount": 1450
}

The filter=timestamp gt '{last_sync_timestamp}' parameter enables incremental synchronization. The returnCount=true parameter provides the total dataset size, allowing the loop to terminate precisely. The retry logic handles 429 rate limits, 401 token expiration, and 5xx server errors with exponential backoff.

Step 2: Filter by Rating and Extract Conversational Turns

Raw CXone interactions contain metadata, system events, and multi-turn exchanges. You must isolate high-quality training data by filtering on the rating field and pairing user utterances with the immediate bot response. This step constructs a flat list of conversational turns.

from typing import List, Dict, Any, Tuple

def extract_turns(interactions: List[Dict[str, Any]], min_rating: int = 4) -> List[Dict[str, Any]]:
    structured_turns = []
    
    for interaction in interactions:
        rating = interaction.get("rating")
        if rating is None or rating < min_rating:
            continue
            
        messages = interaction.get("messages", [])
        user_utterance = None
        
        for msg in messages:
            sender = msg.get("from", "")
            text = msg.get("text", "").strip()
            
            if not text or sender not in ("user", "bot"):
                continue
                
            if sender == "user":
                user_utterance = text
            elif sender == "bot" and user_utterance:
                structured_turns.append({
                    "interaction_id": interaction.get("id"),
                    "timestamp": interaction.get("timestamp"),
                    "bot_id": interaction.get("botId"),
                    "rating": rating,
                    "user_utterance": user_utterance,
                    "bot_response": text
                })
                user_utterance = None
                
    return structured_turns

The function iterates through each interaction and skips records lacking a rating or falling below the threshold. It tracks the most recent user message and pairs it with the subsequent bot message. This pairing strategy ensures the dataset contains direct conversational exchanges rather than fragmented logs. The user_utterance variable resets after each successful pair to prevent cross-turn contamination.

Step 3: Deduplicate Utterances Using Cosine Similarity

Training datasets require distinct examples. Near-duplicate phrases waste compute resources and bias model training. You will generate sentence embeddings using a lightweight transformer model, compute pairwise cosine similarity, and filter out redundant entries.

import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any

def deduplicate_turns(turns: List[Dict[str, Any]], similarity_threshold: float = 0.85, model_name: str = "all-MiniLM-L6-v2") -> List[Dict[str, Any]]:
    if not turns:
        return []
        
    user_texts = [t["user_utterance"] for t in turns]
    model = SentenceTransformer(model_name)
    embeddings = model.encode(user_texts, convert_to_numpy=True, show_progress_bar=False)
    
    normalized_embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    similarity_matrix = np.dot(normalized_embeddings, normalized_embeddings.T)
    
    keep_mask = np.ones(len(turns), dtype=bool)
    for i in range(len(turns)):
        if not keep_mask[i]:
            continue
        for j in range(i + 1, len(turns)):
            if similarity_matrix[i, j] >= similarity_threshold:
                keep_mask[j] = False
                turns[j]["similarity_score"] = round(float(similarity_matrix[i, j]), 4)
                turns[j]["dedup_group"] = i
                
    filtered_turns = [t for t, keep in zip(turns, keep_mask) if keep]
    for t in filtered_turns:
        t["dedup_group"] = filtered_turns.index(t)
        t["similarity_score"] = 1.0
        
    return filtered_turns

The function loads the all-MiniLM-L6-v2 model, which balances speed and accuracy for semantic similarity. It normalizes embeddings to unit length, which allows dot products to directly yield cosine similarity values. The nested loop marks duplicates by setting keep_mask to False when similarity exceeds the threshold. The deduplicated list retains metadata tags for traceability.

Step 4: Export Cleaned Data and Persist Sync State

The final step writes the structured records to a CSV file and saves the latest timestamp to a state file. This enables incremental synchronization on subsequent runs.

import csv
import json
from typing import List, Dict, Any, Optional

def export_and_sync_state(
    turns: List[Dict[str, Any]], 
    output_csv: str, 
    state_file: str, 
    last_sync_ts: Optional[str] = None
) -> str:
    if not turns:
        return last_sync_ts or ""
        
    fieldnames = [
        "interaction_id", "timestamp", "bot_id", "rating",
        "user_utterance", "bot_response", "similarity_score", "dedup_group"
    ]
    
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(turns)
        
    latest_timestamp = max(t["timestamp"] for t in turns)
    
    state_data = {
        "last_sync_timestamp": latest_timestamp,
        "records_exported": len(turns),
        "export_path": output_csv
    }
    
    with open(state_file, "w", encoding="utf-8") as f:
        json.dump(state_data, f, indent=2)
        
    return latest_timestamp

The function writes a standard CSV with explicit column headers. It calculates the maximum timestamp from the current batch and persists it to a JSON state file. The returned timestamp feeds directly into the filter=timestamp gt '{...}' parameter on the next execution cycle.

Complete Working Example

import requests
import time
import json
import csv
import numpy as np
from typing import List, Dict, Any, Optional
from sentence_transformers import SentenceTransformer

class CXoneAuthClient:
    def __init__(self, instance: str, client_id: str, client_secret: str):
        self.instance = instance
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = f"https://{instance}.cxone.com/oauth/token"
        self._token: Optional[str] = None
        self._token_expiry: float = 0.0

    def get_access_token(self) -> str:
        if self._token and time.time() < self._token_expiry - 60:
            return self._token
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "interactions:read analytics:read"
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = requests.post(self.token_url, data=payload, headers=headers)
        response.raise_for_status()
        token_data = response.json()
        self._token = token_data["access_token"]
        self._token_expiry = time.time() + token_data["expires_in"]
        return self._token

    def build_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_access_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

class CXoneInteractionFetcher:
    def __init__(self, auth_client: CXoneAuthClient, instance: str):
        self.auth = auth_client
        self.base_url = f"https://{instance}.cxone.com/api/v2/interactions"
        self.session = requests.Session()
        self.session.headers.update(self.auth.build_headers())

    def _request_with_retry(self, url: str, params: dict, max_retries: int = 5) -> requests.Response:
        for attempt in range(max_retries):
            response = self.session.get(url, params=params)
            if response.status_code == 429:
                retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
                time.sleep(retry_after)
                continue
            elif response.status_code == 401:
                self.session.headers.update(self.auth.build_headers())
                continue
            elif response.status_code >= 500:
                time.sleep(2 ** attempt)
                continue
            return response
        raise Exception("Max retries exceeded for API request")

    def fetch_interactions(self, last_sync_timestamp: Optional[str] = None, page_size: int = 200) -> List[Dict[str, Any]]:
        all_interactions = []
        page = 1
        params = {"pageSize": page_size, "page": page, "sort": "timestamp desc", "returnCount": "true"}
        if last_sync_timestamp:
            params["filter"] = f"timestamp gt '{last_sync_timestamp}'"
        while True:
            response = self._request_with_retry(self.base_url, params)
            response.raise_for_status()
            data = response.json()
            interactions = data.get("interactions", [])
            all_interactions.extend(interactions)
            total_count = int(data.get("totalCount", 0))
            if len(all_interactions) >= total_count or len(interactions) == 0:
                break
            page += 1
            params["page"] = page
            time.sleep(0.5)
        return all_interactions

def extract_turns(interactions: List[Dict[str, Any]], min_rating: int = 4) -> List[Dict[str, Any]]:
    structured_turns = []
    for interaction in interactions:
        rating = interaction.get("rating")
        if rating is None or rating < min_rating:
            continue
        messages = interaction.get("messages", [])
        user_utterance = None
        for msg in messages:
            sender = msg.get("from", "")
            text = msg.get("text", "").strip()
            if not text or sender not in ("user", "bot"):
                continue
            if sender == "user":
                user_utterance = text
            elif sender == "bot" and user_utterance:
                structured_turns.append({
                    "interaction_id": interaction.get("id"),
                    "timestamp": interaction.get("timestamp"),
                    "bot_id": interaction.get("botId"),
                    "rating": rating,
                    "user_utterance": user_utterance,
                    "bot_response": text
                })
                user_utterance = None
    return structured_turns

def deduplicate_turns(turns: List[Dict[str, Any]], similarity_threshold: float = 0.85) -> List[Dict[str, Any]]:
    if not turns:
        return []
    user_texts = [t["user_utterance"] for t in turns]
    model = SentenceTransformer("all-MiniLM-L6-v2")
    embeddings = model.encode(user_texts, convert_to_numpy=True, show_progress_bar=False)
    normalized_embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    similarity_matrix = np.dot(normalized_embeddings, normalized_embeddings.T)
    keep_mask = np.ones(len(turns), dtype=bool)
    for i in range(len(turns)):
        if not keep_mask[i]:
            continue
        for j in range(i + 1, len(turns)):
            if similarity_matrix[i, j] >= similarity_threshold:
                keep_mask[j] = False
                turns[j]["similarity_score"] = round(float(similarity_matrix[i, j]), 4)
                turns[j]["dedup_group"] = i
    filtered_turns = [t for t, keep in zip(turns, keep_mask) if keep]
    for idx, t in enumerate(filtered_turns):
        t["dedup_group"] = idx
        t["similarity_score"] = 1.0
    return filtered_turns

def export_and_sync_state(turns: List[Dict[str, Any]], output_csv: str, state_file: str) -> Optional[str]:
    if not turns:
        return None
    fieldnames = ["interaction_id", "timestamp", "bot_id", "rating", "user_utterance", "bot_response", "similarity_score", "dedup_group"]
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(turns)
    latest_timestamp = max(t["timestamp"] for t in turns)
    state_data = {"last_sync_timestamp": latest_timestamp, "records_exported": len(turns), "export_path": output_csv}
    with open(state_file, "w", encoding="utf-8") as f:
        json.dump(state_data, f, indent=2)
    return latest_timestamp

def load_state(state_file: str) -> Optional[str]:
    try:
        with open(state_file, "r", encoding="utf-8") as f:
            data = json.load(f)
            return data.get("last_sync_timestamp")
    except FileNotFoundError:
        return None

if __name__ == "__main__":
    INSTANCE = "your-instance"
    CLIENT_ID = "your-client-id"
    CLIENT_SECRET = "your-client-secret"
    STATE_FILE = "sync_state.json"
    OUTPUT_CSV = "cxone_training_data.csv"
    
    auth = CXoneAuthClient(INSTANCE, CLIENT_ID, CLIENT_SECRET)
    fetcher = CXoneInteractionFetcher(auth, INSTANCE)
    
    last_ts = load_state(STATE_FILE)
    print(f"Fetching interactions since: {last_ts or 'beginning of time'}")
    
    interactions = fetcher.fetch_interactions(last_sync_timestamp=last_ts)
    turns = extract_turns(interactions, min_rating=4)
    clean_turns = deduplicate_turns(turns, similarity_threshold=0.85)
    
    new_ts = export_and_sync_state(clean_turns, OUTPUT_CSV, STATE_FILE)
    print(f"Exported {len(clean_turns)} records. Next sync target: {new_ts}")

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token expired during a long pagination loop or the client credentials are incorrect.
Fix: The retry logic automatically refreshes headers on 401 responses. Verify your client_id and client_secret match the CXone administration console exactly. Ensure the scope string includes interactions:read.

Error: 403 Forbidden

Cause: The OAuth client lacks the required scope or the user role assigned to the client does not have read permissions on interactions.
Fix: Navigate to the CXone OAuth client configuration and add interactions:read to the allowed scopes. Verify the associated user role includes “View Interaction Logs” or equivalent analytics permissions.

Error: 429 Too Many Requests

Cause: CXone enforces strict rate limits per tenant. Pagination loops without delays trigger cascade blocks.
Fix: The _request_with_retry method implements exponential backoff and respects the Retry-After header. Do not remove the time.sleep(0.5) between pages. If you require higher throughput, request an API rate limit increase from NICE support.

Error: Dimension Mismatch in Cosine Similarity

Cause: The sentence-transformers model outputs embeddings of a fixed dimension. If you switch models without updating the normalization logic, dot products will fail.
Fix: The code normalizes vectors along axis=1 before computing np.dot. This guarantees unit length vectors regardless of model output dimensions. Keep the model name consistent across runs to maintain embedding space compatibility.

Error: Filter Syntax Rejection

Cause: CXone uses OData-style filtering. Incorrect timestamp formatting breaks the query.
Fix: Always use ISO 8601 format with Z suffix: 2023-10-05T14:22:10.000Z. The state file preserves exact API timestamps, preventing format drift during incremental syncs.

Official References