Extending NICE Cognigy.AI Entity Recognition with a Custom spaCy Pipeline and Webhook Integration
What You Will Build
- A Python FastAPI service that receives raw user input from a Cognigy webhook, runs it through a custom-trained spaCy NER model, maps extracted entities to dialog variables, and pushes the results to the active conversation via the Cognigy REST API.
- This tutorial uses the Cognigy.AI Webhook trigger, the Cognigy
/api/v2/conversations/{conversationId}/variablesendpoint, and thespacylibrary. - The implementation is written in Python 3.10 with
fastapi,uvicorn,httpx, andspacy.
Prerequisites
- Cognigy API Key with
conversation:writeandconversation:readpermissions. Cognigy uses API Key authentication for server-to-server integration rather than standard OAuth 2.0 flows. The API key functions as a Bearer token for all outbound REST calls. spacyv3.7+ anden_core_web_smv3.7+- Python 3.10 runtime environment
- External dependencies:
fastapi,uvicorn,httpx,pydantic,python-dotenv - Access to a CDN provider (AWS CloudFront, Cloudflare, or Azure CDN) for hosting the serialized model archive
- Pagination is not required for the endpoints used in this workflow, as both the webhook payload and the variable update endpoint operate on single-request payloads.
Authentication Setup
Cognigy authenticates server-to-server API calls using an API Key generated in the Cognigy Studio settings under API Keys. The key is passed as a Bearer token in the Authorization header. The webhook endpoint itself does not require authentication from Cognigy, but the outbound REST call to update conversation state requires the API key.
Create a configuration module to manage credentials and environment variables:
# config.py
import os
from pydantic import BaseSettings
class CognigySettings(BaseSettings):
tenant_url: str = os.getenv("COGNIGY_TENANT_URL", "https://your-tenant.cognigy.ai")
api_key: str = os.getenv("COGNIGY_API_KEY", "")
cdn_model_url: str = os.getenv("CDN_MODEL_URL", "https://cdn.example.com/models/domain_ner_v1.tar.gz")
model_cache_dir: str = os.getenv("MODEL_CACHE_DIR", "/tmp/cognigy_spacy_model")
class Config:
env_file = ".env"
settings = CognigySettings()
Store your credentials in a .env file. Never commit API keys to version control. The webhook server will load these values at startup and inject the Bearer token into every outbound HTTP request.
Implementation
Step 1: Train and Export the spaCy NER Pipeline
Standard Cognigy entity recognition relies on pattern matching and built-in NLP models. Domain-specific terminology requires a custom NER pipeline. You will train a minimal spaCy model on your terminology, export it to disk, and compress it for CDN distribution.
# train_model.py
import os
import tarfile
import spacy
from spacy.training import Example
def train_domain_ner(output_dir: str = "custom_domain_ner"):
nlp = spacy.blank("en")
ner = nlp.add_pipe("ner")
# Register custom entity labels for your domain
ner.add_label("DOMAIN_PRODUCT")
ner.add_label("DOMAIN_ERROR_CODE")
# Training data follows spaCy v3 format: (text, {"entities": [(start, end, label)]})
TRAIN_DATA = [
("The XR-500 module threw a ERR_404 fault.", {"entities": [(3, 10, "DOMAIN_PRODUCT"), (25, 33, "DOMAIN_ERROR_CODE")]}),
("Check the XR-500 logs for ERR_503.", {"entities": [(9, 16, "DOMAIN_PRODUCT"), (23, 30, "DOMAIN_ERROR_CODE")]}),
("The XR-500 returned ERR_500 during initialization.", {"entities": [(3, 10, "DOMAIN_PRODUCT"), (19, 26, "DOMAIN_ERROR_CODE")]}),
]
optimizer = nlp.begin_training()
print("Training NER pipeline...")
for i in range(15):
losses = {}
spacy.util.fix_random_seed(1)
for text, annotations in TRAIN_DATA:
example = Example.from_dict(nlp.make_doc(text), annotations)
nlp.update([example], drop=0.5, losses=losses)
print(f"Epoch {i} losses: {losses}")
# Export trained pipeline to disk
if os.path.exists(output_dir):
import shutil
shutil.rmtree(output_dir)
nlp.to_disk(output_dir)
print(f"Model exported to {output_dir}")
# Compress for CDN upload
archive_name = f"{output_dir}.tar.gz"
with tarfile.open(archive_name, "w:gz") as tar:
tar.add(output_dir, arcname=os.path.basename(output_dir))
print(f"Compressed archive created: {archive_name}")
if __name__ == "__main__":
train_domain_ner()
Run this script locally. Upload the resulting .tar.gz file to your CDN. Record the public HTTPS URL. This URL will be referenced by the webhook server to fetch the model on initialization.
Step 2: Deploy the Model to a CDN and Implement Lazy Loading
Loading a spaCy model consumes significant memory. You will implement a thread-safe lazy loader that fetches the model from the CDN only once, caches it in memory, and handles network failures gracefully.
# model_loader.py
import os
import tempfile
import tarfile
import threading
import httpx
import spacy
from config import settings
model_lock = threading.Lock()
spacy_nlp: spacy.Language | None = None
async def load_model_from_cdn() -> spacy.Language:
global spacy_nlp
with model_lock:
if spacy_nlp is not None:
return spacy_nlp
temp_dir = tempfile.mkdtemp(prefix="cognigy_spacy_")
archive_path = os.path.join(temp_dir, "model.tar.gz")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(settings.cdn_model_url)
response.raise_for_status()
with open(archive_path, "wb") as f:
f.write(response.content)
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(path=temp_dir)
model_path = os.path.join(temp_dir, "custom_domain_ner")
spacy_nlp = spacy.load(model_path)
print("spaCy model loaded successfully from CDN.")
return spacy_nlp
except httpx.HTTPStatusError as e:
print(f"Failed to download model: {e.response.status_code} {e.response.text}")
raise
except Exception as e:
print(f"Model extraction or loading failed: {e}")
raise
finally:
# Clean up temporary files after loading
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
The loader uses httpx for async HTTP requests, which aligns with FastAPI’s event loop. The threading.Lock prevents race conditions when multiple concurrent webhook requests trigger initialization simultaneously.
Step 3: Build the Cognigy Webhook Endpoint
Cognigy triggers webhooks by sending a POST request to your public URL. The payload contains the user text, conversation identifiers, and existing variables. You will create a FastAPI endpoint that accepts this payload, validates required fields, and prepares the text for NER processing.
# webhook_server.py
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Any
import asyncio
from model_loader import load_model_from_cdn
from config import settings
app = FastAPI(title="Cognigy Custom NER Webhook")
class CognigyWebhookPayload(BaseModel):
text: str
conversationId: str
dialogId: str
variables: dict[str, Any] = {}
@app.post("/cognigy-webhook")
async def cognigy_webhook(request: Request):
global spacy_nlp
# Initialize model on first request if not already loaded
if spacy_nlp is None:
await load_model_from_cdn()
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# Validate required fields
if not body.get("conversationId"):
raise HTTPException(status_code=400, detail="Missing conversationId in webhook payload")
if not body.get("text"):
raise HTTPException(status_code=400, detail="Missing text in webhook payload")
user_text = body["text"]
conversation_id = body["conversationId"]
dialog_id = body.get("dialogId", "")
# Process text with spaCy
doc = spacy_nlp(user_text)
# Extract domain-specific entities
detected_vars = {}
target_labels = {"DOMAIN_PRODUCT", "DOMAIN_ERROR_CODE"}
for ent in doc.ents:
if ent.label_ in target_labels:
# Map to Cognigy variable naming convention
var_key = f"ner_{ent.label_}_detected"
detected_vars[var_key] = ent.text
return {
"status": "success",
"dialogId": dialog_id,
"variables": detected_vars,
"rawEntities": [{"text": ent.text, "label": ent.label_} for ent in doc.ents if ent.label_ in target_labels]
}
The endpoint returns a JSON structure compatible with Cognigy’s webhook handler. Cognigy will automatically parse the variables object and inject it into the dialog context. However, the prompt requires explicit REST API state updates, which you will implement in the next step.
Step 4: Parse Text and Update Conversation State via REST API
You will modify the webhook to push the detected entities directly to the conversation state using the Cognigy REST API. This ensures variables persist across dialog turns and are available to other integrations. The implementation includes retry logic for 429 rate limits and explicit handling for 401, 403, and 5xx errors.
# api_client.py
import httpx
from config import settings
import asyncio
async def update_cognigy_conversation_state(conversation_id: str, variables: dict) -> dict:
"""
Updates conversation variables via Cognigy REST API.
Endpoint: PATCH /api/v2/conversations/{conversationId}/variables
Required Permission: conversation:write
"""
url = f"{settings.tenant_url}/api/v2/conversations/{conversation_id}/variables"
payload = {"variables": [{"key": k, "value": v} for k, v in variables.items()]}
headers = {
"Authorization": f"Bearer {settings.api_key}",
"Content-Type": "application/json"
}
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.patch(url, json=payload, headers=headers)
if response.status_code == 200 or response.status_code == 204:
return response.json() if response.content else {"status": "updated"}
if response.status_code == 401:
raise Exception("Authentication failed. Verify COGNIGY_API_KEY is valid.")
if response.status_code == 403:
raise Exception("Permission denied. API key requires conversation:write scope.")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
print(f"Rate limited (429). Retrying after {retry_after}s...")
await asyncio.sleep(retry_after)
continue
if response.status_code >= 500:
print(f"Server error ({response.status_code}). Retrying...")
await asyncio.sleep(2 ** attempt)
continue
response.raise_for_status()
except httpx.TimeoutException:
print("Request timed out. Retrying...")
await asyncio.sleep(2 ** attempt)
except Exception as e:
raise e
raise Exception("Max retries exceeded for Cognigy state update. Service may be degraded.")
Integrate this client into the webhook endpoint. The full flow now parses the text, extracts entities, pushes them to the REST API, and returns a success payload to Cognigy.
Complete Working Example
Combine the modules into a single deployable FastAPI application. This script is ready to run after setting the environment variables.
# main.py
import os
import sys
import asyncio
from fastapi import FastAPI, Request, HTTPException
import httpx
import spacy
import threading
import tempfile
import tarfile
import shutil
from pydantic import BaseSettings
# Configuration
class Settings(BaseSettings):
tenant_url: str = os.getenv("COGNIGY_TENANT_URL", "https://your-tenant.cognigy.ai")
api_key: str = os.getenv("COGNIGY_API_KEY", "")
cdn_model_url: str = os.getenv("CDN_MODEL_URL", "https://cdn.example.com/models/domain_ner_v1.tar.gz")
class Config:
env_file = ".env"
settings = Settings()
# Model Loader
model_lock = threading.Lock()
spacy_nlp: spacy.Language | None = None
async def load_model_from_cdn() -> spacy.Language:
global spacy_nlp
with model_lock:
if spacy_nlp is not None:
return spacy_nlp
temp_dir = tempfile.mkdtemp(prefix="cognigy_spacy_")
archive_path = os.path.join(temp_dir, "model.tar.gz")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(settings.cdn_model_url)
response.raise_for_status()
with open(archive_path, "wb") as f:
f.write(response.content)
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(path=temp_dir)
model_path = os.path.join(temp_dir, "custom_domain_ner")
spacy_nlp = spacy.load(model_path)
print("spaCy model loaded successfully from CDN.")
return spacy_nlp
except Exception as e:
print(f"Model load failed: {e}")
raise
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# REST API Client
async def update_cognigy_state(conversation_id: str, variables: dict) -> dict:
url = f"{settings.tenant_url}/api/v2/conversations/{conversation_id}/variables"
payload = {"variables": [{"key": k, "value": v} for k, v in variables.items()]}
headers = {"Authorization": f"Bearer {settings.api_key}", "Content-Type": "application/json"}
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.patch(url, json=payload, headers=headers)
if response.status_code in (200, 204):
return response.json() if response.content else {"status": "updated"}
if response.status_code == 401:
raise Exception("Authentication failed. Verify API key.")
if response.status_code == 403:
raise Exception("Permission denied. Requires conversation:write.")
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
await asyncio.sleep(retry_after)
continue
if response.status_code >= 500:
await asyncio.sleep(2 ** attempt)
continue
response.raise_for_status()
except httpx.TimeoutException:
await asyncio.sleep(2 ** attempt)
except Exception as e:
if attempt == max_retries - 1:
raise e
raise Exception("Max retries exceeded for state update.")
# Webhook Server
app = FastAPI(title="Cognigy Custom NER Webhook")
@app.post("/cognigy-webhook")
async def cognigy_webhook(request: Request):
global spacy_nlp
if spacy_nlp is None:
await load_model_from_cdn()
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
if not body.get("conversationId") or not body.get("text"):
raise HTTPException(status_code=400, detail="Missing conversationId or text")
user_text = body["text"]
conversation_id = body["conversationId"]
dialog_id = body.get("dialogId", "")
doc = spacy_nlp(user_text)
detected_vars = {}
target_labels = {"DOMAIN_PRODUCT", "DOMAIN_ERROR_CODE"}
for ent in doc.ents:
if ent.label_ in target_labels:
var_key = f"ner_{ent.label_}_detected"
detected_vars[var_key] = ent.text
if detected_vars:
await update_cognigy_state(conversation_id, detected_vars)
return {
"status": "success",
"dialogId": dialog_id,
"variables": detected_vars
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Run the application with python main.py. Expose port 8000 to the internet using ngrok, AWS ALB, or your cloud provider’s load balancer. Configure the webhook URL in Cognigy Studio.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The
COGNIGY_API_KEYenvironment variable is missing, malformed, or the key has been revoked. Cognigy rejects Bearer tokens that do not match a valid API key. - Fix: Regenerate the API key in Cognigy Studio. Verify the
.envfile contains the exact string without trailing whitespace. Confirm theAuthorizationheader uses theBearerprefix. - Code Check: Ensure
headers = {"Authorization": f"Bearer {settings.api_key}"}matches exactly.
Error: 403 Forbidden
- Cause: The API key lacks the
conversation:writepermission. Cognigy enforces role-based access control on state mutation endpoints. - Fix: Navigate to Cognigy Studio > Settings > API Keys. Edit the key and assign the
conversation:writeandconversation:readscopes. Save and redeploy. - Code Check: The endpoint
/api/v2/conversations/{conversationId}/variablesrequires explicit write permissions. Read-only keys will fail.
Error: 429 Too Many Requests
- Cause: Cognigy enforces rate limits on REST API calls. High-volume bot deployments can trigger cascading 429 responses when multiple conversations trigger webhooks simultaneously.
- Fix: The implementation includes exponential backoff with
Retry-Afterheader parsing. If errors persist, implement a message queue (RabbitMQ, AWS SQS) to batch variable updates instead of calling the API synchronously from the webhook. - Code Check: Verify the retry loop respects
Retry-Afterand caps atmax_retries = 3.
Error: Model Loading Fails or Returns Empty Entities
- Cause: The CDN URL is inaccessible, the
.tar.gzarchive is corrupted, or the training data does not match the runtime spaCy version. - Fix: Validate the CDN URL in a browser. Ensure
spacyanden_core_web_smversions match between training and runtime. Usespacy validateto check pipeline compatibility. - Code Check: Add
print(doc.ents)during local testing to verify the NER pipeline detects spans before deployment.