Implementing Seamless Bot-to-Agent Handoff in NICE CXone Web Messaging with Python

Implementing Seamless Bot-to-Agent Handoff in NICE CXone Web Messaging with Python

What You Will Build

  • The code establishes a persistent WebSocket connection to stream NICE CXone interaction events, detects bot handoff triggers, fetches conversation history, updates interaction context, and executes a queue transfer.
  • This uses the CXone Interaction API, Messaging API, and real-time WebSocket event stream.
  • The implementation uses Python 3.10+ with httpx and websockets.

Prerequisites

  • OAuth client type: Client Credentials flow
  • Required scopes: interactions:read, interactions:write, messaging:read, context:write
  • API version: CXone REST API v2, Interaction WebSocket v1
  • Runtime: Python 3.10 or higher
  • External dependencies: httpx>=0.25.0, websockets>=12.0, pydantic>=2.0

Authentication Setup

CXone server-to-server integrations require OAuth 2.0 Client Credentials. The following function handles token acquisition and implements a simple TTL-based cache to avoid unnecessary credential exchanges.

import httpx
import time
import asyncio
from typing import Optional

CXONE_BASE_URL = "https://platform.niceincontact.com"
CXONE_OAUTH_URL = f"{CXONE_BASE_URL}/oauth/token"

class CXoneAuthManager:
    def __init__(self, client_id: str, client_secret: str, scopes: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0

    async def get_token(self, client: httpx.AsyncClient) -> str:
        if self.access_token and time.time() < self.token_expiry - 60:
            return self.access_token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scopes
        }

        response = await client.post(CXONE_OAUTH_URL, data=payload)
        response.raise_for_status()
        data = response.json()

        self.access_token = data["access_token"]
        self.token_expiry = time.time() + data["expires_in"]
        return self.access_token

The get_token method checks the stored token against a sixty-second safety buffer before requesting a new one. This prevents race conditions where a token expires mid-request. The required scopes for this tutorial are interactions:read interactions:write messaging:read context:write.

Implementation

Step 1: WebSocket Client & Reconnection Logic

CXone streams interaction state changes via a WebSocket endpoint. The connection requires an initial authentication message followed by a subscription to the interaction topic. Network instability requires exponential backoff with jitter to prevent thundering herd scenarios.

import websockets
import json
import random
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CXoneInteractionStream:
    def __init__(self, auth_manager: CXoneAuthManager, on_event: callable):
        self.auth_manager = auth_manager
        self.on_event = on_event
        self.ws_url = f"wss://platform.niceincontact.com/interaction/v1/events"
        self.backoff_base = 2.0
        self.max_backoff = 60.0

    async def connect_and_subscribe(self, client: httpx.AsyncClient):
        token = await self.auth_manager.get_token(client)
        async for ws in websockets.connect(self.ws_url, ping_interval=20, ping_timeout=10):
            try:
                # Authenticate connection
                auth_msg = json.dumps({"action": "authenticate", "token": token})
                await ws.send(auth_msg)
                await ws.recv()  # Expect success acknowledgment

                # Subscribe to interaction events
                sub_msg = json.dumps({"action": "subscribe", "topic": "interactions"})
                await ws.send(sub_msg)
                logger.info("Subscribed to CXone interaction stream.")

                # Listen for events
                async for raw_message in ws:
                    event = json.loads(raw_message)
                    await self.on_event(event)

            except websockets.exceptions.ConnectionClosed as e:
                logger.warning(f"WebSocket closed: {e.code} {e.reason}. Reconnecting...")
            except Exception as e:
                logger.error(f"Unexpected error in WebSocket loop: {e}")
            
            # Exponential backoff with jitter
            delay = min(self.backoff_base ** random.randint(1, 3), self.max_backoff)
            await asyncio.sleep(delay)

The websockets library handles ping/pong keep-alives automatically. The backoff logic uses a randomized exponent to distribute reconnection attempts across time. The on_event callback processes incoming interaction state updates.

Step 2: Monitor Bot Interaction States & Detect Triggers

The event stream delivers JSON payloads containing interaction metadata. You must filter for web messaging sessions, verify bot activity, and evaluate handoff triggers. The following handler checks for low confidence scores or explicit transfer requests.

class HandoffDetector:
    def __init__(self, http_client: httpx.AsyncClient):
        self.client = http_client

    async def evaluate_event(self, event: dict) -> bool:
        if event.get("type") != "webchat":
            return False

        interaction_id = event.get("id")
        state = event.get("state")
        
        # Monitor only active bot sessions
        if state not in ("bot_active", "awaiting_response"):
            return False

        # Fetch full interaction details to evaluate triggers
        response = await self.client.get(
            f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}",
            headers={"Authorization": f"Bearer {self.client.headers.get('Authorization')}"}
        )
        if response.status_code == 401:
            # Token expired mid-flight, refresh and retry
            raise Exception("Token expired. Refresh required.")
        response.raise_for_status()

        interaction = response.json()
        attributes = interaction.get("attributes", {})
        bot_confidence = attributes.get("bot_confidence_score")
        user_requested_transfer = attributes.get("user_requested_transfer")

        # Trigger condition: confidence below threshold or explicit request
        if bot_confidence is not None and bot_confidence < 0.35:
            logger.info(f"Trigger detected: Low confidence ({bot_confidence}) for {interaction_id}")
            return True
        if user_requested_transfer:
            logger.info(f"Trigger detected: Explicit transfer request for {interaction_id}")
            return True

        return False

The handler fetches the complete interaction object via GET /api/v2/interactions/{interactionId} to access custom attributes. The required scope is interactions:read. The handler returns True when a handoff condition is met, signaling the next step to execute.

Step 3: Preserve Chat History & Update Context

Before transferring, you must bundle the conversation history and update the interaction context with the handoff reason. CXone supports pagination for message retrieval. The following function fetches all messages, paginates through results, and patches the context.

async def fetch_and_bundle_history(
    client: httpx.AsyncClient, 
    interaction_id: str, 
    token: str
) -> list:
    headers = {"Authorization": f"Bearer {token}"}
    messages = []
    cursor = None
    limit = 100

    while True:
        params = {"limit": limit}
        if cursor:
            params["cursor"] = cursor

        response = await client.get(
            f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/messages",
            headers=headers,
            params=params
        )
        response.raise_for_status()
        data = response.json()
        messages.extend(data.get("entities", []))

        cursor = data.get("nextPageToken")
        if not cursor:
            break

    return messages

async def update_handoff_context(
    client: httpx.AsyncClient, 
    interaction_id: str, 
    token: str, 
    reason: str
) -> None:
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    payload = {
        "attributes": {
            "handoff_reason": reason,
            "handoff_timestamp": time.time(),
            "source": "bot_confidence_threshold"
        }
    }

    response = await client.patch(
        f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context",
        headers=headers,
        json=payload
    )
    
    # Handle rate limiting with retry logic
    attempts = 0
    while response.status_code == 429 and attempts < 3:
        retry_after = float(response.headers.get("Retry-After", 2.0))
        logger.warning(f"Rate limited on context update. Waiting {retry_after}s")
        await asyncio.sleep(retry_after)
        response = await client.patch(
            f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context",
            headers=headers,
            json=payload
        )
        attempts += 1

    response.raise_for_status()

The message fetch uses GET /api/v2/interactions/{interactionId}/messages with interactions:read and messaging:read scopes. Pagination relies on the nextPageToken field. The context update uses PATCH /api/v2/interactions/{interactionId}/context with the context:write scope. The retry loop explicitly handles 429 Too Many Requests by reading the Retry-After header.

Step 4: Execute Transfer to Agent Queue

The final step constructs the transfer payload and submits it to the CXone transfer endpoint. The payload includes the target queue, bundled history, and transfer metadata.

async def execute_transfer(
    client: httpx.AsyncClient,
    interaction_id: str,
    token: str,
    queue_id: str,
    history: list,
    reason: str
) -> dict:
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    # Bundle history into transfer payload
    # CXone accepts a condensed history array containing message type, author, and text
    condensed_history = [
        {
            "type": msg.get("type", "text"),
            "author": msg.get("author", "unknown"),
            "content": msg.get("content", ""),
            "timestamp": msg.get("timestamp", "")
        }
        for msg in history
    ]

    transfer_payload = {
        "target": {
            "type": "queue",
            "id": queue_id
        },
        "context": {
            "handoff_reason": reason,
            "preserved_history_count": len(condensed_history)
        },
        "history": condensed_history
    }

    response = await client.post(
        f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer",
        headers=headers,
        json=transfer_payload
    )

    # Handle common transfer errors
    if response.status_code == 409:
        logger.warning(f"Interaction {interaction_id} is already transferred or completed.")
        return {"status": "skipped", "reason": "already_transferred"}
    if response.status_code == 429:
        retry_after = float(response.headers.get("Retry-After", 2.0))
        logger.warning(f"Rate limited on transfer. Waiting {retry_after}s")
        await asyncio.sleep(retry_after)
        response = await client.post(
            f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer",
            headers=headers,
            json=transfer_payload
        )

    response.raise_for_status()
    return response.json()

The endpoint POST /api/v2/interactions/{interactionId}/transfer requires interactions:write. The payload structure matches CXone’s transfer schema. A 409 Conflict response indicates the interaction is already in a terminal or transferred state. The retry logic handles 429 responses before raising an exception.

Complete Working Example

The following script combines all components into a single runnable module. Replace the placeholder credentials and queue ID before execution.

import asyncio
import logging
import time
import httpx
import websockets
import json
import random

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

CXONE_BASE_URL = "https://platform.niceincontact.com"
CXONE_OAUTH_URL = f"{CXONE_BASE_URL}/oauth/token"
TARGET_QUEUE_ID = "your_target_queue_id_here"
BOT_HANDOFF_REASON = "low_confidence_threshold"

class CXoneAuthManager:
    def __init__(self, client_id: str, client_secret: str, scopes: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.access_token: str | None = None
        self.token_expiry: float = 0.0

    async def get_token(self, client: httpx.AsyncClient) -> str:
        if self.access_token and time.time() < self.token_expiry - 60:
            return self.access_token
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scopes
        }
        response = await client.post(CXONE_OAUTH_URL, data=payload)
        response.raise_for_status()
        data = response.json()
        self.access_token = data["access_token"]
        self.token_expiry = time.time() + data["expires_in"]
        return self.access_token

async def fetch_and_bundle_history(client: httpx.AsyncClient, interaction_id: str, token: str) -> list:
    headers = {"Authorization": f"Bearer {token}"}
    messages = []
    cursor = None
    while True:
        params = {"limit": 100}
        if cursor:
            params["cursor"] = cursor
        response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/messages", headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        messages.extend(data.get("entities", []))
        cursor = data.get("nextPageToken")
        if not cursor:
            break
    return messages

async def update_handoff_context(client: httpx.AsyncClient, interaction_id: str, token: str, reason: str) -> None:
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    payload = {"attributes": {"handoff_reason": reason, "handoff_timestamp": time.time(), "source": "bot_confidence_threshold"}}
    response = await client.patch(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context", headers=headers, json=payload)
    attempts = 0
    while response.status_code == 429 and attempts < 3:
        retry_after = float(response.headers.get("Retry-After", 2.0))
        logger.warning(f"Rate limited on context update. Waiting {retry_after}s")
        await asyncio.sleep(retry_after)
        response = await client.patch(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/context", headers=headers, json=payload)
        attempts += 1
    response.raise_for_status()

async def execute_transfer(client: httpx.AsyncClient, interaction_id: str, token: str, queue_id: str, history: list, reason: str) -> dict:
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    condensed_history = [{"type": m.get("type", "text"), "author": m.get("author", "unknown"), "content": m.get("content", ""), "timestamp": m.get("timestamp", "")} for m in history]
    transfer_payload = {"target": {"type": "queue", "id": queue_id}, "context": {"handoff_reason": reason, "preserved_history_count": len(condensed_history)}, "history": condensed_history}
    response = await client.post(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer", headers=headers, json=transfer_payload)
    if response.status_code == 409:
        logger.warning(f"Interaction {interaction_id} is already transferred or completed.")
        return {"status": "skipped", "reason": "already_transferred"}
    if response.status_code == 429:
        retry_after = float(response.headers.get("Retry-After", 2.0))
        logger.warning(f"Rate limited on transfer. Waiting {retry_after}s")
        await asyncio.sleep(retry_after)
        response = await client.post(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}/transfer", headers=headers, json=transfer_payload)
    response.raise_for_status()
    return response.json()

async def handle_bot_handoff(event: dict, client: httpx.AsyncClient, auth: CXoneAuthManager) -> None:
    if event.get("type") != "webchat":
        return
    interaction_id = event.get("id")
    state = event.get("state")
    if state not in ("bot_active", "awaiting_response"):
        return

    token = await auth.get_token(client)
    headers = {"Authorization": f"Bearer {token}"}
    
    response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}", headers=headers)
    if response.status_code == 401:
        token = await auth.get_token(client)
        headers["Authorization"] = f"Bearer {token}"
        response = await client.get(f"{CXONE_BASE_URL}/api/v2/interactions/{interaction_id}", headers=headers)
    response.raise_for_status()
    
    interaction = response.json()
    attributes = interaction.get("attributes", {})
    bot_confidence = attributes.get("bot_confidence_score")
    user_requested = attributes.get("user_requested_transfer")

    if (bot_confidence is not None and bot_confidence < 0.35) or user_requested:
        logger.info(f"Handoff trigger confirmed for {interaction_id}")
        history = await fetch_and_bundle_history(client, interaction_id, token)
        await update_handoff_context(client, interaction_id, token, BOT_HANDOFF_REASON)
        result = await execute_transfer(client, interaction_id, token, TARGET_QUEUE_ID, history, BOT_HANDOFF_REASON)
        logger.info(f"Transfer result: {result}")

async def stream_interactions(auth: CXoneAuthManager, client: httpx.AsyncClient) -> None:
    ws_url = f"wss://platform.niceincontact.com/interaction/v1/events"
    backoff_base = 2.0
    max_backoff = 60.0

    async for ws in websockets.connect(ws_url, ping_interval=20, ping_timeout=10):
        try:
            token = await auth.get_token(client)
            await ws.send(json.dumps({"action": "authenticate", "token": token}))
            await ws.recv()
            await ws.send(json.dumps({"action": "subscribe", "topic": "interactions"}))
            logger.info("Subscribed to CXone interaction stream.")
            async for raw in ws:
                await handle_bot_handoff(json.loads(raw), client, auth)
        except websockets.exceptions.ConnectionClosed as e:
            logger.warning(f"WebSocket closed: {e.code} {e.reason}. Reconnecting...")
        except Exception as e:
            logger.error(f"WebSocket error: {e}")
        delay = min(backoff_base ** random.randint(1, 3), max_backoff)
        await asyncio.sleep(delay)

async def main():
    client_id = "YOUR_CLIENT_ID"
    client_secret = "YOUR_CLIENT_SECRET"
    scopes = "interactions:read interactions:write messaging:read context:write"
    
    auth = CXoneAuthManager(client_id, client_secret, scopes)
    async with httpx.AsyncClient(timeout=30.0) as client:
        await auth.get_token(client)
        await stream_interactions(auth, client)

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token expired during a long-running WebSocket session or between API calls.
  • Fix: Implement token TTL checking before every request. The CXoneAuthManager class handles this automatically. If a 401 occurs mid-operation, catch the exception, refresh the token, and retry the exact same request.
  • Code fix: Wrap API calls in a retry loop that checks for 401 and calls await auth.get_token(client) before repeating.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the required scope for the specific endpoint.
  • Fix: Verify the scope parameter in the OAuth request matches the endpoint requirements. Transfer operations require interactions:write. Context updates require context:write. Message retrieval requires messaging:read.
  • Code fix: Update the scopes string in main() to include all necessary permissions. CXone rejects partial scope matches strictly.

Error: 429 Too Many Requests

  • Cause: CXone enforces rate limits per tenant and per endpoint. Bursting transfers or message fetches triggers throttling.
  • Fix: Read the Retry-After header from the response. Implement exponential backoff. The provided execute_transfer and update_handoff_context functions include explicit 429 handling loops.
  • Code fix: Never ignore 429. Always pause execution for the duration specified in Retry-After. Add jitter to avoid synchronized retries across multiple worker processes.

Error: WebSocket 1006 or 1011

  • Cause: Network instability, proxy interference, or CXone platform maintenance closes the connection abruptly.
  • Fix: The websockets library raises ConnectionClosed. The outer async for ws in websockets.connect(...) loop catches this and triggers the backoff logic. Ensure ping_interval and ping_timeout are configured to detect dead connections early.
  • Code fix: Keep ping_interval=20 and ping_timeout=10 in websockets.connect(). This forces the server to acknowledge liveness every twenty seconds and tears down dead connections within ten seconds of failure.

Official References