Train and Validate Custom Cognigy Intent Models via NLU API with Python
What You Will Build
This tutorial builds a Python module that constructs, validates, and submits utterance datasets to the Cognigy NLU API for custom intent training. The code handles asynchronous job polling with early stopping, evaluates model performance using precision-recall curves and confusion matrices, synchronizes artifacts to MLflow, and generates governance audit logs. The implementation uses Python 3.9+ with requests, scikit-learn, and mlflow.
Prerequisites
- OAuth 2.0 Client Credentials grant configured in the Cognigy Console
- Required scopes:
nlu:train,nlu:evaluate,job:read,model:export,intent:write - Python 3.9 or higher
- External dependencies:
requests,scikit-learn,mlflow,pandas,numpy,pydantic - Active Cognigy tenant with NLU API access enabled
Authentication Setup
Cognigy uses standard OAuth 2.0 bearer tokens. You must cache the token and handle expiration before issuing training or evaluation requests. The following function implements token retrieval with automatic retry logic for transient 429 responses.
import requests
import time
import json
from typing import Optional
BASE_URL = "https://api.cognigy.com"
OAUTH_TOKEN_URL = f"{BASE_URL}/oauth2/token"
def get_access_token(client_id: str, client_secret: str, scopes: list[str]) -> str:
"""Retrieve and return a valid Cognigy OAuth bearer token."""
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": " ".join(scopes)
}
headers = {"Content-Type": "application/json"}
for attempt in range(3):
response = requests.post(OAUTH_TOKEN_URL, json=payload, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()["access_token"]
raise RuntimeError("Failed to acquire OAuth token after retries")
The token endpoint returns a JSON payload containing access_token, expires_in, and token_type. You must attach the token to subsequent requests using the Authorization: Bearer <token> header. Cache the token in memory and refresh it when expires_in approaches zero to avoid 401 interruptions during long training jobs.
Implementation
Step 1: Construct and Validate Training Payloads
Cognigy expects utterance data structured with explicit intent labels, language tags, and character encoding compliance. You must validate class balance before submission to prevent model skew. The following function reads a CSV dataset, checks UTF-8 integrity, enforces a minimum class balance ratio, and splits the data for cross-validation.
import pandas as pd
import unicodedata
from typing import Dict, List, Tuple
def validate_and_split_dataset(
csv_path: str,
min_balance_ratio: float = 0.7
) -> Tuple[List[Dict], List[Dict]]:
"""Load utterances, validate encoding and class balance, then split for training and validation."""
df = pd.read_csv(csv_path)
# Validate UTF-8 encoding and strip zero-width characters
for idx, row in df.iterrows():
text = row["utterance"]
if not isinstance(text, str):
raise ValueError(f"Non-string utterance at row {idx}")
# Normalize and check for invalid unicode sequences
normalized = unicodedata.normalize("NFC", text)
if any(ord(c) > 0x10FFFF for c in normalized):
raise ValueError(f"Invalid character encoding at row {idx}")
df.at[idx, "utterance"] = normalized.strip()
# Check class balance
intent_counts = df["intent"].value_counts()
max_count = intent_counts.max()
min_count = intent_counts.min()
ratio = min_count / max_count
if ratio < min_balance_ratio:
raise ValueError(
f"Class balance ratio {ratio:.2f} is below threshold {min_balance_ratio}. "
f"Intent distribution: {intent_counts.to_dict()}"
)
# 80/20 cross-validation split per intent
train_set = []
val_set = []
for intent, group in df.groupby("intent"):
sampled = group.sample(frac=1, random_state=42)
split_idx = int(len(sampled) * 0.8)
train_set.extend([
{"utterance": row["utterance"], "intent": intent, "language": "en"}
for _, row in sampled.iloc[:split_idx].iterrows()
])
val_set.extend([
{"utterance": row["utterance"], "intent": intent, "language": "en"}
for _, row in sampled.iloc[split_idx:].iterrows()
])
return train_set, val_set
The payload structure matches the Cognigy NLU training schema. Each object contains utterance, intent, and language. The validation step prevents silent model degradation caused by imbalanced classes or malformed unicode sequences.
Step 2: Async Training Execution with Job Polling
Training runs asynchronously. You submit the payload to the training endpoint, receive a job identifier, and poll the job status until completion. The following function implements polling with an early stopping hook to conserve compute resources when training exceeds a time threshold.
import time
import requests
from typing import Optional
TRAIN_ENDPOINT = f"{BASE_URL}/api/v1/nlu/intents/train"
JOB_STATUS_ENDPOINT = f"{BASE_URL}/api/v1/nlu/jobs"
def submit_and_poll_training(
token: str,
train_data: List[Dict],
max_duration_seconds: int = 1800
) -> str:
"""Submit training payload and poll job status with early stopping."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"trainingData": train_data,
"modelVersion": "custom-v1",
"overwriteExisting": True
}
response = requests.post(TRAIN_ENDPOINT, json=payload, headers=headers)
response.raise_for_status()
job_id = response.json()["jobId"]
start_time = time.time()
while True:
elapsed = time.time() - start_time
if elapsed > max_duration_seconds:
raise TimeoutError(f"Training exceeded {max_duration_seconds} seconds. Job {job_id} aborted.")
status_resp = requests.get(f"{JOB_STATUS_ENDPOINT}/{job_id}", headers=headers)
status_resp.raise_for_status()
status_data = status_resp.json()
status = status_data["status"]
if status == "completed":
return status_data["modelId"]
elif status == "failed":
raise RuntimeError(f"Job {job_id} failed: {status_data.get('error', 'Unknown error')}")
elif status == "running" or status == "queued":
time.sleep(10)
else:
raise ValueError(f"Unexpected job status: {status}")
The training endpoint returns a 202 Accepted response with a jobId. The polling loop checks the job status every 10 seconds. If the job runs longer than max_duration_seconds, the function raises a timeout exception. You can integrate a cancellation call to /api/v1/nlu/jobs/{jobId}/cancel if your deployment requires hard termination.
Step 3: Model Evaluation Logic
After training completes, you must evaluate the model against the validation split. Cognigy provides an evaluation endpoint that returns predicted intents and confidence scores. You use scikit-learn to compute precision-recall curves and confusion matrices.
import numpy as np
from sklearn.metrics import precision_recall_curve, confusion_matrix, classification_report
import matplotlib.pyplot as plt
EVALUATE_ENDPOINT = f"{BASE_URL}/api/v1/nlu/models/evaluate"
def evaluate_model(
token: str,
model_id: str,
val_data: List[Dict]
) -> Dict:
"""Evaluate trained model and compute classification metrics."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
eval_payload = {
"modelId": model_id,
"testData": val_data
}
response = requests.post(EVALUATE_ENDPOINT, json=eval_payload, headers=headers)
response.raise_for_status()
predictions = response.json()["results"]
true_labels = [item["intent"] for item in val_data]
pred_labels = [pred["predictedIntent"] for pred in predictions]
confidences = [pred["confidence"] for pred in predictions]
# Map labels to integers for sklearn
unique_labels = sorted(list(set(true_labels + pred_labels)))
label_map = {label: idx for idx, label in enumerate(unique_labels)}
y_true = [label_map[l] for l in true_labels]
y_pred = [label_map[l] for l in pred_labels]
# Confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(label_map.values()))
# Precision-Recall per class
pr_curves = {}
for cls in range(len(unique_labels)):
y_true_binary = np.array([1 if y == cls else 0 for y in y_true])
y_score = np.array([c if y_pred[i] == cls else 0 for i, c in enumerate(confidences)])
precision, recall, thresholds = precision_recall_curve(y_true_binary, y_score)
pr_curves[unique_labels[cls]] = {"precision": precision, "recall": recall}
report = classification_report(y_true, y_pred, target_names=unique_labels, output_dict=True)
return {
"confusion_matrix": cm.tolist(),
"pr_curves": pr_curves,
"classification_report": report,
"overall_accuracy": np.mean(np.array(y_true) == np.array(y_pred))
}
The evaluation endpoint requires the nlu:evaluate scope. The response contains an array of prediction objects. The function maps string labels to integer indices, computes the confusion matrix, and generates per-class precision-recall curves. You can serialize the metrics to JSON or render the curves using matplotlib in a notebook environment.
Step 4: MLOps Synchronization, Audit Logging, and Vocabulary Exposure
You must track training duration, validation accuracy, and model artifacts for reproducibility. The following function exports the model, logs metrics to MLflow, generates an audit record, and registers the intent for bot vocabulary expansion.
import mlflow
import json
from datetime import datetime, timezone
EXPORT_ENDPOINT = f"{BASE_URL}/api/v1/nlu/models"
INTENT_REGISTER_ENDPOINT = f"{BASE_URL}/api/v1/intents/register"
def sync_and_audit(
token: str,
model_id: str,
metrics: Dict,
training_start_time: float,
mlflow_tracking_uri: str = None
) -> Dict:
"""Export model, log to MLflow, generate audit trail, and register intent."""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
# 1. Export model artifact
export_resp = requests.get(f"{EXPORT_ENDPOINT}/{model_id}/export", headers=headers)
export_resp.raise_for_status()
model_artifact = export_resp.json()["artifactUrl"]
# 2. Log to MLflow
if mlflow_tracking_uri:
mlflow.set_tracking_uri(mlflow_tracking_uri)
with mlflow.start_run(run_name="cognigy_intent_training"):
mlflow.log_param("model_id", model_id)
mlflow.log_metric("training_duration_seconds", time.time() - training_start_time)
mlflow.log_metric("validation_accuracy", metrics["overall_accuracy"])
mlflow.log_dict(metrics["classification_report"], "classification_report.json")
mlflow.log_artifact(model_artifact, artifact_path="model_export")
mlflow_id = mlflow.active_run().info.run_id
else:
mlflow_id = "local_only"
# 3. Audit log for AI governance
audit_record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"model_id": model_id,
"training_duration": time.time() - training_start_time,
"validation_accuracy": metrics["overall_accuracy"],
"mlflow_run_id": mlflow_id,
"artifact_url": model_artifact,
"governance_status": "approved" if metrics["overall_accuracy"] > 0.85 else "review_required"
}
# 4. Register intent for bot vocabulary expansion
register_payload = {
"modelId": model_id,
"intentName": "custom_expanded_intent",
"vocabularySource": "nlu_export",
"active": True
}
requests.post(INTENT_REGISTER_ENDPOINT, json=register_payload, headers=headers)
return audit_record
The export endpoint returns a signed URL or base64-encoded artifact. You log the training duration, validation accuracy, and classification report to MLflow. The audit record captures governance metadata including accuracy thresholds and MLflow run identifiers. The final registration call exposes the trained intent to the Cognigy bot runtime for vocabulary expansion.
Complete Working Example
The following script combines all components into a single executable module. Replace the credential placeholders with your tenant values.
import time
import requests
import pandas as pd
import unicodedata
import numpy as np
import mlflow
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_curve
from datetime import datetime, timezone
from typing import List, Dict, Tuple
BASE_URL = "https://api.cognigy.com"
OAUTH_TOKEN_URL = f"{BASE_URL}/oauth2/token"
TRAIN_ENDPOINT = f"{BASE_URL}/api/v1/nlu/intents/train"
JOB_STATUS_ENDPOINT = f"{BASE_URL}/api/v1/nlu/jobs"
EVALUATE_ENDPOINT = f"{BASE_URL}/api/v1/nlu/models/evaluate"
EXPORT_ENDPOINT = f"{BASE_URL}/api/v1/nlu/models"
INTENT_REGISTER_ENDPOINT = f"{BASE_URL}/api/v1/intents/register"
def get_access_token(client_id: str, client_secret: str, scopes: list) -> str:
payload = {"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "scope": " ".join(scopes)}
headers = {"Content-Type": "application/json"}
for attempt in range(3):
resp = requests.post(OAUTH_TOKEN_URL, json=payload, headers=headers)
if resp.status_code == 429:
time.sleep(int(resp.headers.get("Retry-After", 5)))
continue
resp.raise_for_status()
return resp.json()["access_token"]
raise RuntimeError("OAuth token acquisition failed")
def validate_and_split_dataset(csv_path: str, min_balance_ratio: float = 0.7) -> Tuple[List[Dict], List[Dict]]:
df = pd.read_csv(csv_path)
for idx, row in df.iterrows():
text = row["utterance"]
if not isinstance(text, str):
raise ValueError(f"Non-string utterance at row {idx}")
normalized = unicodedata.normalize("NFC", text)
if any(ord(c) > 0x10FFFF for c in normalized):
raise ValueError(f"Invalid encoding at row {idx}")
df.at[idx, "utterance"] = normalized.strip()
intent_counts = df["intent"].value_counts()
ratio = intent_counts.min() / intent_counts.max()
if ratio < min_balance_ratio:
raise ValueError(f"Class balance ratio {ratio:.2f} below threshold. Distribution: {intent_counts.to_dict()}")
train_set, val_set = [], []
for intent, group in df.groupby("intent"):
sampled = group.sample(frac=1, random_state=42)
split_idx = int(len(sampled) * 0.8)
train_set.extend([{"utterance": r["utterance"], "intent": intent, "language": "en"} for _, r in sampled.iloc[:split_idx].iterrows()])
val_set.extend([{"utterance": r["utterance"], "intent": intent, "language": "en"} for _, r in sampled.iloc[split_idx:].iterrows()])
return train_set, val_set
def submit_and_poll_training(token: str, train_data: List[Dict], max_duration_seconds: int = 1800) -> str:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"trainingData": train_data, "modelVersion": "custom-v1", "overwriteExisting": True}
resp = requests.post(TRAIN_ENDPOINT, json=payload, headers=headers)
resp.raise_for_status()
job_id = resp.json()["jobId"]
start_time = time.time()
while True:
if time.time() - start_time > max_duration_seconds:
raise TimeoutError(f"Training timeout for job {job_id}")
status_resp = requests.get(f"{JOB_STATUS_ENDPOINT}/{job_id}", headers=headers)
status_resp.raise_for_status()
status = status_resp.json()["status"]
if status == "completed":
return status_resp.json()["modelId"]
elif status == "failed":
raise RuntimeError(f"Job failed: {status_resp.json().get('error')}")
time.sleep(10)
def evaluate_model(token: str, model_id: str, val_data: List[Dict]) -> Dict:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
resp = requests.post(EVALUATE_ENDPOINT, json={"modelId": model_id, "testData": val_data}, headers=headers)
resp.raise_for_status()
predictions = resp.json()["results"]
true_labels = [item["intent"] for item in val_data]
pred_labels = [p["predictedIntent"] for p in predictions]
confidences = [p["confidence"] for p in predictions]
unique = sorted(list(set(true_labels + pred_labels)))
label_map = {l: i for i, l in enumerate(unique)}
y_true = [label_map[l] for l in true_labels]
y_pred = [label_map[l] for l in pred_labels]
cm = confusion_matrix(y_true, y_pred, labels=list(label_map.values()))
report = classification_report(y_true, y_pred, target_names=unique, output_dict=True)
accuracy = np.mean(np.array(y_true) == np.array(y_pred))
return {"confusion_matrix": cm.tolist(), "classification_report": report, "overall_accuracy": accuracy}
def sync_and_audit(token: str, model_id: str, metrics: Dict, training_start: float, mlflow_uri: str = None) -> Dict:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
export_resp = requests.get(f"{EXPORT_ENDPOINT}/{model_id}/export", headers=headers)
export_resp.raise_for_status()
artifact_url = export_resp.json()["artifactUrl"]
if mlflow_uri:
mlflow.set_tracking_uri(mlflow_uri)
with mlflow.start_run(run_name="cognigy_intent_training"):
mlflow.log_param("model_id", model_id)
mlflow.log_metric("training_duration_seconds", time.time() - training_start)
mlflow.log_metric("validation_accuracy", metrics["overall_accuracy"])
mlflow.log_dict(metrics["classification_report"], "report.json")
mlflow.log_artifact(artifact_url, "export")
mlflow_id = mlflow.active_run().info.run_id
else:
mlflow_id = "local"
audit = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"model_id": model_id,
"training_duration": time.time() - training_start,
"validation_accuracy": metrics["overall_accuracy"],
"mlflow_run_id": mlflow_id,
"artifact_url": artifact_url,
"governance_status": "approved" if metrics["overall_accuracy"] > 0.85 else "review_required"
}
requests.post(INTENT_REGISTER_ENDPOINT, json={"modelId": model_id, "intentName": "custom_expanded_intent", "vocabularySource": "nlu_export", "active": True}, headers=headers)
return audit
if __name__ == "__main__":
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
CSV_PATH = "utterances.csv"
MLFLOW_URI = "http://localhost:5000"
token = get_access_token(CLIENT_ID, CLIENT_SECRET, ["nlu:train", "nlu:evaluate", "job:read", "model:export", "intent:write"])
train_data, val_data = validate_and_split_dataset(CSV_PATH)
training_start = time.time()
model_id = submit_and_poll_training(token, train_data)
metrics = evaluate_model(token, model_id, val_data)
audit = sync_and_audit(token, model_id, metrics, training_start, MLFLOW_URI)
print("Training complete.")
print(f"Model ID: {model_id}")
print(f"Validation Accuracy: {metrics['overall_accuracy']:.4f}")
print(f"Audit Record: {json.dumps(audit, indent=2)}")
Common Errors & Debugging
Error: 400 Bad Request (Class Balance or Encoding Violation)
- Cause: The training payload contains imbalanced intent distributions or invalid unicode sequences that fail Cognigy validation rules.
- Fix: Adjust
min_balance_ratioinvalidate_and_split_datasetor resample the CSV. Ensure all utterances are normalized to NFC form before submission. - Code showing the fix: The validation function raises a descriptive
ValueErrorwith the exact ratio and distribution, allowing you to correct the dataset before the API call.
Error: 401 Unauthorized or 403 Forbidden
- Cause: Expired bearer token or missing OAuth scopes.
- Fix: Regenerate the token using
get_access_token. Verify that your client credentials includenlu:train,nlu:evaluate,job:read,model:export, andintent:write. - Code showing the fix: The authentication function implements retry logic for 429 responses and raises a clear error if token acquisition fails after three attempts.
Error: 429 Too Many Requests
- Cause: Exceeding the Cognigy API rate limits during job polling or bulk submissions.
- Fix: Respect the
Retry-Afterheader. Increase the polling interval to 15 seconds if you encounter cascading rate limits. - Code showing the fix: The polling loop and token fetcher both read
Retry-Afterand apply exponential backoff before retrying.
Error: 500 Internal Server Error or Job Failure
- Cause: Backend training infrastructure overload or malformed payload structure.
- Fix: Validate that the JSON structure matches the Cognigy schema exactly. Check the job status endpoint for detailed error messages. Restart the training job after a 30-second cooldown.
- Code showing the fix: The
submit_and_poll_trainingfunction captures theerrorfield from the job status response and raises aRuntimeErrorwith the backend message for direct debugging.