Build an LLM Prompt Chaining Middleware for NICE Cognigy.AI with Python

Build an LLM Prompt Chaining Middleware for NICE Cognigy.AI with Python

What You Will Build

This tutorial builds a Python middleware service that intercepts Cognigy.AI dialog intents, chains multiple LLM prompts using Jinja2 templates, caches reasoning steps in Redis with session-specific TTLs, and returns structured responses via webhook payloads. The service uses the Cognigy.AI Dialog API for state management and standard HTTP webhooks for intent interception. The implementation uses Python 3.10 with FastAPI, Redis, the OpenAI SDK, and explicit token-aware context truncation.

Prerequisites

  • Cognigy.AI OAuth 2.0 Client Credentials client with api.dialog.read, api.dialog.write, and api.session.read scopes
  • Cognigy.AI REST API v2
  • Python 3.10 or higher
  • Redis server running locally or remotely
  • pip install fastapi uvicorn httpx redis jinja2 openai tiktoken pydantic python-dotenv

Authentication Setup

Cognigy.AI requires OAuth 2.0 Client Credentials for programmatic API access. The middleware must fetch a bearer token, cache it in memory, and refresh it before expiration. The following class handles token acquisition and automatic refresh.

import time
import httpx
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class CognigyAuthManager:
    def __init__(self, tenant_domain: str, client_id: str, client_secret: str):
        self.base_url = f"https://{tenant_domain}.cognigy.ai/api/v2"
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.token_ttl_buffer: int = 60  # Refresh 60 seconds before expiry

    async def get_token(self) -> str:
        if self.access_token and time.time() < (self.token_expiry - self.token_ttl_buffer):
            return self.access_token

        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "api.dialog.read api.dialog.write api.session.read"
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(url, headers=headers, data=data)
            response.raise_for_status()
            payload = response.json()
            
            self.access_token = payload["access_token"]
            self.token_expiry = time.time() + payload["expires_in"]
            logger.info("Cognigy OAuth token refreshed successfully.")
            return self.access_token

The request cycle for token acquisition follows standard OAuth 2.0. A successful response returns a JSON body containing access_token, expires_in, and token_type. The middleware stores the token and calculates the absolute expiry timestamp. If the endpoint returns HTTP 401, the credentials are invalid. If it returns HTTP 403, the client lacks the required scopes.

Implementation

Step 1: Verify Webhook Signature and Intercept Intent

Cognigy.AI signs outgoing webhook payloads with an HMAC-SHA256 digest to prevent spoofing. The middleware must verify the signature before processing the intent. Cognigy passes the signature in the X-Cognigy-Signature header.

import hmac
import hashlib
import json
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel

app = FastAPI()

class CognigyWebhookPayload(BaseModel):
    sessionId: str
    userId: str
    intent: dict
    dialogState: dict
    variables: dict

def verify_signature(payload_bytes: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(secret.encode(), payload_bytes, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.post("/webhook/intercept")
async def intercept_intent(request: Request):
    signature = request.headers.get("X-Cognigy-Signature")
    if not signature:
        raise HTTPException(status_code=400, detail="Missing X-Cognigy-Signature header")

    body = await request.body()
    if not verify_signature(body, signature, WEBHOOK_SECRET):
        raise HTTPException(status_code=403, detail="Invalid webhook signature")

    payload = CognigyWebhookPayload.model_validate_json(body)
    logger.info(f"Intercepted intent: {payload.intent.get('name')} for session {payload.sessionId}")
    return payload

The Cognigy webhook request contains the full dialog snapshot. A valid request includes sessionId, intent, dialogState, and variables. If verification fails, the middleware returns HTTP 403 immediately. This prevents unauthorized payload injection.

Step 2: Retrieve Dialog State and Cache Context

The middleware fetches the authoritative dialog state from the Cognigy.AI Dialog API to ensure consistency. It then caches intermediate reasoning steps in Redis using a session-specific key with a TTL matching the dialog timeout.

import redis
import json

async def fetch_dialog_state(auth: CognigyAuthManager, session_id: str) -> dict:
    url = f"{auth.base_url}/dialog/sessions/{session_id}"
    token = await auth.get_token()
    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}

    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=headers)
        if response.status_code == 404:
            raise HTTPException(status_code=404, detail="Dialog session not found")
        response.raise_for_status()
        return response.json()

async def cache_reasoning_step(redis_client: redis.Redis, session_id: str, step_name: str, data: dict, ttl: int = 3600):
    key = f"cognigy:reasoning:{session_id}:{step_name}"
    redis_client.setex(key, ttl, json.dumps(data))
    logger.debug(f"Cached reasoning step '{step_name}' for session {session_id}")

The Dialog API endpoint /api/v2/dialog/sessions/{sessionId} returns the complete session graph, including active nodes, variable values, and turn history. The Redis cache uses a structured key pattern cognigy:reasoning:{sessionId}:{stepName}. The TTL defaults to one hour but should align with your Cognigy session timeout configuration.

Step 3: Execute Prompt Chain with Context Truncation

Prompt chaining requires sequential LLM calls where each step consumes the output of the previous step. The middleware uses Jinja2 for template rendering and tiktoken for precise token counting. Context window truncation prevents token overflow by dropping the oldest conversation turns until the payload fits within the model limit.

import jinja2
import tiktoken
from openai import AsyncOpenAI

ENCODER = tiktoken.encoding_for_model("gpt-4o")
MAX_CONTEXT_TOKENS = 12000

def truncate_context(messages: list[dict], max_tokens: int) -> list[dict]:
    current_tokens = sum(ENCODER.encode(msg["content"]).__len__() for msg in messages)
    while current_tokens > max_tokens and len(messages) > 2:
        messages.pop(1)  # Remove oldest user/assistant pair after system prompt
        current_tokens = sum(ENCODER.encode(msg["content"]).__len__() for msg in messages)
    return messages

async def execute_prompt_chain(
    openai_client: AsyncOpenAI,
    redis_client: redis.Redis,
    session_id: str,
    user_message: str,
    history: list[dict]
) -> dict:
    chain_results = {}
    current_messages = [{"role": "system", "content": "You are a precise reasoning engine."}]
    current_messages.extend(history)
    current_messages.append({"role": "user", "content": user_message})
    
    current_messages = truncate_context(current_messages, MAX_CONTEXT_TOKENS)

    # Step 1: Intent Clarification
    clarif_template = jinja2.Template("Analyze intent: {{ message }}. Return JSON with confidence and required parameters.")
    clarif_prompt = clarif_template.render(message=user_message)
    resp1 = await openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": clarif_prompt}], temperature=0.1
    )
    clarif_output = resp1.choices[0].message.content
    chain_results["clarification"] = clarif_output
    cache_reasoning_step(redis_client, session_id, "clarification", {"output": clarif_output, "tokens": resp1.usage.total_tokens})

    # Step 2: Knowledge Retrieval Simulation
    retrieval_template = jinja2.Template("Extract entities from: {{ text }}. Return JSON list.")
    retrieval_prompt = retrieval_template.render(text=clarif_output)
    resp2 = await openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": retrieval_prompt}], temperature=0.0
    )
    retrieval_output = resp2.choices[0].message.content
    chain_results["retrieval"] = retrieval_output
    cache_reasoning_step(redis_client, session_id, "retrieval", {"output": retrieval_output, "tokens": resp2.usage.total_tokens})

    return chain_results

The truncation function preserves the system prompt and the most recent exchanges while dropping older turns. Each chain step logs token usage to Redis for budget monitoring. The OpenAI SDK handles the HTTP request to the LLM provider. If the provider returns HTTP 429, the middleware must implement retry logic before failing.

Step 4: Update Dialog State and Route Response

After the prompt chain completes, the middleware updates the Cognigy dialog variables via the Dialog API and constructs the webhook response payload that Cognigy expects.

async def update_dialog_variables(auth: CognigyAuthManager, session_id: str, variables: dict):
    url = f"{auth.base_url}/dialog/sessions/{session_id}/variables"
    token = await auth.get_token()
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    
    async with httpx.AsyncClient() as client:
        response = await client.put(url, headers=headers, json=variables)
        response.raise_for_status()
        return response.json()

@app.post("/webhook/intercept", status_code=200)
async def intercept_intent(request: Request, auth: CognigyAuthManager = Depends(get_auth), redis_client: redis.Redis = Depends(get_redis)):
    # ... signature verification and payload parsing from Step 1 ...
    
    try:
        dialog_state = await fetch_dialog_state(auth, payload.sessionId)
        history = dialog_state.get("conversationHistory", [])
        user_message = payload.intent.get("text", "")
        
        chain_results = await execute_prompt_chain(openai_client, redis_client, payload.sessionId, user_message, history)
        
        # Update Cognigy variables with chain results
        await update_dialog_variables(auth, payload.sessionId, {"llm_reasoning": chain_results})
        
        # Return Cognigy-compatible webhook response
        return {
            "response": {
                "text": "Processing complete.",
                "variables": chain_results,
                "nextNode": "process_llm_output"
            }
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Dialog API error: {e.response.status_code} - {e.response.text}")
        raise HTTPException(status_code=e.response.status_code, detail="Dialog API communication failed")
    except Exception as e:
        logger.exception("Unexpected middleware failure")
        raise HTTPException(status_code=500, detail="Internal processing error")

The Cognigy webhook response expects a JSON object containing response.text, response.variables, and optionally response.nextNode. The middleware writes the chain results to dialog variables so subsequent Cognigy nodes can consume them. The PUT request to /api/v2/dialog/sessions/{sessionId}/variables merges the provided key-value pairs into the active session.

Complete Working Example

import os
import time
import hmac
import hashlib
import logging
import json
import httpx
import redis
import jinja2
import tiktoken
from fastapi import FastAPI, Request, HTTPException, Depends
from pydantic import BaseModel
from openai import AsyncOpenAI

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
TENANT_DOMAIN = os.getenv("COGNIGY_TENANT")
CLIENT_ID = os.getenv("COGNIGY_CLIENT_ID")
CLIENT_SECRET = os.getenv("COGNIGY_CLIENT_SECRET")
WEBHOOK_SECRET = os.getenv("COGNIGY_WEBHOOK_SECRET")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

app = FastAPI()
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
ENCODER = tiktoken.encoding_for_model("gpt-4o")
MAX_CONTEXT_TOKENS = 12000

class CognigyAuthManager:
    def __init__(self, tenant_domain: str, client_id: str, client_secret: str):
        self.base_url = f"https://{tenant_domain}.cognigy.ai/api/v2"
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expiry = 0.0
        self.token_ttl_buffer = 60

    async def get_token(self) -> str:
        if self.access_token and time.time() < (self.token_expiry - self.token_ttl_buffer):
            return self.access_token
        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "api.dialog.read api.dialog.write api.session.read"
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(url, headers=headers, data=data)
            response.raise_for_status()
            payload = response.json()
            self.access_token = payload["access_token"]
            self.token_expiry = time.time() + payload["expires_in"]
            return self.access_token

auth = CognigyAuthManager(TENANT_DOMAIN, CLIENT_ID, CLIENT_SECRET)

class CognigyWebhookPayload(BaseModel):
    sessionId: str
    userId: str
    intent: dict
    dialogState: dict
    variables: dict

def verify_signature(payload_bytes: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(secret.encode(), payload_bytes, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature)

def truncate_context(messages: list[dict], max_tokens: int) -> list[dict]:
    current_tokens = sum(len(ENCODER.encode(msg["content"])) for msg in messages)
    while current_tokens > max_tokens and len(messages) > 2:
        messages.pop(1)
        current_tokens = sum(len(ENCODER.encode(msg["content"])) for msg in messages)
    return messages

async def cache_reasoning_step(session_id: str, step_name: str, data: dict, ttl: int = 3600):
    key = f"cognigy:reasoning:{session_id}:{step_name}"
    redis_client.setex(key, ttl, json.dumps(data))

async def execute_prompt_chain(session_id: str, user_message: str, history: list[dict]) -> dict:
    chain_results = {}
    current_messages = [{"role": "system", "content": "You are a precise reasoning engine."}]
    current_messages.extend(history)
    current_messages.append({"role": "user", "content": user_message})
    current_messages = truncate_context(current_messages, MAX_CONTEXT_TOKENS)

    clarif_template = jinja2.Template("Analyze intent: {{ message }}. Return JSON with confidence and required parameters.")
    resp1 = await openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": clarif_template.render(message=user_message)}], temperature=0.1
    )
    chain_results["clarification"] = resp1.choices[0].message.content
    cache_reasoning_step(session_id, "clarification", {"output": chain_results["clarification"], "tokens": resp1.usage.total_tokens})

    retrieval_template = jinja2.Template("Extract entities from: {{ text }}. Return JSON list.")
    resp2 = await openai_client.chat.completions.create(
        model="gpt-4o", messages=[{"role": "user", "content": retrieval_template.render(text=chain_results["clarification"])}], temperature=0.0
    )
    chain_results["retrieval"] = resp2.choices[0].message.content
    cache_reasoning_step(session_id, "retrieval", {"output": chain_results["retrieval"], "tokens": resp2.usage.total_tokens})

    return chain_results

async def update_dialog_variables(session_id: str, variables: dict):
    url = f"{auth.base_url}/dialog/sessions/{session_id}/variables"
    token = await auth.get_token()
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    async with httpx.AsyncClient() as client:
        response = await client.put(url, headers=headers, json=variables)
        response.raise_for_status()

@app.post("/webhook/intercept", status_code=200)
async def intercept_intent(request: Request):
    signature = request.headers.get("X-Cognigy-Signature")
    if not signature:
        raise HTTPException(status_code=400, detail="Missing X-Cognigy-Signature header")
    body = await request.body()
    if not verify_signature(body, signature, WEBHOOK_SECRET):
        raise HTTPException(status_code=403, detail="Invalid webhook signature")
    
    payload = CognigyWebhookPayload.model_validate_json(body)
    try:
        # Simulate history fetch for brevity
        history = [{"role": "user", "content": "Previous turn"}]
        chain_results = await execute_prompt_chain(payload.sessionId, payload.intent.get("text", ""), history)
        await update_dialog_variables(payload.sessionId, {"llm_reasoning": chain_results})
        return {
            "response": {
                "text": "Processing complete.",
                "variables": chain_results,
                "nextNode": "process_llm_output"
            }
        }
    except httpx.HTTPStatusError as e:
        raise HTTPException(status_code=e.response.status_code, detail="API communication failed")
    except Exception as e:
        logger.exception("Middleware failure")
        raise HTTPException(status_code=500, detail="Internal processing error")

Run the service with uvicorn main:app --host 0.0.0.0 --port 8000. Configure Cognigy Studio to send dialog intents to https://your-middleware-url/webhook/intercept.

Common Errors & Debugging

Error: HTTP 401 Unauthorized

  • Cause: Invalid OAuth client credentials or expired token not refreshed in time.
  • Fix: Verify COGNIGY_CLIENT_ID and COGNIGY_CLIENT_SECRET match the Cognigy Security configuration. Ensure the token buffer in CognigyAuthManager accounts for clock drift between your server and the Cognigy auth provider.
  • Code fix: The get_token method automatically refreshes when time.time() >= token_expiry - token_ttl_buffer. Increase token_ttl_buffer to 120 seconds if network latency causes premature expiry.

Error: HTTP 403 Forbidden

  • Cause: Missing OAuth scopes or invalid webhook signature.
  • Fix: Confirm the client has api.dialog.read, api.dialog.write, and api.session.read scopes assigned. Verify WEBHOOK_SECRET matches the secret configured in Cognigy Studio webhook settings.
  • Code fix: Log the expected and received HMAC values during development to identify encoding mismatches. Use hmac.compare_digest to prevent timing attacks.

Error: HTTP 429 Too Many Requests

  • Cause: Exceeding Cognigy API rate limits or LLM provider rate limits.
  • Fix: Implement exponential backoff with jitter. The following wrapper handles retry logic for 429 responses.
  • Code fix:
import asyncio

async def retry_on_429(func, *args, max_retries=3, base_delay=1.0, **kwargs):
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except httpx.HTTPStatusError as e:
            if e.response.status_code != 429 or attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + (hash(str(e)) % 100) / 1000
            logger.warning(f"Rate limited (429). Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)

Apply retry_on_429 to openai_client.chat.completions.create and Cognigy API calls.

Error: Context Window Overflow

  • Cause: Conversation history exceeds MAX_CONTEXT_TOKENS despite truncation logic.
  • Fix: Adjust MAX_CONTEXT_TOKENS to match your LLM model limit. Ensure the truncation function preserves the system prompt and critical recent turns.
  • Code fix: The truncate_context function removes turns from index 1 onward. If overflow persists, reduce MAX_CONTEXT_TOKENS or implement a sliding window that summarizes older turns before removal.

Official References