Fine-Tuning NICE Cognigy.AI NLU Models via Training API with Python SDK

Fine-Tuning NICE Cognigy.AI NLU Models via Training API with Python SDK

What You Will Build

This tutorial provides a complete Python module that constructs annotated NLU training datasets, validates schemas against token limits, triggers asynchronous GPU-accelerated model retraining, monitors convergence metrics, evaluates classification performance with precision-recall curves, syncs model artifacts to S3, and generates governance audit logs. The code uses the Cognigy.AI Training API with httpx for transport, scikit-learn for evaluation, and boto3 for artifact synchronization. The implementation covers Python 3.9+.

Prerequisites

  • OAuth2 client credentials configured in Cognigy.AI with scopes: nlu:write, training:execute, models:read, exports:write, datasets:upload
  • Cognigy.AI API v1 endpoint base URL
  • Python 3.9+ runtime
  • Dependencies: httpx>=0.24.0, pandas>=2.0.0, scikit-learn>=1.3.0, boto3>=1.28.0, numpy>=1.24.0, matplotlib>=3.7.0, pydantic>=2.0.0

Authentication Setup

Cognigy.AI supports OAuth2 client credentials flow for service-to-service authentication. The following implementation caches tokens and implements automatic refresh before expiration.

import httpx
import time
import threading
from typing import Optional
from pydantic import BaseModel

class OAuthToken(BaseModel):
    access_token: str
    token_type: str
    expires_in: int
    issued_at: float = time.time()

class CognigyAuthClient:
    def __init__(self, client_id: str, client_secret: str, token_url: str, scopes: list[str]):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.scopes = scopes
        self._token: Optional[OAuthToken] = None
        self._lock = threading.Lock()

    def _fetch_token(self) -> OAuthToken:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": " ".join(self.scopes)
        }
        response = httpx.post(self.token_url, data=payload)
        response.raise_for_status()
        data = response.json()
        return OAuthToken(
            access_token=data["access_token"],
            token_type=data["token_type"],
            expires_in=data["expires_in"]
        )

    def get_token(self) -> OAuthToken:
        with self._lock:
            if self._token is None or time.time() >= (self._token.issued_at + self._token.expires_in - 60):
                self._token = self._fetch_token()
            return self._token

Implementation

Step 1: Construct and Validate Dataset Payloads

Training datasets require annotated intent utterances, entity extraction rules, and cross-validation fold assignments. Cognigy.AI rejects payloads exceeding token limits or containing unsupported language model configurations. The validation step checks sequence length, entity tag format, and fold distribution before upload.

import pandas as pd
from pydantic import BaseModel, field_validator
from typing import List, Dict, Any

class TrainingSample(BaseModel):
    text: str
    intent: str
    entities: List[Dict[str, Any]]
    fold: int

    @field_validator("text")
    @classmethod
    def check_token_length(cls, v: str) -> str:
        estimated_tokens = len(v.split())
        if estimated_tokens > 512:
            raise ValueError("Text exceeds maximum token length of 512.")
        return v

    @field_validator("entities")
    @classmethod
    def validate_entity_schema(cls, v: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        for entity in v:
            if "start" not in entity or "end" not in entity or "type" not in entity:
                raise ValueError("Entity must contain start, end, and type fields.")
        return v

class TrainingDataset(BaseModel):
    samples: List[TrainingSample]
    language: str
    model_version: str

    @field_validator("language")
    @classmethod
    def check_language_compatibility(cls, v: str) -> str:
        supported = ["en-US", "de-DE", "fr-FR", "es-ES"]
        if v not in supported:
            raise ValueError(f"Language {v} is not in the compatibility matrix.")
        return v

def build_dataset_df(samples: List[TrainingSample]) -> pd.DataFrame:
    records = []
    for s in samples:
        records.append({
            "text": s.text,
            "intent": s.intent,
            "entities": s.entities,
            "fold": s.fold
        })
    return pd.DataFrame(records)

Step 2: Upload Dataset and Trigger Asynchronous Training Job

The dataset is serialized to JSON and posted to the Cognigy.AI dataset upload endpoint. After validation, the training job is initiated with GPU resource allocation parameters and early stopping configuration. The API returns a job identifier for polling.

import httpx
import json
import time
from typing import Dict, Any

class CognigyNLUClient:
    BASE_URL = "https://api.cognigy.ai"
    MAX_RETRIES = 3
    RETRY_DELAY = 2

    def __init__(self, auth: CognigyAuthClient):
        self.auth = auth
        self.client = httpx.Client(base_url=self.BASE_URL, timeout=30.0)

    def _request(self, method: str, path: str, **kwargs) -> httpx.Response:
        headers = kwargs.pop("headers", {})
        headers["Authorization"] = f"Bearer {self.auth.get_token().access_token}"
        headers["Content-Type"] = "application/json"
        
        last_exception = None
        for attempt in range(self.MAX_RETRIES):
            response = self.client.request(method, path, headers=headers, **kwargs)
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", self.RETRY_DELAY))
                time.sleep(retry_after)
                continue
            if response.status_code >= 500:
                last_exception = httpx.HTTPStatusError(f"Server error {response.status_code}", response=response, request=response.request)
                time.sleep(self.RETRY_DELAY * (attempt + 1))
                continue
            return response
        raise last_exception

    def upload_dataset(self, dataset: TrainingDataset, project_id: str) -> str:
        payload = {
            "projectId": project_id,
            "language": dataset.language,
            "modelVersion": dataset.model_version,
            "samples": dataset.model_dump()["samples"]
        }
        response = self._request("POST", f"/api/v1/ai/nlu/datasets/upload", json=payload)
        response.raise_for_status()
        return response.json()["datasetId"]

    def start_training_job(self, dataset_id: str, project_id: str, gpu_tier: str = "A10G-1", early_stopping_patience: int = 3) -> Dict[str, Any]:
        payload = {
            "datasetId": dataset_id,
            "projectId": project_id,
            "config": {
                "gpuTier": gpu_tier,
                "earlyStopping": {
                    "patience": early_stopping_patience,
                    "metric": "val_loss"
                },
                "crossValidationFolds": 5
            }
        }
        response = self._request("POST", f"/api/v1/ai/nlu/training/jobs", json=payload)
        response.raise_for_status()
        return response.json()

Step 3: Monitor Training, Track Loss, and Handle Early Stopping

Training jobs run asynchronously. The polling loop retrieves epoch duration, validation loss, and GPU utilization metrics. The loop respects early stopping signals from the platform and stops polling when the job transitions to a terminal state.

import time
from typing import List, Dict, Any

class TrainingMonitor:
    def __init__(self, client: CognigyNLUClient, job_id: str):
        self.client = client
        self.job_id = job_id
        self.epoch_metrics: List[Dict[str, Any]] = []

    def poll_until_complete(self, interval: int = 10) -> List[Dict[str, Any]]:
        while True:
            response = self.client._request("GET", f"/api/v1/ai/nlu/training/jobs/{self.job_id}/status")
            response.raise_for_status()
            status_data = response.json()
            
            state = status_data["state"]
            if state in ["COMPLETED", "FAILED", "CANCELLED", "EARLY_STOPPED"]:
                break
                
            metrics = status_data.get("currentEpochMetrics", {})
            if metrics:
                self.epoch_metrics.append(metrics)
                
            time.sleep(interval)
        return self.epoch_metrics

Step 4: Evaluate Model and Generate Precision-Recall Curves

After training completes, the evaluation endpoint returns prediction probabilities and ground truth labels. The evaluation logic computes a confusion matrix, generates precision-recall curves, and optimizes the classification threshold to minimize false positives.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, precision_recall_curve, auc

class NLUEvaluator:
    @staticmethod
    def fetch_evaluation_results(client: CognigyNLUClient, job_id: str) -> pd.DataFrame:
        response = client._request("GET", f"/api/v1/ai/nlu/training/jobs/{job_id}/evaluation")
        response.raise_for_status()
        data = response.json()["results"]
        return pd.DataFrame(data)

    @staticmethod
    def optimize_threshold(df: pd.DataFrame, target_precision: float = 0.95) -> float:
        y_true = (df["predicted_intent"] == df["actual_intent"]).astype(int)
        y_scores = df["max_probability"].values
        
        precision, recall, thresholds = precision_recall_curve(y_true, y_scores)
        pr_auc = auc(recall, precision)
        
        optimal_idx = np.argmin(np.abs(precision - target_precision))
        optimal_threshold = thresholds[optimal_idx] if optimal_idx < len(thresholds) else 0.5
        
        return optimal_threshold, precision, recall, thresholds

    @staticmethod
    def generate_metrics(df: pd.DataFrame) -> Dict[str, Any]:
        y_true = df["actual_intent"]
        y_pred = df["predicted_intent"]
        cm = confusion_matrix(y_true, y_pred, labels=df["actual_intent"].unique())
        
        threshold, precision, recall, thresholds = NLUEvaluator.optimize_threshold(df)
        
        plt.figure(figsize=(6, 4))
        plt.plot(recall, precision, label=f"PR Curve (AUC = {auc(recall, precision):.2f})")
        plt.axhline(y=threshold, color="r", linestyle="--", label=f"Optimal Threshold")
        plt.xlabel("Recall")
        plt.ylabel("Precision")
        plt.title("Intent Classification Precision-Recall Curve")
        plt.legend()
        plt.tight_layout()
        plt.savefig("pr_curve.png")
        plt.close()
        
        return {
            "confusion_matrix": cm.tolist(),
            "optimal_threshold": threshold,
            "pr_auc": auc(recall, precision)
        }

Step 5: Sync Artifacts to S3 and Generate Audit Logs

Model artifacts, evaluation metrics, and training logs are packaged and uploaded to an S3-compatible bucket. The audit log generator records dataset hashes, training duration, GPU allocation, threshold selection, and compliance metadata for AI governance.

import boto3
import hashlib
import json
from datetime import datetime, timezone
from typing import Dict, Any

class MLOpsSync:
    def __init__(self, s3_endpoint: str, bucket: str, region: str, aws_key: str, aws_secret: str):
        self.s3_client = boto3.client(
            "s3",
            endpoint_url=s3_endpoint,
            aws_access_key_id=aws_key,
            aws_secret_access_key=aws_secret,
            region_name=region
        )
        self.bucket = bucket

    def upload_artifact(self, key: str, data: bytes, content_type: str = "application/json") -> str:
        self.s3_client.put_object(Bucket=self.bucket, Key=key, Body=data, ContentType=content_type)
        return f"s3://{self.bucket}/{key}"

    def generate_audit_log(self, job_id: str, dataset_id: str, metrics: Dict[str, Any], epoch_metrics: List[Dict[str, Any]]) -> str:
        log = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "jobId": job_id,
            "datasetId": dataset_id,
            "trainingDurationSeconds": sum(e.get("durationSec", 0) for e in epoch_metrics),
            "gpuAllocation": epoch_metrics[0].get("gpuTier", "unknown") if epoch_metrics else "unknown",
            "finalValLoss": epoch_metrics[-1].get("valLoss", 0) if epoch_metrics else 0,
            "evaluationMetrics": metrics,
            "governanceHash": hashlib.sha256(json.dumps(metrics, sort_keys=True).encode()).hexdigest()
        }
        log_bytes = json.dumps(log, indent=2).encode()
        log_path = f"audit/nlu_training_{job_id}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
        self.upload_artifact(log_path, log_bytes)
        return f"s3://{self.bucket}/{log_path}"

Complete Working Example

The following script combines authentication, dataset construction, validation, training orchestration, evaluation, artifact synchronization, and audit logging into a single executable module. Replace the placeholder credentials and endpoints with your environment values.

import httpx
import pandas as pd
import time
import json
from typing import List

def run_fine_tuning_pipeline():
    # Configuration
    COGNIGY_BASE = "https://api.cognigy.ai"
    AUTH_URL = f"{COGNIGY_BASE}/oauth/token"
    S3_ENDPOINT = "https://s3.amazonaws.com"
    BUCKET_NAME = "mlops-nlu-artifacts"
    REGION = "us-east-1"
    AWS_KEY = "YOUR_AWS_ACCESS_KEY"
    AWS_SECRET = "YOUR_AWS_SECRET_KEY"
    CLIENT_ID = "YOUR_COGNIGY_CLIENT_ID"
    CLIENT_SECRET = "YOUR_COGNIGY_CLIENT_SECRET"
    PROJECT_ID = "YOUR_PROJECT_ID"

    # Initialize clients
    auth = CognigyAuthClient(CLIENT_ID, CLIENT_SECRET, AUTH_URL, scopes=["nlu:write", "training:execute", "models:read", "exports:write", "datasets:upload"])
    nlu_client = CognigyNLUClient(auth)
    s3_sync = MLOpsSync(S3_ENDPOINT, BUCKET_NAME, REGION, AWS_KEY, AWS_SECRET)

    # Step 1: Construct and validate dataset
    samples = [
        TrainingSample(text="I want to reset my password", intent="reset_password", entities=[{"start": 0, "end": 28, "type": "intent"}], fold=0),
        TrainingSample(text="Can you help me with billing", intent="billing_inquiry", entities=[{"start": 0, "end": 29, "type": "intent"}], fold=1),
        TrainingSample(text="My account is locked", intent="account_locked", entities=[{"start": 0, "end": 20, "type": "intent"}], fold=2),
    ]
    dataset = TrainingDataset(samples=samples, language="en-US", model_version="v2.1")
    
    print(f"Dataset validation passed. Uploading {len(samples)} samples.")
    dataset_id = nlu_client.upload_dataset(dataset, PROJECT_ID)
    print(f"Dataset uploaded: {dataset_id}")

    # Step 2: Start training job
    job_response = nlu_client.start_training_job(dataset_id, PROJECT_ID, gpu_tier="A10G-1", early_stopping_patience=3)
    job_id = job_response["jobId"]
    print(f"Training job started: {job_id}")

    # Step 3: Monitor training
    monitor = TrainingMonitor(nlu_client, job_id)
    print("Polling training status...")
    epoch_metrics = monitor.poll_until_complete(interval=10)
    print(f"Training completed. Processed {len(epoch_metrics)} epochs.")

    # Step 4: Evaluate model
    eval_df = NLUEvaluator.fetch_evaluation_results(nlu_client, job_id)
    metrics = NLUEvaluator.generate_metrics(eval_df)
    print(f"Evaluation complete. Optimal threshold: {metrics['optimal_threshold']:.3f}")

    # Step 5: Sync artifacts and audit
    metrics_path = s3_sync.upload_artifact(f"models/{job_id}_metrics.json", json.dumps(metrics).encode())
    audit_path = s3_sync.generate_audit_log(job_id, dataset_id, metrics, epoch_metrics)
    
    print(f"Metrics synced to: {metrics_path}")
    print(f"Audit log generated: {audit_path}")
    return metrics, audit_path

if __name__ == "__main__":
    run_fine_tuning_pipeline()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired OAuth token or incorrect client credentials.
  • Fix: Verify client_id and client_secret in the Cognigy.AI developer console. Ensure the token cache refreshes before expiration. The CognigyAuthClient implementation automatically refreshes tokens 60 seconds before expiry.
  • Code Fix: The _fetch_token method already handles token acquisition. Ensure the scopes list matches the registered client permissions.

Error: 403 Forbidden

  • Cause: Missing OAuth scopes or insufficient project-level permissions.
  • Fix: Add nlu:write, training:execute, and datasets:upload to the OAuth client configuration. Verify the service account has editor access to the target project.
  • Code Fix: Update the scopes list in CognigyAuthClient initialization.

Error: 400 Bad Request (Schema Validation)

  • Cause: Text exceeds 512 tokens, entity boundaries are malformed, or language is unsupported.
  • Fix: Validate inputs before serialization. The TrainingSample pydantic model enforces token limits and entity schema requirements.
  • Code Fix: Review the field_validator methods in TrainingSample. Adjust text truncation logic if upstream data sources produce longer sequences.

Error: 429 Too Many Requests

  • Cause: Rate limiting on dataset upload or status polling endpoints.
  • Fix: Implement exponential backoff. The _request method already retries 429 responses using the Retry-After header or a fixed delay.
  • Code Fix: Increase RETRY_DELAY or adjust polling interval in poll_until_complete.

Error: 503 Service Unavailable (GPU Queue)

  • Cause: GPU compute resources are exhausted in the region.
  • Fix: Poll the job status endpoint for queue position. Switch to a lower-tier GPU or schedule jobs during off-peak hours.
  • Code Fix: The training monitor loop handles 503 retries automatically. Add a queue position logger if Cognigy.AI returns queuePosition in the status payload.

Official References