Exporting NICE Cognigy.AI NLU Models via REST API with Python
What You Will Build
- This script programmatically exports a trained Cognigy.AI NLU model to a versioned artifact file.
- It uses the Cognigy.AI REST API endpoints for asynchronous job submission, status polling, and artifact retrieval.
- The implementation covers Python with
requestsfor API communication andboto3for S3-compatible storage.
Prerequisites
- OAuth client type: Service account configured with
client_credentialsgrant type - Required scopes:
nlu:models:export,jobs:read,artifacts:download - API version: Cognigy.AI Platform API v1
- Runtime: Python 3.9 or higher
- External dependencies:
requests>=2.31.0,boto3>=1.28.0,pytz>=2023.3
Authentication Setup
Cognigy.AI uses OAuth 2.0 for API authentication. You must obtain an access token using the client credentials flow before making any export requests. The token expires after a fixed duration, so you must implement caching and refresh logic to avoid repeated authentication calls.
import time
import requests
from typing import Optional
class CognigyAuthManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, token_url: str = "/api/v1/auth/oauth/token"):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"{self.base_url}{token_url}"
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_access_token(self) -> str:
"""Fetches or returns a cached OAuth access token."""
if self._token and time.time() < self._expires_at:
return self._token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
# Add 60 seconds buffer to expiration time
self._expires_at = time.time() + data.get("expires_in", 3600) - 60
return self._token
def get_headers(self) -> dict:
"""Returns headers with valid Bearer token for API calls."""
return {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
The get_access_token method handles token retrieval and caches the result until expiration minus a sixty second buffer. You must configure your OAuth client with the nlu:models:export scope in the Cognigy administration console before this flow succeeds.
Implementation
Step 1: Construct Export Request Payload and Validate Permissions
You must validate that the target model version exists, is in a TRAINED state, and that your service account holds export permissions. The export request payload requires the model identifier, target format, and optional credential tokens for external storage integration.
import requests
from typing import Dict, Any
def validate_model_and_trigger_export(
auth: CognigyAuthManager,
model_id: str,
model_version: str,
export_format: str = "ONNX"
) -> Dict[str, Any]:
"""
Validates model state and triggers asynchronous export job.
Required OAuth Scope: nlu:models:export
"""
validate_url = f"{auth.base_url}/api/v1/nlu/models/{model_id}/versions/{model_version}"
headers = auth.get_headers()
# Validate model status and permissions
validate_response = requests.get(validate_url, headers=headers, timeout=10)
if validate_response.status_code == 403:
raise PermissionError("Service account lacks read access to the specified model version.")
validate_response.raise_for_status()
model_data = validate_response.json()
if model_data.get("status") != "TRAINED":
raise ValueError(f"Model version {model_version} is in state {model_data.get('status')}. Export requires TRAINED state.")
# Construct export payload
export_payload = {
"modelId": model_id,
"version": model_version,
"format": export_format,
"metadata": {
"exported_by": "automation_service",
"environment": "production"
}
}
export_url = f"{auth.base_url}/api/v1/nlu/models/{model_id}/exports"
export_response = requests.post(export_url, json=export_payload, headers=headers, timeout=15)
if export_response.status_code == 403:
raise PermissionError("Service account lacks nlu:models:export scope or model export is restricted by security policy.")
export_response.raise_for_status()
return export_response.json()
The endpoint returns a jobId immediately. Cognigy processes large NLU models asynchronously to prevent request timeouts. You must store the jobId and exportId for the polling phase.
Step 2: Handle Asynchronous Export Job Execution via Status Polling
Export jobs transition through PENDING, RUNNING, COMPLETED, and FAILED states. You must implement exponential backoff polling to respect rate limits and track progress percentages. The polling loop must handle 429 Too Many Requests responses by increasing the wait interval.
import time
import requests
from typing import Dict, Any
def poll_export_job(
auth: CognigyAuthManager,
job_id: str,
max_retries: int = 30,
initial_backoff: float = 2.0
) -> Dict[str, Any]:
"""
Polls asynchronous export job until completion or failure.
Required OAuth Scope: jobs:read
"""
job_url = f"{auth.base_url}/api/v1/jobs/{job_id}"
headers = auth.get_headers()
backoff = initial_backoff
for attempt in range(max_retries):
job_response = requests.get(job_url, headers=headers, timeout=10)
if job_response.status_code == 429:
retry_after = float(job_response.headers.get("Retry-After", backoff))
print(f"Rate limited (429). Waiting {retry_after} seconds.")
time.sleep(retry_after)
backoff = min(backoff * 2, 30.0)
continue
job_response.raise_for_status()
job_data = job_response.json()
status = job_data.get("status")
progress = job_data.get("progress", 0)
print(f"Job {job_id} status: {status} | Progress: {progress}%")
if status == "COMPLETED":
return job_data
elif status == "FAILED":
raise RuntimeError(f"Export job failed: {job_data.get('errorMessage')}")
elif status in ("PENDING", "RUNNING"):
time.sleep(backoff)
else:
raise ValueError(f"Unexpected job status: {status}")
raise TimeoutError(f"Export job {job_id} did not complete within {max_retries} polling attempts.")
The polling function tracks progress percentages and implements dynamic backoff when the API returns 429. You must monitor the progress field to calculate infrastructure planning metrics later.
Step 3: Process Results, Download Artifact, and Store to S3
Once the job completes, you retrieve the download URL from the job response. You must stream the download to manage memory usage for large model artifacts. After downloading, you upload the artifact to an S3-compatible endpoint with server-side encryption enabled.
import boto3
import time
import requests
from typing import Dict, Any, Tuple
def download_and_store_artifact(
auth: CognigyAuthManager,
download_url: str,
s3_bucket: str,
s3_key: str,
region: str = "us-east-1"
) -> Tuple[int, str]:
"""
Streams model artifact download and uploads to S3 with encryption.
Required OAuth Scope: artifacts:download
Returns: (artifact_size_bytes, s3_uri)
"""
headers = auth.get_headers()
download_start = time.perf_counter()
# Stream download to avoid loading entire file into memory
download_response = requests.get(download_url, headers=headers, stream=True, timeout=60)
download_response.raise_for_status()
total_size = 0
chunk_size = 8192
chunks = []
for chunk in download_response.iter_content(chunk_size=chunk_size):
if chunk:
chunks.append(chunk)
total_size += len(chunk)
download_duration = time.perf_counter() - download_start
print(f"Downloaded {total_size} bytes in {download_duration:.2f} seconds.")
# Upload to S3 with encryption at rest
s3_client = boto3.client("s3", region_name=region)
artifact_blob = b"".join(chunks)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=artifact_blob,
ServerSideEncryption="AES256",
Metadata={
"export-format": "ONNX",
"download-duration-sec": str(round(download_duration, 2)),
"artifact-size-bytes": str(total_size)
}
)
s3_uri = f"s3://{s3_bucket}/{s3_key}"
return total_size, s3_uri
The function streams the response in eight thousand byte chunks, calculates the exact artifact size, and uploads to S3 using AES256 server-side encryption. You attach metadata to the S3 object for infrastructure planning and compliance auditing.
Step 4: Synchronize Export Completion via Webhook and Generate Audit Logs
After successful storage, you must notify external model registries and record a governance audit log. The webhook payload contains version control identifiers, storage location, duration metrics, and artifact size. The audit log follows a structured JSON format for compliance ingestion.
import json
import time
import requests
from datetime import datetime, timezone
def notify_webhook_and_audit(
webhook_url: str,
model_id: str,
model_version: str,
export_id: str,
job_id: str,
s3_uri: str,
artifact_size: int,
total_duration: float,
audit_log_path: str
) -> None:
"""
Sends completion webhook and writes structured audit log.
"""
timestamp = datetime.now(timezone.utc).isoformat()
webhook_payload = {
"event": "nlu_model_export_completed",
"timestamp": timestamp,
"data": {
"modelId": model_id,
"version": model_version,
"exportId": export_id,
"jobId": job_id,
"storageLocation": s3_uri,
"artifactSizeBytes": artifact_size,
"durationSeconds": round(total_duration, 2),
"status": "SUCCESS"
}
}
# Notify external registry
try:
webhook_response = requests.post(
webhook_url,
json=webhook_payload,
headers={"Content-Type": "application/json"},
timeout=10
)
webhook_response.raise_for_status()
print(f"Webhook notification sent to {webhook_url}")
except requests.RequestException as e:
print(f"Webhook notification failed: {e}")
# Generate audit log
audit_entry = {
"log_type": "nlu_export_audit",
"timestamp": timestamp,
"actor": "automation_service",
"action": "export_model",
"resource": f"nlu:{model_id}:{model_version}",
"export_id": export_id,
"job_id": job_id,
"outcome": "SUCCESS",
"metrics": {
"duration_seconds": round(total_duration, 2),
"artifact_size_bytes": artifact_size,
"storage_destination": s3_uri
},
"compliance_flags": {
"encrypted_at_rest": True,
"scope_validated": True,
"version_constrained": True
}
}
with open(audit_log_path, "a", encoding="utf-8") as log_file:
log_file.write(json.dumps(audit_entry) + "\n")
print(f"Audit log written to {audit_log_path}")
The webhook synchronizes the export event with your external model registry. The audit log records scope validation, encryption status, and duration metrics for governance compliance reviews.
Complete Working Example
import time
import requests
import boto3
import json
from typing import Dict, Any, Optional
class CognigyAuthManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, token_url: str = "/api/v1/auth/oauth/token"):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"{self.base_url}{token_url}"
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_access_token(self) -> str:
if self._token and time.time() < self._expires_at:
return self._token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(self.token_url, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data.get("expires_in", 3600) - 60
return self._token
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def export_nlu_model(
base_url: str,
client_id: str,
client_secret: str,
model_id: str,
model_version: str,
export_format: str = "ONNX",
s3_bucket: str = "cognigy-model-backups",
region: str = "us-east-1",
webhook_url: str = "https://registry.example.com/webhooks/nlu-exports",
audit_log_path: str = "nlu_export_audit.log"
) -> Dict[str, Any]:
auth = CognigyAuthManager(base_url, client_id, client_secret)
overall_start = time.perf_counter()
# Step 1: Validate and trigger
print("Validating model and triggering export...")
trigger_resp = requests.post(
f"{auth.base_url}/api/v1/nlu/models/{model_id}/exports",
json={"modelId": model_id, "version": model_version, "format": export_format},
headers=auth.get_headers(),
timeout=15
)
trigger_resp.raise_for_status()
trigger_data = trigger_resp.json()
job_id = trigger_data["jobId"]
export_id = trigger_data["exportId"]
# Step 2: Poll job
print(f"Polling job {job_id}...")
job_url = f"{auth.base_url}/api/v1/jobs/{job_id}"
backoff = 2.0
max_retries = 30
for _ in range(max_retries):
job_resp = requests.get(job_url, headers=auth.get_headers(), timeout=10)
if job_resp.status_code == 429:
time.sleep(float(job_resp.headers.get("Retry-After", backoff)))
backoff = min(backoff * 2, 30.0)
continue
job_resp.raise_for_status()
job_data = job_resp.json()
if job_data["status"] == "COMPLETED":
break
elif job_data["status"] == "FAILED":
raise RuntimeError(f"Export failed: {job_data.get('errorMessage')}")
time.sleep(backoff)
# Step 3: Download and store
download_url = job_data["downloadUrl"]
print("Downloading artifact and uploading to S3...")
dl_resp = requests.get(download_url, headers=auth.get_headers(), stream=True, timeout=60)
dl_resp.raise_for_status()
chunks = []
total_size = 0
for chunk in dl_resp.iter_content(chunk_size=8192):
if chunk:
chunks.append(chunk)
total_size += len(chunk)
s3_client = boto3.client("s3", region_name=region)
s3_key = f"nlu-exports/{model_id}/{model_version}/{export_id}.onnx"
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=b"".join(chunks),
ServerSideEncryption="AES256",
Metadata={"size": str(total_size), "format": export_format}
)
s3_uri = f"s3://{s3_bucket}/{s3_key}"
# Step 4: Webhook and audit
total_duration = time.perf_counter() - overall_start
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
webhook_payload = {
"event": "nlu_export_completed",
"timestamp": timestamp,
"data": {
"modelId": model_id,
"version": model_version,
"exportId": export_id,
"jobId": job_id,
"storageLocation": s3_uri,
"artifactSizeBytes": total_size,
"durationSeconds": round(total_duration, 2)
}
}
requests.post(webhook_url, json=webhook_payload, timeout=10)
audit_entry = {
"timestamp": timestamp,
"action": "export_model",
"resource": f"nlu:{model_id}:{model_version}",
"export_id": export_id,
"outcome": "SUCCESS",
"metrics": {"duration_seconds": round(total_duration, 2), "artifact_size_bytes": total_size}
}
with open(audit_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(audit_entry) + "\n")
return {
"export_id": export_id,
"job_id": job_id,
"s3_uri": s3_uri,
"artifact_size_bytes": total_size,
"duration_seconds": round(total_duration, 2)
}
if __name__ == "__main__":
result = export_nlu_model(
base_url="https://your-tenant.cognigy.ai",
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
model_id="nlu_intent_classifier_v2",
model_version="1.4.0",
s3_bucket="cognigy-model-backups",
webhook_url="https://registry.example.com/webhooks/nlu-exports"
)
print("Export completed successfully:", result)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired, or the client credentials are incorrect.
- Fix: Verify the
client_idandclient_secretmatch a service account in the Cognigy console. Ensure the token cache refreshes before expiration. - Code Fix: The
CognigyAuthManagerclass automatically refreshes tokens whentime.time() >= self._expires_at. If the error persists, check that the OAuth endpoint returns a validexpires_infield.
Error: 403 Forbidden
- Cause: The service account lacks the
nlu:models:exportscope, or the target model version is restricted by a security policy. - Fix: Navigate to the OAuth client configuration and add the required scope. Verify the model version status is
TRAINEDand not locked by an active deployment pipeline. - Code Fix: The validation step explicitly checks
model_data.get("status") != "TRAINED"and raises a descriptive error before triggering the export.
Error: 429 Too Many Requests
- Cause: Polling the job status endpoint too frequently triggers rate limiting.
- Fix: Implement exponential backoff. Read the
Retry-Afterheader if provided. - Code Fix: The polling loop checks
job_resp.status_code == 429, sleeps for the specified duration, and doubles the backoff interval up to thirty seconds.
Error: 500 Internal Server Error
- Cause: The export engine encountered a timeout or the model exceeds the maximum exportable size.
- Fix: Check the model size against platform limits. Retry the export request after a brief delay. If the error persists, verify the
export_formatis supported for the model architecture. - Code Fix: Wrap the export trigger in a retry loop with fixed delays if transient infrastructure failures occur.