Routing Cognigy Bot Intents to NICE CXone Queues via Webhooks

Routing Cognigy Bot Intents to NICE CXone Queues via Webhooks

What You Will Build

  • A Python script that listens for webhook payloads from NICE Cognigy, extracts the detected intent, and routes the conversation to a specific NICE CXone queue.
  • Integration uses the Cognigy Platform API for intent validation and the NICE CXone API v2 for queue assignment and conversation bridging.
  • The tutorial covers Python 3.9+ with fastapi and httpx for async processing.

Prerequisites

NICE Cognigy Configuration

  • A Cognigy Studio project with at least two defined Intents (e.g., sales_inquiry, support_ticket).
  • A Webhook Node configured to send a POST request to your endpoint on onIntentMatch.
  • The Webhook payload must include intentName, sessionId, and userInput.

NICE CXone Configuration

  • A NICE CXone organization with at least two Queues created (e.g., “Sales Support”, “Technical Support”).
  • An OAuth Client Credentials application created in the Developer Console.
  • Required Scopes: conversation:write, conversation:read, routing:write.

Development Environment

  • Python 3.9 or higher.
  • Dependencies: fastapi, uvicorn, httpx, pydantic.

Install dependencies:

pip install fastapi uvicorn httpx pydantic

Authentication Setup

NICE CXone uses OAuth 2.0 Client Credentials flow. Because the token expires after an hour, your application must handle token retrieval and caching. For this tutorial, we will implement a simple in-memory cache with a refresh timer.

Create a file named auth.py:

import httpx
import time
from typing import Optional

class CXoneAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "us"):
        self.client_id = client_id
        self.client_secret = client_secret
        # Determine base URL based on environment
        if environment == "us":
            self.base_url = "https://api.mypurecloud.com"
        elif environment == "eu":
            self.base_url = "https://api.euc1.pure.cloud"
        else:
            raise ValueError("Unsupported environment")
        
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    async def get_token(self) -> str:
        """
        Retrieves an OAuth token from CXone.
        Checks cache first. Refreshes if expired.
        """
        # Check if we have a valid token (add 5 minute buffer)
        if self.access_token and time.time() < (self.token_expiry - 300):
            return self.access_token

        # If token is expired or missing, fetch new one
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            
            if response.status_code != 200:
                raise Exception(f"Failed to get token: {response.text}")
            
            data = response.json()
            self.access_token = data["access_token"]
            self.token_expiry = time.time() + data["expires_in"]
            
            return self.access_token

    async def get_headers(self) -> dict:
        token = await self.get_token()
        return {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }

Implementation

Step 1: Define the Webhook Payload Structure

Cognigy sends a JSON payload when a webhook is triggered. You must define the expected structure to parse it correctly. The critical fields are intentName (to determine the queue) and sessionId (to link the CXone conversation).

Create models.py:

from pydantic import BaseModel
from typing import List, Optional

class CognigyUserInput(BaseModel):
    text: str
    channel: str

class CognigyIntent(BaseModel):
    name: str
    confidence: float

class CognigyWebhookPayload(BaseModel):
    sessionId: str
    userInput: CognigyUserInput
    intents: List[CognigyIntent]
    # Additional fields might exist, but these are required for routing
    timestamp: Optional[int] = None

Step 2: Map Intents to CXone Queue IDs

You cannot route to a queue by name directly in the API; you need the Queue ID. Hardcoding IDs is fragile. A robust solution fetches the queue ID once and caches it, or looks it up dynamically. For this tutorial, we will implement a dynamic lookup function that caches results in memory.

Create queue_mapper.py:

import httpx
import logging
from typing import Dict, Optional

logger = logging.getLogger(__name__)

class QueueMapper:
    def __init__(self, auth_service: 'CXoneAuth'):
        self.auth = auth_service
        self.queue_cache: Dict[str, str] = {} # Intent Name -> Queue ID
        # Define your mapping configuration here
        self.intent_to_queue_name = {
            "sales_inquiry": "Sales Support",
            "support_ticket": "Technical Support",
            "general_info": "General Inquiry"
        }

    async def get_queue_id(self, intent_name: str) -> Optional[str]:
        """
        Retrieves the CXone Queue ID for a given Cognigy Intent.
        """
        # Check if we already know the queue ID for this intent
        if intent_name in self.queue_cache:
            return self.queue_cache[intent_name]

        queue_name = self.intent_to_queue_name.get(intent_name)
        if not queue_name:
            logger.warning(f"No queue mapping found for intent: {intent_name}")
            return None

        # Fetch from CXone API if not in cache
        queue_id = await self._fetch_queue_id_by_name(queue_name)
        if queue_id:
            self.queue_cache[intent_name] = queue_id
        
        return queue_id

    async def _fetch_queue_id_by_name(self, queue_name: str) -> Optional[str]:
        """
        Searches CXone for a queue by name.
        Endpoint: GET /api/v2/routing/queues
        Scope: routing:read
        """
        headers = await self.auth.get_headers()
        url = f"{self.auth.base_url}/api/v2/routing/queues"
        
        # CXone returns paginated results. We fetch the first page.
        # In production, implement pagination if you have >100 queues.
        params = {
            "pageSize": 100,
            "pageNumber": 1,
            "name": queue_name
        }

        async with httpx.AsyncClient() as client:
            response = await client.get(url, headers=headers, params=params)
            
            if response.status_code == 429:
                # Handle Rate Limiting
                wait_time = int(response.headers.get("Retry-After", 5))
                logger.warning(f"Rate limited. Waiting {wait_time} seconds.")
                import asyncio
                await asyncio.sleep(wait_time)
                return await self._fetch_queue_id_by_name(queue_name)

            if response.status_code != 200:
                logger.error(f"Failed to fetch queues: {response.text}")
                return None

            data = response.json()
            entities = data.get("entities", [])
            
            for queue in entities:
                if queue["name"] == queue_name:
                    return queue["id"]
            
            logger.error(f"Queue '{queue_name}' not found in CXone.")
            return None

Step 3: Process the Webhook and Route the Conversation

This is the core logic. When Cognigy hits your endpoint, you must:

  1. Validate the payload.
  2. Determine the highest-confidence intent.
  3. Get the corresponding CXone Queue ID.
  4. Update the CXone Conversation to assign it to that queue.

Note: Cognigy does not automatically create a CXone conversation. You must have a mechanism to create the conversation first (e.g., via a CXone Webchat widget that triggers the Cognigy bot, or a separate API call). This tutorial assumes the CXone Conversation ID is passed from Cognigy via the sessionId or a custom variable. If you are bridging from a CXone Webchat, the sessionId in Cognigy often maps to the CXone conversationId.

If you are starting a new conversation from Cognigy, you would use POST /api/v2/conversations. Here, we assume the conversation exists and we are routing it.

Create main.py:

from fastapi import FastAPI, HTTPException, Request
import httpx
import logging
from models import CognigyWebhookPayload
from auth import CXoneAuth
from queue_mapper import QueueMapper

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Cognigy to CXone Router")

# Initialize Services
# Replace with your actual credentials
CXONE_CLIENT_ID = "your_cxone_client_id"
CXONE_CLIENT_SECRET = "your_cxone_client_secret"
CXONE_ENV = "us"

auth_service = CXoneAuth(CXONE_CLIENT_ID, CXONE_CLIENT_SECRET, CXONE_ENV)
queue_mapper = QueueMapper(auth_service)

@app.post("/webhook/cognigy/route")
async def handle_cognigy_webhook(payload: CognigyWebhookPayload):
    """
    Receives intent data from Cognigy and routes the CXone conversation.
    """
    logger.info(f"Received webhook for session: {payload.sessionId}")
    
    # 1. Determine the dominant intent
    if not payload.intents:
        raise HTTPException(status_code=400, detail="No intents detected in payload")
    
    # Sort by confidence descending
    dominant_intent = max(payload.intents, key=lambda x: x.confidence)
    intent_name = dominant_intent.name
    confidence = dominant_intent.confidence

    logger.info(f"Dominant intent: {intent_name} (confidence: {confidence})")

    # 2. Get the CXone Queue ID
    queue_id = await queue_mapper.get_queue_id(intent_name)
    
    if not queue_id:
        logger.error(f"Could not resolve queue for intent: {intent_name}")
        # Return success to Cognigy to prevent retry loops, but log error
        return {"status": "error", "message": "Queue resolution failed"}

    # 3. Route the Conversation in CXone
    # Assumption: payload.sessionId contains the CXone Conversation ID
    # If your setup differs, you may need to look up the conversation by external ID
    cxone_conversation_id = payload.sessionId
    
    success = await route_conversation_to_queue(cxone_conversation_id, queue_id)
    
    if success:
        logger.info(f"Successfully routed conversation {cxone_conversation_id} to queue {queue_id}")
        return {"status": "success", "queue_id": queue_id}
    else:
        logger.error(f"Failed to route conversation {cxone_conversation_id}")
        return {"status": "failed", "message": "CXone API update failed"}

async def route_conversation_to_queue(conversation_id: str, queue_id: str) -> bool:
    """
    Updates an existing CXone conversation to assign it to a specific queue.
    Endpoint: PATCH /api/v2/conversations/{conversationId}
    Scope: conversation:write
    """
    headers = await auth_service.get_headers()
    url = f"{auth_service.base_url}/api/v2/conversations/{conversation_id}"
    
    # The payload depends on the conversation type (web, voice, etc.)
    # For Web conversations, we update the 'routing' object
    payload = {
        "routing": {
            "queue": {
                "id": queue_id
            },
            "skillRequirements": [] # Optional: clear existing skill requirements
        }
    }

    async with httpx.AsyncClient() as client:
        try:
            response = await client.patch(
                url,
                headers=headers,
                json=payload
            )
            
            if response.status_code == 429:
                wait_time = int(response.headers.get("Retry-After", 5))
                logger.warning(f"Rate limited on route. Waiting {wait_time}s.")
                import asyncio
                await asyncio.sleep(wait_time)
                # Retry once
                response = await client.patch(url, headers=headers, json=payload)

            if response.status_code in [200, 204]:
                return True
            else:
                logger.error(f"Routing failed: {response.status_code} - {response.text}")
                return False
                
        except httpx.RequestError as e:
            logger.error(f"Network error during routing: {e}")
            return False

Step 4: Handling Edge Cases and Errors

The code above includes basic error handling. However, production systems must handle:

  1. Invalid Conversation IDs: If the sessionId from Cognigy does not match a CXone conversation, the PATCH will return 404.
  2. Queue Unavailability: If the queue is paused, the conversation might not be routed immediately.
  3. Rate Limiting: The 429 handler in route_conversation_to_queue implements a simple retry.

Add error logging for 404s:

# Inside route_conversation_to_queue, replace the status check with:
if response.status_code == 404:
    logger.error(f"Conversation {conversation_id} not found in CXone.")
    return False
elif response.status_code == 403:
    logger.error(f"Permission denied. Check OAuth scopes.")
    return False

Complete Working Example

Combine the modules into a single runnable script for testing. Save this as app.py.

import asyncio
import httpx
import time
import logging
from typing import Optional, List, Dict
from pydantic import BaseModel

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Models ---
class CognigyUserInput(BaseModel):
    text: str
    channel: str

class CognigyIntent(BaseModel):
    name: str
    confidence: float

class CognigyWebhookPayload(BaseModel):
    sessionId: str
    userInput: CognigyUserInput
    intents: List[CognigyIntent]
    timestamp: Optional[int] = None

# --- Authentication Service ---
class CXoneAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "us"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = "https://api.mypurecloud.com" if environment == "us" else "https://api.euc1.pure.cloud"
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0

    async def get_token(self) -> str:
        if self.access_token and time.time() < (self.token_expiry - 300):
            return self.access_token
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={"grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret},
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            if response.status_code != 200:
                raise Exception(f"Auth failed: {response.text}")
            data = response.json()
            self.access_token = data["access_token"]
            self.token_expiry = time.time() + data["expires_in"]
            return self.access_token

    async def get_headers(self) -> dict:
        return {"Authorization": f"Bearer {await self.get_token()}", "Content-Type": "application/json"}

# --- Queue Mapper ---
class QueueMapper:
    def __init__(self, auth: CXoneAuth):
        self.auth = auth
        self.queue_cache: Dict[str, str] = {}
        self.intent_map = {"sales_inquiry": "Sales Support", "support_ticket": "Technical Support"}

    async def get_queue_id(self, intent_name: str) -> Optional[str]:
        if intent_name in self.queue_cache:
            return self.queue_cache[intent_name]
        queue_name = self.intent_map.get(intent_name)
        if not queue_name:
            return None
        q_id = await self._fetch_queue_id(queue_name)
        if q_id:
            self.queue_cache[intent_name] = q_id
        return q_id

    async def _fetch_queue_id(self, name: str) -> Optional[str]:
        headers = await self.auth.get_headers()
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{self.auth.base_url}/api/v2/routing/queues", headers=headers, params={"name": name, "pageSize": 100})
            if resp.status_code != 200:
                return None
            for q in resp.json().get("entities", []):
                if q["name"] == name:
                    return q["id"]
        return None

# --- FastAPI App ---
from fastapi import FastAPI, HTTPException

app = FastAPI()

# CONFIGURE THESE
AUTH = CXoneAuth(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
MAPPER = QueueMapper(AUTH)

@app.post("/webhook/cognigy/route")
async def route_intent(payload: CognigyWebhookPayload):
    if not payload.intents:
        raise HTTPException(400, "No intents")
    
    dominant = max(payload.intents, key=lambda x: x.confidence)
    queue_id = await MAPPER.get_queue_id(dominant.name)
    
    if not queue_id:
        return {"status": "error", "msg": "Queue not found"}
    
    # Route Conversation
    headers = await AUTH.get_headers()
    conv_id = payload.sessionId
    url = f"{AUTH.base_url}/api/v2/conversations/{conv_id}"
    
    async with httpx.AsyncClient() as client:
        resp = await client.patch(url, headers=headers, json={"routing": {"queue": {"id": queue_id}}})
        
        if resp.status_code in [200, 204]:
            return {"status": "success", "queue_id": queue_id}
        elif resp.status_code == 404:
            return {"status": "error", "msg": "Conversation not found"}
        else:
            return {"status": "error", "msg": resp.text}

# To run: uvicorn app:app --reload

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token is expired, invalid, or the client credentials are wrong.
  • Fix: Verify client_id and client_secret in CXoneAuth. Ensure the token cache is refreshing. Check the logs for the exact error from /oauth/token.

Error: 403 Forbidden

  • Cause: The OAuth Client lacks the required scopes.
  • Fix: Go to the CXone Developer Console. Edit your client. Add conversation:write and routing:read. Re-generate the token.

Error: 404 Conversation Not Found

  • Cause: The sessionId sent by Cognigy does not match the CXone conversationId.
  • Fix: In Cognigy Studio, ensure you are passing the CXone Conversation ID into the Webhook payload. If you are using CXone Webchat, the conversationId is available in the session object. Map this to the sessionId field in your Cognigy Webhook configuration.

Error: 429 Too Many Requests

  • Cause: Exceeding the CXone API rate limit (typically 100 requests per minute per client).
  • Fix: The provided code includes a basic retry with Retry-After header parsing. For high-volume bots, implement a token bucket algorithm or queue the requests using a message broker like RabbitMQ or AWS SQS.

Official References