Routing Cognigy Bot Intents to NICE CXone Queues via Webhooks
What You Will Build
- A Python script that listens for webhook payloads from NICE Cognigy, extracts the detected intent, and routes the conversation to a specific NICE CXone queue.
- Integration uses the Cognigy Platform API for intent validation and the NICE CXone API v2 for queue assignment and conversation bridging.
- The tutorial covers Python 3.9+ with
fastapiandhttpxfor async processing.
Prerequisites
NICE Cognigy Configuration
- A Cognigy Studio project with at least two defined Intents (e.g.,
sales_inquiry,support_ticket). - A Webhook Node configured to send a
POSTrequest to your endpoint ononIntentMatch. - The Webhook payload must include
intentName,sessionId, anduserInput.
NICE CXone Configuration
- A NICE CXone organization with at least two Queues created (e.g., “Sales Support”, “Technical Support”).
- An OAuth Client Credentials application created in the Developer Console.
- Required Scopes:
conversation:write,conversation:read,routing:write.
Development Environment
- Python 3.9 or higher.
- Dependencies:
fastapi,uvicorn,httpx,pydantic.
Install dependencies:
pip install fastapi uvicorn httpx pydantic
Authentication Setup
NICE CXone uses OAuth 2.0 Client Credentials flow. Because the token expires after an hour, your application must handle token retrieval and caching. For this tutorial, we will implement a simple in-memory cache with a refresh timer.
Create a file named auth.py:
import httpx
import time
from typing import Optional
class CXoneAuth:
def __init__(self, client_id: str, client_secret: str, environment: str = "us"):
self.client_id = client_id
self.client_secret = client_secret
# Determine base URL based on environment
if environment == "us":
self.base_url = "https://api.mypurecloud.com"
elif environment == "eu":
self.base_url = "https://api.euc1.pure.cloud"
else:
raise ValueError("Unsupported environment")
self.token_url = f"{self.base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
async def get_token(self) -> str:
"""
Retrieves an OAuth token from CXone.
Checks cache first. Refreshes if expired.
"""
# Check if we have a valid token (add 5 minute buffer)
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
# If token is expired or missing, fetch new one
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code != 200:
raise Exception(f"Failed to get token: {response.text}")
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.access_token
async def get_headers(self) -> dict:
token = await self.get_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
Implementation
Step 1: Define the Webhook Payload Structure
Cognigy sends a JSON payload when a webhook is triggered. You must define the expected structure to parse it correctly. The critical fields are intentName (to determine the queue) and sessionId (to link the CXone conversation).
Create models.py:
from pydantic import BaseModel
from typing import List, Optional
class CognigyUserInput(BaseModel):
text: str
channel: str
class CognigyIntent(BaseModel):
name: str
confidence: float
class CognigyWebhookPayload(BaseModel):
sessionId: str
userInput: CognigyUserInput
intents: List[CognigyIntent]
# Additional fields might exist, but these are required for routing
timestamp: Optional[int] = None
Step 2: Map Intents to CXone Queue IDs
You cannot route to a queue by name directly in the API; you need the Queue ID. Hardcoding IDs is fragile. A robust solution fetches the queue ID once and caches it, or looks it up dynamically. For this tutorial, we will implement a dynamic lookup function that caches results in memory.
Create queue_mapper.py:
import httpx
import logging
from typing import Dict, Optional
logger = logging.getLogger(__name__)
class QueueMapper:
def __init__(self, auth_service: 'CXoneAuth'):
self.auth = auth_service
self.queue_cache: Dict[str, str] = {} # Intent Name -> Queue ID
# Define your mapping configuration here
self.intent_to_queue_name = {
"sales_inquiry": "Sales Support",
"support_ticket": "Technical Support",
"general_info": "General Inquiry"
}
async def get_queue_id(self, intent_name: str) -> Optional[str]:
"""
Retrieves the CXone Queue ID for a given Cognigy Intent.
"""
# Check if we already know the queue ID for this intent
if intent_name in self.queue_cache:
return self.queue_cache[intent_name]
queue_name = self.intent_to_queue_name.get(intent_name)
if not queue_name:
logger.warning(f"No queue mapping found for intent: {intent_name}")
return None
# Fetch from CXone API if not in cache
queue_id = await self._fetch_queue_id_by_name(queue_name)
if queue_id:
self.queue_cache[intent_name] = queue_id
return queue_id
async def _fetch_queue_id_by_name(self, queue_name: str) -> Optional[str]:
"""
Searches CXone for a queue by name.
Endpoint: GET /api/v2/routing/queues
Scope: routing:read
"""
headers = await self.auth.get_headers()
url = f"{self.auth.base_url}/api/v2/routing/queues"
# CXone returns paginated results. We fetch the first page.
# In production, implement pagination if you have >100 queues.
params = {
"pageSize": 100,
"pageNumber": 1,
"name": queue_name
}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, params=params)
if response.status_code == 429:
# Handle Rate Limiting
wait_time = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited. Waiting {wait_time} seconds.")
import asyncio
await asyncio.sleep(wait_time)
return await self._fetch_queue_id_by_name(queue_name)
if response.status_code != 200:
logger.error(f"Failed to fetch queues: {response.text}")
return None
data = response.json()
entities = data.get("entities", [])
for queue in entities:
if queue["name"] == queue_name:
return queue["id"]
logger.error(f"Queue '{queue_name}' not found in CXone.")
return None
Step 3: Process the Webhook and Route the Conversation
This is the core logic. When Cognigy hits your endpoint, you must:
- Validate the payload.
- Determine the highest-confidence intent.
- Get the corresponding CXone Queue ID.
- Update the CXone Conversation to assign it to that queue.
Note: Cognigy does not automatically create a CXone conversation. You must have a mechanism to create the conversation first (e.g., via a CXone Webchat widget that triggers the Cognigy bot, or a separate API call). This tutorial assumes the CXone Conversation ID is passed from Cognigy via the sessionId or a custom variable. If you are bridging from a CXone Webchat, the sessionId in Cognigy often maps to the CXone conversationId.
If you are starting a new conversation from Cognigy, you would use POST /api/v2/conversations. Here, we assume the conversation exists and we are routing it.
Create main.py:
from fastapi import FastAPI, HTTPException, Request
import httpx
import logging
from models import CognigyWebhookPayload
from auth import CXoneAuth
from queue_mapper import QueueMapper
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Cognigy to CXone Router")
# Initialize Services
# Replace with your actual credentials
CXONE_CLIENT_ID = "your_cxone_client_id"
CXONE_CLIENT_SECRET = "your_cxone_client_secret"
CXONE_ENV = "us"
auth_service = CXoneAuth(CXONE_CLIENT_ID, CXONE_CLIENT_SECRET, CXONE_ENV)
queue_mapper = QueueMapper(auth_service)
@app.post("/webhook/cognigy/route")
async def handle_cognigy_webhook(payload: CognigyWebhookPayload):
"""
Receives intent data from Cognigy and routes the CXone conversation.
"""
logger.info(f"Received webhook for session: {payload.sessionId}")
# 1. Determine the dominant intent
if not payload.intents:
raise HTTPException(status_code=400, detail="No intents detected in payload")
# Sort by confidence descending
dominant_intent = max(payload.intents, key=lambda x: x.confidence)
intent_name = dominant_intent.name
confidence = dominant_intent.confidence
logger.info(f"Dominant intent: {intent_name} (confidence: {confidence})")
# 2. Get the CXone Queue ID
queue_id = await queue_mapper.get_queue_id(intent_name)
if not queue_id:
logger.error(f"Could not resolve queue for intent: {intent_name}")
# Return success to Cognigy to prevent retry loops, but log error
return {"status": "error", "message": "Queue resolution failed"}
# 3. Route the Conversation in CXone
# Assumption: payload.sessionId contains the CXone Conversation ID
# If your setup differs, you may need to look up the conversation by external ID
cxone_conversation_id = payload.sessionId
success = await route_conversation_to_queue(cxone_conversation_id, queue_id)
if success:
logger.info(f"Successfully routed conversation {cxone_conversation_id} to queue {queue_id}")
return {"status": "success", "queue_id": queue_id}
else:
logger.error(f"Failed to route conversation {cxone_conversation_id}")
return {"status": "failed", "message": "CXone API update failed"}
async def route_conversation_to_queue(conversation_id: str, queue_id: str) -> bool:
"""
Updates an existing CXone conversation to assign it to a specific queue.
Endpoint: PATCH /api/v2/conversations/{conversationId}
Scope: conversation:write
"""
headers = await auth_service.get_headers()
url = f"{auth_service.base_url}/api/v2/conversations/{conversation_id}"
# The payload depends on the conversation type (web, voice, etc.)
# For Web conversations, we update the 'routing' object
payload = {
"routing": {
"queue": {
"id": queue_id
},
"skillRequirements": [] # Optional: clear existing skill requirements
}
}
async with httpx.AsyncClient() as client:
try:
response = await client.patch(
url,
headers=headers,
json=payload
)
if response.status_code == 429:
wait_time = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited on route. Waiting {wait_time}s.")
import asyncio
await asyncio.sleep(wait_time)
# Retry once
response = await client.patch(url, headers=headers, json=payload)
if response.status_code in [200, 204]:
return True
else:
logger.error(f"Routing failed: {response.status_code} - {response.text}")
return False
except httpx.RequestError as e:
logger.error(f"Network error during routing: {e}")
return False
Step 4: Handling Edge Cases and Errors
The code above includes basic error handling. However, production systems must handle:
- Invalid Conversation IDs: If the
sessionIdfrom Cognigy does not match a CXone conversation, the PATCH will return 404. - Queue Unavailability: If the queue is paused, the conversation might not be routed immediately.
- Rate Limiting: The
429handler inroute_conversation_to_queueimplements a simple retry.
Add error logging for 404s:
# Inside route_conversation_to_queue, replace the status check with:
if response.status_code == 404:
logger.error(f"Conversation {conversation_id} not found in CXone.")
return False
elif response.status_code == 403:
logger.error(f"Permission denied. Check OAuth scopes.")
return False
Complete Working Example
Combine the modules into a single runnable script for testing. Save this as app.py.
import asyncio
import httpx
import time
import logging
from typing import Optional, List, Dict
from pydantic import BaseModel
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Models ---
class CognigyUserInput(BaseModel):
text: str
channel: str
class CognigyIntent(BaseModel):
name: str
confidence: float
class CognigyWebhookPayload(BaseModel):
sessionId: str
userInput: CognigyUserInput
intents: List[CognigyIntent]
timestamp: Optional[int] = None
# --- Authentication Service ---
class CXoneAuth:
def __init__(self, client_id: str, client_secret: str, environment: str = "us"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = "https://api.mypurecloud.com" if environment == "us" else "https://api.euc1.pure.cloud"
self.token_url = f"{self.base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
async def get_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={"grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code != 200:
raise Exception(f"Auth failed: {response.text}")
data = response.json()
self.access_token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.access_token
async def get_headers(self) -> dict:
return {"Authorization": f"Bearer {await self.get_token()}", "Content-Type": "application/json"}
# --- Queue Mapper ---
class QueueMapper:
def __init__(self, auth: CXoneAuth):
self.auth = auth
self.queue_cache: Dict[str, str] = {}
self.intent_map = {"sales_inquiry": "Sales Support", "support_ticket": "Technical Support"}
async def get_queue_id(self, intent_name: str) -> Optional[str]:
if intent_name in self.queue_cache:
return self.queue_cache[intent_name]
queue_name = self.intent_map.get(intent_name)
if not queue_name:
return None
q_id = await self._fetch_queue_id(queue_name)
if q_id:
self.queue_cache[intent_name] = q_id
return q_id
async def _fetch_queue_id(self, name: str) -> Optional[str]:
headers = await self.auth.get_headers()
async with httpx.AsyncClient() as client:
resp = await client.get(f"{self.auth.base_url}/api/v2/routing/queues", headers=headers, params={"name": name, "pageSize": 100})
if resp.status_code != 200:
return None
for q in resp.json().get("entities", []):
if q["name"] == name:
return q["id"]
return None
# --- FastAPI App ---
from fastapi import FastAPI, HTTPException
app = FastAPI()
# CONFIGURE THESE
AUTH = CXoneAuth(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
MAPPER = QueueMapper(AUTH)
@app.post("/webhook/cognigy/route")
async def route_intent(payload: CognigyWebhookPayload):
if not payload.intents:
raise HTTPException(400, "No intents")
dominant = max(payload.intents, key=lambda x: x.confidence)
queue_id = await MAPPER.get_queue_id(dominant.name)
if not queue_id:
return {"status": "error", "msg": "Queue not found"}
# Route Conversation
headers = await AUTH.get_headers()
conv_id = payload.sessionId
url = f"{AUTH.base_url}/api/v2/conversations/{conv_id}"
async with httpx.AsyncClient() as client:
resp = await client.patch(url, headers=headers, json={"routing": {"queue": {"id": queue_id}}})
if resp.status_code in [200, 204]:
return {"status": "success", "queue_id": queue_id}
elif resp.status_code == 404:
return {"status": "error", "msg": "Conversation not found"}
else:
return {"status": "error", "msg": resp.text}
# To run: uvicorn app:app --reload
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token is expired, invalid, or the client credentials are wrong.
- Fix: Verify
client_idandclient_secretinCXoneAuth. Ensure the token cache is refreshing. Check the logs for the exact error from/oauth/token.
Error: 403 Forbidden
- Cause: The OAuth Client lacks the required scopes.
- Fix: Go to the CXone Developer Console. Edit your client. Add
conversation:writeandrouting:read. Re-generate the token.
Error: 404 Conversation Not Found
- Cause: The
sessionIdsent by Cognigy does not match the CXoneconversationId. - Fix: In Cognigy Studio, ensure you are passing the CXone Conversation ID into the Webhook payload. If you are using CXone Webchat, the
conversationIdis available in thesessionobject. Map this to thesessionIdfield in your Cognigy Webhook configuration.
Error: 429 Too Many Requests
- Cause: Exceeding the CXone API rate limit (typically 100 requests per minute per client).
- Fix: The provided code includes a basic retry with
Retry-Afterheader parsing. For high-volume bots, implement a token bucket algorithm or queue the requests using a message broker like RabbitMQ or AWS SQS.