Automating Cognigy.AI NLU Model Training via REST API with Python
What You Will Build
This tutorial builds a Python automation pipeline that triggers NLU model training, validates dataset quality, polls asynchronous jobs, evaluates performance metrics, promotes the optimal model version, and generates governance audit logs. The implementation uses the Cognigy.AI REST API v1 and Python 3.10.
Prerequisites
- Cognigy.AI tenant with API access enabled
- OAuth2 client credentials (
client_id,client_secret) withmodel:read,model:write,training:read,training:writescopes - Python 3.10+
requests,pydantic,python-dotenv,uuid,logging
Authentication Setup
Cognigy.AI supports OAuth2 client credentials flow for service-to-service authentication. The token endpoint returns a Bearer token that expires after a defined window. You must cache the token and refresh it before expiration to avoid 401 interruptions during long training jobs.
import os
import time
import requests
from typing import Optional
class CognigyAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str):
self.tenant = tenant
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
self.base_url = f"https://{tenant}.cognigy.ai/api/v1"
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 300:
return self.token
url = f"{self.base_url}/auth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "model:read model:write training:read training:write"
}
response = requests.post(url, data=payload, timeout=15)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.token
def headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
Implementation
Step 1: Construct Training Payloads with Dataset References and Hyperparameters
The training request requires explicit dataset identifiers, a validation split ratio, and hyperparameter tuning values. Cognigy.AI expects these in a structured JSON body. You must reference datasets that are already annotated and published to your tenant.
from pydantic import BaseModel, Field
from typing import List
class TrainingConfig(BaseModel):
dataset_ids: List[str]
validation_split: float = Field(0.2, ge=0.1, le=0.5)
epochs: int = Field(50, ge=10, le=500)
batch_size: int = Field(32, ge=8, le=128)
learning_rate: float = Field(0.001, gt=0.0, le=0.1)
dropout: float = Field(0.3, ge=0.0, le=0.8)
early_stopping_patience: int = Field(5, ge=1, le=20)
def to_payload(self) -> dict:
return {
"datasetIds": self.dataset_ids,
"validationSplit": self.validation_split,
"hyperparameters": {
"epochs": self.epochs,
"batchSize": self.batch_size,
"learningRate": self.learning_rate,
"dropout": self.dropout,
"earlyStoppingPatience": self.early_stopping_patience
}
}
Step 2: Validate Inputs Against Class Imbalance and Annotation Quality Thresholds
Training degrades when intent distributions skew heavily or when annotation confidence falls below acceptable limits. You must query dataset statistics before submitting the training job. This step calculates the imbalance ratio and average annotation score.
import logging
from requests import Response
logger = logging.getLogger(__name__)
class DatasetValidator:
def __init__(self, auth: CognigyAuth):
self.auth = auth
def fetch_dataset_stats(self, dataset_id: str) -> dict:
url = f"{self.auth.base_url}/datasets/{dataset_id}/stats"
response = requests.get(url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
return response.json()
def validate_datasets(self, dataset_ids: List[str], max_imbalance_ratio: float = 5.0, min_annotation_score: float = 0.85) -> bool:
for ds_id in dataset_ids:
stats = self.fetch_dataset_stats(ds_id)
intent_counts = stats.get("intentDistribution", {})
if not intent_counts:
raise ValueError(f"Dataset {ds_id} contains no annotated intents.")
counts = list(intent_counts.values())
imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
if imbalance_ratio > max_imbalance_ratio:
logger.warning(f"Dataset {ds_id} class imbalance ratio {imbalance_ratio:.2f} exceeds threshold {max_imbalance_ratio}.")
return False
avg_score = stats.get("averageAnnotationScore", 0.0)
if avg_score < min_annotation_score:
logger.warning(f"Dataset {ds_id} annotation quality {avg_score:.2f} falls below threshold {min_annotation_score}.")
return False
return True
Step 3: Trigger Asynchronous Training and Poll Job Status
Training jobs run asynchronously on Cognigy.AI infrastructure. You must POST the payload, capture the returned trainingId, and poll the status endpoint until completion. This implementation includes exponential backoff for 429 rate limits and tracks resource utilization metrics.
import time
class TrainingOrchestrator:
def __init__(self, auth: CognigyAuth, model_id: str):
self.auth = auth
self.model_id = model_id
self.training_id: Optional[str] = None
def start_training(self, config: TrainingConfig) -> str:
url = f"{self.auth.base_url}/models/{self.model_id}/train"
payload = config.to_payload()
response = requests.post(url, json=payload, headers=self.auth.headers(), timeout=30)
response.raise_for_status()
data = response.json()
self.training_id = data["trainingId"]
logger.info(f"Training job initiated: {self.training_id}")
return self.training_id
def poll_status(self, polling_interval: int = 15, max_retries: int = 10) -> dict:
if not self.training_id:
raise RuntimeError("No active training job. Call start_training first.")
url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/status"
backoff = 1
while True:
try:
response = requests.get(url, headers=self.auth.headers(), timeout=15)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", backoff))
logger.warning(f"Rate limited. Retrying in {retry_after}s.")
time.sleep(retry_after)
backoff = min(backoff * 2, max_retries)
continue
response.raise_for_status()
data = response.json()
status = data.get("status")
progress = data.get("progress", 0)
resources = data.get("resourceUtilization", {})
logger.info(f"Training status: {status} | Progress: {progress}% | CPU: {resources.get('cpu', 'N/A')}% | Memory: {resources.get('memory', 'N/A')}%")
if status in ("COMPLETED", "FAILED", "CANCELLED"):
return data
time.sleep(polling_interval)
except requests.exceptions.RequestException as e:
logger.error(f"Polling error: {e}")
raise
Step 4: Evaluate Metrics and Promote the Optimal Model Version
After training completes, you must retrieve evaluation metrics to determine if the new version meets production standards. This step checks precision, recall, and F1 score against defined thresholds. If the metrics pass, the pipeline promotes the model version to the production environment.
class ModelEvaluator:
def __init__(self, auth: CognigyAuth, model_id: str):
self.auth = auth
self.model_id = model_id
def fetch_metrics(self, training_id: str) -> dict:
url = f"{self.auth.base_url}/models/{self.model_id}/training/{training_id}/metrics"
response = requests.get(url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
return response.json()
def evaluate_and_promote(self, training_id: str, min_f1: float = 0.85, min_precision: float = 0.80) -> bool:
metrics = self.fetch_metrics(training_id)
f1_score = metrics.get("f1Score", 0.0)
precision = metrics.get("precision", 0.0)
recall = metrics.get("recall", 0.0)
logger.info(f"Metrics - F1: {f1_score:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}")
if f1_score < min_f1 or precision < min_precision:
logger.warning(f"Model failed evaluation thresholds. F1: {f1_score}, Precision: {precision}")
return False
version_id = metrics.get("versionId")
if not version_id:
raise ValueError("No version ID returned in metrics payload.")
promote_url = f"{self.auth.base_url}/models/{self.model_id}/versions/{version_id}/promote"
response = requests.post(promote_url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
logger.info(f"Version {version_id} successfully promoted to production.")
return True
Step 5: Sync Artifacts via Webhook and Generate Governance Audit Logs
Continuous NLU improvement requires external version control and compliance tracking. This step pushes training artifacts to a webhook endpoint for model registry synchronization and writes a structured audit log containing duration, convergence data, and decision outcomes.
import json
from datetime import datetime, timezone
class AuditAndSync:
def __init__(self, auth: CognigyAuth, webhook_url: str):
self.auth = auth
self.webhook_url = webhook_url
def sync_to_registry(self, training_id: str, model_id: str, version_id: str, metrics: dict) -> None:
payload = {
"event": "model_training_completed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"modelId": model_id,
"trainingId": training_id,
"versionId": version_id,
"metrics": metrics,
"source": "cognigy_ai_automation"
}
response = requests.post(
self.webhook_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=15
)
response.raise_for_status()
logger.info(f"Artifact synced to external registry: {version_id}")
def write_audit_log(self, model_id: str, training_id: str, start_time: float, end_time: float, status: str, metrics: dict, promoted: bool) -> None:
duration_seconds = end_time - start_time
log_entry = {
"auditId": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"modelId": model_id,
"trainingId": training_id,
"status": status,
"durationSeconds": round(duration_seconds, 2),
"metrics": metrics,
"promotedToProduction": promoted,
"convergenceData": metrics.get("convergenceHistory", []),
"governanceTags": ["automated", "api_triggered", "threshold_validated"]
}
log_file = "cognigy_training_audit.log"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, default=str) + "\n")
logger.info(f"Audit log written to {log_file} for training {training_id}")
Complete Working Example
The following script combines all components into a single runnable module. You must set environment variables for tenant credentials and webhook configuration before execution.
import os
import uuid
import logging
import time
import requests
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime, timezone
import json
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class CognigyAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str):
self.tenant = tenant
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
self.base_url = f"https://{tenant}.cognigy.ai/api/v1"
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 300:
return self.token
url = f"{self.base_url}/auth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "model:read model:write training:read training:write"
}
response = requests.post(url, data=payload, timeout=15)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.token_expiry = time.time() + data["expires_in"]
return self.token
def headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
class TrainingConfig(BaseModel):
dataset_ids: List[str]
validation_split: float = Field(0.2, ge=0.1, le=0.5)
epochs: int = Field(50, ge=10, le=500)
batch_size: int = Field(32, ge=8, le=128)
learning_rate: float = Field(0.001, gt=0.0, le=0.1)
dropout: float = Field(0.3, ge=0.0, le=0.8)
def to_payload(self) -> dict:
return {
"datasetIds": self.dataset_ids,
"validationSplit": self.validation_split,
"hyperparameters": {
"epochs": self.epochs,
"batchSize": self.batch_size,
"learningRate": self.learning_rate,
"dropout": self.dropout
}
}
class CognigyNLUTrainer:
def __init__(self, tenant: str, client_id: str, client_secret: str, model_id: str, webhook_url: str):
self.auth = CognigyAuth(tenant, client_id, client_secret)
self.model_id = model_id
self.webhook_url = webhook_url
self.training_id: Optional[str] = None
def validate_datasets(self, dataset_ids: List[str]) -> bool:
for ds_id in dataset_ids:
url = f"{self.auth.base_url}/datasets/{ds_id}/stats"
response = requests.get(url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
stats = response.json()
intent_counts = stats.get("intentDistribution", {})
if not intent_counts:
raise ValueError(f"Dataset {ds_id} contains no annotated intents.")
counts = list(intent_counts.values())
imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
if imbalance_ratio > 5.0:
logger.warning(f"Dataset {ds_id} imbalance ratio {imbalance_ratio:.2f} exceeds threshold.")
return False
avg_score = stats.get("averageAnnotationScore", 0.0)
if avg_score < 0.85:
logger.warning(f"Dataset {ds_id} annotation quality {avg_score:.2f} below threshold.")
return False
return True
def run_training_pipeline(self, config: TrainingConfig) -> dict:
if not self.validate_datasets(config.dataset_ids):
raise ValueError("Dataset validation failed. Aborting training.")
start_time = time.time()
logger.info("Triggering training job...")
train_url = f"{self.auth.base_url}/models/{self.model_id}/train"
response = requests.post(train_url, json=config.to_payload(), headers=self.auth.headers(), timeout=30)
response.raise_for_status()
self.training_id = response.json()["trainingId"]
logger.info(f"Training initiated: {self.training_id}")
status_data = self._poll_training_status()
end_time = time.time()
status = status_data.get("status")
if status != "COMPLETED":
logger.error(f"Training ended with status: {status}")
self._write_audit_log(start_time, end_time, status, {}, False)
return {"status": status, "trainingId": self.training_id}
metrics = self._fetch_metrics()
promoted = self._evaluate_and_promote(metrics)
self._sync_to_registry(metrics)
self._write_audit_log(start_time, end_time, status, metrics, promoted)
return {
"status": status,
"trainingId": self.training_id,
"metrics": metrics,
"promoted": promoted,
"durationSeconds": round(end_time - start_time, 2)
}
def _poll_training_status(self) -> dict:
url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/status"
backoff = 1
while True:
response = requests.get(url, headers=self.auth.headers(), timeout=15)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", backoff))
logger.warning(f"Rate limited. Retrying in {retry_after}s.")
time.sleep(retry_after)
backoff = min(backoff * 2, 30)
continue
response.raise_for_status()
data = response.json()
logger.info(f"Progress: {data.get('progress', 0)}% | Status: {data.get('status')}")
if data.get("status") in ("COMPLETED", "FAILED", "CANCELLED"):
return data
time.sleep(15)
def _fetch_metrics(self) -> dict:
url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/metrics"
response = requests.get(url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
return response.json()
def _evaluate_and_promote(self, metrics: dict) -> bool:
f1 = metrics.get("f1Score", 0.0)
precision = metrics.get("precision", 0.0)
if f1 < 0.85 or precision < 0.80:
logger.warning(f"Metrics failed thresholds. F1: {f1}, Precision: {precision}")
return False
version_id = metrics.get("versionId")
if not version_id:
raise ValueError("Missing versionId in metrics.")
promote_url = f"{self.auth.base_url}/models/{self.model_id}/versions/{version_id}/promote"
response = requests.post(promote_url, headers=self.auth.headers(), timeout=15)
response.raise_for_status()
logger.info(f"Version {version_id} promoted to production.")
return True
def _sync_to_registry(self, metrics: dict) -> None:
payload = {
"event": "model_training_completed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"modelId": self.model_id,
"trainingId": self.training_id,
"versionId": metrics.get("versionId"),
"metrics": metrics,
"source": "cognigy_ai_automation"
}
response = requests.post(self.webhook_url, json=payload, headers={"Content-Type": "application/json"}, timeout=15)
response.raise_for_status()
logger.info("Artifact synced to external registry.")
def _write_audit_log(self, start_time: float, end_time: float, status: str, metrics: dict, promoted: bool) -> None:
log_entry = {
"auditId": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"modelId": self.model_id,
"trainingId": self.training_id,
"status": status,
"durationSeconds": round(end_time - start_time, 2),
"metrics": metrics,
"promotedToProduction": promoted,
"convergenceData": metrics.get("convergenceHistory", []),
"governanceTags": ["automated", "api_triggered", "threshold_validated"]
}
with open("cognigy_training_audit.log", "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, default=str) + "\n")
logger.info("Audit log written.")
if __name__ == "__main__":
tenant = os.getenv("COGNIGY_TENANT")
client_id = os.getenv("COGNIGY_CLIENT_ID")
client_secret = os.getenv("COGNIGY_CLIENT_SECRET")
model_id = os.getenv("COGNIGY_MODEL_ID")
webhook_url = os.getenv("WEBHOOK_URL", "https://hooks.example.com/cognigy-sync")
dataset_ids = os.getenv("DATASET_IDS", "ds_intent_001,ds_intent_002").split(",")
config = TrainingConfig(
dataset_ids=dataset_ids,
validation_split=0.2,
epochs=50,
batch_size=32,
learning_rate=0.001,
dropout=0.3
)
trainer = CognigyNLUTrainer(tenant, client_id, client_secret, model_id, webhook_url)
result = trainer.run_training_pipeline(config)
logger.info(f"Pipeline complete: {result}")
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth2 token expired during polling or the client credentials lack required scopes.
- Fix: Ensure the token refresh logic checks expiration before each request. Verify the
scopeparameter includestraining:readandtraining:write. - Code Fix: The
CognigyAuth.get_token()method automatically refreshes whentime.time() >= self.token_expiry - 300. Add explicit scope validation if your tenant enforces strict role binding.
Error: 400 Bad Request (Validation Failure)
- Cause: Dataset IDs do not exist, validation split exceeds bounds, or hyperparameters fall outside allowed ranges.
- Fix: Validate
dataset_idsagainst your tenant inventory. Ensurevalidation_splitstays between 0.1 and 0.5. Verify hyperparameter constraints match Cognigy.AI limits. - Code Fix: The
TrainingConfigPydantic model enforces bounds. Catchpydantic.ValidationErrorand log the exact field violation before calling the API.
Error: 429 Too Many Requests
- Cause: Polling frequency exceeds tenant rate limits or concurrent training jobs saturate the quota.
- Fix: Implement exponential backoff with a maximum cap. Respect the
Retry-Afterheader when present. - Code Fix: The
_poll_training_statusmethod includes backoff logic that doubles the wait time up to 30 seconds. Never poll faster than every 10 seconds for status endpoints.
Error: 500 Internal Server Error (Training Engine Failure)
- Cause: Backend training infrastructure failure or corrupted dataset payload.
- Fix: Check the
statusendpoint for detailed error messages. Verify dataset integrity by re-exporting and re-importing annotations. Retry with a reduced batch size. - Code Fix: Capture the full response body on 5xx errors. Log the payload and dataset IDs for support tickets. Implement a retry wrapper for the initial POST if the error is transient.