Sanitizing NICE Cognigy.AI User Inputs for LLM Injection with Python

Sanitizing NICE Cognigy.AI User Inputs for LLM Injection with Python

What You Will Build

A Python FastAPI webhook that intercepts Cognigy.AI user utterances, scans text for prompt injection patterns using a threat detection model, redacts malicious sequences while preserving intent structure, appends safety flags to the Cognigy context object, routes high-risk sessions to a fallback human agent flow via the Dialog API, and logs injection attempts for threat intelligence analysis. This tutorial uses the Cognigy.AI REST API and FastAPI. The programming language is Python.

Prerequisites

  • Cognigy.AI OAuth2 Client Credentials with cognigy:session:read, cognigy:session:write, cognigy:context:write scopes
  • Cognigy.AI API v2
  • Python 3.9 or higher
  • fastapi, uvicorn, requests, pydantic, python-dotenv, urllib3
  • Cognigy.AI Flow configured to trigger an External node at the start of the dialog, passing the full context payload to this webhook

Authentication Setup

Cognigy.AI supports OAuth2 Client Credentials for server-to-server integrations. You must exchange your client ID and secret for an access token before calling the Dialog API. The token expires after fifteen minutes, so the implementation includes automatic caching and refresh logic.

import os
import time
import requests
from typing import Optional

class CognigyAuthManager:
    def __init__(self, client_id: str, client_secret: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip("/")
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry - 30:
            return self.access_token

        url = f"{self.base_url}/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "cognigy:session:read cognigy:session:write cognigy:context:write"
        }

        response = requests.post(url, data=payload, timeout=10)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.token_expiry = time.time() + token_data["expires_in"]
        return self.access_token

The get_token method checks the cached token and requests a new one only when necessary. This prevents unnecessary network calls and respects rate limits.

Implementation

Step 1: Webhook Endpoint and Context Interception

The Cognigy.AI External node sends a POST request containing the session context, dialog state, and user input. You must parse this payload, extract the utterance, and pass it to the detection engine. The endpoint returns a modified context payload that Cognigy.AI will merge back into the active session.

from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import json

app = FastAPI()

class CognigyPayload(BaseModel):
    context: dict
    dialog: dict
    user: dict
    session_id: str

@app.post("/cognigy/intercept")
async def intercept_utterance(payload: CognigyPayload):
    original_input = payload.user.get("input", "")
    if not original_input:
        raise HTTPException(status_code=400, detail="Missing user input")

    # Detection and redaction will be implemented in Step 2
    # Context flagging and routing will be implemented in Step 3
    # Logging will be implemented in Step 4

    return {
        "context": payload.context,
        "dialog": payload.dialog,
        "user": payload.user,
        "session_id": payload.session_id
    }

The endpoint validates the payload structure and extracts the raw utterance. Cognigy.AI expects the response to mirror the incoming structure so it can continue execution.

Step 2: Injection Detection and Redaction Engine

Prompt injection attacks typically contain directive overrides, system role impersonation, or encoded payloads. You will scan the utterance using a pattern-based threat detection model that calculates a risk score. The engine redacts matched sequences while preserving the surrounding intent structure for downstream NLU or LLM processing.

import re
from typing import Tuple, List, Dict

INJECTION_PATTERNS = [
    (r"(?i)(ignore|override|bypass|disregard)\s+(previous|all|system)\s*(instructions|rules|prompts)", 0.9),
    (r"(?i)(act|pretend|simulate)\s+(as|like)\s+(system|admin|developer)", 0.8),
    (r"(?i)(base64|hex|rot13)\s*(decode|encode|reveal)", 0.7),
    (r"(?i)(\b\d{4,}\b\s*[,;]\s*\w{4,}\s*){3,}", 0.6),
    (r"(?i)(\b(?:tell|show|print|output)\s+(?:your|the)\s*(?:prompt|system|initial)\s*(message|text|content))", 0.95),
]

def detect_and_redact(utterance: str) -> Tuple[str, float, List[Dict]]:
    risk_score = 0.0
    matched_patterns = []
    redacted_text = utterance

    for pattern, weight in INJECTION_PATTERNS:
        matches = list(re.finditer(pattern, utterance))
        if matches:
            risk_score = min(1.0, risk_score + weight)
            matched_patterns.extend([
                {"pattern": pattern, "match": m.group(0), "weight": weight}
                for m in matches
            ])
            redacted_text = re.sub(pattern, "[INJECTION_REDACTED]", redacted_text, flags=re.IGNORECASE)

    # Normalize score based on pattern density
    if len(matched_patterns) > 2:
        risk_score = min(1.0, risk_score + 0.15)

    return redacted_text, risk_score, matched_patterns

The function returns the sanitized text, a normalized risk score between zero and one, and a list of matched patterns. The redaction replaces malicious directives with a neutral placeholder so the remaining conversational intent remains parseable.

Step 3: Context Flagging and Dialog API Routing

When the risk score exceeds the threshold, you must update the Cognigy.AI session context with safety flags and trigger a routing action. The Dialog API provides endpoints for context mutation and flow redirection. You will use requests with exponential backoff for 429 responses.

import time
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

def create_api_client(auth_manager: CognigyAuthManager, base_url: str) -> requests.Session:
    session = requests.Session()
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["POST", "GET"]
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    session.headers.update({
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "Content-Type": "application/json"
    })
    return session

def route_to_fallback(api_client: requests.Session, session_id: str, base_url: str, risk_score: float) -> None:
    # Update context with safety flags
    context_url = f"{base_url}/api/v2/sessions/{session_id}/context"
    context_payload = {
        "safety": {
            "injection_detected": True,
            "risk_score": risk_score,
            "route_to_human": True,
            "redaction_applied": True
        }
    }

    try:
        ctx_resp = api_client.post(context_url, json=context_payload)
        ctx_resp.raise_for_status()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            raise Exception("Authentication failed. Check OAuth client credentials.")
        elif e.response.status_code == 403:
            raise Exception("Insufficient permissions. Verify cognigy:context:write scope.")
        elif e.response.status_code == 429:
            raise Exception("Rate limit exceeded. Backoff strategy applied.")
        else:
            raise Exception(f"Context update failed: {e.response.status_code}")

    # Trigger fallback flow action
    action_url = f"{base_url}/api/v2/sessions/{session_id}/actions"
    action_payload = {
        "action": "navigate",
        "target": "fallback_human_agent_flow",
        "reason": "high_injection_risk"
    }

    try:
        act_resp = api_client.post(action_url, json=action_payload)
        act_resp.raise_for_status()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            raise Exception("Session not found. Verify session_id validity.")
        elif e.response.status_code == 429:
            raise Exception("Rate limit exceeded during routing.")
        else:
            raise Exception(f"Routing action failed: {e.response.status_code}")

The routing function updates the session context with structured safety metadata and triggers a navigation action to a predefined fallback flow. The retry strategy handles transient 429 and 5xx responses automatically.

Step 4: Threat Intelligence Logging

You must log every interception attempt with structured fields for downstream analysis. The logger records the original input, redacted input, risk score, matched patterns, and routing decision.

import json
import logging
from datetime import datetime, timezone

logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("cognigy_injection_filter")

def log_threat_event(
    session_id: str,
    original: str,
    redacted: str,
    risk_score: float,
    patterns: List[Dict],
    routed: bool
) -> None:
    event = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "session_id": session_id,
        "original_input": original,
        "redacted_input": redacted,
        "risk_score": risk_score,
        "patterns_matched": patterns,
        "action_taken": "routed_to_human" if routed else "passed_to_nlu",
        "event_type": "injection_scan"
    }
    logger.info(json.dumps(event, ensure_ascii=False))

The logger outputs JSON lines that integrate directly with SIEM platforms, ELK stacks, or cloud logging services. You can route this output to a file, stdout, or a message queue by modifying the handler.

Step 5: FastAPI Integration and Error Handling

You will combine the components into a single endpoint that orchestrates detection, redaction, context updates, routing, and logging. The endpoint includes explicit error handling for network failures and API errors.

@app.post("/cognigy/intercept")
async def intercept_utterance(payload: CognigyPayload):
    original_input = payload.user.get("input", "")
    if not original_input:
        raise HTTPException(status_code=400, detail="Missing user input")

    redacted_input, risk_score, matched_patterns = detect_and_redact(original_input)
    routed = False

    api_base = os.getenv("COGNIGY_API_BASE")
    if not api_base:
        raise HTTPException(status_code=500, detail="COGNIGY_API_BASE environment variable not set")

    if risk_score >= 0.75:
        try:
            auth = CognigyAuthManager(
                client_id=os.getenv("COGNIGY_CLIENT_ID"),
                client_secret=os.getenv("COGNIGY_CLIENT_SECRET"),
                base_url=api_base
            )
            api_client = create_api_client(auth, api_base)
            route_to_fallback(api_client, payload.session_id, api_base, risk_score)
            routed = True
        except Exception as e:
            logger.error(f"Routing failed for session {payload.session_id}: {str(e)}")
            # Fallback: still return redacted input to prevent raw injection propagation
            routed = False

    log_threat_event(
        session_id=payload.session_id,
        original=original_input,
        redacted=redacted_input,
        risk_score=risk_score,
        patterns=matched_patterns,
        routed=routed
    )

    # Update user input in payload for downstream processing
    payload.user["input"] = redacted_input
    payload.context["safety"] = {
        "injection_detected": risk_score >= 0.75,
        "risk_score": risk_score,
        "patterns_matched": len(matched_patterns)
    }

    return {
        "context": payload.context,
        "dialog": payload.dialog,
        "user": payload.user,
        "session_id": payload.session_id
    }

The endpoint processes the utterance, applies the detection engine, routes high-risk sessions, logs the event, and returns the sanitized payload. Cognigy.AI receives the modified input and context, allowing the dialog to continue safely.

Complete Working Example

import os
import time
import json
import logging
import re
import requests
from typing import Optional, Tuple, List, Dict
from datetime import datetime, timezone
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# --- Configuration & Logging ---
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("cognigy_injection_filter")

# --- Authentication ---
class CognigyAuthManager:
    def __init__(self, client_id: str, client_secret: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip("/")
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry - 30:
            return self.access_token

        url = f"{self.base_url}/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "cognigy:session:read cognigy:session:write cognigy:context:write"
        }

        response = requests.post(url, data=payload, timeout=10)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.token_expiry = time.time() + token_data["expires_in"]
        return self.access_token

# --- Detection Engine ---
INJECTION_PATTERNS = [
    (r"(?i)(ignore|override|bypass|disregard)\s+(previous|all|system)\s*(instructions|rules|prompts)", 0.9),
    (r"(?i)(act|pretend|simulate)\s+(as|like)\s+(system|admin|developer)", 0.8),
    (r"(?i)(base64|hex|rot13)\s*(decode|encode|reveal)", 0.7),
    (r"(?i)(\b\d{4,}\b\s*[,;]\s*\w{4,}\s*){3,}", 0.6),
    (r"(?i)(\b(?:tell|show|print|output)\s+(?:your|the)\s*(?:prompt|system|initial)\s*(message|text|content))", 0.95),
]

def detect_and_redact(utterance: str) -> Tuple[str, float, List[Dict]]:
    risk_score = 0.0
    matched_patterns = []
    redacted_text = utterance

    for pattern, weight in INJECTION_PATTERNS:
        matches = list(re.finditer(pattern, utterance))
        if matches:
            risk_score = min(1.0, risk_score + weight)
            matched_patterns.extend([
                {"pattern": pattern, "match": m.group(0), "weight": weight}
                for m in matches
            ])
            redacted_text = re.sub(pattern, "[INJECTION_REDACTED]", redacted_text, flags=re.IGNORECASE)

    if len(matched_patterns) > 2:
        risk_score = min(1.0, risk_score + 0.15)

    return redacted_text, risk_score, matched_patterns

# --- API Client & Routing ---
def create_api_client(auth_manager: CognigyAuthManager, base_url: str) -> requests.Session:
    session = requests.Session()
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["POST", "GET"]
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    session.headers.update({
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "Content-Type": "application/json"
    })
    return session

def route_to_fallback(api_client: requests.Session, session_id: str, base_url: str, risk_score: float) -> None:
    context_url = f"{base_url}/api/v2/sessions/{session_id}/context"
    context_payload = {
        "safety": {
            "injection_detected": True,
            "risk_score": risk_score,
            "route_to_human": True,
            "redaction_applied": True
        }
    }

    try:
        ctx_resp = api_client.post(context_url, json=context_payload)
        ctx_resp.raise_for_status()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            raise Exception("Authentication failed. Check OAuth client credentials.")
        elif e.response.status_code == 403:
            raise Exception("Insufficient permissions. Verify cognigy:context:write scope.")
        elif e.response.status_code == 429:
            raise Exception("Rate limit exceeded. Backoff strategy applied.")
        else:
            raise Exception(f"Context update failed: {e.response.status_code}")

    action_url = f"{base_url}/api/v2/sessions/{session_id}/actions"
    action_payload = {
        "action": "navigate",
        "target": "fallback_human_agent_flow",
        "reason": "high_injection_risk"
    }

    try:
        act_resp = api_client.post(action_url, json=action_payload)
        act_resp.raise_for_status()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            raise Exception("Session not found. Verify session_id validity.")
        elif e.response.status_code == 429:
            raise Exception("Rate limit exceeded during routing.")
        else:
            raise Exception(f"Routing action failed: {e.response.status_code}")

# --- Logging ---
def log_threat_event(
    session_id: str,
    original: str,
    redacted: str,
    risk_score: float,
    patterns: List[Dict],
    routed: bool
) -> None:
    event = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "session_id": session_id,
        "original_input": original,
        "redacted_input": redacted,
        "risk_score": risk_score,
        "patterns_matched": patterns,
        "action_taken": "routed_to_human" if routed else "passed_to_nlu",
        "event_type": "injection_scan"
    }
    logger.info(json.dumps(event, ensure_ascii=False))

# --- FastAPI Application ---
app = FastAPI()

class CognigyPayload(BaseModel):
    context: dict
    dialog: dict
    user: dict
    session_id: str

@app.post("/cognigy/intercept")
async def intercept_utterance(payload: CognigyPayload):
    original_input = payload.user.get("input", "")
    if not original_input:
        raise HTTPException(status_code=400, detail="Missing user input")

    redacted_input, risk_score, matched_patterns = detect_and_redact(original_input)
    routed = False

    api_base = os.getenv("COGNIGY_API_BASE")
    if not api_base:
        raise HTTPException(status_code=500, detail="COGNIGY_API_BASE environment variable not set")

    if risk_score >= 0.75:
        try:
            auth = CognigyAuthManager(
                client_id=os.getenv("COGNIGY_CLIENT_ID"),
                client_secret=os.getenv("COGNIGY_CLIENT_SECRET"),
                base_url=api_base
            )
            api_client = create_api_client(auth, api_base)
            route_to_fallback(api_client, payload.session_id, api_base, risk_score)
            routed = True
        except Exception as e:
            logger.error(f"Routing failed for session {payload.session_id}: {str(e)}")
            routed = False

    log_threat_event(
        session_id=payload.session_id,
        original=original_input,
        redacted=redacted_input,
        risk_score=risk_score,
        patterns=matched_patterns,
        routed=routed
    )

    payload.user["input"] = redacted_input
    payload.context["safety"] = {
        "injection_detected": risk_score >= 0.75,
        "risk_score": risk_score,
        "patterns_matched": len(matched_patterns)
    }

    return {
        "context": payload.context,
        "dialog": payload.dialog,
        "user": payload.user,
        "session_id": payload.session_id
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Save the script as cognigy_injection_filter.py. Set the environment variables COGNIGY_API_BASE, COGNIGY_CLIENT_ID, and COGNIGY_CLIENT_SECRET. Run the server with uvicorn cognigy_injection_filter:app --reload. Configure the Cognigy.AI External node to POST to http://<host>:8000/cognigy/intercept.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth2 token request failed due to invalid client credentials or missing scope.
  • Fix: Verify COGNIGY_CLIENT_ID and COGNIGY_CLIENT_SECRET match the registered OAuth application. Ensure the client has the cognigy:session:write and cognigy:context:write permissions enabled in the Cognigy.AI admin console.
  • Code Fix: The CognigyAuthManager.get_token() method calls response.raise_for_status(). Inspect the response body for exact error codes returned by the OAuth server.

Error: 403 Forbidden

  • Cause: The access token lacks the required scopes, or the API key permissions are restricted.
  • Fix: Navigate to the Cognigy.AI integration settings and confirm the OAuth client is granted cognigy:session:read, cognigy:session:write, and cognigy:context:write. Reissue the token after scope changes.
  • Code Fix: The route_to_fallback function catches 403 responses and raises a descriptive exception. Log the exception payload to verify scope mismatches.

Error: 429 Too Many Requests

  • Cause: The Dialog API enforces rate limits per tenant or per OAuth client.
  • Fix: The create_api_client function configures urllib3.util.retry.Retry with status_forcelist=[429]. The backoff factor starts at one second and doubles on each retry. If the error persists, reduce the webhook invocation frequency or implement a request queue.
  • Code Fix: Monitor the Retry-After header in 429 responses. The retry strategy automatically respects standard exponential backoff.

Error: 502/503 Bad Gateway or Service Unavailable

  • Cause: Cognigy.AI backend services are temporarily unavailable or undergoing maintenance.
  • Fix: The retry strategy includes 502 and 503 in the force list. The webhook returns the redacted input regardless of routing failure to prevent raw injection propagation. Implement circuit breaker logic in production if failures exceed a threshold.

Official References