Implementing Seamless Bot-to-Agent Handoff in NICE CXone Web Messaging with Python
What You Will Build
- The code establishes a persistent WebSocket connection to stream NICE CXone interaction events, detects bot handoff triggers, fetches conversation history, updates interaction context, and executes a queue transfer.
- This uses the CXone Interaction API, Messaging API, and real-time WebSocket event stream.
- The implementation uses Python 3.10+ with
httpxandwebsockets.
Prerequisites
- OAuth client type: Client Credentials flow
- Required scopes:
interactions:read,interactions:write,messaging:read,context:write - API version: CXone REST API v2, Interaction WebSocket v1
- Runtime: Python 3.10 or higher
- External dependencies:
httpx>=0.25.0,websockets>=12.0,pydantic>=2.0
Authentication Setup
CXone server-to-server integrations require OAuth 2.0 Client Credentials. The following function handles token acquisition and implements a simple TTL-based cache to avoid unnecessary credential exchanges.
import httpx
import time
import asyncio
from typing import Optional
CXONE_BASE_URL = "https://platform.niceincontact.com"
CXONE_OAUTH_URL = f"{CXONE_BASE_URL}/oauth/token"
class CXoneAuthManager:
def __init__(self, client_id: str, client_secret: str, scopes: str):
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
async def get_token(self, client: httpx.AsyncClient) -> str:
if self.access_token and time.time() < self.token_expiry - 60:
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scopes
}
response = await client.post(CXONE_OAUTH_URL, data=payload)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.access_token
The get_token method checks the stored token against a sixty-second safety buffer before requesting a new one. This prevents race conditions where a token expires mid-request. The required scopes for this tutorial are interactions:read interactions:write messaging:read context:write.
Implementation
Step 1: WebSocket Client & Reconnection Logic
CXone streams interaction state changes via a WebSocket endpoint. The connection requires an initial authentication message followed by a subscription to the interaction topic. Network instability requires exponential backoff with jitter to prevent thundering herd scenarios.
import websockets
import json
import random
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CXoneInteractionStream:
def __init__(self, auth_manager: CXoneAuthManager, on_event: callable):
self.auth_manager = auth_manager
self.on_event = on_event
self.ws_url = f"wss://platform.niceincontact.com/interaction/v1/events"
self.backoff_base = 2.0
self.max_backoff = 60.0
async def connect_and_subscribe(self, client: httpx.AsyncClient):
token = await self.auth_manager.get_token(client)
async for ws in websockets.connect(self.ws_url, ping_interval=20, ping_timeout=10):
try:
# Authenticate connection
auth_msg = json.dumps({"action": "authenticate", "token": token})
await ws.send(auth_msg)
await ws.recv() # Expect success acknowledgment
# Subscribe to interaction events
sub_msg = json.dumps({"action": "subscribe", "topic": "interactions"})
await ws.send(sub_msg)
logger.info("Subscribed to CXone interaction stream.")
# Listen for events
async for raw_message in ws:
event = json.loads(raw_message)
await self.on_event(event)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket closed: {e.code} {e.reason}. Reconnecting...")
except Exception as e:
logger.error(f"Unexpected error in WebSocket loop: {e}")
# Exponential backoff with jitter
delay = min(self.backoff_base ** random.randint(1, 3), self.max_backoff)
await asyncio.sleep(delay)
The websockets library handles ping/pong keep-alives automatically. The backoff logic uses a randomized exponent to distribute reconnection attempts across time. The on_event callback processes incoming interaction state updates.
Step 2: Monitor Bot Interaction States & Detect Triggers
The event stream delivers JSON payloads containing interaction metadata. You must filter for web messaging sessions, verify bot activity, and evaluate handoff triggers. The following handler checks for low confidence scores or explicit transfer requests.
class HandoffDetector:
def __init__(self, http_client: httpx.AsyncClient):
self.client = http_client
async def evaluate_event(self, event: dict) -> bool:
if event.get("type") != "webchat":
return False
interaction_id = event.get("id")
state = event.get("state")
# Monitor only active bot sessions
if state not in ("bot_active", "awaiting_response"):
return False
# Fetch full interaction details to evaluate triggers
response = await self.client.get(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}",
headers={"Authorization": f"Bearer {self.client.headers.get('Authorization')}"}
)
if response.status_code == 401:
# Token expired mid-flight, refresh and retry
raise Exception("Token expired. Refresh required.")
response.raise_for_status()
interaction = response.json()
attributes = interaction.get("attributes", {})
bot_confidence = attributes.get("bot_confidence_score")
user_requested_transfer = attributes.get("user_requested_transfer")
# Trigger condition: confidence below threshold or explicit request
if bot_confidence is not None and bot_confidence < 0.35:
logger.info(f"Trigger detected: Low confidence ({bot_confidence}) for {interaction_id}")
return True
if user_requested_transfer:
logger.info(f"Trigger detected: Explicit transfer request for {interaction_id}")
return True
return False
The handler fetches the complete interaction object via GET /api/v2/interactions/{interactionId} to access custom attributes. The required scope is interactions:read. The handler returns True when a handoff condition is met, signaling the next step to execute.
Step 3: Preserve Chat History & Update Context
Before transferring, you must bundle the conversation history and update the interaction context with the handoff reason. CXone supports pagination for message retrieval. The following function fetches all messages, paginates through results, and patches the context.
async def fetch_and_bundle_history(
client: httpx.AsyncClient,
interaction_id: str,
token: str
) -> list:
headers = {"Authorization": f"Bearer {token}"}
messages = []
cursor = None
limit = 100
while True:
params = {"limit": limit}
if cursor:
params["cursor"] = cursor
response = await client.get(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/messages",
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
messages.extend(data.get("entities", []))
cursor = data.get("nextPageToken")
if not cursor:
break
return messages
async def update_handoff_context(
client: httpx.AsyncClient,
interaction_id: str,
token: str,
reason: str
) -> None:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"attributes": {
"handoff_reason": reason,
"handoff_timestamp": time.time(),
"source": "bot_confidence_threshold"
}
}
response = await client.patch(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context",
headers=headers,
json=payload
)
# Handle rate limiting with retry logic
attempts = 0
while response.status_code == 429 and attempts < 3:
retry_after = float(response.headers.get("Retry-After", 2.0))
logger.warning(f"Rate limited on context update. Waiting {retry_after}s")
await asyncio.sleep(retry_after)
response = await client.patch(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context",
headers=headers,
json=payload
)
attempts += 1
response.raise_for_status()
The message fetch uses GET /api/v2/interactions/{interactionId}/messages with interactions:read and messaging:read scopes. Pagination relies on the nextPageToken field. The context update uses PATCH /api/v2/interactions/{interactionId}/context with the context:write scope. The retry loop explicitly handles 429 Too Many Requests by reading the Retry-After header.
Step 4: Execute Transfer to Agent Queue
The final step constructs the transfer payload and submits it to the CXone transfer endpoint. The payload includes the target queue, bundled history, and transfer metadata.
async def execute_transfer(
client: httpx.AsyncClient,
interaction_id: str,
token: str,
queue_id: str,
history: list,
reason: str
) -> dict:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Bundle history into transfer payload
# CXone accepts a condensed history array containing message type, author, and text
condensed_history = [
{
"type": msg.get("type", "text"),
"author": msg.get("author", "unknown"),
"content": msg.get("content", ""),
"timestamp": msg.get("timestamp", "")
}
for msg in history
]
transfer_payload = {
"target": {
"type": "queue",
"id": queue_id
},
"context": {
"handoff_reason": reason,
"preserved_history_count": len(condensed_history)
},
"history": condensed_history
}
response = await client.post(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer",
headers=headers,
json=transfer_payload
)
# Handle common transfer errors
if response.status_code == 409:
logger.warning(f"Interaction {interaction_id} is already transferred or completed.")
return {"status": "skipped", "reason": "already_transferred"}
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", 2.0))
logger.warning(f"Rate limited on transfer. Waiting {retry_after}s")
await asyncio.sleep(retry_after)
response = await client.post(
f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer",
headers=headers,
json=transfer_payload
)
response.raise_for_status()
return response.json()
The endpoint POST /api/v2/interactions/{interactionId}/transfer requires interactions:write. The payload structure matches CXone’s transfer schema. A 409 Conflict response indicates the interaction is already in a terminal or transferred state. The retry logic handles 429 responses before raising an exception.
Complete Working Example
The following script combines all components into a single runnable module. Replace the placeholder credentials and queue ID before execution.
import asyncio
import logging
import time
import httpx
import websockets
import json
import random
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
CXONE_BASE_URL = "https://platform.niceincontact.com"
CXONE_OAUTH_URL = f"{CXONE_BASE_URL}/oauth/token"
TARGET_QUEUE_ID = "your_target_queue_id_here"
BOT_HANDOFF_REASON = "low_confidence_threshold"
class CXoneAuthManager:
def __init__(self, client_id: str, client_secret: str, scopes: str):
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.access_token: str | None = None
self.token_expiry: float = 0.0
async def get_token(self, client: httpx.AsyncClient) -> str:
if self.access_token and time.time() < self.token_expiry - 60:
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scopes
}
response = await client.post(CXONE_OAUTH_URL, data=payload)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.access_token
async def fetch_and_bundle_history(client: httpx.AsyncClient, interaction_id: str, token: str) -> list:
headers = {"Authorization": f"Bearer {token}"}
messages = []
cursor = None
while True:
params = {"limit": 100}
if cursor:
params["cursor"] = cursor
response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/messages", headers=headers, params=params)
response.raise_for_status()
data = response.json()
messages.extend(data.get("entities", []))
cursor = data.get("nextPageToken")
if not cursor:
break
return messages
async def update_handoff_context(client: httpx.AsyncClient, interaction_id: str, token: str, reason: str) -> None:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"attributes": {"handoff_reason": reason, "handoff_timestamp": time.time(), "source": "bot_confidence_threshold"}}
response = await client.patch(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context", headers=headers, json=payload)
attempts = 0
while response.status_code == 429 and attempts < 3:
retry_after = float(response.headers.get("Retry-After", 2.0))
logger.warning(f"Rate limited on context update. Waiting {retry_after}s")
await asyncio.sleep(retry_after)
response = await client.patch(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context", headers=headers, json=payload)
attempts += 1
response.raise_for_status()
async def execute_transfer(client: httpx.AsyncClient, interaction_id: str, token: str, queue_id: str, history: list, reason: str) -> dict:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
condensed_history = [{"type": m.get("type", "text"), "author": m.get("author", "unknown"), "content": m.get("content", ""), "timestamp": m.get("timestamp", "")} for m in history]
transfer_payload = {"target": {"type": "queue", "id": queue_id}, "context": {"handoff_reason": reason, "preserved_history_count": len(condensed_history)}, "history": condensed_history}
response = await client.post(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer", headers=headers, json=transfer_payload)
if response.status_code == 409:
logger.warning(f"Interaction {interaction_id} is already transferred or completed.")
return {"status": "skipped", "reason": "already_transferred"}
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", 2.0))
logger.warning(f"Rate limited on transfer. Waiting {retry_after}s")
await asyncio.sleep(retry_after)
response = await client.post(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer", headers=headers, json=transfer_payload)
response.raise_for_status()
return response.json()
async def handle_bot_handoff(event: dict, client: httpx.AsyncClient, auth: CXoneAuthManager) -> None:
if event.get("type") != "webchat":
return
interaction_id = event.get("id")
state = event.get("state")
if state not in ("bot_active", "awaiting_response"):
return
token = await auth.get_token(client)
headers = {"Authorization": f"Bearer {token}"}
response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}", headers=headers)
if response.status_code == 401:
token = await auth.get_token(client)
headers["Authorization"] = f"Bearer {token}"
response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}", headers=headers)
response.raise_for_status()
interaction = response.json()
attributes = interaction.get("attributes", {})
bot_confidence = attributes.get("bot_confidence_score")
user_requested = attributes.get("user_requested_transfer")
if (bot_confidence is not None and bot_confidence < 0.35) or user_requested:
logger.info(f"Handoff trigger confirmed for {interaction_id}")
history = await fetch_and_bundle_history(client, interaction_id, token)
await update_handoff_context(client, interaction_id, token, BOT_HANDOFF_REASON)
result = await execute_transfer(client, interaction_id, token, TARGET_QUEUE_ID, history, BOT_HANDOFF_REASON)
logger.info(f"Transfer result: {result}")
async def stream_interactions(auth: CXoneAuthManager, client: httpx.AsyncClient) -> None:
ws_url = f"wss://platform.niceincontact.com/interaction/v1/events"
backoff_base = 2.0
max_backoff = 60.0
async for ws in websockets.connect(ws_url, ping_interval=20, ping_timeout=10):
try:
token = await auth.get_token(client)
await ws.send(json.dumps({"action": "authenticate", "token": token}))
await ws.recv()
await ws.send(json.dumps({"action": "subscribe", "topic": "interactions"}))
logger.info("Subscribed to CXone interaction stream.")
async for raw in ws:
await handle_bot_handoff(json.loads(raw), client, auth)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket closed: {e.code} {e.reason}. Reconnecting...")
except Exception as e:
logger.error(f"WebSocket error: {e}")
delay = min(backoff_base ** random.randint(1, 3), max_backoff)
await asyncio.sleep(delay)
async def main():
client_id = "YOUR_CLIENT_ID"
client_secret = "YOUR_CLIENT_SECRET"
scopes = "interactions:read interactions:write messaging:read context:write"
auth = CXoneAuthManager(client_id, client_secret, scopes)
async with httpx.AsyncClient(timeout=30.0) as client:
await auth.get_token(client)
await stream_interactions(auth, client)
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token expired during a long-running WebSocket session or between API calls.
- Fix: Implement token TTL checking before every request. The
CXoneAuthManagerclass handles this automatically. If a 401 occurs mid-operation, catch the exception, refresh the token, and retry the exact same request. - Code fix: Wrap API calls in a retry loop that checks for
401and callsawait auth.get_token(client)before repeating.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope for the specific endpoint.
- Fix: Verify the
scopeparameter in the OAuth request matches the endpoint requirements. Transfer operations requireinteractions:write. Context updates requirecontext:write. Message retrieval requiresmessaging:read. - Code fix: Update the
scopesstring inmain()to include all necessary permissions. CXone rejects partial scope matches strictly.
Error: 429 Too Many Requests
- Cause: CXone enforces rate limits per tenant and per endpoint. Bursting transfers or message fetches triggers throttling.
- Fix: Read the
Retry-Afterheader from the response. Implement exponential backoff. The providedexecute_transferandupdate_handoff_contextfunctions include explicit429handling loops. - Code fix: Never ignore
429. Always pause execution for the duration specified inRetry-After. Add jitter to avoid synchronized retries across multiple worker processes.
Error: WebSocket 1006 or 1011
- Cause: Network instability, proxy interference, or CXone platform maintenance closes the connection abruptly.
- Fix: The
websocketslibrary raisesConnectionClosed. The outerasync for ws in websockets.connect(...)loop catches this and triggers the backoff logic. Ensureping_intervalandping_timeoutare configured to detect dead connections early. - Code fix: Keep
ping_interval=20andping_timeout=10inwebsockets.connect(). This forces the server to acknowledge liveness every twenty seconds and tears down dead connections within ten seconds of failure.