Extending NICE Cognigy.AI Entity Recognition with a Custom spaCy Pipeline and Webhook Integration

Extending NICE Cognigy.AI Entity Recognition with a Custom spaCy Pipeline and Webhook Integration

What You Will Build

  • A Python FastAPI service that receives raw user input from a Cognigy webhook, runs it through a custom-trained spaCy NER model, maps extracted entities to dialog variables, and pushes the results to the active conversation via the Cognigy REST API.
  • This tutorial uses the Cognigy.AI Webhook trigger, the Cognigy /api/v2/conversations/{conversationId}/variables endpoint, and the spacy library.
  • The implementation is written in Python 3.10 with fastapi, uvicorn, httpx, and spacy.

Prerequisites

  • Cognigy API Key with conversation:write and conversation:read permissions. Cognigy uses API Key authentication for server-to-server integration rather than standard OAuth 2.0 flows. The API key functions as a Bearer token for all outbound REST calls.
  • spacy v3.7+ and en_core_web_sm v3.7+
  • Python 3.10 runtime environment
  • External dependencies: fastapi, uvicorn, httpx, pydantic, python-dotenv
  • Access to a CDN provider (AWS CloudFront, Cloudflare, or Azure CDN) for hosting the serialized model archive
  • Pagination is not required for the endpoints used in this workflow, as both the webhook payload and the variable update endpoint operate on single-request payloads.

Authentication Setup

Cognigy authenticates server-to-server API calls using an API Key generated in the Cognigy Studio settings under API Keys. The key is passed as a Bearer token in the Authorization header. The webhook endpoint itself does not require authentication from Cognigy, but the outbound REST call to update conversation state requires the API key.

Create a configuration module to manage credentials and environment variables:

# config.py
import os
from pydantic import BaseSettings

class CognigySettings(BaseSettings):
    tenant_url: str = os.getenv("COGNIGY_TENANT_URL", "https://your-tenant.cognigy.ai")
    api_key: str = os.getenv("COGNIGY_API_KEY", "")
    cdn_model_url: str = os.getenv("CDN_MODEL_URL", "https://cdn.example.com/models/domain_ner_v1.tar.gz")
    model_cache_dir: str = os.getenv("MODEL_CACHE_DIR", "/tmp/cognigy_spacy_model")

    class Config:
        env_file = ".env"

settings = CognigySettings()

Store your credentials in a .env file. Never commit API keys to version control. The webhook server will load these values at startup and inject the Bearer token into every outbound HTTP request.

Implementation

Step 1: Train and Export the spaCy NER Pipeline

Standard Cognigy entity recognition relies on pattern matching and built-in NLP models. Domain-specific terminology requires a custom NER pipeline. You will train a minimal spaCy model on your terminology, export it to disk, and compress it for CDN distribution.

# train_model.py
import os
import tarfile
import spacy
from spacy.training import Example

def train_domain_ner(output_dir: str = "custom_domain_ner"):
    nlp = spacy.blank("en")
    ner = nlp.add_pipe("ner")
    
    # Register custom entity labels for your domain
    ner.add_label("DOMAIN_PRODUCT")
    ner.add_label("DOMAIN_ERROR_CODE")

    # Training data follows spaCy v3 format: (text, {"entities": [(start, end, label)]})
    TRAIN_DATA = [
        ("The XR-500 module threw a ERR_404 fault.", {"entities": [(3, 10, "DOMAIN_PRODUCT"), (25, 33, "DOMAIN_ERROR_CODE")]}),
        ("Check the XR-500 logs for ERR_503.", {"entities": [(9, 16, "DOMAIN_PRODUCT"), (23, 30, "DOMAIN_ERROR_CODE")]}),
        ("The XR-500 returned ERR_500 during initialization.", {"entities": [(3, 10, "DOMAIN_PRODUCT"), (19, 26, "DOMAIN_ERROR_CODE")]}),
    ]

    optimizer = nlp.begin_training()
    print("Training NER pipeline...")
    for i in range(15):
        losses = {}
        spacy.util.fix_random_seed(1)
        for text, annotations in TRAIN_DATA:
            example = Example.from_dict(nlp.make_doc(text), annotations)
            nlp.update([example], drop=0.5, losses=losses)
        print(f"Epoch {i} losses: {losses}")

    # Export trained pipeline to disk
    if os.path.exists(output_dir):
        import shutil
        shutil.rmtree(output_dir)
    nlp.to_disk(output_dir)
    print(f"Model exported to {output_dir}")

    # Compress for CDN upload
    archive_name = f"{output_dir}.tar.gz"
    with tarfile.open(archive_name, "w:gz") as tar:
        tar.add(output_dir, arcname=os.path.basename(output_dir))
    print(f"Compressed archive created: {archive_name}")

if __name__ == "__main__":
    train_domain_ner()

Run this script locally. Upload the resulting .tar.gz file to your CDN. Record the public HTTPS URL. This URL will be referenced by the webhook server to fetch the model on initialization.

Step 2: Deploy the Model to a CDN and Implement Lazy Loading

Loading a spaCy model consumes significant memory. You will implement a thread-safe lazy loader that fetches the model from the CDN only once, caches it in memory, and handles network failures gracefully.

# model_loader.py
import os
import tempfile
import tarfile
import threading
import httpx
import spacy
from config import settings

model_lock = threading.Lock()
spacy_nlp: spacy.Language | None = None

async def load_model_from_cdn() -> spacy.Language:
    global spacy_nlp
    with model_lock:
        if spacy_nlp is not None:
            return spacy_nlp

        temp_dir = tempfile.mkdtemp(prefix="cognigy_spacy_")
        archive_path = os.path.join(temp_dir, "model.tar.gz")

        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.get(settings.cdn_model_url)
                response.raise_for_status()
                with open(archive_path, "wb") as f:
                    f.write(response.content)

            with tarfile.open(archive_path, "r:gz") as tar:
                tar.extractall(path=temp_dir)

            model_path = os.path.join(temp_dir, "custom_domain_ner")
            spacy_nlp = spacy.load(model_path)
            print("spaCy model loaded successfully from CDN.")
            return spacy_nlp

        except httpx.HTTPStatusError as e:
            print(f"Failed to download model: {e.response.status_code} {e.response.text}")
            raise
        except Exception as e:
            print(f"Model extraction or loading failed: {e}")
            raise
        finally:
            # Clean up temporary files after loading
            import shutil
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)

The loader uses httpx for async HTTP requests, which aligns with FastAPI’s event loop. The threading.Lock prevents race conditions when multiple concurrent webhook requests trigger initialization simultaneously.

Step 3: Build the Cognigy Webhook Endpoint

Cognigy triggers webhooks by sending a POST request to your public URL. The payload contains the user text, conversation identifiers, and existing variables. You will create a FastAPI endpoint that accepts this payload, validates required fields, and prepares the text for NER processing.

# webhook_server.py
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Any
import asyncio
from model_loader import load_model_from_cdn
from config import settings

app = FastAPI(title="Cognigy Custom NER Webhook")

class CognigyWebhookPayload(BaseModel):
    text: str
    conversationId: str
    dialogId: str
    variables: dict[str, Any] = {}

@app.post("/cognigy-webhook")
async def cognigy_webhook(request: Request):
    global spacy_nlp
    
    # Initialize model on first request if not already loaded
    if spacy_nlp is None:
        await load_model_from_cdn()

    try:
        body = await request.json()
    except Exception:
        raise HTTPException(status_code=400, detail="Invalid JSON payload")

    # Validate required fields
    if not body.get("conversationId"):
        raise HTTPException(status_code=400, detail="Missing conversationId in webhook payload")
    if not body.get("text"):
        raise HTTPException(status_code=400, detail="Missing text in webhook payload")

    user_text = body["text"]
    conversation_id = body["conversationId"]
    dialog_id = body.get("dialogId", "")

    # Process text with spaCy
    doc = spacy_nlp(user_text)
    
    # Extract domain-specific entities
    detected_vars = {}
    target_labels = {"DOMAIN_PRODUCT", "DOMAIN_ERROR_CODE"}
    
    for ent in doc.ents:
        if ent.label_ in target_labels:
            # Map to Cognigy variable naming convention
            var_key = f"ner_{ent.label_}_detected"
            detected_vars[var_key] = ent.text

    return {
        "status": "success",
        "dialogId": dialog_id,
        "variables": detected_vars,
        "rawEntities": [{"text": ent.text, "label": ent.label_} for ent in doc.ents if ent.label_ in target_labels]
    }

The endpoint returns a JSON structure compatible with Cognigy’s webhook handler. Cognigy will automatically parse the variables object and inject it into the dialog context. However, the prompt requires explicit REST API state updates, which you will implement in the next step.

Step 4: Parse Text and Update Conversation State via REST API

You will modify the webhook to push the detected entities directly to the conversation state using the Cognigy REST API. This ensures variables persist across dialog turns and are available to other integrations. The implementation includes retry logic for 429 rate limits and explicit handling for 401, 403, and 5xx errors.

# api_client.py
import httpx
from config import settings
import asyncio

async def update_cognigy_conversation_state(conversation_id: str, variables: dict) -> dict:
    """
    Updates conversation variables via Cognigy REST API.
    Endpoint: PATCH /api/v2/conversations/{conversationId}/variables
    Required Permission: conversation:write
    """
    url = f"{settings.tenant_url}/api/v2/conversations/{conversation_id}/variables"
    payload = {"variables": [{"key": k, "value": v} for k, v in variables.items()]}
    headers = {
        "Authorization": f"Bearer {settings.api_key}",
        "Content-Type": "application/json"
    }

    max_retries = 3
    for attempt in range(max_retries):
        try:
            async with httpx.AsyncClient(timeout=15.0) as client:
                response = await client.patch(url, json=payload, headers=headers)
                
                if response.status_code == 200 or response.status_code == 204:
                    return response.json() if response.content else {"status": "updated"}
                
                if response.status_code == 401:
                    raise Exception("Authentication failed. Verify COGNIGY_API_KEY is valid.")
                
                if response.status_code == 403:
                    raise Exception("Permission denied. API key requires conversation:write scope.")
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    await asyncio.sleep(retry_after)
                    continue
                
                if response.status_code >= 500:
                    print(f"Server error ({response.status_code}). Retrying...")
                    await asyncio.sleep(2 ** attempt)
                    continue
                
                response.raise_for_status()

        except httpx.TimeoutException:
            print("Request timed out. Retrying...")
            await asyncio.sleep(2 ** attempt)
        except Exception as e:
            raise e

    raise Exception("Max retries exceeded for Cognigy state update. Service may be degraded.")

Integrate this client into the webhook endpoint. The full flow now parses the text, extracts entities, pushes them to the REST API, and returns a success payload to Cognigy.

Complete Working Example

Combine the modules into a single deployable FastAPI application. This script is ready to run after setting the environment variables.

# main.py
import os
import sys
import asyncio
from fastapi import FastAPI, Request, HTTPException
import httpx
import spacy
import threading
import tempfile
import tarfile
import shutil
from pydantic import BaseSettings

# Configuration
class Settings(BaseSettings):
    tenant_url: str = os.getenv("COGNIGY_TENANT_URL", "https://your-tenant.cognigy.ai")
    api_key: str = os.getenv("COGNIGY_API_KEY", "")
    cdn_model_url: str = os.getenv("CDN_MODEL_URL", "https://cdn.example.com/models/domain_ner_v1.tar.gz")
    class Config:
        env_file = ".env"

settings = Settings()

# Model Loader
model_lock = threading.Lock()
spacy_nlp: spacy.Language | None = None

async def load_model_from_cdn() -> spacy.Language:
    global spacy_nlp
    with model_lock:
        if spacy_nlp is not None:
            return spacy_nlp
        temp_dir = tempfile.mkdtemp(prefix="cognigy_spacy_")
        archive_path = os.path.join(temp_dir, "model.tar.gz")
        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.get(settings.cdn_model_url)
                response.raise_for_status()
                with open(archive_path, "wb") as f:
                    f.write(response.content)
            with tarfile.open(archive_path, "r:gz") as tar:
                tar.extractall(path=temp_dir)
            model_path = os.path.join(temp_dir, "custom_domain_ner")
            spacy_nlp = spacy.load(model_path)
            print("spaCy model loaded successfully from CDN.")
            return spacy_nlp
        except Exception as e:
            print(f"Model load failed: {e}")
            raise
        finally:
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)

# REST API Client
async def update_cognigy_state(conversation_id: str, variables: dict) -> dict:
    url = f"{settings.tenant_url}/api/v2/conversations/{conversation_id}/variables"
    payload = {"variables": [{"key": k, "value": v} for k, v in variables.items()]}
    headers = {"Authorization": f"Bearer {settings.api_key}", "Content-Type": "application/json"}
    max_retries = 3
    for attempt in range(max_retries):
        try:
            async with httpx.AsyncClient(timeout=15.0) as client:
                response = await client.patch(url, json=payload, headers=headers)
                if response.status_code in (200, 204):
                    return response.json() if response.content else {"status": "updated"}
                if response.status_code == 401:
                    raise Exception("Authentication failed. Verify API key.")
                if response.status_code == 403:
                    raise Exception("Permission denied. Requires conversation:write.")
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
                    await asyncio.sleep(retry_after)
                    continue
                if response.status_code >= 500:
                    await asyncio.sleep(2 ** attempt)
                    continue
                response.raise_for_status()
        except httpx.TimeoutException:
            await asyncio.sleep(2 ** attempt)
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
    raise Exception("Max retries exceeded for state update.")

# Webhook Server
app = FastAPI(title="Cognigy Custom NER Webhook")

@app.post("/cognigy-webhook")
async def cognigy_webhook(request: Request):
    global spacy_nlp
    if spacy_nlp is None:
        await load_model_from_cdn()

    try:
        body = await request.json()
    except Exception:
        raise HTTPException(status_code=400, detail="Invalid JSON payload")

    if not body.get("conversationId") or not body.get("text"):
        raise HTTPException(status_code=400, detail="Missing conversationId or text")

    user_text = body["text"]
    conversation_id = body["conversationId"]
    dialog_id = body.get("dialogId", "")

    doc = spacy_nlp(user_text)
    detected_vars = {}
    target_labels = {"DOMAIN_PRODUCT", "DOMAIN_ERROR_CODE"}

    for ent in doc.ents:
        if ent.label_ in target_labels:
            var_key = f"ner_{ent.label_}_detected"
            detected_vars[var_key] = ent.text

    if detected_vars:
        await update_cognigy_state(conversation_id, detected_vars)

    return {
        "status": "success",
        "dialogId": dialog_id,
        "variables": detected_vars
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Run the application with python main.py. Expose port 8000 to the internet using ngrok, AWS ALB, or your cloud provider’s load balancer. Configure the webhook URL in Cognigy Studio.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The COGNIGY_API_KEY environment variable is missing, malformed, or the key has been revoked. Cognigy rejects Bearer tokens that do not match a valid API key.
  • Fix: Regenerate the API key in Cognigy Studio. Verify the .env file contains the exact string without trailing whitespace. Confirm the Authorization header uses the Bearer prefix.
  • Code Check: Ensure headers = {"Authorization": f"Bearer {settings.api_key}"} matches exactly.

Error: 403 Forbidden

  • Cause: The API key lacks the conversation:write permission. Cognigy enforces role-based access control on state mutation endpoints.
  • Fix: Navigate to Cognigy Studio > Settings > API Keys. Edit the key and assign the conversation:write and conversation:read scopes. Save and redeploy.
  • Code Check: The endpoint /api/v2/conversations/{conversationId}/variables requires explicit write permissions. Read-only keys will fail.

Error: 429 Too Many Requests

  • Cause: Cognigy enforces rate limits on REST API calls. High-volume bot deployments can trigger cascading 429 responses when multiple conversations trigger webhooks simultaneously.
  • Fix: The implementation includes exponential backoff with Retry-After header parsing. If errors persist, implement a message queue (RabbitMQ, AWS SQS) to batch variable updates instead of calling the API synchronously from the webhook.
  • Code Check: Verify the retry loop respects Retry-After and caps at max_retries = 3.

Error: Model Loading Fails or Returns Empty Entities

  • Cause: The CDN URL is inaccessible, the .tar.gz archive is corrupted, or the training data does not match the runtime spaCy version.
  • Fix: Validate the CDN URL in a browser. Ensure spacy and en_core_web_sm versions match between training and runtime. Use spacy validate to check pipeline compatibility.
  • Code Check: Add print(doc.ents) during local testing to verify the NER pipeline detects spans before deployment.

Official References