Build an LLM Prompt Chaining Middleware for NICE Cognigy.AI with Python
What You Will Build
This tutorial builds a Python middleware service that intercepts Cognigy.AI dialog intents, chains multiple LLM prompts using Jinja2 templates, caches reasoning steps in Redis with session-specific TTLs, and returns structured responses via webhook payloads. The service uses the Cognigy.AI Dialog API for state management and standard HTTP webhooks for intent interception. The implementation uses Python 3.10 with FastAPI, Redis, the OpenAI SDK, and explicit token-aware context truncation.
Prerequisites
- Cognigy.AI OAuth 2.0 Client Credentials client with
api.dialog.read,api.dialog.write, andapi.session.readscopes - Cognigy.AI REST API v2
- Python 3.10 or higher
- Redis server running locally or remotely
pip install fastapi uvicorn httpx redis jinja2 openai tiktoken pydantic python-dotenv
Authentication Setup
Cognigy.AI requires OAuth 2.0 Client Credentials for programmatic API access. The middleware must fetch a bearer token, cache it in memory, and refresh it before expiration. The following class handles token acquisition and automatic refresh.
import time
import httpx
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class CognigyAuthManager:
def __init__(self, tenant_domain: str, client_id: str, client_secret: str):
self.base_url = f"https://{tenant_domain}.cognigy.ai/api/v2"
self.client_id = client_id
self.client_secret = client_secret
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
self.token_ttl_buffer: int = 60 # Refresh 60 seconds before expiry
async def get_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - self.token_ttl_buffer):
return self.access_token
url = f"{self.base_url}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "api.dialog.read api.dialog.write api.session.read"
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, data=data)
response.raise_for_status()
payload = response.json()
self.access_token = payload["access_token"]
self.token_expiry = time.time() + payload["expires_in"]
logger.info("Cognigy OAuth token refreshed successfully.")
return self.access_token
The request cycle for token acquisition follows standard OAuth 2.0. A successful response returns a JSON body containing access_token, expires_in, and token_type. The middleware stores the token and calculates the absolute expiry timestamp. If the endpoint returns HTTP 401, the credentials are invalid. If it returns HTTP 403, the client lacks the required scopes.
Implementation
Step 1: Verify Webhook Signature and Intercept Intent
Cognigy.AI signs outgoing webhook payloads with an HMAC-SHA256 digest to prevent spoofing. The middleware must verify the signature before processing the intent. Cognigy passes the signature in the X-Cognigy-Signature header.
import hmac
import hashlib
import json
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
app = FastAPI()
class CognigyWebhookPayload(BaseModel):
sessionId: str
userId: str
intent: dict
dialogState: dict
variables: dict
def verify_signature(payload_bytes: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), payload_bytes, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhook/intercept")
async def intercept_intent(request: Request):
signature = request.headers.get("X-Cognigy-Signature")
if not signature:
raise HTTPException(status_code=400, detail="Missing X-Cognigy-Signature header")
body = await request.body()
if not verify_signature(body, signature, WEBHOOK_SECRET):
raise HTTPException(status_code=403, detail="Invalid webhook signature")
payload = CognigyWebhookPayload.model_validate_json(body)
logger.info(f"Intercepted intent: {payload.intent.get('name')} for session {payload.sessionId}")
return payload
The Cognigy webhook request contains the full dialog snapshot. A valid request includes sessionId, intent, dialogState, and variables. If verification fails, the middleware returns HTTP 403 immediately. This prevents unauthorized payload injection.
Step 2: Retrieve Dialog State and Cache Context
The middleware fetches the authoritative dialog state from the Cognigy.AI Dialog API to ensure consistency. It then caches intermediate reasoning steps in Redis using a session-specific key with a TTL matching the dialog timeout.
import redis
import json
async def fetch_dialog_state(auth: CognigyAuthManager, session_id: str) -> dict:
url = f"{auth.base_url}/dialog/sessions/{session_id}"
token = await auth.get_token()
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
if response.status_code == 404:
raise HTTPException(status_code=404, detail="Dialog session not found")
response.raise_for_status()
return response.json()
async def cache_reasoning_step(redis_client: redis.Redis, session_id: str, step_name: str, data: dict, ttl: int = 3600):
key = f"cognigy:reasoning:{session_id}:{step_name}"
redis_client.setex(key, ttl, json.dumps(data))
logger.debug(f"Cached reasoning step '{step_name}' for session {session_id}")
The Dialog API endpoint /api/v2/dialog/sessions/{sessionId} returns the complete session graph, including active nodes, variable values, and turn history. The Redis cache uses a structured key pattern cognigy:reasoning:{sessionId}:{stepName}. The TTL defaults to one hour but should align with your Cognigy session timeout configuration.
Step 3: Execute Prompt Chain with Context Truncation
Prompt chaining requires sequential LLM calls where each step consumes the output of the previous step. The middleware uses Jinja2 for template rendering and tiktoken for precise token counting. Context window truncation prevents token overflow by dropping the oldest conversation turns until the payload fits within the model limit.
import jinja2
import tiktoken
from openai import AsyncOpenAI
ENCODER = tiktoken.encoding_for_model("gpt-4o")
MAX_CONTEXT_TOKENS = 12000
def truncate_context(messages: list[dict], max_tokens: int) -> list[dict]:
current_tokens = sum(ENCODER.encode(msg["content"]).__len__() for msg in messages)
while current_tokens > max_tokens and len(messages) > 2:
messages.pop(1) # Remove oldest user/assistant pair after system prompt
current_tokens = sum(ENCODER.encode(msg["content"]).__len__() for msg in messages)
return messages
async def execute_prompt_chain(
openai_client: AsyncOpenAI,
redis_client: redis.Redis,
session_id: str,
user_message: str,
history: list[dict]
) -> dict:
chain_results = {}
current_messages = [{"role": "system", "content": "You are a precise reasoning engine."}]
current_messages.extend(history)
current_messages.append({"role": "user", "content": user_message})
current_messages = truncate_context(current_messages, MAX_CONTEXT_TOKENS)
# Step 1: Intent Clarification
clarif_template = jinja2.Template("Analyze intent: {{ message }}. Return JSON with confidence and required parameters.")
clarif_prompt = clarif_template.render(message=user_message)
resp1 = await openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": clarif_prompt}], temperature=0.1
)
clarif_output = resp1.choices[0].message.content
chain_results["clarification"] = clarif_output
cache_reasoning_step(redis_client, session_id, "clarification", {"output": clarif_output, "tokens": resp1.usage.total_tokens})
# Step 2: Knowledge Retrieval Simulation
retrieval_template = jinja2.Template("Extract entities from: {{ text }}. Return JSON list.")
retrieval_prompt = retrieval_template.render(text=clarif_output)
resp2 = await openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": retrieval_prompt}], temperature=0.0
)
retrieval_output = resp2.choices[0].message.content
chain_results["retrieval"] = retrieval_output
cache_reasoning_step(redis_client, session_id, "retrieval", {"output": retrieval_output, "tokens": resp2.usage.total_tokens})
return chain_results
The truncation function preserves the system prompt and the most recent exchanges while dropping older turns. Each chain step logs token usage to Redis for budget monitoring. The OpenAI SDK handles the HTTP request to the LLM provider. If the provider returns HTTP 429, the middleware must implement retry logic before failing.
Step 4: Update Dialog State and Route Response
After the prompt chain completes, the middleware updates the Cognigy dialog variables via the Dialog API and constructs the webhook response payload that Cognigy expects.
async def update_dialog_variables(auth: CognigyAuthManager, session_id: str, variables: dict):
url = f"{auth.base_url}/dialog/sessions/{session_id}/variables"
token = await auth.get_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=variables)
response.raise_for_status()
return response.json()
@app.post("/webhook/intercept", status_code=200)
async def intercept_intent(request: Request, auth: CognigyAuthManager = Depends(get_auth), redis_client: redis.Redis = Depends(get_redis)):
# ... signature verification and payload parsing from Step 1 ...
try:
dialog_state = await fetch_dialog_state(auth, payload.sessionId)
history = dialog_state.get("conversationHistory", [])
user_message = payload.intent.get("text", "")
chain_results = await execute_prompt_chain(openai_client, redis_client, payload.sessionId, user_message, history)
# Update Cognigy variables with chain results
await update_dialog_variables(auth, payload.sessionId, {"llm_reasoning": chain_results})
# Return Cognigy-compatible webhook response
return {
"response": {
"text": "Processing complete.",
"variables": chain_results,
"nextNode": "process_llm_output"
}
}
except httpx.HTTPStatusError as e:
logger.error(f"Dialog API error: {e.response.status_code} - {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail="Dialog API communication failed")
except Exception as e:
logger.exception("Unexpected middleware failure")
raise HTTPException(status_code=500, detail="Internal processing error")
The Cognigy webhook response expects a JSON object containing response.text, response.variables, and optionally response.nextNode. The middleware writes the chain results to dialog variables so subsequent Cognigy nodes can consume them. The PUT request to /api/v2/dialog/sessions/{sessionId}/variables merges the provided key-value pairs into the active session.
Complete Working Example
import os
import time
import hmac
import hashlib
import logging
import json
import httpx
import redis
import jinja2
import tiktoken
from fastapi import FastAPI, Request, HTTPException, Depends
from pydantic import BaseModel
from openai import AsyncOpenAI
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
TENANT_DOMAIN = os.getenv("COGNIGY_TENANT")
CLIENT_ID = os.getenv("COGNIGY_CLIENT_ID")
CLIENT_SECRET = os.getenv("COGNIGY_CLIENT_SECRET")
WEBHOOK_SECRET = os.getenv("COGNIGY_WEBHOOK_SECRET")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
app = FastAPI()
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
ENCODER = tiktoken.encoding_for_model("gpt-4o")
MAX_CONTEXT_TOKENS = 12000
class CognigyAuthManager:
def __init__(self, tenant_domain: str, client_id: str, client_secret: str):
self.base_url = f"https://{tenant_domain}.cognigy.ai/api/v2"
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.token_expiry = 0.0
self.token_ttl_buffer = 60
async def get_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - self.token_ttl_buffer):
return self.access_token
url = f"{self.base_url}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "api.dialog.read api.dialog.write api.session.read"
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, data=data)
response.raise_for_status()
payload = response.json()
self.access_token = payload["access_token"]
self.token_expiry = time.time() + payload["expires_in"]
return self.access_token
auth = CognigyAuthManager(TENANT_DOMAIN, CLIENT_ID, CLIENT_SECRET)
class CognigyWebhookPayload(BaseModel):
sessionId: str
userId: str
intent: dict
dialogState: dict
variables: dict
def verify_signature(payload_bytes: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), payload_bytes, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def truncate_context(messages: list[dict], max_tokens: int) -> list[dict]:
current_tokens = sum(len(ENCODER.encode(msg["content"])) for msg in messages)
while current_tokens > max_tokens and len(messages) > 2:
messages.pop(1)
current_tokens = sum(len(ENCODER.encode(msg["content"])) for msg in messages)
return messages
async def cache_reasoning_step(session_id: str, step_name: str, data: dict, ttl: int = 3600):
key = f"cognigy:reasoning:{session_id}:{step_name}"
redis_client.setex(key, ttl, json.dumps(data))
async def execute_prompt_chain(session_id: str, user_message: str, history: list[dict]) -> dict:
chain_results = {}
current_messages = [{"role": "system", "content": "You are a precise reasoning engine."}]
current_messages.extend(history)
current_messages.append({"role": "user", "content": user_message})
current_messages = truncate_context(current_messages, MAX_CONTEXT_TOKENS)
clarif_template = jinja2.Template("Analyze intent: {{ message }}. Return JSON with confidence and required parameters.")
resp1 = await openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": clarif_template.render(message=user_message)}], temperature=0.1
)
chain_results["clarification"] = resp1.choices[0].message.content
cache_reasoning_step(session_id, "clarification", {"output": chain_results["clarification"], "tokens": resp1.usage.total_tokens})
retrieval_template = jinja2.Template("Extract entities from: {{ text }}. Return JSON list.")
resp2 = await openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role": "user", "content": retrieval_template.render(text=chain_results["clarification"])}], temperature=0.0
)
chain_results["retrieval"] = resp2.choices[0].message.content
cache_reasoning_step(session_id, "retrieval", {"output": chain_results["retrieval"], "tokens": resp2.usage.total_tokens})
return chain_results
async def update_dialog_variables(session_id: str, variables: dict):
url = f"{auth.base_url}/dialog/sessions/{session_id}/variables"
token = await auth.get_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=variables)
response.raise_for_status()
@app.post("/webhook/intercept", status_code=200)
async def intercept_intent(request: Request):
signature = request.headers.get("X-Cognigy-Signature")
if not signature:
raise HTTPException(status_code=400, detail="Missing X-Cognigy-Signature header")
body = await request.body()
if not verify_signature(body, signature, WEBHOOK_SECRET):
raise HTTPException(status_code=403, detail="Invalid webhook signature")
payload = CognigyWebhookPayload.model_validate_json(body)
try:
# Simulate history fetch for brevity
history = [{"role": "user", "content": "Previous turn"}]
chain_results = await execute_prompt_chain(payload.sessionId, payload.intent.get("text", ""), history)
await update_dialog_variables(payload.sessionId, {"llm_reasoning": chain_results})
return {
"response": {
"text": "Processing complete.",
"variables": chain_results,
"nextNode": "process_llm_output"
}
}
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail="API communication failed")
except Exception as e:
logger.exception("Middleware failure")
raise HTTPException(status_code=500, detail="Internal processing error")
Run the service with uvicorn main:app --host 0.0.0.0 --port 8000. Configure Cognigy Studio to send dialog intents to https://your-middleware-url/webhook/intercept.
Common Errors & Debugging
Error: HTTP 401 Unauthorized
- Cause: Invalid OAuth client credentials or expired token not refreshed in time.
- Fix: Verify
COGNIGY_CLIENT_IDandCOGNIGY_CLIENT_SECRETmatch the Cognigy Security configuration. Ensure the token buffer inCognigyAuthManageraccounts for clock drift between your server and the Cognigy auth provider. - Code fix: The
get_tokenmethod automatically refreshes whentime.time() >= token_expiry - token_ttl_buffer. Increasetoken_ttl_bufferto 120 seconds if network latency causes premature expiry.
Error: HTTP 403 Forbidden
- Cause: Missing OAuth scopes or invalid webhook signature.
- Fix: Confirm the client has
api.dialog.read,api.dialog.write, andapi.session.readscopes assigned. VerifyWEBHOOK_SECRETmatches the secret configured in Cognigy Studio webhook settings. - Code fix: Log the expected and received HMAC values during development to identify encoding mismatches. Use
hmac.compare_digestto prevent timing attacks.
Error: HTTP 429 Too Many Requests
- Cause: Exceeding Cognigy API rate limits or LLM provider rate limits.
- Fix: Implement exponential backoff with jitter. The following wrapper handles retry logic for 429 responses.
- Code fix:
import asyncio
async def retry_on_429(func, *args, max_retries=3, base_delay=1.0, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
if e.response.status_code != 429 or attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + (hash(str(e)) % 100) / 1000
logger.warning(f"Rate limited (429). Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
Apply retry_on_429 to openai_client.chat.completions.create and Cognigy API calls.
Error: Context Window Overflow
- Cause: Conversation history exceeds
MAX_CONTEXT_TOKENSdespite truncation logic. - Fix: Adjust
MAX_CONTEXT_TOKENSto match your LLM model limit. Ensure the truncation function preserves the system prompt and critical recent turns. - Code fix: The
truncate_contextfunction removes turns from index 1 onward. If overflow persists, reduceMAX_CONTEXT_TOKENSor implement a sliding window that summarizes older turns before removal.