Generating NICE CXone Agent Assist Knowledge Snippets with Python
What You Will Build
- This service listens to live CXone interactions via WebSocket, extracts key phrases using a lightweight transformer model, queries a local vector database for semantically similar knowledge base resolutions, ranks results using a hybrid BM25 and cosine similarity score, and injects formatted assist cards into the agent desktop.
- The implementation relies on the NICE CXone Interactions WebSocket, Assist API, and Knowledge API.
- The code is written in Python 3.10+ using
websockets,sentence-transformers,chromadb,rank_bm25, andrequests.
Prerequisites
- CXone OAuth 2.0 Client Credentials grant configured with a valid client ID and client secret
- Required scopes:
interactions:read,assist:write,knowledge:read - CXone API version: v2
- Python 3.10+ runtime
- External dependencies:
websockets>=12.0,requests>=2.31.0,sentence-transformers>=2.2.0,chromadb>=0.4.0,rank-bm25>=0.2.2,scikit-learn>=1.3.0,transformers>=4.35.0,torch>=2.1.0,tenacity>=8.2.0
Authentication Setup
CXone uses OAuth 2.0 Client Credentials for machine-to-machine authentication. You must request a token from the tenant-specific OAuth endpoint and cache it until expiration. The token must include the required scopes for interactions, assist, and knowledge operations.
import time
import requests
from typing import Optional
class CXoneAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str):
self.tenant = tenant
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"https://{tenant}.cxone.com/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: float = 0.0
def get_token(self) -> str:
if self.access_token and time.time() < self.expires_at:
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "interactions:read assist:write knowledge:read"
}
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.expires_at = time.time() + (data.get("expires_in", 3600) - 300)
return self.access_token
def get_headers(self) -> dict:
return {"Authorization": f"Bearer {self.get_token()}", "Content-Type": "application/json"}
Implementation
Step 1: Subscribe to Active Interaction WebSockets
CXone streams real-time interaction events through a WebSocket endpoint. You must authenticate the connection using the Bearer token in the header. The service filters for active interactions and extracts the interaction ID and transcript data.
import asyncio
import json
import websockets
from typing import Callable, Dict, Any
async def stream_interactions(auth: CXoneAuth, on_interaction: Callable[[Dict[str, Any]], None]):
uri = f"wss://{auth.tenant}.cxone.com/api/v2/interactions/events"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Accept": "application/json"}
while True:
try:
async with websockets.connect(uri, additional_headers=headers, ping_interval=20, ping_timeout=10) as ws:
print("WebSocket connected to CXone interactions stream")
async for message in ws:
event = json.loads(message)
if event.get("type") == "interaction" and event.get("data", {}).get("state") == "active":
interaction_data = event["data"]
if "channels" in interaction_data:
# Extract transcript from voice or chat channel
transcript = ""
for channel in interaction_data["channels"]:
if "transcript" in channel:
transcript += channel["transcript"] + " "
if transcript.strip():
on_interaction({
"interaction_id": interaction_data["id"],
"transcript": transcript.strip()
})
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket disconnected: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
print(f"WebSocket error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
Step 2: Extract Key Phrases Using a Lightweight Transformer
A lightweight transformer model extracts key phrases by analyzing attention weights across tokens. You load the model once, pass the transcript through the encoder, and select tokens with the highest attention scores as key phrases. This approach avoids heavy NLP pipelines while maintaining semantic relevance.
import torch
from transformers import AutoTokenizer, AutoModel
from typing import List
class KeyPhraseExtractor:
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
self.model.eval()
def extract(self, text: str, top_k: int = 5) -> List[str]:
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model(**inputs, output_attentions=True)
# Average attention weights across layers and heads
attention_weights = torch.stack(outputs.attentions).mean(dim=0).squeeze(0)
token_ids = inputs["input_ids"].squeeze(0)
# Map attention to tokens, ignore padding and special tokens
scores = []
for i, (token_id, weight) in enumerate(zip(token_ids, attention_weights)):
if token_id not in [self.tokenizer.pad_token_id, self.tokenizer.cls_token_id, self.tokenizer.sep_token_id]:
scores.append((token_id, weight.item()))
# Sort by attention score and decode top tokens
scores.sort(key=lambda x: x[1], reverse=True)
key_phrases = [self.tokenizer.decode([tid]).strip() for tid, _ in scores[:top_k]]
return list(dict.fromkeys(key_phrases)) # Remove duplicates while preserving order
Step 3: Query Vector Database and Rank with Hybrid BM25 and Cosine
You store knowledge base articles in a vector database with precomputed embeddings. When a query arrives, you retrieve candidates using cosine similarity, compute BM25 scores against the extracted key phrases, and combine both scores using a weighted hybrid formula. This balances semantic relevance with lexical precision.
import numpy as np
from chromadb import Client as ChromaClient
from rank_bm25 import BM25Okapi
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from typing import List, Dict
class HybridRanker:
def __init__(self, chroma_path: str = "./cxone_kb_cache"):
self.chroma = ChromaClient(chroma_path)
self.collection = self.chroma.get_or_create_collection("kb_articles")
self.embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def query(self, key_phrases: List[str], transcript: str, top_k: int = 5) -> List[Dict]:
query_embedding = self.embedder.encode([transcript])
# Retrieve candidates by cosine similarity
results = self.collection.query(
query_embeddings=query_embedding.tolist(),
n_results=20,
include=["documents", "metadatas", "distances"]
)
if not results["documents"][0]:
return []
documents = results["documents"][0]
distances = results["distances"][0]
metadatas = results["metadatas"][0]
# Convert distances to cosine similarity (1 - distance)
cosine_scores = [1.0 - d for d in distances]
# Compute BM25 scores against key phrases
tokenized_docs = [doc.split() for doc in documents]
tokenized_query = key_phrases
bm25 = BM25Okapi(tokenized_docs)
bm25_scores = bm25.get_scores(tokenized_query)
# Normalize BM25 scores to 0-1 range
max_bm25 = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
normalized_bm25 = [s / max_bm25 for s in bm25_scores]
# Hybrid scoring: 60% cosine, 40% BM25
hybrid_scores = [0.6 * c + 0.4 * b for c, b in zip(cosine_scores, normalized_bm25)]
# Rank and format results
ranked = sorted(zip(documents, metadatas, hybrid_scores), key=lambda x: x[2], reverse=True)
return [
{"title": m.get("title", "Unknown"), "content": doc, "score": round(score, 4)}
for doc, m, score in ranked[:top_k]
]
Step 4: Inject Formatted Cards via the Assist API
CXone accepts assist messages through a POST request to the interaction-specific endpoint. You format the ranked results into card objects and send them with the interaction ID. The API returns a 200 status on success. You must handle 429 responses with exponential backoff.
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class AssistInjector:
def __init__(self, auth: CXoneAuth):
self.auth = auth
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(requests.exceptions.HTTPError))
def send_cards(self, interaction_id: str, results: List[Dict]) -> bool:
base_url = f"https://{self.auth.tenant}.cxone.com/api/v2/interactions/{interaction_id}/assist"
cards = []
for i, item in enumerate(results, 1):
cards.append({
"type": "card",
"content": {
"title": f"KB Result {i}",
"body": f"**{item['title']}**\n{item['content'][:200]}...",
"score": item["score"]
}
})
payload = {"messages": cards}
headers = self.auth.get_headers()
response = requests.post(base_url, json=payload, headers=headers, timeout=15)
response.raise_for_status()
return response.status_code == 200
Step 5: Implement Cache Invalidation on Knowledge Base Updates
Knowledge base articles change frequently. You poll the CXone Knowledge API with conditional headers to detect modifications. When the server returns a 200 or 206 status, you invalidate the local vector database and rebuild the cache with fresh embeddings.
import threading
from typing import Optional
class KBCacheManager:
def __init__(self, auth: CXoneAuth, ranker: HybridRanker, poll_interval: int = 300):
self.auth = auth
self.ranker = ranker
self.poll_interval = poll_interval
self.last_modified: Optional[str] = None
self.thread = threading.Thread(target=self._poll_loop, daemon=True)
self.thread.start()
def _poll_loop(self):
while True:
try:
self._check_for_updates()
except Exception as e:
print(f"KB cache polling error: {e}")
time.sleep(self.poll_interval)
def _check_for_updates(self):
url = f"https://{self.auth.tenant}.cxone.com/api/v2/knowledge/articles"
headers = self.auth.get_headers()
if self.last_modified:
headers["If-Modified-Since"] = self.last_modified
response = requests.get(url, headers=headers, timeout=15)
if response.status_code in (200, 206):
self.last_modified = response.headers.get("Last-Modified")
articles = response.json().get("items", [])
self._rebuild_cache(articles)
print(f"KB cache invalidated and rebuilt with {len(articles)} articles")
elif response.status_code == 304:
pass # No changes
else:
response.raise_for_status()
def _rebuild_cache(self, articles: list):
self.ranker.collection.delete(where=None)
if not articles:
return
texts = [a.get("content", "") for a in articles]
embeddings = self.ranker.embedder.encode(texts, show_progress_bar=False)
metadatas = [{"title": a.get("title", ""), "id": a.get("id", "")} for a in articles]
ids = [f"kb_{i}" for i in range(len(articles))]
self.ranker.collection.add(
ids=ids,
documents=texts,
embeddings=embeddings.tolist(),
metadatas=metadatas
)
Complete Working Example
The following script combines all components into a single runnable service. Replace the placeholder credentials with your CXone tenant values.
import asyncio
import sys
from typing import Dict, Any
# Import all classes from previous sections
# (In production, structure these across modules)
def handle_interaction(interaction: Dict[str, Any], extractor: KeyPhraseExtractor,
ranker: HybridRanker, injector: AssistInjector):
print(f"Processing interaction {interaction['interaction_id']}")
key_phrases = extractor.extract(interaction["transcript"], top_k=5)
print(f"Key phrases: {key_phrases}")
results = ranker.query(key_phrases, interaction["transcript"], top_k=3)
if results:
success = injector.send_cards(interaction["interaction_id"], results)
print(f"Assist cards injected: {success}")
def main():
tenant = "your-tenant"
client_id = "your-client-id"
client_secret = "your-client-secret"
auth = CXoneAuth(tenant, client_id, client_secret)
extractor = KeyPhraseExtractor()
ranker = HybridRanker()
injector = AssistInjector(auth)
cache_manager = KBCacheManager(auth, ranker, poll_interval=600)
# Initial cache build if empty
if ranker.collection.count() == 0:
print("Initial KB cache is empty. Waiting for first polling cycle...")
time.sleep(5)
async def run():
await stream_interactions(auth, lambda i: handle_interaction(i, extractor, ranker, injector))
try:
asyncio.run(run())
except KeyboardInterrupt:
print("Service shutting down.")
sys.exit(0)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or the client credentials are invalid.
- Fix: Verify the client ID and secret match a registered CXone OAuth client. Ensure the token refresh logic checks
expires_atbefore each request. TheCXoneAuthclass handles automatic refresh.
Error: 403 Forbidden
- Cause: The OAuth token lacks required scopes.
- Fix: Update the
scopeparameter in the token request to includeinteractions:read assist:write knowledge:read. Regenerate the token after modifying the client configuration in the CXone admin console.
Error: 429 Too Many Requests
- Cause: The Assist API enforces rate limits per tenant. Rapid interaction volumes can trigger throttling.
- Fix: Implement exponential backoff. The
@retrydecorator inAssistInjectorhandles this automatically. Increase themultipliervalue if your tenant has stricter limits.
Error: WebSocket Connection Closed Unexpectedly
- Cause: Network instability or token expiration during long-lived connections.
- Fix: The
stream_interactionsfunction includes a reconnection loop with ping/pong keep-alives. Ensure your firewall allows outbound WebSocket traffic on port 443.
Error: Dimensionality Mismatch in ChromaDB
- Cause: The vector database expects embeddings of a specific dimension (384 for
all-MiniLM-L6-v2). - Fix: Ensure the same transformer model generates embeddings during cache building and querying. Delete the
./cxone_kb_cachedirectory and restart if dimensions conflict.