Training Cognigy.AI Intent Models via REST API with Python

Training Cognigy.AI Intent Models via REST API with Python

What You Will Build

A Python module that constructs annotated training payloads, validates intent schema constraints, uploads versioned datasets with diff comparison, triggers asynchronous model training, polls for job completion, retrieves confusion matrices, synchronizes data via webhooks, logs audit trails, and exposes a QA evaluation endpoint. This tutorial uses the Cognigy.AI v2 REST API and Python 3.9+ with the requests library.

Prerequisites

  • Cognigy.AI API key or OAuth2 client credentials with scopes cognigy:projects:read, cognigy:projects:write, cognigy:datasets:upload, cognigy:models:train
  • Cognigy.AI API version v2
  • Python 3.9 or higher
  • Dependencies: requests>=2.31.0, pydantic>=2.0.0, httpx>=0.24.0, hashlib (standard library), logging (standard library)

Authentication Setup

Cognigy.AI v2 endpoints require a Bearer token in the Authorization header. The following setup handles token acquisition and caching with automatic refresh logic.

import os
import time
import requests
from typing import Optional

class CognigyAuth:
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token: Optional[str] = None
        self._expires_at: float = 0.0

    def get_token(self) -> str:
        if self._token and time.time() < self._expires_at:
            return self._token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "cognigy:projects:read cognigy:projects:write cognigy:datasets:upload cognigy:models:train"
        }
        response = requests.post(self.token_url, data=payload, timeout=10)
        response.raise_for_status()
        data = response.json()
        self._token = data["access_token"]
        self._expires_at = time.time() + data.get("expires_in", 3600) - 60
        return self._token

Implementation

Step 1: Construct Training Data Payloads and Validate Schema Constraints

Training data requires strict schema validation to prevent model corruption. Each utterance must contain non-overlapping entity spans, and intent labels must be unique within the dataset. Disambiguation rules require explicit confidence thresholds.

import hashlib
import logging
from typing import List, Dict, Any
from pydantic import BaseModel, field_validator, ValidationError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class EntityAnnotation(BaseModel):
    start: int
    end: int
    label: str
    value: str

class Utterance(BaseModel):
    text: str
    intent: str
    entities: List[EntityAnnotation] = []

    @field_validator("entities")
    @classmethod
    def validate_entity_spans(cls, v: List[EntityAnnotation], info: Any) -> List[EntityAnnotation]:
        for entity in v:
            if entity.start >= entity.end:
                raise ValueError("Entity start must be less than end")
        sorted_entities = sorted(v, key=lambda e: e.start)
        for i in range(len(sorted_entities) - 1):
            if sorted_entities[i].end > sorted_entities[i + 1].start:
                raise ValueError("Overlapping entity spans detected")
        return v

class TrainingDataset(BaseModel):
    version: str
    intents: Dict[str, List[Utterance]]

    @field_validator("intents")
    @classmethod
    def validate_unique_labels(cls, v: Dict[str, List[Utterance]]) -> Dict[str, List[Utterance]]:
        label_set = set(v.keys())
        if len(label_set) != len(v):
            raise ValueError("Duplicate intent labels found in dataset")
        return v

def build_training_payload(utterances: List[Dict[str, Any]]) -> Dict[str, Any]:
    grouped: Dict[str, List[Dict[str, Any]]] = {}
    for item in utterances:
        intent = item["intent"]
        if intent not in grouped:
            grouped[intent] = []
        grouped[intent].append(item)
    
    try:
        dataset = TrainingDataset(
            version="1.0.0",
            intents={k: [Utterance(**u) for u in v] for k, v in grouped.items()}
        )
        logger.info("Schema validation passed. Unique intents: %d", len(dataset.intents))
        return dataset.model_dump()
    except ValidationError as e:
        logger.error("Schema validation failed: %s", str(e))
        raise

Step 2: Implement Incremental Model Updates with Versioned Dataset Uploads

Incremental updates prevent redundant training cycles. The system computes a SHA-256 hash of the payload, compares it against the last uploaded version, and only uploads when differences exist. The API expects a JSON payload with explicit version tracking.

import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class CognigyDatasetManager:
    def __init__(self, auth: CognigyAuth, project_id: str, base_url: str):
        self.auth = auth
        self.project_id = project_id
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
        self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))

    def _get_headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def compute_payload_hash(self, payload: Dict[str, Any]) -> str:
        normalized = json.dumps(payload, sort_keys=True, separators=(",", ":"))
        return hashlib.sha256(normalized.encode("utf-8")).hexdigest()

    def upload_dataset(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        current_hash = self.compute_payload_hash(payload)
        logger.info("Dataset hash: %s", current_hash)

        # Simulate diff comparison against last uploaded version
        last_hash_url = f"{self.base_url}/api/v2/projects/{self.project_id}/datasets/latest/hash"
        try:
            resp = self.session.get(last_hash_url, headers=self._get_headers(), timeout=10)
            resp.raise_for_status()
            last_hash = resp.json().get("hash", "")
            if current_hash == last_hash:
                logger.info("No changes detected. Skipping upload.")
                return {"status": "skipped", "reason": "identical_hash"}
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                logger.info("No previous dataset found. Proceeding with full upload.")
            else:
                raise

        upload_url = f"{self.base_url}/api/v2/projects/{self.project_id}/datasets/upload"
        response = self.session.post(
            upload_url,
            headers=self._get_headers(),
            json=payload,
            timeout=30
        )
        response.raise_for_status()
        logger.info("Dataset uploaded successfully. Version: %s", payload.get("version"))
        return response.json()

Step 3: Handle Asynchronous Model Training Jobs with Progress Tracking

Model training runs asynchronously. The client must poll the status endpoint with exponential backoff and handle 429 rate limits gracefully. Progress tracking extracts percentage completion and job state.

import time

class CognigyModelTrainer:
    def __init__(self, dataset_manager: CognigyDatasetManager):
        self.manager = dataset_manager
        self.session = dataset_manager.session
        self.base_url = dataset_manager.base_url
        self.project_id = dataset_manager.project_id

    def trigger_training(self) -> str:
        train_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/train"
        response = self.session.post(
            train_url,
            headers=self.manager._get_headers(),
            json={"mode": "incremental", "optimize_for": "accuracy"},
            timeout=15
        )
        response.raise_for_status()
        job_id = response.json()["jobId"]
        logger.info("Training job initiated. Job ID: %s", job_id)
        return job_id

    def poll_training_status(self, job_id: str, max_polls: int = 60) -> Dict[str, Any]:
        status_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/train/status/{job_id}"
        backoff = 2.0

        for attempt in range(max_polls):
            response = self.session.get(status_url, headers=self.manager._get_headers(), timeout=10)
            
            if response.status_code == 429:
                retry_after = float(response.headers.get("Retry-After", backoff))
                logger.warning("Rate limited (429). Waiting %.2f seconds.", retry_after)
                time.sleep(retry_after)
                continue
            
            response.raise_for_status()
            data = response.json()
            state = data.get("status", "UNKNOWN")
            progress = data.get("progress", 0)
            
            logger.info("Training progress: %d%% | State: %s", progress, state)
            
            if state in ["COMPLETED", "SUCCESS"]:
                logger.info("Training completed successfully.")
                return data
            elif state in ["FAILED", "ERROR"]:
                raise RuntimeError(f"Training failed: {data.get('error', 'Unknown error')}")
            
            time.sleep(backoff)
            backoff = min(backoff * 1.5, 30.0)
        
        raise TimeoutError("Training job did not complete within maximum polling attempts.")

Step 4: Synchronize Training Data and Monitor Accuracy Metrics

External NLU platforms sync via webhook ingestion. The system structures the payload for Cognigy.AI webhook endpoints and retrieves confusion matrices for performance tuning. Audit logging captures every action for governance.

import logging
import json
from datetime import datetime

class CognigyMetricsAndSync:
    def __init__(self, dataset_manager: CognigyDatasetManager):
        self.manager = dataset_manager
        self.session = dataset_manager.session
        self.base_url = dataset_manager.base_url
        self.project_id = dataset_manager.project_id
        self.audit_log = logging.getLogger("audit")
        self.audit_log.handlers = self.manager.session.adapters["https://"].max_retries  # Placeholder for stream handler

    def _log_audit(self, action: str, details: Dict[str, Any]) -> None:
        record = {
            "timestamp": datetime.utcnow().isoformat(),
            "project_id": self.project_id,
            "action": action,
            "details": details
        }
        logger.info("AUDIT: %s", json.dumps(record))

    def sync_via_webhook(self, external_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        webhook_url = f"{self.base_url}/api/v2/projects/{self.project_id}/webhooks/ingest"
        payload = {
            "source": "external_nlu",
            "format": "cognigy_v2",
            "records": external_data
        }
        self._log_audit("WEBHOOK_INGEST", {"record_count": len(external_data)})
        
        response = self.session.post(
            webhook_url,
            headers=self.manager._get_headers(),
            json=payload,
            timeout=20
        )
        response.raise_for_status()
        return response.json()

    def get_confusion_matrix(self) -> Dict[str, Any]:
        metrics_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/metrics"
        params = {"type": "confusion_matrix", "granularity": "intent"}
        response = self.session.get(
            metrics_url,
            headers=self.manager._get_headers(),
            params=params,
            timeout=15
        )
        response.raise_for_status()
        data = response.json()
        self._log_audit("METRICS_FETCH", {"metrics_type": "confusion_matrix"})
        return data

    def get_accuracy_metrics(self) -> Dict[str, Any]:
        metrics_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/metrics"
        params = {"type": "accuracy", "split": "validation"}
        response = self.session.get(
            metrics_url,
            headers=self.manager._get_headers(),
            params=params,
            timeout=15
        )
        response.raise_for_status()
        return response.json()

Step 5: Expose an Intent Evaluation Tool for QA Validation

QA validation requires a synchronous endpoint that returns predicted intent, confidence score, and extracted entities. The tool accepts raw utterances and returns structured predictions for review.

class CognigyQAEvaluator:
    def __init__(self, dataset_manager: CognigyDatasetManager):
        self.manager = dataset_manager
        self.session = dataset_manager.session
        self.base_url = dataset_manager.base_url
        self.project_id = dataset_manager.project_id

    def evaluate_utterance(self, text: str) -> Dict[str, Any]:
        eval_url = f"{self.base_url}/api/v2/projects/{self.project_id}/models/evaluate"
        payload = {
            "input": text,
            "return_entities": True,
            "return_confidence": True,
            "return_alternatives": 3
        }
        response = self.session.post(
            eval_url,
            headers=self.manager._get_headers(),
            json=payload,
            timeout=10
        )
        response.raise_for_status()
        return response.json()

Complete Working Example

The following script combines all components into a runnable module. Replace environment variables with valid credentials before execution.

import os
import sys
import json
from typing import List, Dict, Any

def main() -> None:
    # Configuration
    CLIENT_ID = os.getenv("COGNIGY_CLIENT_ID")
    CLIENT_SECRET = os.getenv("COGNIGY_CLIENT_SECRET")
    TOKEN_URL = os.getenv("COGNIGY_TOKEN_URL", "https://auth.cognigy.ai/oauth/token")
    BASE_URL = os.getenv("COGNIGY_BASE_URL", "https://api.cognigy.ai")
    PROJECT_ID = os.getenv("COGNIGY_PROJECT_ID")

    if not all([CLIENT_ID, CLIENT_SECRET, PROJECT_ID]):
        raise ValueError("Missing required environment variables.")

    # Initialize components
    auth = CognigyAuth(CLIENT_ID, CLIENT_SECRET, TOKEN_URL)
    dataset_mgr = CognigyDatasetManager(auth, PROJECT_ID, BASE_URL)
    trainer = CognigyModelTrainer(dataset_mgr)
    metrics_sync = CognigyMetricsAndSync(dataset_mgr)
    evaluator = CognigyQAEvaluator(dataset_mgr)

    # Sample training data
    sample_utterances: List[Dict[str, Any]] = [
        {
            "text": "I want to book a flight to London",
            "intent": "book_flight",
            "entities": [
                {"start": 29, "end": 35, "label": "destination", "value": "London"}
            ]
        },
        {
            "text": "Cancel my reservation for tomorrow",
            "intent": "cancel_reservation",
            "entities": [
                {"start": 28, "end": 36, "label": "date", "value": "tomorrow"}
            ]
        }
    ]

    try:
        # Step 1: Validate and build payload
        payload = build_training_payload(sample_utterances)
        
        # Step 2: Upload dataset
        upload_result = dataset_mgr.upload_dataset(payload)
        if upload_result.get("status") == "skipped":
            print("Dataset unchanged. Proceeding to training.")
        
        # Step 3: Trigger and poll training
        job_id = trainer.trigger_training()
        training_result = trainer.poll_training_status(job_id)
        
        # Step 4: Retrieve metrics
        confusion = metrics_sync.get_confusion_matrix()
        accuracy = metrics_sync.get_accuracy_metrics()
        print("Confusion Matrix:", json.dumps(confusion, indent=2))
        print("Accuracy Metrics:", json.dumps(accuracy, indent=2))
        
        # Step 5: QA Evaluation
        test_text = "Book me a ticket to Paris next week"
        eval_result = evaluator.evaluate_utterance(test_text)
        print("QA Evaluation:", json.dumps(eval_result, indent=2))

    except requests.exceptions.HTTPError as e:
        logger.error("HTTP Error: %s | Response: %s", e.response.status_code, e.response.text)
        sys.exit(1)
    except Exception as e:
        logger.error("Execution failed: %s", str(e))
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired token, invalid client credentials, or missing Authorization header.
  • Fix: Verify COGNIGY_CLIENT_ID and COGNIGY_CLIENT_SECRET match the registered application. Ensure the token endpoint returns a valid access_token. The CognigyAuth class handles refresh, but network timeouts during token acquisition will propagate as 401. Add explicit timeout handling in the token request.
  • Code Fix: Wrap auth.get_token() in a retry block with explicit 401 detection.

Error: 400 Bad Request (Schema Violation)

  • Cause: Overlapping entity spans, duplicate intent labels, or malformed JSON structure.
  • Fix: The TrainingDataset Pydantic model catches these before transmission. Review the validation error messages. Ensure entity start and end indices match the exact character positions in text. Cognigy.AI uses zero-based indexing.
  • Code Fix: Log ValidationError details and map them to original utterance indices for precise correction.

Error: 429 Too Many Requests

  • Cause: Rate limit exceeded on dataset upload or training status polling.
  • Fix: The CognigyDatasetManager and CognigyModelTrainer include exponential backoff and Retry-After header parsing. Ensure your polling interval respects the platform limit. Do not parallelize training triggers.
  • Code Fix: Increase max_retries in HTTPAdapter and adjust backoff multiplier in the polling loop.

Error: 500 Internal Server Error (Training Failure)

  • Cause: Incompatible dataset format, corrupted version history, or backend resource exhaustion.
  • Fix: Check the poll_training_status response for error details. Revert to a known good dataset version. Verify that entity labels match the registered taxonomy in the Cognigy.AI console.
  • Code Fix: Capture the full response body on 5xx errors and trigger an alert. Use the version field in payloads to rollback if necessary.

Official References