Offloading Heavy Database Queries in NICE Cognigy Webhooks Using Python Celery

Offloading Heavy Database Queries in NICE Cognigy Webhooks Using Python Celery

What You Will Build

  • A FastAPI webhook endpoint that accepts a NICE Cognigy.AI trigger, dispatches a background task to Celery, and returns an immediate HTTP 202 Accepted response to prevent timeout.
  • A Celery worker that executes a heavy database aggregation query, then updates the Cognigy.AI conversation variables using the REST API upon completion.
  • Python 3.10+ implementation using FastAPI, Celery, httpx, and Pydantic with production-grade error handling and OAuth2 token management.

Prerequisites

  • NICE Cognigy.AI tenant with an OAuth2 client credentials grant configured.
  • Required OAuth scope: cognigy:conversation:write.
  • Python 3.10 or higher with a virtual environment.
  • Redis server running on localhost:6379 (or update the broker URL).
  • Dependencies: pip install fastapi uvicorn celery httpx pydantic redis

Authentication Setup

NICE Cognigy.AI uses OAuth2 client credentials flow for server-to-server API access. The token expires after one hour. The following implementation caches the token and refreshes it only when necessary.

import time
import httpx
from typing import Optional

class CognigyAuthClient:
    def __init__(self, tenant: str, client_id: str, client_secret: str, scope: str = "cognigy:conversation:write"):
        self.base_url = f"https://{tenant}.cognigy.ai"
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self._token: Optional[str] = None
        self._expires_at: float = 0.0

    def get_token(self) -> str:
        if self._token and time.time() < self._expires_at - 300:
            return self._token

        url = f"{self.base_url}/api/v1/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scope
        }

        with httpx.Client(timeout=10.0) as client:
            response = client.post(url, data=payload)
            response.raise_for_status()

            token_data = response.json()
            self._token = token_data["access_token"]
            self._expires_at = time.time() + token_data["expires_in"] - 300
            return self._token

The token cache subtracts 300 seconds from the expiration window to provide a safety margin. The raise_for_status call converts HTTP 4xx and 5xx responses into httpx.HTTPStatusError exceptions, which you must catch in the calling layer.

Implementation

Step 1: Webhook Handler and Task Dispatch

Cognigy.AI webhooks expect a fast response. If the webhook handler blocks on a heavy database query, Cognigy will timeout and mark the intent as failed. The handler extracts the conversation identifier, dispatches the Celery task, and returns immediately.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any

app = FastAPI()

class CognigyWebhookPayload(BaseModel):
    conversationId: str
    userId: str
    variables: Dict[str, Any]

@app.post("/webhook/cognigy-trigger")
def handle_cognigy_trigger(payload: CognigyWebhookPayload) -> dict:
    if not payload.conversationId:
        raise HTTPException(status_code=400, detail="conversationId is required")

    task_id = heavy_db_task.delay(
        conversation_id=payload.conversationId,
        query_params=payload.variables
    )

    return {
        "status": "accepted",
        "taskId": task_id,
        "message": "Database query dispatched to background worker"
    }

Expected Cognigy.AI Request:

{
  "conversationId": "conv_8f4a2c1d",
  "userId": "usr_9b3e7f2a",
  "variables": {
    "customer_segment": "enterprise",
    "date_range": "2024-01-01_to_2024-12-31",
    "aggregation_type": "revenue"
  }
}

Expected Webhook Response:

{
  "status": "accepted",
  "taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "message": "Database query dispatched to background worker"
}

The FastAPI endpoint validates the payload structure and raises HTTP 400 if the conversation identifier is missing. The delay method queues the task to Redis and returns immediately.

Step 2: Celery Worker and Database Execution

Celery handles the background execution. The worker receives the task parameters, executes the heavy database query, and captures the result. This example uses SQLite for runnability, but the structure matches production PostgreSQL or MySQL patterns.

import time
import sqlite3
import logging
from celery import Celery

logger = logging.getLogger(__name__)

celery_app = Celery(
    "cognigy_worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def heavy_db_task(self, conversation_id: str, query_params: dict) -> None:
    logger.info("Starting heavy DB task for conversation: %s", conversation_id)
    
    try:
        result = execute_heavy_database_query(query_params)
        logger.info("Query completed. Updating conversation variables.")
        update_conversation_variables(conversation_id, result)
    except Exception as exc:
        logger.error("Task failed for conversation %s: %s", conversation_id, exc)
        raise self.retry(exc=exc)

def execute_heavy_database_query(params: dict) -> str:
    conn = sqlite3.connect(":memory:")
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE transactions (id INTEGER PRIMARY KEY, amount REAL, segment TEXT)")
    cursor.executemany(
        "INSERT INTO transactions (amount, segment) VALUES (?, ?)",
        [(i * 1.5, "enterprise") for i in range(100000)]
    )
    conn.commit()
    
    segment = params.get("customer_segment", "enterprise")
    query = f"SELECT COUNT(*), SUM(amount) FROM transactions WHERE segment = '{segment}'"
    
    time.sleep(2.0)
    
    count, total = cursor.execute(query).fetchone()
    conn.close()
    return f"count:{int(count)}|total:{total:.2f}"

The bind=True parameter exposes the task instance for retry logic. The max_retries=3 and default_retry_delay=60 parameters prevent infinite retry loops. The database function simulates a heavy aggregation with time.sleep and a realistic query pattern. Replace the SQLite block with your production ORM or driver when deploying.

Step 3: Updating Conversation Variables via REST API

Once the query completes, the worker must push the result back to Cognigy.AI. The following function handles the REST API call, implements exponential backoff for HTTP 429 rate limits, and validates the response payload.

import httpx
import time
import logging

logger = logging.getLogger(__name__)

def update_conversation_variables(conversation_id: str, result_data: str, auth_client: CognigyAuthClient) -> None:
    url = f"{auth_client.base_url}/api/v1/conversations/{conversation_id}/variables"
    token = auth_client.get_token()
    
    payload = {
        "variables": [
            {"name": "db_query_result", "value": result_data},
            {"name": "db_query_timestamp", "value": str(int(time.time()))}
        ]
    }
    
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    max_retries = 4
    base_delay = 2.0
    
    for attempt in range(max_retries):
        with httpx.Client(timeout=15.0) as client:
            response = client.patch(url, json=payload, headers=headers)
            
            if response.status_code == 200:
                logger.info("Successfully updated variables for conversation %s", conversation_id)
                return
            elif response.status_code == 401:
                logger.warning("Token expired. Refreshing and retrying.")
                token = auth_client.get_token()
                headers["Authorization"] = f"Bearer {token}"
                continue
            elif response.status_code == 403:
                logger.error("Forbidden. Check OAuth scope: cognigy:conversation:write")
                raise PermissionError("Missing required OAuth scope")
            elif response.status_code == 429:
                retry_after = float(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
                logger.warning("Rate limited. Retrying after %.2f seconds", retry_after)
                time.sleep(retry_after)
                continue
            elif response.status_code >= 500:
                logger.warning("Server error %d. Retrying in %.2f seconds", response.status_code, base_delay * (2 ** attempt))
                time.sleep(base_delay * (2 ** attempt))
                continue
            else:
                logger.error("Unexpected status %d: %s", response.status_code, response.text)
                raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
                
    logger.error("Exhausted retries for conversation %s", conversation_id)
    raise ConnectionError("Failed to update conversation variables after retries")

The function handles HTTP 401 by refreshing the token and retrying immediately. HTTP 403 fails fast because it indicates a scope misconfiguration. HTTP 429 respects the Retry-After header or falls back to exponential backoff. HTTP 5xx errors trigger delayed retries. The function raises an exception if all retries fail, which Celery catches and logs.

Complete Working Example

Save the following script as cognigy_worker.py. Run the FastAPI server with uvicorn cognigy_worker:app --host 0.0.0.0 --port 8000. Run the Celery worker separately with celery -A cognigy_worker.celery_app worker --loglevel=info.

import time
import sqlite3
import logging
import httpx
from typing import Dict, Any, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery import Celery

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

# --- Configuration ---
COGNIGY_TENANT = "your-tenant"
COGNIGY_CLIENT_ID = "your-client-id"
COGNIGY_CLIENT_SECRET = "your-client-secret"

# --- OAuth Client ---
class CognigyAuthClient:
    def __init__(self, tenant: str, client_id: str, client_secret: str, scope: str = "cognigy:conversation:write"):
        self.base_url = f"https://{tenant}.cognigy.ai"
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self._token: Optional[str] = None
        self._expires_at: float = 0.0

    def get_token(self) -> str:
        if self._token and time.time() < self._expires_at - 300:
            return self._token

        url = f"{self.base_url}/api/v1/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scope
        }

        with httpx.Client(timeout=10.0) as client:
            response = client.post(url, data=payload)
            response.raise_for_status()

            token_data = response.json()
            self._token = token_data["access_token"]
            self._expires_at = time.time() + token_data["expires_in"] - 300
            return self._token

auth_client = CognigyAuthClient(COGNIGY_TENANT, COGNIGY_CLIENT_ID, COGNIGY_CLIENT_SECRET)

# --- Celery Configuration ---
celery_app = Celery(
    "cognigy_worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def heavy_db_task(self, conversation_id: str, query_params: dict) -> None:
    logger.info("Starting heavy DB task for conversation: %s", conversation_id)
    
    try:
        result = execute_heavy_database_query(query_params)
        logger.info("Query completed. Updating conversation variables.")
        update_conversation_variables(conversation_id, result, auth_client)
    except Exception as exc:
        logger.error("Task failed for conversation %s: %s", conversation_id, exc)
        raise self.retry(exc=exc)

def execute_heavy_database_query(params: dict) -> str:
    conn = sqlite3.connect(":memory:")
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE transactions (id INTEGER PRIMARY KEY, amount REAL, segment TEXT)")
    cursor.executemany(
        "INSERT INTO transactions (amount, segment) VALUES (?, ?)",
        [(i * 1.5, "enterprise") for i in range(100000)]
    )
    conn.commit()
    
    segment = params.get("customer_segment", "enterprise")
    query = f"SELECT COUNT(*), SUM(amount) FROM transactions WHERE segment = '{segment}'"
    
    time.sleep(2.0)
    
    count, total = cursor.execute(query).fetchone()
    conn.close()
    return f"count:{int(count)}|total:{total:.2f}"

def update_conversation_variables(conversation_id: str, result_data: str, auth: CognigyAuthClient) -> None:
    url = f"{auth.base_url}/api/v1/conversations/{conversation_id}/variables"
    token = auth.get_token()
    
    payload = {
        "variables": [
            {"name": "db_query_result", "value": result_data},
            {"name": "db_query_timestamp", "value": str(int(time.time()))}
        ]
    }
    
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    max_retries = 4
    base_delay = 2.0
    
    for attempt in range(max_retries):
        with httpx.Client(timeout=15.0) as client:
            response = client.patch(url, json=payload, headers=headers)
            
            if response.status_code == 200:
                logger.info("Successfully updated variables for conversation %s", conversation_id)
                return
            elif response.status_code == 401:
                logger.warning("Token expired. Refreshing and retrying.")
                token = auth.get_token()
                headers["Authorization"] = f"Bearer {token}"
                continue
            elif response.status_code == 403:
                logger.error("Forbidden. Check OAuth scope: cognigy:conversation:write")
                raise PermissionError("Missing required OAuth scope")
            elif response.status_code == 429:
                retry_after = float(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
                logger.warning("Rate limited. Retrying after %.2f seconds", retry_after)
                time.sleep(retry_after)
                continue
            elif response.status_code >= 500:
                logger.warning("Server error %d. Retrying in %.2f seconds", response.status_code, base_delay * (2 ** attempt))
                time.sleep(base_delay * (2 ** attempt))
                continue
            else:
                logger.error("Unexpected status %d: %s", response.status_code, response.text)
                raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
                
    logger.error("Exhausted retries for conversation %s", conversation_id)
    raise ConnectionError("Failed to update conversation variables after retries")

# --- FastAPI Webhook ---
app = FastAPI()

class CognigyWebhookPayload(BaseModel):
    conversationId: str
    userId: str
    variables: Dict[str, Any]

@app.post("/webhook/cognigy-trigger")
def handle_cognigy_trigger(payload: CognigyWebhookPayload) -> dict:
    if not payload.conversationId:
        raise HTTPException(status_code=400, detail="conversationId is required")

    task_id = heavy_db_task.delay(
        conversation_id=payload.conversationId,
        query_params=payload.variables
    )

    return {
        "status": "accepted",
        "taskId": task_id,
        "message": "Database query dispatched to background worker"
    }

Deploy the FastAPI server behind a reverse proxy or load balancer. Configure your NICE Cognigy.AI webhook node to POST to https://your-domain.com/webhook/cognigy-trigger. Ensure the worker process runs continuously to consume tasks from the Redis broker.

Common Errors and Debugging

Error: HTTP 401 Unauthorized

  • Cause: The OAuth2 token has expired or the client credentials are incorrect.
  • Fix: Verify the client_id and client_secret match the Cognigy.AI tenant configuration. The token cache automatically refreshes, but initial authentication failures require credential validation.
  • Code Fix: The get_token method catches httpx.HTTPStatusError and propagates it. Wrap the initial call in a try-except block if you need custom retry logic during startup.

Error: HTTP 403 Forbidden

  • Cause: The OAuth2 client lacks the cognigy:conversation:write scope.
  • Fix: Navigate to the Cognigy.AI developer console, edit the OAuth client, and add the cognigy:conversation:write scope. Restart the worker to invalidate the cached token and fetch a new one with updated permissions.

Error: HTTP 429 Too Many Requests

  • Cause: Cognigy.AI enforces rate limits on variable updates, typically around 100 requests per minute per tenant.
  • Fix: The implementation uses exponential backoff and respects the Retry-After header. If you are processing high volumes, implement a rate limiter in Celery using celery.signals.task_prerun to throttle dispatch rates.

Error: Celery Task Fails to Execute

  • Cause: The worker process is not running, or the Redis broker connection is refused.
  • Fix: Run celery -A cognigy_worker.celery_app worker --loglevel=info in a separate terminal. Verify Redis is accessible with redis-cli ping. Check worker logs for ConnectionRefusedError or ModuleNotFoundError.

Official References