Automating Cognigy.AI NLU Model Training via REST API with Python

Automating Cognigy.AI NLU Model Training via REST API with Python

What You Will Build

This tutorial builds a Python automation pipeline that triggers NLU model training, validates dataset quality, polls asynchronous jobs, evaluates performance metrics, promotes the optimal model version, and generates governance audit logs. The implementation uses the Cognigy.AI REST API v1 and Python 3.10.

Prerequisites

  • Cognigy.AI tenant with API access enabled
  • OAuth2 client credentials (client_id, client_secret) with model:read, model:write, training:read, training:write scopes
  • Python 3.10+
  • requests, pydantic, python-dotenv, uuid, logging

Authentication Setup

Cognigy.AI supports OAuth2 client credentials flow for service-to-service authentication. The token endpoint returns a Bearer token that expires after a defined window. You must cache the token and refresh it before expiration to avoid 401 interruptions during long training jobs.

import os
import time
import requests
from typing import Optional

class CognigyAuth:
    def __init__(self, tenant: str, client_id: str, client_secret: str):
        self.tenant = tenant
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.base_url = f"https://{tenant}.cognigy.ai/api/v1"

    def get_token(self) -> str:
        if self.token and time.time() < self.token_expiry - 300:
            return self.token

        url = f"{self.base_url}/auth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "model:read model:write training:read training:write"
        }

        response = requests.post(url, data=payload, timeout=15)
        response.raise_for_status()
        data = response.json()

        self.token = data["access_token"]
        self.token_expiry = time.time() + data["expires_in"]
        return self.token

    def headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

Implementation

Step 1: Construct Training Payloads with Dataset References and Hyperparameters

The training request requires explicit dataset identifiers, a validation split ratio, and hyperparameter tuning values. Cognigy.AI expects these in a structured JSON body. You must reference datasets that are already annotated and published to your tenant.

from pydantic import BaseModel, Field
from typing import List

class TrainingConfig(BaseModel):
    dataset_ids: List[str]
    validation_split: float = Field(0.2, ge=0.1, le=0.5)
    epochs: int = Field(50, ge=10, le=500)
    batch_size: int = Field(32, ge=8, le=128)
    learning_rate: float = Field(0.001, gt=0.0, le=0.1)
    dropout: float = Field(0.3, ge=0.0, le=0.8)
    early_stopping_patience: int = Field(5, ge=1, le=20)

    def to_payload(self) -> dict:
        return {
            "datasetIds": self.dataset_ids,
            "validationSplit": self.validation_split,
            "hyperparameters": {
                "epochs": self.epochs,
                "batchSize": self.batch_size,
                "learningRate": self.learning_rate,
                "dropout": self.dropout,
                "earlyStoppingPatience": self.early_stopping_patience
            }
        }

Step 2: Validate Inputs Against Class Imbalance and Annotation Quality Thresholds

Training degrades when intent distributions skew heavily or when annotation confidence falls below acceptable limits. You must query dataset statistics before submitting the training job. This step calculates the imbalance ratio and average annotation score.

import logging
from requests import Response

logger = logging.getLogger(__name__)

class DatasetValidator:
    def __init__(self, auth: CognigyAuth):
        self.auth = auth

    def fetch_dataset_stats(self, dataset_id: str) -> dict:
        url = f"{self.auth.base_url}/datasets/{dataset_id}/stats"
        response = requests.get(url, headers=self.auth.headers(), timeout=15)
        response.raise_for_status()
        return response.json()

    def validate_datasets(self, dataset_ids: List[str], max_imbalance_ratio: float = 5.0, min_annotation_score: float = 0.85) -> bool:
        for ds_id in dataset_ids:
            stats = self.fetch_dataset_stats(ds_id)
            intent_counts = stats.get("intentDistribution", {})
            
            if not intent_counts:
                raise ValueError(f"Dataset {ds_id} contains no annotated intents.")
                
            counts = list(intent_counts.values())
            imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
            
            if imbalance_ratio > max_imbalance_ratio:
                logger.warning(f"Dataset {ds_id} class imbalance ratio {imbalance_ratio:.2f} exceeds threshold {max_imbalance_ratio}.")
                return False
                
            avg_score = stats.get("averageAnnotationScore", 0.0)
            if avg_score < min_annotation_score:
                logger.warning(f"Dataset {ds_id} annotation quality {avg_score:.2f} falls below threshold {min_annotation_score}.")
                return False
                
        return True

Step 3: Trigger Asynchronous Training and Poll Job Status

Training jobs run asynchronously on Cognigy.AI infrastructure. You must POST the payload, capture the returned trainingId, and poll the status endpoint until completion. This implementation includes exponential backoff for 429 rate limits and tracks resource utilization metrics.

import time

class TrainingOrchestrator:
    def __init__(self, auth: CognigyAuth, model_id: str):
        self.auth = auth
        self.model_id = model_id
        self.training_id: Optional[str] = None

    def start_training(self, config: TrainingConfig) -> str:
        url = f"{self.auth.base_url}/models/{self.model_id}/train"
        payload = config.to_payload()
        
        response = requests.post(url, json=payload, headers=self.auth.headers(), timeout=30)
        response.raise_for_status()
        data = response.json()
        self.training_id = data["trainingId"]
        logger.info(f"Training job initiated: {self.training_id}")
        return self.training_id

    def poll_status(self, polling_interval: int = 15, max_retries: int = 10) -> dict:
        if not self.training_id:
            raise RuntimeError("No active training job. Call start_training first.")
            
        url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/status"
        backoff = 1
        
        while True:
            try:
                response = requests.get(url, headers=self.auth.headers(), timeout=15)
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", backoff))
                    logger.warning(f"Rate limited. Retrying in {retry_after}s.")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, max_retries)
                    continue
                    
                response.raise_for_status()
                data = response.json()
                
                status = data.get("status")
                progress = data.get("progress", 0)
                resources = data.get("resourceUtilization", {})
                
                logger.info(f"Training status: {status} | Progress: {progress}% | CPU: {resources.get('cpu', 'N/A')}% | Memory: {resources.get('memory', 'N/A')}%")
                
                if status in ("COMPLETED", "FAILED", "CANCELLED"):
                    return data
                    
                time.sleep(polling_interval)
                
            except requests.exceptions.RequestException as e:
                logger.error(f"Polling error: {e}")
                raise

Step 4: Evaluate Metrics and Promote the Optimal Model Version

After training completes, you must retrieve evaluation metrics to determine if the new version meets production standards. This step checks precision, recall, and F1 score against defined thresholds. If the metrics pass, the pipeline promotes the model version to the production environment.

class ModelEvaluator:
    def __init__(self, auth: CognigyAuth, model_id: str):
        self.auth = auth
        self.model_id = model_id

    def fetch_metrics(self, training_id: str) -> dict:
        url = f"{self.auth.base_url}/models/{self.model_id}/training/{training_id}/metrics"
        response = requests.get(url, headers=self.auth.headers(), timeout=15)
        response.raise_for_status()
        return response.json()

    def evaluate_and_promote(self, training_id: str, min_f1: float = 0.85, min_precision: float = 0.80) -> bool:
        metrics = self.fetch_metrics(training_id)
        f1_score = metrics.get("f1Score", 0.0)
        precision = metrics.get("precision", 0.0)
        recall = metrics.get("recall", 0.0)
        
        logger.info(f"Metrics - F1: {f1_score:.3f}, Precision: {precision:.3f}, Recall: {recall:.3f}")
        
        if f1_score < min_f1 or precision < min_precision:
            logger.warning(f"Model failed evaluation thresholds. F1: {f1_score}, Precision: {precision}")
            return False
            
        version_id = metrics.get("versionId")
        if not version_id:
            raise ValueError("No version ID returned in metrics payload.")
            
        promote_url = f"{self.auth.base_url}/models/{self.model_id}/versions/{version_id}/promote"
        response = requests.post(promote_url, headers=self.auth.headers(), timeout=15)
        response.raise_for_status()
        
        logger.info(f"Version {version_id} successfully promoted to production.")
        return True

Step 5: Sync Artifacts via Webhook and Generate Governance Audit Logs

Continuous NLU improvement requires external version control and compliance tracking. This step pushes training artifacts to a webhook endpoint for model registry synchronization and writes a structured audit log containing duration, convergence data, and decision outcomes.

import json
from datetime import datetime, timezone

class AuditAndSync:
    def __init__(self, auth: CognigyAuth, webhook_url: str):
        self.auth = auth
        self.webhook_url = webhook_url

    def sync_to_registry(self, training_id: str, model_id: str, version_id: str, metrics: dict) -> None:
        payload = {
            "event": "model_training_completed",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "modelId": model_id,
            "trainingId": training_id,
            "versionId": version_id,
            "metrics": metrics,
            "source": "cognigy_ai_automation"
        }
        
        response = requests.post(
            self.webhook_url,
            json=payload,
            headers={"Content-Type": "application/json"},
            timeout=15
        )
        response.raise_for_status()
        logger.info(f"Artifact synced to external registry: {version_id}")

    def write_audit_log(self, model_id: str, training_id: str, start_time: float, end_time: float, status: str, metrics: dict, promoted: bool) -> None:
        duration_seconds = end_time - start_time
        log_entry = {
            "auditId": str(uuid.uuid4()),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "modelId": model_id,
            "trainingId": training_id,
            "status": status,
            "durationSeconds": round(duration_seconds, 2),
            "metrics": metrics,
            "promotedToProduction": promoted,
            "convergenceData": metrics.get("convergenceHistory", []),
            "governanceTags": ["automated", "api_triggered", "threshold_validated"]
        }
        
        log_file = "cognigy_training_audit.log"
        with open(log_file, "a", encoding="utf-8") as f:
            f.write(json.dumps(log_entry, default=str) + "\n")
            
        logger.info(f"Audit log written to {log_file} for training {training_id}")

Complete Working Example

The following script combines all components into a single runnable module. You must set environment variables for tenant credentials and webhook configuration before execution.

import os
import uuid
import logging
import time
import requests
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime, timezone
import json

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

class CognigyAuth:
    def __init__(self, tenant: str, client_id: str, client_secret: str):
        self.tenant = tenant
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.base_url = f"https://{tenant}.cognigy.ai/api/v1"

    def get_token(self) -> str:
        if self.token and time.time() < self.token_expiry - 300:
            return self.token
        url = f"{self.base_url}/auth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "model:read model:write training:read training:write"
        }
        response = requests.post(url, data=payload, timeout=15)
        response.raise_for_status()
        data = response.json()
        self.token = data["access_token"]
        self.token_expiry = time.time() + data["expires_in"]
        return self.token

    def headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

class TrainingConfig(BaseModel):
    dataset_ids: List[str]
    validation_split: float = Field(0.2, ge=0.1, le=0.5)
    epochs: int = Field(50, ge=10, le=500)
    batch_size: int = Field(32, ge=8, le=128)
    learning_rate: float = Field(0.001, gt=0.0, le=0.1)
    dropout: float = Field(0.3, ge=0.0, le=0.8)

    def to_payload(self) -> dict:
        return {
            "datasetIds": self.dataset_ids,
            "validationSplit": self.validation_split,
            "hyperparameters": {
                "epochs": self.epochs,
                "batchSize": self.batch_size,
                "learningRate": self.learning_rate,
                "dropout": self.dropout
            }
        }

class CognigyNLUTrainer:
    def __init__(self, tenant: str, client_id: str, client_secret: str, model_id: str, webhook_url: str):
        self.auth = CognigyAuth(tenant, client_id, client_secret)
        self.model_id = model_id
        self.webhook_url = webhook_url
        self.training_id: Optional[str] = None

    def validate_datasets(self, dataset_ids: List[str]) -> bool:
        for ds_id in dataset_ids:
            url = f"{self.auth.base_url}/datasets/{ds_id}/stats"
            response = requests.get(url, headers=self.auth.headers(), timeout=15)
            response.raise_for_status()
            stats = response.json()
            intent_counts = stats.get("intentDistribution", {})
            if not intent_counts:
                raise ValueError(f"Dataset {ds_id} contains no annotated intents.")
            counts = list(intent_counts.values())
            imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
            if imbalance_ratio > 5.0:
                logger.warning(f"Dataset {ds_id} imbalance ratio {imbalance_ratio:.2f} exceeds threshold.")
                return False
            avg_score = stats.get("averageAnnotationScore", 0.0)
            if avg_score < 0.85:
                logger.warning(f"Dataset {ds_id} annotation quality {avg_score:.2f} below threshold.")
                return False
        return True

    def run_training_pipeline(self, config: TrainingConfig) -> dict:
        if not self.validate_datasets(config.dataset_ids):
            raise ValueError("Dataset validation failed. Aborting training.")

        start_time = time.time()
        logger.info("Triggering training job...")
        train_url = f"{self.auth.base_url}/models/{self.model_id}/train"
        response = requests.post(train_url, json=config.to_payload(), headers=self.auth.headers(), timeout=30)
        response.raise_for_status()
        self.training_id = response.json()["trainingId"]
        logger.info(f"Training initiated: {self.training_id}")

        status_data = self._poll_training_status()
        end_time = time.time()
        status = status_data.get("status")

        if status != "COMPLETED":
            logger.error(f"Training ended with status: {status}")
            self._write_audit_log(start_time, end_time, status, {}, False)
            return {"status": status, "trainingId": self.training_id}

        metrics = self._fetch_metrics()
        promoted = self._evaluate_and_promote(metrics)
        self._sync_to_registry(metrics)
        self._write_audit_log(start_time, end_time, status, metrics, promoted)

        return {
            "status": status,
            "trainingId": self.training_id,
            "metrics": metrics,
            "promoted": promoted,
            "durationSeconds": round(end_time - start_time, 2)
        }

    def _poll_training_status(self) -> dict:
        url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/status"
        backoff = 1
        while True:
            response = requests.get(url, headers=self.auth.headers(), timeout=15)
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", backoff))
                logger.warning(f"Rate limited. Retrying in {retry_after}s.")
                time.sleep(retry_after)
                backoff = min(backoff * 2, 30)
                continue
            response.raise_for_status()
            data = response.json()
            logger.info(f"Progress: {data.get('progress', 0)}% | Status: {data.get('status')}")
            if data.get("status") in ("COMPLETED", "FAILED", "CANCELLED"):
                return data
            time.sleep(15)

    def _fetch_metrics(self) -> dict:
        url = f"{self.auth.base_url}/models/{self.model_id}/training/{self.training_id}/metrics"
        response = requests.get(url, headers=self.auth.headers(), timeout=15)
        response.raise_for_status()
        return response.json()

    def _evaluate_and_promote(self, metrics: dict) -> bool:
        f1 = metrics.get("f1Score", 0.0)
        precision = metrics.get("precision", 0.0)
        if f1 < 0.85 or precision < 0.80:
            logger.warning(f"Metrics failed thresholds. F1: {f1}, Precision: {precision}")
            return False
        version_id = metrics.get("versionId")
        if not version_id:
            raise ValueError("Missing versionId in metrics.")
        promote_url = f"{self.auth.base_url}/models/{self.model_id}/versions/{version_id}/promote"
        response = requests.post(promote_url, headers=self.auth.headers(), timeout=15)
        response.raise_for_status()
        logger.info(f"Version {version_id} promoted to production.")
        return True

    def _sync_to_registry(self, metrics: dict) -> None:
        payload = {
            "event": "model_training_completed",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "modelId": self.model_id,
            "trainingId": self.training_id,
            "versionId": metrics.get("versionId"),
            "metrics": metrics,
            "source": "cognigy_ai_automation"
        }
        response = requests.post(self.webhook_url, json=payload, headers={"Content-Type": "application/json"}, timeout=15)
        response.raise_for_status()
        logger.info("Artifact synced to external registry.")

    def _write_audit_log(self, start_time: float, end_time: float, status: str, metrics: dict, promoted: bool) -> None:
        log_entry = {
            "auditId": str(uuid.uuid4()),
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "modelId": self.model_id,
            "trainingId": self.training_id,
            "status": status,
            "durationSeconds": round(end_time - start_time, 2),
            "metrics": metrics,
            "promotedToProduction": promoted,
            "convergenceData": metrics.get("convergenceHistory", []),
            "governanceTags": ["automated", "api_triggered", "threshold_validated"]
        }
        with open("cognigy_training_audit.log", "a", encoding="utf-8") as f:
            f.write(json.dumps(log_entry, default=str) + "\n")
        logger.info("Audit log written.")

if __name__ == "__main__":
    tenant = os.getenv("COGNIGY_TENANT")
    client_id = os.getenv("COGNIGY_CLIENT_ID")
    client_secret = os.getenv("COGNIGY_CLIENT_SECRET")
    model_id = os.getenv("COGNIGY_MODEL_ID")
    webhook_url = os.getenv("WEBHOOK_URL", "https://hooks.example.com/cognigy-sync")
    dataset_ids = os.getenv("DATASET_IDS", "ds_intent_001,ds_intent_002").split(",")

    config = TrainingConfig(
        dataset_ids=dataset_ids,
        validation_split=0.2,
        epochs=50,
        batch_size=32,
        learning_rate=0.001,
        dropout=0.3
    )

    trainer = CognigyNLUTrainer(tenant, client_id, client_secret, model_id, webhook_url)
    result = trainer.run_training_pipeline(config)
    logger.info(f"Pipeline complete: {result}")

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth2 token expired during polling or the client credentials lack required scopes.
  • Fix: Ensure the token refresh logic checks expiration before each request. Verify the scope parameter includes training:read and training:write.
  • Code Fix: The CognigyAuth.get_token() method automatically refreshes when time.time() >= self.token_expiry - 300. Add explicit scope validation if your tenant enforces strict role binding.

Error: 400 Bad Request (Validation Failure)

  • Cause: Dataset IDs do not exist, validation split exceeds bounds, or hyperparameters fall outside allowed ranges.
  • Fix: Validate dataset_ids against your tenant inventory. Ensure validation_split stays between 0.1 and 0.5. Verify hyperparameter constraints match Cognigy.AI limits.
  • Code Fix: The TrainingConfig Pydantic model enforces bounds. Catch pydantic.ValidationError and log the exact field violation before calling the API.

Error: 429 Too Many Requests

  • Cause: Polling frequency exceeds tenant rate limits or concurrent training jobs saturate the quota.
  • Fix: Implement exponential backoff with a maximum cap. Respect the Retry-After header when present.
  • Code Fix: The _poll_training_status method includes backoff logic that doubles the wait time up to 30 seconds. Never poll faster than every 10 seconds for status endpoints.

Error: 500 Internal Server Error (Training Engine Failure)

  • Cause: Backend training infrastructure failure or corrupted dataset payload.
  • Fix: Check the status endpoint for detailed error messages. Verify dataset integrity by re-exporting and re-importing annotations. Retry with a reduced batch size.
  • Code Fix: Capture the full response body on 5xx errors. Log the payload and dataset IDs for support tickets. Implement a retry wrapper for the initial POST if the error is transient.

Official References