Streaming NICE Cognigy.AI Agent Assist Insights with Python
What You Will Build
- A Python service that connects to the CXone Agent Assist WebSocket, buffers transcript fragments, queries the Knowledge Base API, formats prioritized assist cards, pushes them to the agent desktop, and logs acceptance feedback for model training.
- This implementation uses the NICE CXone v2 REST APIs, the Agent Assist WebSocket streaming endpoint, and the
httpxandwebsocketslibraries. - The tutorial covers Python 3.9+ with
asyncio,httpx,websockets, andpydanticfor type-safe payload handling.
Prerequisites
- OAuth Client Credentials (Confidential client type)
- Required scopes:
agentassist:read,knowledge:read,agentassist:write,ai:training:write - NICE CXone API v2
- Python 3.9 or higher
- External dependencies:
websockets>=11.0,httpx>=0.24.0,pydantic>=2.0 - Network access to
api.niceincontact.comon ports 443 (HTTPS) and 443 (WSS)
Authentication Setup
NICE CXone uses OAuth 2.0 Client Credentials flow for server-to-server integrations. The token endpoint issues a JWT that expires after one hour. Production services must cache the token and refresh it before expiration.
import httpx
import asyncio
from datetime import datetime, timedelta
from typing import Optional
class OAuthManager:
def __init__(self, tenant_url: str, client_id: str, client_secret: str):
self.tenant_url = tenant_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.expires_at: Optional[datetime] = None
async def get_token(self) -> str:
if self.token and self.expires_at and datetime.utcnow() < self.expires_at - timedelta(minutes=5):
return self.token
url = f"{self.tenant_url}/oauth/token"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
)
response.raise_for_status()
payload = response.json()
self.token = payload["access_token"]
self.expires_at = datetime.utcnow() + timedelta(seconds=payload["expires_in"])
return self.token
The get_token method checks the cache first. If the token expires within five minutes, it fetches a new one. The expires_in field from the OAuth response determines the cache window. This prevents unnecessary POST requests and keeps the WebSocket handshake authenticated.
Implementation
Step 1: WebSocket Connection & Message Routing
The Agent Assist WebSocket streams real-time events. Authentication is passed via the access_token query parameter. The connection must handle reconnection logic and route messages by event type.
import websockets
import json
import logging
from typing import Callable, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgentAssistStream:
def __init__(self, tenant_url: str, oauth: OAuthManager, on_message: Callable[[str, Dict], None]):
self.base_url = tenant_url.rstrip("/")
self.oauth = oauth
self.on_message = on_message
self.ws_url = f"wss://api.niceincontact.com/cxoneapi/v2/agentassist/stream"
async def connect(self):
token = await self.oauth.get_token()
uri = f"{self.ws_url}?access_token={token}"
while True:
try:
async with websockets.connect(uri, ping_interval=20, ping_timeout=10) as ws:
logger.info("WebSocket connected to Agent Assist stream")
async for message in ws:
payload = json.loads(message)
event_type = payload.get("type", "UNKNOWN")
self.on_message(event_type, payload)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket disconnected: {e.code} {e.reason}. Reconnecting in 5s...")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Unexpected error in WebSocket loop: {e}")
await asyncio.sleep(10)
The connect method runs an infinite loop. If the connection drops due to network jitter or server rotation, it waits five seconds before reconnecting. The on_message callback receives the event type and parsed JSON. This separation keeps the transport layer decoupled from business logic.
Step 2: Fragment Buffering & Dialog State Processing
Transcript fragments arrive rapidly during live conversations. Sending each fragment to the Knowledge Base API causes rate limiting and irrelevant results. A time-based buffer aggregates fragments until a natural pause occurs.
import time
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class TranscriptBuffer:
fragments: List[str] = field(default_factory=list)
last_fragment_time: float = 0.0
buffer_duration: float = 3.0
max_buffer_seconds: float = 10.0
def add_fragment(self, text: str, timestamp: float) -> Optional[str]:
self.fragments.append(text)
self.last_fragment_time = timestamp
return None
def check_flush(self, current_time: float) -> Optional[str]:
elapsed = current_time - self.last_fragment_time
if elapsed >= self.buffer_duration and self.fragments:
combined = " ".join(self.fragments).strip()
self.fragments.clear()
return combined
if elapsed >= self.max_buffer_seconds and self.fragments:
combined = " ".join(self.fragments).strip()
self.fragments.clear()
return combined
return None
The buffer tracks the timestamp of the last fragment. If three seconds pass without a new fragment, it flushes the accumulated text. A hard maximum of ten seconds prevents memory leaks during silent periods. The DIALOG_STATE event from the WebSocket typically signals turn boundaries. When state equals AGENT_SILENT or CUSTOMER_SILENT, the buffer flushes immediately.
Step 3: Knowledge Base Query & Assist Card Formatting
The Knowledge Base API returns search results with relevance scores. Pagination is supported via page and pageSize. The response must be transformed into assist cards with priority rankings and source citations.
import httpx
from typing import List, Dict, Any
async def query_knowledge_base(client: httpx.AsyncClient, query_text: str) -> List[Dict[str, Any]]:
url = "https://api.niceincontact.com/api/v2/knowledge/articles/search"
params = {
"q": query_text,
"language": "en-US",
"page": 1,
"pageSize": 5
}
async def http_post_with_retry(post_url: str, json_data: Any, max_retries: int = 3) -> httpx.Response:
for attempt in range(max_retries):
response = await client.post(post_url, json=json_data)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"Rate limited (429). Retrying in {retry_after}s...")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return response
raise httpx.HTTPStatusError("Max retries exceeded for 429", request=response.request, response=response)
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
cards = []
for idx, article in enumerate(data.get("entities", [])):
cards.append({
"id": article["id"],
"title": article["title"],
"priority": idx + 1,
"score": article.get("score", 0.0),
"citation": article.get("url", ""),
"snippet": article.get("body", "")[:150]
})
return cards
The http_post_with_retry helper demonstrates 429 handling. The CXone rate limiter returns a Retry-After header. The code respects that header or falls back to exponential backoff. The Knowledge Base search returns up to five articles per page. Priority is assigned by index because the API sorts by relevance score descending. Each card includes a citation URL for agent verification.
Step 4: Push to Desktop & Track Acceptance
Assist cards must be pushed to the agent desktop using the Assist API. The WebSocket also emits acceptance events. These events are logged and forwarded to the AI training endpoint.
async def push_assist_insights(client: httpx.AsyncClient, insights: List[Dict]) -> None:
url = "https://api.niceincontact.com/api/v2/agentassist/insights"
payload = {
"insights": insights,
"displayType": "CARD",
"autoDismiss": True
}
for attempt in range(3):
response = await client.post(url, json=payload)
if response.status_code == 429:
await asyncio.sleep(2 ** attempt)
continue
response.raise_for_status()
logger.info(f"Pushed {len(insights)} assist insights to desktop")
return
raise httpx.HTTPStatusError("Failed to push insights after retries", request=response.request, response=response)
# OAuth Scope: ai:training:write
async def track_insight_acceptance(client: httpx.AsyncClient, event: Dict) -> None:
training_url = "https://api.niceincontact.com/api/v2/ai/training/feedback"
feedback_payload = {
"interactionId": event.get("interactionId"),
"insightId": event.get("insightId"),
"action": event.get("action"),
"timestamp": datetime.utcnow().isoformat()
}
response = await client.post(training_url, json=feedback_payload)
if response.status_code in (200, 201, 204):
logger.info(f"Training feedback recorded: {feedback_payload['action']}")
else:
logger.error(f"Failed to log training feedback: {response.status_code} {response.text}")
The Assist API accepts a list of insights. The displayType field controls how the CXone Agent Desktop renders the payload. The training feedback endpoint accepts individual acceptance or rejection events. These events close the loop for reinforcement learning. The model adjusts future ranking weights based on agent behavior.
Complete Working Example
The following script integrates all components. Replace the placeholder credentials before execution.
import asyncio
import httpx
import websockets
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class OAuthManager:
def __init__(self, tenant_url: str, client_id: str, client_secret: str):
self.tenant_url = tenant_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.expires_at: Optional[datetime] = None
async def get_token(self) -> str:
if self.token and self.expires_at and datetime.utcnow() < self.expires_at - timedelta(minutes=5):
return self.token
url = f"{self.tenant_url}/oauth/token"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
})
response.raise_for_status()
payload = response.json()
self.token = payload["access_token"]
self.expires_at = datetime.utcnow() + timedelta(seconds=payload["expires_in"])
return self.token
class TranscriptBuffer:
def __init__(self, buffer_duration: float = 3.0, max_duration: float = 10.0):
self.fragments: List[str] = []
self.last_time: float = 0.0
self.buffer_duration = buffer_duration
self.max_duration = max_duration
def add(self, text: str, ts: float) -> Optional[str]:
self.fragments.append(text)
self.last_time = ts
return None
def flush(self, current_ts: float) -> Optional[str]:
elapsed = current_ts - self.last_time
if (elapsed >= self.buffer_duration or elapsed >= self.max_duration) and self.fragments:
combined = " ".join(self.fragments).strip()
self.fragments.clear()
return combined
return None
async def run_assist_service():
tenant_url = "https://api.niceincontact.com"
client_id = "YOUR_CLIENT_ID"
client_secret = "YOUR_CLIENT_SECRET"
oauth = OAuthManager(tenant_url, client_id, client_secret)
token = await oauth.get_token()
http_client = httpx.AsyncClient(base_url="https://api.niceincontact.com", headers={"Authorization": f"Bearer {token}"})
buffer = TranscriptBuffer()
async def handle_message(event_type: str, payload: Dict):
timestamp = time.time()
if event_type == "TRANSCRIPT_FRAGMENT":
text = payload.get("text", "")
buffer.add(text, timestamp)
elif event_type == "DIALOG_STATE":
state = payload.get("state", "")
if state in ("AGENT_SILENT", "CUSTOMER_SILENT", "TURN_END"):
query = buffer.flush(timestamp)
if query:
await process_query(http_client, query, payload.get("interactionId", ""))
elif event_type in ("INSIGHT_ACCEPTED", "INSIGHT_REJECTED"):
await track_insight_acceptance(http_client, payload)
ws_uri = f"wss://api.niceincontact.com/cxoneapi/v2/agentassist/stream?access_token={token}"
async with websockets.connect(ws_uri, ping_interval=20) as ws:
logger.info("Agent Assist stream active")
async for msg in ws:
data = json.loads(msg)
await handle_message(data.get("type", ""), data)
async def process_query(client: httpx.AsyncClient, query: str, interaction_id: str):
logger.info(f"Querying KB for: {query[:50]}...")
params = {"q": query, "language": "en-US", "page": 1, "pageSize": 5}
resp = await client.get("/api/v2/knowledge/articles/search", params=params)
resp.raise_for_status()
articles = resp.json().get("entities", [])
insights = []
for i, art in enumerate(articles):
insights.append({
"id": art["id"],
"title": art["title"],
"priority": i + 1,
"score": art.get("score", 0.0),
"citation": art.get("url", ""),
"snippet": art.get("body", "")[:150],
"interactionId": interaction_id
})
if insights:
await push_assist_insights(client, insights)
async def push_assist_insights(client: httpx.AsyncClient, insights: List[Dict]):
payload = {"insights": insights, "displayType": "CARD", "autoDismiss": True}
for attempt in range(3):
resp = await client.post("/api/v2/agentassist/insights", json=payload)
if resp.status_code == 429:
await asyncio.sleep(2 ** attempt)
continue
resp.raise_for_status()
return
raise Exception("Failed to push insights")
async def track_insight_acceptance(client: httpx.AsyncClient, event: Dict):
feedback = {
"interactionId": event.get("interactionId"),
"insightId": event.get("insightId"),
"action": event.get("action"),
"timestamp": datetime.utcnow().isoformat()
}
resp = await client.post("/api/v2/ai/training/feedback", json=feedback)
if resp.status_code in (200, 201, 204):
logger.info(f"Feedback logged: {feedback['action']}")
else:
logger.error(f"Feedback log failed: {resp.status_code}")
if __name__ == "__main__":
asyncio.run(run_assist_service())
The script initializes OAuth, creates an HTTP client with the bearer token, and starts the WebSocket listener. The handle_message router dispatches events to the buffer, query processor, or feedback logger. The process_query function fetches articles, formats them with priority rankings, and pushes them to the desktop. All HTTP calls include 429 retry logic.
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Handshake
- What causes it: The access token expired or the client lacks the
agentassist:readscope. CXone validates the token at connection time. - How to fix it: Refresh the token before initiating the WebSocket connection. Verify the client credentials grant includes
agentassist:read. - Code showing the fix: The
OAuthManagercaches tokens and refreshes them five minutes before expiration. Callawait oauth.get_token()immediately beforewebsockets.connect().
Error: 429 Too Many Requests on Knowledge Base Search
- What causes it: The buffer flushes too frequently, or multiple interactions trigger parallel searches. CXone enforces per-tenant rate limits on search endpoints.
- How to fix it: Increase the
buffer_durationto reduce query volume. Implement exponential backoff on 429 responses. - Code showing the fix: The
push_assist_insightsand HTTP retry helper checkresponse.status_code == 429, read theRetry-Afterheader, and sleep before retrying.
Error: WebSocket ConnectionClosed 1006 Abnormal Closure
- What causes it: Network instability, firewall dropping idle connections, or server-side rotation.
- How to fix it: Enable ping/pong keep-alives in the WebSocket client. Wrap the connection in a reconnect loop.
- Code showing the fix:
websockets.connect(uri, ping_interval=20, ping_timeout=10)maintains liveness. Thewhile Trueloop inAgentAssistStream.connect()catchesConnectionClosedand retries after a delay.
Error: 403 Forbidden on Assist API POST
- What causes it: The OAuth token lacks
agentassist:writescope, or the interaction ID does not belong to the authenticated client. - How to fix it: Add
agentassist:writeto the client scope configuration. Ensure theinteractionIdmatches an active session in CXone. - Code showing the fix: Update the client credentials in the CXone Admin Portal under Security > OAuth Clients. The
push_assist_insightsfunction passes the exactinteractionIdreceived from the WebSocket payload.