Fine-Tuning NICE Cognigy.AI NLU Models via Training API with Python SDK
What You Will Build
This tutorial provides a complete Python module that constructs annotated NLU training datasets, validates schemas against token limits, triggers asynchronous GPU-accelerated model retraining, monitors convergence metrics, evaluates classification performance with precision-recall curves, syncs model artifacts to S3, and generates governance audit logs. The code uses the Cognigy.AI Training API with httpx for transport, scikit-learn for evaluation, and boto3 for artifact synchronization. The implementation covers Python 3.9+.
Prerequisites
- OAuth2 client credentials configured in Cognigy.AI with scopes:
nlu:write,training:execute,models:read,exports:write,datasets:upload - Cognigy.AI API v1 endpoint base URL
- Python 3.9+ runtime
- Dependencies:
httpx>=0.24.0,pandas>=2.0.0,scikit-learn>=1.3.0,boto3>=1.28.0,numpy>=1.24.0,matplotlib>=3.7.0,pydantic>=2.0.0
Authentication Setup
Cognigy.AI supports OAuth2 client credentials flow for service-to-service authentication. The following implementation caches tokens and implements automatic refresh before expiration.
import httpx
import time
import threading
from typing import Optional
from pydantic import BaseModel
class OAuthToken(BaseModel):
access_token: str
token_type: str
expires_in: int
issued_at: float = time.time()
class CognigyAuthClient:
def __init__(self, client_id: str, client_secret: str, token_url: str, scopes: list[str]):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.scopes = scopes
self._token: Optional[OAuthToken] = None
self._lock = threading.Lock()
def _fetch_token(self) -> OAuthToken:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
response = httpx.post(self.token_url, data=payload)
response.raise_for_status()
data = response.json()
return OAuthToken(
access_token=data["access_token"],
token_type=data["token_type"],
expires_in=data["expires_in"]
)
def get_token(self) -> OAuthToken:
with self._lock:
if self._token is None or time.time() >= (self._token.issued_at + self._token.expires_in - 60):
self._token = self._fetch_token()
return self._token
Implementation
Step 1: Construct and Validate Dataset Payloads
Training datasets require annotated intent utterances, entity extraction rules, and cross-validation fold assignments. Cognigy.AI rejects payloads exceeding token limits or containing unsupported language model configurations. The validation step checks sequence length, entity tag format, and fold distribution before upload.
import pandas as pd
from pydantic import BaseModel, field_validator
from typing import List, Dict, Any
class TrainingSample(BaseModel):
text: str
intent: str
entities: List[Dict[str, Any]]
fold: int
@field_validator("text")
@classmethod
def check_token_length(cls, v: str) -> str:
estimated_tokens = len(v.split())
if estimated_tokens > 512:
raise ValueError("Text exceeds maximum token length of 512.")
return v
@field_validator("entities")
@classmethod
def validate_entity_schema(cls, v: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
for entity in v:
if "start" not in entity or "end" not in entity or "type" not in entity:
raise ValueError("Entity must contain start, end, and type fields.")
return v
class TrainingDataset(BaseModel):
samples: List[TrainingSample]
language: str
model_version: str
@field_validator("language")
@classmethod
def check_language_compatibility(cls, v: str) -> str:
supported = ["en-US", "de-DE", "fr-FR", "es-ES"]
if v not in supported:
raise ValueError(f"Language {v} is not in the compatibility matrix.")
return v
def build_dataset_df(samples: List[TrainingSample]) -> pd.DataFrame:
records = []
for s in samples:
records.append({
"text": s.text,
"intent": s.intent,
"entities": s.entities,
"fold": s.fold
})
return pd.DataFrame(records)
Step 2: Upload Dataset and Trigger Asynchronous Training Job
The dataset is serialized to JSON and posted to the Cognigy.AI dataset upload endpoint. After validation, the training job is initiated with GPU resource allocation parameters and early stopping configuration. The API returns a job identifier for polling.
import httpx
import json
import time
from typing import Dict, Any
class CognigyNLUClient:
BASE_URL = "https://api.cognigy.ai"
MAX_RETRIES = 3
RETRY_DELAY = 2
def __init__(self, auth: CognigyAuthClient):
self.auth = auth
self.client = httpx.Client(base_url=self.BASE_URL, timeout=30.0)
def _request(self, method: str, path: str, **kwargs) -> httpx.Response:
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.auth.get_token().access_token}"
headers["Content-Type"] = "application/json"
last_exception = None
for attempt in range(self.MAX_RETRIES):
response = self.client.request(method, path, headers=headers, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", self.RETRY_DELAY))
time.sleep(retry_after)
continue
if response.status_code >= 500:
last_exception = httpx.HTTPStatusError(f"Server error {response.status_code}", response=response, request=response.request)
time.sleep(self.RETRY_DELAY * (attempt + 1))
continue
return response
raise last_exception
def upload_dataset(self, dataset: TrainingDataset, project_id: str) -> str:
payload = {
"projectId": project_id,
"language": dataset.language,
"modelVersion": dataset.model_version,
"samples": dataset.model_dump()["samples"]
}
response = self._request("POST", f"/api/v1/ai/nlu/datasets/upload", json=payload)
response.raise_for_status()
return response.json()["datasetId"]
def start_training_job(self, dataset_id: str, project_id: str, gpu_tier: str = "A10G-1", early_stopping_patience: int = 3) -> Dict[str, Any]:
payload = {
"datasetId": dataset_id,
"projectId": project_id,
"config": {
"gpuTier": gpu_tier,
"earlyStopping": {
"patience": early_stopping_patience,
"metric": "val_loss"
},
"crossValidationFolds": 5
}
}
response = self._request("POST", f"/api/v1/ai/nlu/training/jobs", json=payload)
response.raise_for_status()
return response.json()
Step 3: Monitor Training, Track Loss, and Handle Early Stopping
Training jobs run asynchronously. The polling loop retrieves epoch duration, validation loss, and GPU utilization metrics. The loop respects early stopping signals from the platform and stops polling when the job transitions to a terminal state.
import time
from typing import List, Dict, Any
class TrainingMonitor:
def __init__(self, client: CognigyNLUClient, job_id: str):
self.client = client
self.job_id = job_id
self.epoch_metrics: List[Dict[str, Any]] = []
def poll_until_complete(self, interval: int = 10) -> List[Dict[str, Any]]:
while True:
response = self.client._request("GET", f"/api/v1/ai/nlu/training/jobs/{self.job_id}/status")
response.raise_for_status()
status_data = response.json()
state = status_data["state"]
if state in ["COMPLETED", "FAILED", "CANCELLED", "EARLY_STOPPED"]:
break
metrics = status_data.get("currentEpochMetrics", {})
if metrics:
self.epoch_metrics.append(metrics)
time.sleep(interval)
return self.epoch_metrics
Step 4: Evaluate Model and Generate Precision-Recall Curves
After training completes, the evaluation endpoint returns prediction probabilities and ground truth labels. The evaluation logic computes a confusion matrix, generates precision-recall curves, and optimizes the classification threshold to minimize false positives.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, precision_recall_curve, auc
class NLUEvaluator:
@staticmethod
def fetch_evaluation_results(client: CognigyNLUClient, job_id: str) -> pd.DataFrame:
response = client._request("GET", f"/api/v1/ai/nlu/training/jobs/{job_id}/evaluation")
response.raise_for_status()
data = response.json()["results"]
return pd.DataFrame(data)
@staticmethod
def optimize_threshold(df: pd.DataFrame, target_precision: float = 0.95) -> float:
y_true = (df["predicted_intent"] == df["actual_intent"]).astype(int)
y_scores = df["max_probability"].values
precision, recall, thresholds = precision_recall_curve(y_true, y_scores)
pr_auc = auc(recall, precision)
optimal_idx = np.argmin(np.abs(precision - target_precision))
optimal_threshold = thresholds[optimal_idx] if optimal_idx < len(thresholds) else 0.5
return optimal_threshold, precision, recall, thresholds
@staticmethod
def generate_metrics(df: pd.DataFrame) -> Dict[str, Any]:
y_true = df["actual_intent"]
y_pred = df["predicted_intent"]
cm = confusion_matrix(y_true, y_pred, labels=df["actual_intent"].unique())
threshold, precision, recall, thresholds = NLUEvaluator.optimize_threshold(df)
plt.figure(figsize=(6, 4))
plt.plot(recall, precision, label=f"PR Curve (AUC = {auc(recall, precision):.2f})")
plt.axhline(y=threshold, color="r", linestyle="--", label=f"Optimal Threshold")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Intent Classification Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.savefig("pr_curve.png")
plt.close()
return {
"confusion_matrix": cm.tolist(),
"optimal_threshold": threshold,
"pr_auc": auc(recall, precision)
}
Step 5: Sync Artifacts to S3 and Generate Audit Logs
Model artifacts, evaluation metrics, and training logs are packaged and uploaded to an S3-compatible bucket. The audit log generator records dataset hashes, training duration, GPU allocation, threshold selection, and compliance metadata for AI governance.
import boto3
import hashlib
import json
from datetime import datetime, timezone
from typing import Dict, Any
class MLOpsSync:
def __init__(self, s3_endpoint: str, bucket: str, region: str, aws_key: str, aws_secret: str):
self.s3_client = boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=aws_key,
aws_secret_access_key=aws_secret,
region_name=region
)
self.bucket = bucket
def upload_artifact(self, key: str, data: bytes, content_type: str = "application/json") -> str:
self.s3_client.put_object(Bucket=self.bucket, Key=key, Body=data, ContentType=content_type)
return f"s3://{self.bucket}/{key}"
def generate_audit_log(self, job_id: str, dataset_id: str, metrics: Dict[str, Any], epoch_metrics: List[Dict[str, Any]]) -> str:
log = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"jobId": job_id,
"datasetId": dataset_id,
"trainingDurationSeconds": sum(e.get("durationSec", 0) for e in epoch_metrics),
"gpuAllocation": epoch_metrics[0].get("gpuTier", "unknown") if epoch_metrics else "unknown",
"finalValLoss": epoch_metrics[-1].get("valLoss", 0) if epoch_metrics else 0,
"evaluationMetrics": metrics,
"governanceHash": hashlib.sha256(json.dumps(metrics, sort_keys=True).encode()).hexdigest()
}
log_bytes = json.dumps(log, indent=2).encode()
log_path = f"audit/nlu_training_{job_id}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
self.upload_artifact(log_path, log_bytes)
return f"s3://{self.bucket}/{log_path}"
Complete Working Example
The following script combines authentication, dataset construction, validation, training orchestration, evaluation, artifact synchronization, and audit logging into a single executable module. Replace the placeholder credentials and endpoints with your environment values.
import httpx
import pandas as pd
import time
import json
from typing import List
def run_fine_tuning_pipeline():
# Configuration
COGNIGY_BASE = "https://api.cognigy.ai"
AUTH_URL = f"{COGNIGY_BASE}/oauth/token"
S3_ENDPOINT = "https://s3.amazonaws.com"
BUCKET_NAME = "mlops-nlu-artifacts"
REGION = "us-east-1"
AWS_KEY = "YOUR_AWS_ACCESS_KEY"
AWS_SECRET = "YOUR_AWS_SECRET_KEY"
CLIENT_ID = "YOUR_COGNIGY_CLIENT_ID"
CLIENT_SECRET = "YOUR_COGNIGY_CLIENT_SECRET"
PROJECT_ID = "YOUR_PROJECT_ID"
# Initialize clients
auth = CognigyAuthClient(CLIENT_ID, CLIENT_SECRET, AUTH_URL, scopes=["nlu:write", "training:execute", "models:read", "exports:write", "datasets:upload"])
nlu_client = CognigyNLUClient(auth)
s3_sync = MLOpsSync(S3_ENDPOINT, BUCKET_NAME, REGION, AWS_KEY, AWS_SECRET)
# Step 1: Construct and validate dataset
samples = [
TrainingSample(text="I want to reset my password", intent="reset_password", entities=[{"start": 0, "end": 28, "type": "intent"}], fold=0),
TrainingSample(text="Can you help me with billing", intent="billing_inquiry", entities=[{"start": 0, "end": 29, "type": "intent"}], fold=1),
TrainingSample(text="My account is locked", intent="account_locked", entities=[{"start": 0, "end": 20, "type": "intent"}], fold=2),
]
dataset = TrainingDataset(samples=samples, language="en-US", model_version="v2.1")
print(f"Dataset validation passed. Uploading {len(samples)} samples.")
dataset_id = nlu_client.upload_dataset(dataset, PROJECT_ID)
print(f"Dataset uploaded: {dataset_id}")
# Step 2: Start training job
job_response = nlu_client.start_training_job(dataset_id, PROJECT_ID, gpu_tier="A10G-1", early_stopping_patience=3)
job_id = job_response["jobId"]
print(f"Training job started: {job_id}")
# Step 3: Monitor training
monitor = TrainingMonitor(nlu_client, job_id)
print("Polling training status...")
epoch_metrics = monitor.poll_until_complete(interval=10)
print(f"Training completed. Processed {len(epoch_metrics)} epochs.")
# Step 4: Evaluate model
eval_df = NLUEvaluator.fetch_evaluation_results(nlu_client, job_id)
metrics = NLUEvaluator.generate_metrics(eval_df)
print(f"Evaluation complete. Optimal threshold: {metrics['optimal_threshold']:.3f}")
# Step 5: Sync artifacts and audit
metrics_path = s3_sync.upload_artifact(f"models/{job_id}_metrics.json", json.dumps(metrics).encode())
audit_path = s3_sync.generate_audit_log(job_id, dataset_id, metrics, epoch_metrics)
print(f"Metrics synced to: {metrics_path}")
print(f"Audit log generated: {audit_path}")
return metrics, audit_path
if __name__ == "__main__":
run_fine_tuning_pipeline()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired OAuth token or incorrect client credentials.
- Fix: Verify
client_idandclient_secretin the Cognigy.AI developer console. Ensure the token cache refreshes before expiration. TheCognigyAuthClientimplementation automatically refreshes tokens 60 seconds before expiry. - Code Fix: The
_fetch_tokenmethod already handles token acquisition. Ensure thescopeslist matches the registered client permissions.
Error: 403 Forbidden
- Cause: Missing OAuth scopes or insufficient project-level permissions.
- Fix: Add
nlu:write,training:execute, anddatasets:uploadto the OAuth client configuration. Verify the service account has editor access to the target project. - Code Fix: Update the
scopeslist inCognigyAuthClientinitialization.
Error: 400 Bad Request (Schema Validation)
- Cause: Text exceeds 512 tokens, entity boundaries are malformed, or language is unsupported.
- Fix: Validate inputs before serialization. The
TrainingSamplepydantic model enforces token limits and entity schema requirements. - Code Fix: Review the
field_validatormethods inTrainingSample. Adjust text truncation logic if upstream data sources produce longer sequences.
Error: 429 Too Many Requests
- Cause: Rate limiting on dataset upload or status polling endpoints.
- Fix: Implement exponential backoff. The
_requestmethod already retries 429 responses using theRetry-Afterheader or a fixed delay. - Code Fix: Increase
RETRY_DELAYor adjust polling interval inpoll_until_complete.
Error: 503 Service Unavailable (GPU Queue)
- Cause: GPU compute resources are exhausted in the region.
- Fix: Poll the job status endpoint for queue position. Switch to a lower-tier GPU or schedule jobs during off-peak hours.
- Code Fix: The training monitor loop handles 503 retries automatically. Add a queue position logger if Cognigy.AI returns
queuePositionin the status payload.