Offloading Heavy Database Queries in NICE Cognigy Webhooks Using Python Celery
What You Will Build
- A FastAPI webhook endpoint that accepts a NICE Cognigy.AI trigger, dispatches a background task to Celery, and returns an immediate HTTP 202 Accepted response to prevent timeout.
- A Celery worker that executes a heavy database aggregation query, then updates the Cognigy.AI conversation variables using the REST API upon completion.
- Python 3.10+ implementation using FastAPI, Celery, httpx, and Pydantic with production-grade error handling and OAuth2 token management.
Prerequisites
- NICE Cognigy.AI tenant with an OAuth2 client credentials grant configured.
- Required OAuth scope:
cognigy:conversation:write. - Python 3.10 or higher with a virtual environment.
- Redis server running on
localhost:6379(or update the broker URL). - Dependencies:
pip install fastapi uvicorn celery httpx pydantic redis
Authentication Setup
NICE Cognigy.AI uses OAuth2 client credentials flow for server-to-server API access. The token expires after one hour. The following implementation caches the token and refreshes it only when necessary.
import time
import httpx
from typing import Optional
class CognigyAuthClient:
def __init__(self, tenant: str, client_id: str, client_secret: str, scope: str = "cognigy:conversation:write"):
self.base_url = f"https://{tenant}.cognigy.ai"
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 300:
return self._token
url = f"{self.base_url}/api/v1/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
with httpx.Client(timeout=10.0) as client:
response = client.post(url, data=payload)
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
self._expires_at = time.time() + token_data["expires_in"] - 300
return self._token
The token cache subtracts 300 seconds from the expiration window to provide a safety margin. The raise_for_status call converts HTTP 4xx and 5xx responses into httpx.HTTPStatusError exceptions, which you must catch in the calling layer.
Implementation
Step 1: Webhook Handler and Task Dispatch
Cognigy.AI webhooks expect a fast response. If the webhook handler blocks on a heavy database query, Cognigy will timeout and mark the intent as failed. The handler extracts the conversation identifier, dispatches the Celery task, and returns immediately.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any
app = FastAPI()
class CognigyWebhookPayload(BaseModel):
conversationId: str
userId: str
variables: Dict[str, Any]
@app.post("/webhook/cognigy-trigger")
def handle_cognigy_trigger(payload: CognigyWebhookPayload) -> dict:
if not payload.conversationId:
raise HTTPException(status_code=400, detail="conversationId is required")
task_id = heavy_db_task.delay(
conversation_id=payload.conversationId,
query_params=payload.variables
)
return {
"status": "accepted",
"taskId": task_id,
"message": "Database query dispatched to background worker"
}
Expected Cognigy.AI Request:
{
"conversationId": "conv_8f4a2c1d",
"userId": "usr_9b3e7f2a",
"variables": {
"customer_segment": "enterprise",
"date_range": "2024-01-01_to_2024-12-31",
"aggregation_type": "revenue"
}
}
Expected Webhook Response:
{
"status": "accepted",
"taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"message": "Database query dispatched to background worker"
}
The FastAPI endpoint validates the payload structure and raises HTTP 400 if the conversation identifier is missing. The delay method queues the task to Redis and returns immediately.
Step 2: Celery Worker and Database Execution
Celery handles the background execution. The worker receives the task parameters, executes the heavy database query, and captures the result. This example uses SQLite for runnability, but the structure matches production PostgreSQL or MySQL patterns.
import time
import sqlite3
import logging
from celery import Celery
logger = logging.getLogger(__name__)
celery_app = Celery(
"cognigy_worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def heavy_db_task(self, conversation_id: str, query_params: dict) -> None:
logger.info("Starting heavy DB task for conversation: %s", conversation_id)
try:
result = execute_heavy_database_query(query_params)
logger.info("Query completed. Updating conversation variables.")
update_conversation_variables(conversation_id, result)
except Exception as exc:
logger.error("Task failed for conversation %s: %s", conversation_id, exc)
raise self.retry(exc=exc)
def execute_heavy_database_query(params: dict) -> str:
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("CREATE TABLE transactions (id INTEGER PRIMARY KEY, amount REAL, segment TEXT)")
cursor.executemany(
"INSERT INTO transactions (amount, segment) VALUES (?, ?)",
[(i * 1.5, "enterprise") for i in range(100000)]
)
conn.commit()
segment = params.get("customer_segment", "enterprise")
query = f"SELECT COUNT(*), SUM(amount) FROM transactions WHERE segment = '{segment}'"
time.sleep(2.0)
count, total = cursor.execute(query).fetchone()
conn.close()
return f"count:{int(count)}|total:{total:.2f}"
The bind=True parameter exposes the task instance for retry logic. The max_retries=3 and default_retry_delay=60 parameters prevent infinite retry loops. The database function simulates a heavy aggregation with time.sleep and a realistic query pattern. Replace the SQLite block with your production ORM or driver when deploying.
Step 3: Updating Conversation Variables via REST API
Once the query completes, the worker must push the result back to Cognigy.AI. The following function handles the REST API call, implements exponential backoff for HTTP 429 rate limits, and validates the response payload.
import httpx
import time
import logging
logger = logging.getLogger(__name__)
def update_conversation_variables(conversation_id: str, result_data: str, auth_client: CognigyAuthClient) -> None:
url = f"{auth_client.base_url}/api/v1/conversations/{conversation_id}/variables"
token = auth_client.get_token()
payload = {
"variables": [
{"name": "db_query_result", "value": result_data},
{"name": "db_query_timestamp", "value": str(int(time.time()))}
]
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
max_retries = 4
base_delay = 2.0
for attempt in range(max_retries):
with httpx.Client(timeout=15.0) as client:
response = client.patch(url, json=payload, headers=headers)
if response.status_code == 200:
logger.info("Successfully updated variables for conversation %s", conversation_id)
return
elif response.status_code == 401:
logger.warning("Token expired. Refreshing and retrying.")
token = auth_client.get_token()
headers["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 403:
logger.error("Forbidden. Check OAuth scope: cognigy:conversation:write")
raise PermissionError("Missing required OAuth scope")
elif response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
logger.warning("Rate limited. Retrying after %.2f seconds", retry_after)
time.sleep(retry_after)
continue
elif response.status_code >= 500:
logger.warning("Server error %d. Retrying in %.2f seconds", response.status_code, base_delay * (2 ** attempt))
time.sleep(base_delay * (2 ** attempt))
continue
else:
logger.error("Unexpected status %d: %s", response.status_code, response.text)
raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
logger.error("Exhausted retries for conversation %s", conversation_id)
raise ConnectionError("Failed to update conversation variables after retries")
The function handles HTTP 401 by refreshing the token and retrying immediately. HTTP 403 fails fast because it indicates a scope misconfiguration. HTTP 429 respects the Retry-After header or falls back to exponential backoff. HTTP 5xx errors trigger delayed retries. The function raises an exception if all retries fail, which Celery catches and logs.
Complete Working Example
Save the following script as cognigy_worker.py. Run the FastAPI server with uvicorn cognigy_worker:app --host 0.0.0.0 --port 8000. Run the Celery worker separately with celery -A cognigy_worker.celery_app worker --loglevel=info.
import time
import sqlite3
import logging
import httpx
from typing import Dict, Any, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery import Celery
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# --- Configuration ---
COGNIGY_TENANT = "your-tenant"
COGNIGY_CLIENT_ID = "your-client-id"
COGNIGY_CLIENT_SECRET = "your-client-secret"
# --- OAuth Client ---
class CognigyAuthClient:
def __init__(self, tenant: str, client_id: str, client_secret: str, scope: str = "cognigy:conversation:write"):
self.base_url = f"https://{tenant}.cognigy.ai"
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 300:
return self._token
url = f"{self.base_url}/api/v1/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope
}
with httpx.Client(timeout=10.0) as client:
response = client.post(url, data=payload)
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
self._expires_at = time.time() + token_data["expires_in"] - 300
return self._token
auth_client = CognigyAuthClient(COGNIGY_TENANT, COGNIGY_CLIENT_ID, COGNIGY_CLIENT_SECRET)
# --- Celery Configuration ---
celery_app = Celery(
"cognigy_worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def heavy_db_task(self, conversation_id: str, query_params: dict) -> None:
logger.info("Starting heavy DB task for conversation: %s", conversation_id)
try:
result = execute_heavy_database_query(query_params)
logger.info("Query completed. Updating conversation variables.")
update_conversation_variables(conversation_id, result, auth_client)
except Exception as exc:
logger.error("Task failed for conversation %s: %s", conversation_id, exc)
raise self.retry(exc=exc)
def execute_heavy_database_query(params: dict) -> str:
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("CREATE TABLE transactions (id INTEGER PRIMARY KEY, amount REAL, segment TEXT)")
cursor.executemany(
"INSERT INTO transactions (amount, segment) VALUES (?, ?)",
[(i * 1.5, "enterprise") for i in range(100000)]
)
conn.commit()
segment = params.get("customer_segment", "enterprise")
query = f"SELECT COUNT(*), SUM(amount) FROM transactions WHERE segment = '{segment}'"
time.sleep(2.0)
count, total = cursor.execute(query).fetchone()
conn.close()
return f"count:{int(count)}|total:{total:.2f}"
def update_conversation_variables(conversation_id: str, result_data: str, auth: CognigyAuthClient) -> None:
url = f"{auth.base_url}/api/v1/conversations/{conversation_id}/variables"
token = auth.get_token()
payload = {
"variables": [
{"name": "db_query_result", "value": result_data},
{"name": "db_query_timestamp", "value": str(int(time.time()))}
]
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
max_retries = 4
base_delay = 2.0
for attempt in range(max_retries):
with httpx.Client(timeout=15.0) as client:
response = client.patch(url, json=payload, headers=headers)
if response.status_code == 200:
logger.info("Successfully updated variables for conversation %s", conversation_id)
return
elif response.status_code == 401:
logger.warning("Token expired. Refreshing and retrying.")
token = auth.get_token()
headers["Authorization"] = f"Bearer {token}"
continue
elif response.status_code == 403:
logger.error("Forbidden. Check OAuth scope: cognigy:conversation:write")
raise PermissionError("Missing required OAuth scope")
elif response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", base_delay * (2 ** attempt)))
logger.warning("Rate limited. Retrying after %.2f seconds", retry_after)
time.sleep(retry_after)
continue
elif response.status_code >= 500:
logger.warning("Server error %d. Retrying in %.2f seconds", response.status_code, base_delay * (2 ** attempt))
time.sleep(base_delay * (2 ** attempt))
continue
else:
logger.error("Unexpected status %d: %s", response.status_code, response.text)
raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=response.request, response=response)
logger.error("Exhausted retries for conversation %s", conversation_id)
raise ConnectionError("Failed to update conversation variables after retries")
# --- FastAPI Webhook ---
app = FastAPI()
class CognigyWebhookPayload(BaseModel):
conversationId: str
userId: str
variables: Dict[str, Any]
@app.post("/webhook/cognigy-trigger")
def handle_cognigy_trigger(payload: CognigyWebhookPayload) -> dict:
if not payload.conversationId:
raise HTTPException(status_code=400, detail="conversationId is required")
task_id = heavy_db_task.delay(
conversation_id=payload.conversationId,
query_params=payload.variables
)
return {
"status": "accepted",
"taskId": task_id,
"message": "Database query dispatched to background worker"
}
Deploy the FastAPI server behind a reverse proxy or load balancer. Configure your NICE Cognigy.AI webhook node to POST to https://your-domain.com/webhook/cognigy-trigger. Ensure the worker process runs continuously to consume tasks from the Redis broker.
Common Errors and Debugging
Error: HTTP 401 Unauthorized
- Cause: The OAuth2 token has expired or the client credentials are incorrect.
- Fix: Verify the
client_idandclient_secretmatch the Cognigy.AI tenant configuration. The token cache automatically refreshes, but initial authentication failures require credential validation. - Code Fix: The
get_tokenmethod catcheshttpx.HTTPStatusErrorand propagates it. Wrap the initial call in a try-except block if you need custom retry logic during startup.
Error: HTTP 403 Forbidden
- Cause: The OAuth2 client lacks the
cognigy:conversation:writescope. - Fix: Navigate to the Cognigy.AI developer console, edit the OAuth client, and add the
cognigy:conversation:writescope. Restart the worker to invalidate the cached token and fetch a new one with updated permissions.
Error: HTTP 429 Too Many Requests
- Cause: Cognigy.AI enforces rate limits on variable updates, typically around 100 requests per minute per tenant.
- Fix: The implementation uses exponential backoff and respects the
Retry-Afterheader. If you are processing high volumes, implement a rate limiter in Celery usingcelery.signals.task_prerunto throttle dispatch rates.
Error: Celery Task Fails to Execute
- Cause: The worker process is not running, or the Redis broker connection is refused.
- Fix: Run
celery -A cognigy_worker.celery_app worker --loglevel=infoin a separate terminal. Verify Redis is accessible withredis-cli ping. Check worker logs forConnectionRefusedErrororModuleNotFoundError.