Curating NICE CXone Conversational Training Data with Python
What You Will Build
A production-grade Python module that extracts bot interaction logs from NICE CXone, filters conversations by user satisfaction ratings, isolates user utterances and bot responses, removes near-duplicate phrases using cosine similarity on sentence embeddings, and exports a structured CSV dataset with metadata tags. The script uses the CXone Interactions API v2 and implements incremental synchronization via timestamp tracking. Python 3.9+ and the requests library are used throughout.
Prerequisites
- OAuth Client Type: Confidential client (Client Credentials Grant)
- Required Scopes:
interactions:read,analytics:read - API Version: CXone REST API v2 (
/api/v2/interactions) - Runtime: Python 3.9 or higher
- Dependencies:
requests,sentence-transformers,numpy,torch(required by the embedding model) - Installation:
pip install requests sentence-transformers numpy torch
Authentication Setup
NICE CXone uses OAuth 2.0 for API authentication. The Client Credentials flow is appropriate for server-to-server data extraction. The token endpoint resides at https://{instance}.cxone.com/oauth/token. You must cache the access token and handle expiration gracefully to avoid unnecessary token requests during pagination loops.
import requests
import time
import json
from typing import Optional
class CXoneAuthClient:
def __init__(self, instance: str, client_id: str, client_secret: str):
self.instance = instance
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"https://{instance}.cxone.com/oauth/token"
self._token: Optional[str] = None
self._token_expiry: float = 0.0
def get_access_token(self) -> str:
if self._token and time.time() < self._token_expiry - 60:
return self._token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "interactions:read analytics:read"
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(self.token_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
self._token_expiry = time.time() + token_data["expires_in"]
return self._token
def build_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
The get_access_token method checks the cached token against its expiration timestamp. It subtracts sixty seconds as a safety buffer to prevent mid-request authentication failures. The build_headers method attaches the token and standard JSON headers to every subsequent API call.
Implementation
Step 1: Initialize Client and Fetch Interactions with Pagination
The CXone Interactions API returns paginated results. You must loop through pages until the returned count reaches zero or matches the total count. The API enforces rate limits, so you must implement exponential backoff for HTTP 429 responses. The following code establishes a session, applies retry logic, and fetches interactions incrementally.
import requests
import time
from typing import List, Dict, Any
class CXoneInteractionFetcher:
def __init__(self, auth_client: CXoneAuthClient, instance: str):
self.auth = auth_client
self.base_url = f"https://{instance}.cxone.com/api/v2/interactions"
self.session = requests.Session()
self.session.headers.update(self.auth.build_headers())
def _request_with_retry(self, url: str, params: dict, max_retries: int = 5) -> requests.Response:
for attempt in range(max_retries):
response = self.session.get(url, params=params)
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
elif response.status_code == 401:
self.session.headers.update(self.auth.build_headers())
continue
elif response.status_code >= 500:
time.sleep(2 ** attempt)
continue
return response
raise Exception("Max retries exceeded for API request")
def fetch_interactions(self, last_sync_timestamp: Optional[str] = None, page_size: int = 200) -> List[Dict[str, Any]]:
all_interactions = []
page = 1
params = {
"pageSize": page_size,
"page": page,
"sort": "timestamp desc",
"returnCount": "true"
}
if last_sync_timestamp:
params["filter"] = f"timestamp gt '{last_sync_timestamp}'"
while True:
response = self._request_with_retry(self.base_url, params)
response.raise_for_status()
data = response.json()
interactions = data.get("interactions", [])
all_interactions.extend(interactions)
total_count = int(data.get("totalCount", 0))
if len(all_interactions) >= total_count or len(interactions) == 0:
break
page += 1
params["page"] = page
time.sleep(0.5) # Polite rate limiting
return all_interactions
Expected HTTP Request:
GET /api/v2/interactions?pageSize=200&page=1&sort=timestamp desc&returnCount=true&filter=timestamp gt '2023-10-01T00:00:00.000Z' HTTP/1.1
Host: your-instance.cxone.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Accept: application/json
Expected Response Snippet:
{
"interactions": [
{
"id": "int_8f3a2b1c",
"timestamp": "2023-10-05T14:22:10.000Z",
"botId": "bot_k9x2m",
"rating": 5,
"messages": [
{"from": "user", "text": "I need to reset my password", "timestamp": "2023-10-05T14:22:10.000Z"},
{"from": "bot", "text": "I can help with that. Please verify your email address.", "timestamp": "2023-10-05T14:22:11.000Z"}
]
}
],
"totalCount": 1450
}
The filter=timestamp gt '{last_sync_timestamp}' parameter enables incremental synchronization. The returnCount=true parameter provides the total dataset size, allowing the loop to terminate precisely. The retry logic handles 429 rate limits, 401 token expiration, and 5xx server errors with exponential backoff.
Step 2: Filter by Rating and Extract Conversational Turns
Raw CXone interactions contain metadata, system events, and multi-turn exchanges. You must isolate high-quality training data by filtering on the rating field and pairing user utterances with the immediate bot response. This step constructs a flat list of conversational turns.
from typing import List, Dict, Any, Tuple
def extract_turns(interactions: List[Dict[str, Any]], min_rating: int = 4) -> List[Dict[str, Any]]:
structured_turns = []
for interaction in interactions:
rating = interaction.get("rating")
if rating is None or rating < min_rating:
continue
messages = interaction.get("messages", [])
user_utterance = None
for msg in messages:
sender = msg.get("from", "")
text = msg.get("text", "").strip()
if not text or sender not in ("user", "bot"):
continue
if sender == "user":
user_utterance = text
elif sender == "bot" and user_utterance:
structured_turns.append({
"interaction_id": interaction.get("id"),
"timestamp": interaction.get("timestamp"),
"bot_id": interaction.get("botId"),
"rating": rating,
"user_utterance": user_utterance,
"bot_response": text
})
user_utterance = None
return structured_turns
The function iterates through each interaction and skips records lacking a rating or falling below the threshold. It tracks the most recent user message and pairs it with the subsequent bot message. This pairing strategy ensures the dataset contains direct conversational exchanges rather than fragmented logs. The user_utterance variable resets after each successful pair to prevent cross-turn contamination.
Step 3: Deduplicate Utterances Using Cosine Similarity
Training datasets require distinct examples. Near-duplicate phrases waste compute resources and bias model training. You will generate sentence embeddings using a lightweight transformer model, compute pairwise cosine similarity, and filter out redundant entries.
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any
def deduplicate_turns(turns: List[Dict[str, Any]], similarity_threshold: float = 0.85, model_name: str = "all-MiniLM-L6-v2") -> List[Dict[str, Any]]:
if not turns:
return []
user_texts = [t["user_utterance"] for t in turns]
model = SentenceTransformer(model_name)
embeddings = model.encode(user_texts, convert_to_numpy=True, show_progress_bar=False)
normalized_embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
similarity_matrix = np.dot(normalized_embeddings, normalized_embeddings.T)
keep_mask = np.ones(len(turns), dtype=bool)
for i in range(len(turns)):
if not keep_mask[i]:
continue
for j in range(i + 1, len(turns)):
if similarity_matrix[i, j] >= similarity_threshold:
keep_mask[j] = False
turns[j]["similarity_score"] = round(float(similarity_matrix[i, j]), 4)
turns[j]["dedup_group"] = i
filtered_turns = [t for t, keep in zip(turns, keep_mask) if keep]
for t in filtered_turns:
t["dedup_group"] = filtered_turns.index(t)
t["similarity_score"] = 1.0
return filtered_turns
The function loads the all-MiniLM-L6-v2 model, which balances speed and accuracy for semantic similarity. It normalizes embeddings to unit length, which allows dot products to directly yield cosine similarity values. The nested loop marks duplicates by setting keep_mask to False when similarity exceeds the threshold. The deduplicated list retains metadata tags for traceability.
Step 4: Export Cleaned Data and Persist Sync State
The final step writes the structured records to a CSV file and saves the latest timestamp to a state file. This enables incremental synchronization on subsequent runs.
import csv
import json
from typing import List, Dict, Any, Optional
def export_and_sync_state(
turns: List[Dict[str, Any]],
output_csv: str,
state_file: str,
last_sync_ts: Optional[str] = None
) -> str:
if not turns:
return last_sync_ts or ""
fieldnames = [
"interaction_id", "timestamp", "bot_id", "rating",
"user_utterance", "bot_response", "similarity_score", "dedup_group"
]
with open(output_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(turns)
latest_timestamp = max(t["timestamp"] for t in turns)
state_data = {
"last_sync_timestamp": latest_timestamp,
"records_exported": len(turns),
"export_path": output_csv
}
with open(state_file, "w", encoding="utf-8") as f:
json.dump(state_data, f, indent=2)
return latest_timestamp
The function writes a standard CSV with explicit column headers. It calculates the maximum timestamp from the current batch and persists it to a JSON state file. The returned timestamp feeds directly into the filter=timestamp gt '{...}' parameter on the next execution cycle.
Complete Working Example
import requests
import time
import json
import csv
import numpy as np
from typing import List, Dict, Any, Optional
from sentence_transformers import SentenceTransformer
class CXoneAuthClient:
def __init__(self, instance: str, client_id: str, client_secret: str):
self.instance = instance
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"https://{instance}.cxone.com/oauth/token"
self._token: Optional[str] = None
self._token_expiry: float = 0.0
def get_access_token(self) -> str:
if self._token and time.time() < self._token_expiry - 60:
return self._token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "interactions:read analytics:read"
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(self.token_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
self._token_expiry = time.time() + token_data["expires_in"]
return self._token
def build_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
class CXoneInteractionFetcher:
def __init__(self, auth_client: CXoneAuthClient, instance: str):
self.auth = auth_client
self.base_url = f"https://{instance}.cxone.com/api/v2/interactions"
self.session = requests.Session()
self.session.headers.update(self.auth.build_headers())
def _request_with_retry(self, url: str, params: dict, max_retries: int = 5) -> requests.Response:
for attempt in range(max_retries):
response = self.session.get(url, params=params)
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
elif response.status_code == 401:
self.session.headers.update(self.auth.build_headers())
continue
elif response.status_code >= 500:
time.sleep(2 ** attempt)
continue
return response
raise Exception("Max retries exceeded for API request")
def fetch_interactions(self, last_sync_timestamp: Optional[str] = None, page_size: int = 200) -> List[Dict[str, Any]]:
all_interactions = []
page = 1
params = {"pageSize": page_size, "page": page, "sort": "timestamp desc", "returnCount": "true"}
if last_sync_timestamp:
params["filter"] = f"timestamp gt '{last_sync_timestamp}'"
while True:
response = self._request_with_retry(self.base_url, params)
response.raise_for_status()
data = response.json()
interactions = data.get("interactions", [])
all_interactions.extend(interactions)
total_count = int(data.get("totalCount", 0))
if len(all_interactions) >= total_count or len(interactions) == 0:
break
page += 1
params["page"] = page
time.sleep(0.5)
return all_interactions
def extract_turns(interactions: List[Dict[str, Any]], min_rating: int = 4) -> List[Dict[str, Any]]:
structured_turns = []
for interaction in interactions:
rating = interaction.get("rating")
if rating is None or rating < min_rating:
continue
messages = interaction.get("messages", [])
user_utterance = None
for msg in messages:
sender = msg.get("from", "")
text = msg.get("text", "").strip()
if not text or sender not in ("user", "bot"):
continue
if sender == "user":
user_utterance = text
elif sender == "bot" and user_utterance:
structured_turns.append({
"interaction_id": interaction.get("id"),
"timestamp": interaction.get("timestamp"),
"bot_id": interaction.get("botId"),
"rating": rating,
"user_utterance": user_utterance,
"bot_response": text
})
user_utterance = None
return structured_turns
def deduplicate_turns(turns: List[Dict[str, Any]], similarity_threshold: float = 0.85) -> List[Dict[str, Any]]:
if not turns:
return []
user_texts = [t["user_utterance"] for t in turns]
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(user_texts, convert_to_numpy=True, show_progress_bar=False)
normalized_embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
similarity_matrix = np.dot(normalized_embeddings, normalized_embeddings.T)
keep_mask = np.ones(len(turns), dtype=bool)
for i in range(len(turns)):
if not keep_mask[i]:
continue
for j in range(i + 1, len(turns)):
if similarity_matrix[i, j] >= similarity_threshold:
keep_mask[j] = False
turns[j]["similarity_score"] = round(float(similarity_matrix[i, j]), 4)
turns[j]["dedup_group"] = i
filtered_turns = [t for t, keep in zip(turns, keep_mask) if keep]
for idx, t in enumerate(filtered_turns):
t["dedup_group"] = idx
t["similarity_score"] = 1.0
return filtered_turns
def export_and_sync_state(turns: List[Dict[str, Any]], output_csv: str, state_file: str) -> Optional[str]:
if not turns:
return None
fieldnames = ["interaction_id", "timestamp", "bot_id", "rating", "user_utterance", "bot_response", "similarity_score", "dedup_group"]
with open(output_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(turns)
latest_timestamp = max(t["timestamp"] for t in turns)
state_data = {"last_sync_timestamp": latest_timestamp, "records_exported": len(turns), "export_path": output_csv}
with open(state_file, "w", encoding="utf-8") as f:
json.dump(state_data, f, indent=2)
return latest_timestamp
def load_state(state_file: str) -> Optional[str]:
try:
with open(state_file, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("last_sync_timestamp")
except FileNotFoundError:
return None
if __name__ == "__main__":
INSTANCE = "your-instance"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
STATE_FILE = "sync_state.json"
OUTPUT_CSV = "cxone_training_data.csv"
auth = CXoneAuthClient(INSTANCE, CLIENT_ID, CLIENT_SECRET)
fetcher = CXoneInteractionFetcher(auth, INSTANCE)
last_ts = load_state(STATE_FILE)
print(f"Fetching interactions since: {last_ts or 'beginning of time'}")
interactions = fetcher.fetch_interactions(last_sync_timestamp=last_ts)
turns = extract_turns(interactions, min_rating=4)
clean_turns = deduplicate_turns(turns, similarity_threshold=0.85)
new_ts = export_and_sync_state(clean_turns, OUTPUT_CSV, STATE_FILE)
print(f"Exported {len(clean_turns)} records. Next sync target: {new_ts}")
Common Errors & Debugging
Error: 401 Unauthorized
Cause: The OAuth token expired during a long pagination loop or the client credentials are incorrect.
Fix: The retry logic automatically refreshes headers on 401 responses. Verify your client_id and client_secret match the CXone administration console exactly. Ensure the scope string includes interactions:read.
Error: 403 Forbidden
Cause: The OAuth client lacks the required scope or the user role assigned to the client does not have read permissions on interactions.
Fix: Navigate to the CXone OAuth client configuration and add interactions:read to the allowed scopes. Verify the associated user role includes “View Interaction Logs” or equivalent analytics permissions.
Error: 429 Too Many Requests
Cause: CXone enforces strict rate limits per tenant. Pagination loops without delays trigger cascade blocks.
Fix: The _request_with_retry method implements exponential backoff and respects the Retry-After header. Do not remove the time.sleep(0.5) between pages. If you require higher throughput, request an API rate limit increase from NICE support.
Error: Dimension Mismatch in Cosine Similarity
Cause: The sentence-transformers model outputs embeddings of a fixed dimension. If you switch models without updating the normalization logic, dot products will fail.
Fix: The code normalizes vectors along axis=1 before computing np.dot. This guarantees unit length vectors regardless of model output dimensions. Keep the model name consistent across runs to maintain embedding space compatibility.
Error: Filter Syntax Rejection
Cause: CXone uses OData-style filtering. Incorrect timestamp formatting breaks the query.
Fix: Always use ISO 8601 format with Z suffix: 2023-10-05T14:22:10.000Z. The state file preserves exact API timestamps, preventing format drift during incremental syncs.