Training Cognigy.AI Intent Models via REST API with Python
What You Will Build
A Python module that constructs annotated training payloads, validates intent schema constraints, uploads versioned datasets with diff comparison, triggers asynchronous model training, polls for job completion, retrieves confusion matrices, synchronizes data via webhooks, logs audit trails, and exposes a QA evaluation endpoint. This tutorial uses the Cognigy.AI v2 REST API and Python 3.9+ with the requests library.
Prerequisites
- Cognigy.AI API key or OAuth2 client credentials with scopes
cognigy:projects:read,cognigy:projects:write,cognigy:datasets:upload,cognigy:models:train - Cognigy.AI API version
v2 - Python 3.9 or higher
- Dependencies:
requests>=2.31.0,pydantic>=2.0.0,httpx>=0.24.0,hashlib(standard library),logging(standard library)
Authentication Setup
Cognigy.AI v2 endpoints require a Bearer token in the Authorization header. The following setup handles token acquisition and caching with automatic refresh logic.
import os
import time
import requests
from typing import Optional
class CognigyAuth:
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at:
return self._token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "cognigy:projects:read cognigy:projects:write cognigy:datasets:upload cognigy:models:train"
}
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data.get("expires_in", 3600) - 60
return self._token
Implementation
Step 1: Construct Training Data Payloads and Validate Schema Constraints
Training data requires strict schema validation to prevent model corruption. Each utterance must contain non-overlapping entity spans, and intent labels must be unique within the dataset. Disambiguation rules require explicit confidence thresholds.
import hashlib
import logging
from typing import List, Dict, Any
from pydantic import BaseModel, field_validator, ValidationError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EntityAnnotation(BaseModel):
start: int
end: int
label: str
value: str
class Utterance(BaseModel):
text: str
intent: str
entities: List[EntityAnnotation] = []
@field_validator("entities")
@classmethod
def validate_entity_spans(cls, v: List[EntityAnnotation], info: Any) -> List[EntityAnnotation]:
for entity in v:
if entity.start >= entity.end:
raise ValueError("Entity start must be less than end")
sorted_entities = sorted(v, key=lambda e: e.start)
for i in range(len(sorted_entities) - 1):
if sorted_entities[i].end > sorted_entities[i + 1].start:
raise ValueError("Overlapping entity spans detected")
return v
class TrainingDataset(BaseModel):
version: str
intents: Dict[str, List[Utterance]]
@field_validator("intents")
@classmethod
def validate_unique_labels(cls, v: Dict[str, List[Utterance]]) -> Dict[str, List[Utterance]]:
label_set = set(v.keys())
if len(label_set) != len(v):
raise ValueError("Duplicate intent labels found in dataset")
return v
def build_training_payload(utterances: List[Dict[str, Any]]) -> Dict[str, Any]:
grouped: Dict[str, List[Dict[str, Any]]] = {}
for item in utterances:
intent = item["intent"]
if intent not in grouped:
grouped[intent] = []
grouped[intent].append(item)
try:
dataset = TrainingDataset(
version="1.0.0",
intents={k: [Utterance(**u) for u in v] for k, v in grouped.items()}
)
logger.info("Schema validation passed. Unique intents: %d", len(dataset.intents))
return dataset.model_dump()
except ValidationError as e:
logger.error("Schema validation failed: %s", str(e))
raise
Step 2: Implement Incremental Model Updates with Versioned Dataset Uploads
Incremental updates prevent redundant training cycles. The system computes a SHA-256 hash of the payload, compares it against the last uploaded version, and only uploads when differences exist. The API expects a JSON payload with explicit version tracking.
import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class CognigyDatasetManager:
def __init__(self, auth: CognigyAuth, project_id: str, base_url: str):
self.auth = auth
self.project_id = project_id
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
def _get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def compute_payload_hash(self, payload: Dict[str, Any]) -> str:
normalized = json.dumps(payload, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def upload_dataset(self, payload: Dict[str, Any]) -> Dict[str, Any]:
current_hash = self.compute_payload_hash(payload)
logger.info("Dataset hash: %s", current_hash)
# Simulate diff comparison against last uploaded version
last_hash_url = f"{self.base_url}/api/v2/projects/{self.project_id}/datasets/latest/hash"
try:
resp = self.session.get(last_hash_url, headers=self._get_headers(), timeout=10)
resp.raise_for_status()
last_hash = resp.json().get("hash", "")
if current_hash == last_hash:
logger.info("No changes detected. Skipping upload.")
return {"status": "skipped", "reason": "identical_hash"}
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.info("No previous dataset found. Proceeding with full upload.")
else:
raise
upload_url = f"{self.base_url}/api/v2/projects/{self.project_id}/datasets/upload"
response = self.session.post(
upload_url,
headers=self._get_headers(),
json=payload,
timeout=30
)
response.raise_for_status()
logger.info("Dataset uploaded successfully. Version: %s", payload.get("version"))
return response.json()
Step 3: Handle Asynchronous Model Training Jobs with Progress Tracking
Model training runs asynchronously. The client must poll the status endpoint with exponential backoff and handle 429 rate limits gracefully. Progress tracking extracts percentage completion and job state.
import time
class CognigyModelTrainer:
def __init__(self, dataset_manager: CognigyDatasetManager):
self.manager = dataset_manager
self.session = dataset_manager.session
self.base_url = dataset_manager.base_url
self.project_id = dataset_manager.project_id
def trigger_training(self) -> str:
train_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/train"
response = self.session.post(
train_url,
headers=self.manager._get_headers(),
json={"mode": "incremental", "optimize_for": "accuracy"},
timeout=15
)
response.raise_for_status()
job_id = response.json()["jobId"]
logger.info("Training job initiated. Job ID: %s", job_id)
return job_id
def poll_training_status(self, job_id: str, max_polls: int = 60) -> Dict[str, Any]:
status_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/train/status/{job_id}"
backoff = 2.0
for attempt in range(max_polls):
response = self.session.get(status_url, headers=self.manager._get_headers(), timeout=10)
if response.status_code == 429:
retry_after = float(response.headers.get("Retry-After", backoff))
logger.warning("Rate limited (429). Waiting %.2f seconds.", retry_after)
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
state = data.get("status", "UNKNOWN")
progress = data.get("progress", 0)
logger.info("Training progress: %d%% | State: %s", progress, state)
if state in ["COMPLETED", "SUCCESS"]:
logger.info("Training completed successfully.")
return data
elif state in ["FAILED", "ERROR"]:
raise RuntimeError(f"Training failed: {data.get('error', 'Unknown error')}")
time.sleep(backoff)
backoff = min(backoff * 1.5, 30.0)
raise TimeoutError("Training job did not complete within maximum polling attempts.")
Step 4: Synchronize Training Data and Monitor Accuracy Metrics
External NLU platforms sync via webhook ingestion. The system structures the payload for Cognigy.AI webhook endpoints and retrieves confusion matrices for performance tuning. Audit logging captures every action for governance.
import logging
import json
from datetime import datetime
class CognigyMetricsAndSync:
def __init__(self, dataset_manager: CognigyDatasetManager):
self.manager = dataset_manager
self.session = dataset_manager.session
self.base_url = dataset_manager.base_url
self.project_id = dataset_manager.project_id
self.audit_log = logging.getLogger("audit")
self.audit_log.handlers = self.manager.session.adapters["https://"].max_retries # Placeholder for stream handler
def _log_audit(self, action: str, details: Dict[str, Any]) -> None:
record = {
"timestamp": datetime.utcnow().isoformat(),
"project_id": self.project_id,
"action": action,
"details": details
}
logger.info("AUDIT: %s", json.dumps(record))
def sync_via_webhook(self, external_data: List[Dict[str, Any]]) -> Dict[str, Any]:
webhook_url = f"{self.base_url}/api/v2/projects/{self.project_id}/webhooks/ingest"
payload = {
"source": "external_nlu",
"format": "cognigy_v2",
"records": external_data
}
self._log_audit("WEBHOOK_INGEST", {"record_count": len(external_data)})
response = self.session.post(
webhook_url,
headers=self.manager._get_headers(),
json=payload,
timeout=20
)
response.raise_for_status()
return response.json()
def get_confusion_matrix(self) -> Dict[str, Any]:
metrics_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/metrics"
params = {"type": "confusion_matrix", "granularity": "intent"}
response = self.session.get(
metrics_url,
headers=self.manager._get_headers(),
params=params,
timeout=15
)
response.raise_for_status()
data = response.json()
self._log_audit("METRICS_FETCH", {"metrics_type": "confusion_matrix"})
return data
def get_accuracy_metrics(self) -> Dict[str, Any]:
metrics_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/metrics"
params = {"type": "accuracy", "split": "validation"}
response = self.session.get(
metrics_url,
headers=self.manager._get_headers(),
params=params,
timeout=15
)
response.raise_for_status()
return response.json()
Step 5: Expose an Intent Evaluation Tool for QA Validation
QA validation requires a synchronous endpoint that returns predicted intent, confidence score, and extracted entities. The tool accepts raw utterances and returns structured predictions for review.
class CognigyQAEvaluator:
def __init__(self, dataset_manager: CognigyDatasetManager):
self.manager = dataset_manager
self.session = dataset_manager.session
self.base_url = dataset_manager.base_url
self.project_id = dataset_manager.project_id
def evaluate_utterance(self, text: str) -> Dict[str, Any]:
eval_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/evaluate"
payload = {
"input": text,
"return_entities": True,
"return_confidence": True,
"return_alternatives": 3
}
response = self.session.post(
eval_url,
headers=self.manager._get_headers(),
json=payload,
timeout=10
)
response.raise_for_status()
return response.json()
Complete Working Example
The following script combines all components into a runnable module. Replace environment variables with valid credentials before execution.
import os
import sys
import json
from typing import List, Dict, Any
def main() -> None:
# Configuration
CLIENT_ID = os.getenv("COGNIGY_CLIENT_ID")
CLIENT_SECRET = os.getenv("COGNIGY_CLIENT_SECRET")
TOKEN_URL = os.getenv("COGNIGY_TOKEN_URL", "https://auth.cognigy.ai/oauth/token")
BASE_URL = os.getenv("COGNIGY_BASE_URL", "https://api.cognigy.ai")
PROJECT_ID = os.getenv("COGNIGY_PROJECT_ID")
if not all([CLIENT_ID, CLIENT_SECRET, PROJECT_ID]):
raise ValueError("Missing required environment variables.")
# Initialize components
auth = CognigyAuth(CLIENT_ID, CLIENT_SECRET, TOKEN_URL)
dataset_mgr = CognigyDatasetManager(auth, PROJECT_ID, BASE_URL)
trainer = CognigyModelTrainer(dataset_mgr)
metrics_sync = CognigyMetricsAndSync(dataset_mgr)
evaluator = CognigyQAEvaluator(dataset_mgr)
# Sample training data
sample_utterances: List[Dict[str, Any]] = [
{
"text": "I want to book a flight to London",
"intent": "book_flight",
"entities": [
{"start": 29, "end": 35, "label": "destination", "value": "London"}
]
},
{
"text": "Cancel my reservation for tomorrow",
"intent": "cancel_reservation",
"entities": [
{"start": 28, "end": 36, "label": "date", "value": "tomorrow"}
]
}
]
try:
# Step 1: Validate and build payload
payload = build_training_payload(sample_utterances)
# Step 2: Upload dataset
upload_result = dataset_mgr.upload_dataset(payload)
if upload_result.get("status") == "skipped":
print("Dataset unchanged. Proceeding to training.")
# Step 3: Trigger and poll training
job_id = trainer.trigger_training()
training_result = trainer.poll_training_status(job_id)
# Step 4: Retrieve metrics
confusion = metrics_sync.get_confusion_matrix()
accuracy = metrics_sync.get_accuracy_metrics()
print("Confusion Matrix:", json.dumps(confusion, indent=2))
print("Accuracy Metrics:", json.dumps(accuracy, indent=2))
# Step 5: QA Evaluation
test_text = "Book me a ticket to Paris next week"
eval_result = evaluator.evaluate_utterance(test_text)
print("QA Evaluation:", json.dumps(eval_result, indent=2))
except requests.exceptions.HTTPError as e:
logger.error("HTTP Error: %s | Response: %s", e.response.status_code, e.response.text)
sys.exit(1)
except Exception as e:
logger.error("Execution failed: %s", str(e))
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired token, invalid client credentials, or missing
Authorizationheader. - Fix: Verify
COGNIGY_CLIENT_IDandCOGNIGY_CLIENT_SECRETmatch the registered application. Ensure the token endpoint returns a validaccess_token. TheCognigyAuthclass handles refresh, but network timeouts during token acquisition will propagate as 401. Add explicit timeout handling in the token request. - Code Fix: Wrap
auth.get_token()in a retry block with explicit 401 detection.
Error: 400 Bad Request (Schema Violation)
- Cause: Overlapping entity spans, duplicate intent labels, or malformed JSON structure.
- Fix: The
TrainingDatasetPydantic model catches these before transmission. Review the validation error messages. Ensure entitystartandendindices match the exact character positions intext. Cognigy.AI uses zero-based indexing. - Code Fix: Log
ValidationErrordetails and map them to original utterance indices for precise correction.
Error: 429 Too Many Requests
- Cause: Rate limit exceeded on dataset upload or training status polling.
- Fix: The
CognigyDatasetManagerandCognigyModelTrainerinclude exponential backoff andRetry-Afterheader parsing. Ensure your polling interval respects the platform limit. Do not parallelize training triggers. - Code Fix: Increase
max_retriesinHTTPAdapterand adjustbackoffmultiplier in the polling loop.
Error: 500 Internal Server Error (Training Failure)
- Cause: Incompatible dataset format, corrupted version history, or backend resource exhaustion.
- Fix: Check the
poll_training_statusresponse forerrordetails. Revert to a known good dataset version. Verify that entity labels match the registered taxonomy in the Cognigy.AI console. - Code Fix: Capture the full response body on 5xx errors and trigger an alert. Use the
versionfield in payloads to rollback if necessary.