Managing NICE Cognigy External Service Timeouts via REST API with Python
What You Will Build
A Python timeout manager that configures external service timeouts in Cognigy, validates payloads against platform execution limits, implements a circuit breaker with state tracking and automatic recovery, handles graceful degradation using cached responses, synchronizes timeout metrics to external dashboards via webhooks, tracks frequency and recovery latency, generates audit logs, and exposes a unified interface for automated bot reliability protection. This tutorial uses the Cognigy Cloud REST API and Python with httpx, pydantic, and standard library modules.
Prerequisites
- OAuth2 client credentials grant type configured in Cognigy Cloud
- Required scopes:
externalService:write,bot:read,metrics:write,audit:write - Cognigy API version: v1 REST endpoints
- Python 3.10 or higher
- External dependencies:
httpx>=0.27.0,pydantic>=2.5.0,pybreaker>=1.0.2 - Install dependencies:
pip install httpx pydantic pybreaker
Authentication Setup
Cognigy uses a standard OAuth2 client credentials flow. The following code demonstrates token acquisition, caching, and automatic refresh when the token expires.
import httpx
import time
import json
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class CognigyAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str, scopes: list[str]):
self.base_url = f"https://api.{tenant}.cognigy.ai"
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self._token: Optional[str] = None
self._expires_at: float = 0.0
self.client = httpx.Client(timeout=httpx.Timeout(30.0))
def _request_token(self) -> dict:
auth_url = f"{self.base_url}/auth/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
response = self.client.post(auth_url, data=payload)
response.raise_for_status()
return response.json()
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
logger.info("Requesting new OAuth2 token")
token_data = self._request_token()
self._token = token_data["access_token"]
self._expires_at = time.time() + token_data["expires_in"]
return self._token
def close(self):
self.client.close()
Implementation
Step 1: Construct and Validate Timeout Configuration Payloads
Cognigy enforces strict execution limits for external service calls. The maximum allowed timeout is 30 seconds. Downstream SLAs typically require a minimum of 1 second. The following Pydantic model validates payloads before submission.
from pydantic import BaseModel, Field, field_validator
from typing import Optional
class TimeoutConfig(BaseModel):
endpoint_id: str
max_wait_time_ms: int = Field(ge=1000, le=30000)
fallback_template: str
sla_threshold_ms: int = Field(ge=500, le=25000)
@field_validator("fallback_template")
@classmethod
def validate_fallback_structure(cls, v: str) -> str:
if not v.startswith("{") or not v.endswith("}"):
raise ValueError("Fallback template must be valid JSON")
try:
json.loads(v)
except json.JSONDecodeError:
raise ValueError("Fallback template contains invalid JSON")
return v
def build_timeout_payload(config: TimeoutConfig) -> dict:
return {
"endpointId": config.endpoint_id,
"timeoutConfiguration": {
"maxWaitTimeMs": config.max_wait_time_ms,
"retryAttempts": 2,
"backoffMs": 500
},
"fallbackConfiguration": {
"template": config.fallback_template,
"routeTo": "notification_queue"
},
"slaConfiguration": {
"thresholdMs": config.sla_threshold_ms,
"alertEnabled": True
}
}
Step 2: Implement Circuit Breaker with State Tracking and Automatic Recovery
The circuit breaker prevents cascading failures when an external dependency degrades. This implementation tracks state transitions, measures recovery latency, and automatically attempts recovery after a configurable window.
import enum
import threading
from datetime import datetime, timezone
from pybreaker import CircuitBreaker, State
class RecoveryTracker:
def __init__(self):
self._lock = threading.Lock()
self.failure_count: int = 0
self.recovery_attempts: int = 0
self.last_failure_time: Optional[float] = None
self.recovery_latency_ms: Optional[float] = None
self.state_history: list[dict] = []
def record_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
self._log_state("OPEN", self.failure_count)
def record_recovery_attempt(self):
with self._lock:
self.recovery_attempts += 1
start_time = time.time()
return start_time
def record_recovery_success(self, start_time: float):
with self._lock:
latency = (time.time() - start_time) * 1000
self.recovery_latency_ms = latency
self._log_state("CLOSED", 0, latency)
def _log_state(self, state: str, failures: int, latency: Optional[float] = None):
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"state": state,
"failure_count": failures,
"recovery_latency_ms": latency
}
self.state_history.append(entry)
Step 3: Graceful Degradation with Cached Response Injection
When the circuit opens, the system injects cached fallback responses and routes user notifications to a secondary queue. This maintains bot functionality during outages.
from typing import Any
import copy
class DegradationManager:
def __init__(self, cache_ttl_seconds: int = 300):
self._cache: dict[str, dict] = {}
self._ttl = cache_ttl_seconds
self._cache_time: dict[str, float] = {}
self.notification_queue: list[dict] = []
def cache_response(self, endpoint_id: str, response: dict):
self._cache[endpoint_id] = copy.deepcopy(response)
self._cache_time[endpoint_id] = time.time()
def get_fallback(self, endpoint_id: str, fallback_template: str) -> dict:
if endpoint_id in self._cache and time.time() - self._cache_time[endpoint_id] < self._ttl:
return self._cache[endpoint_id]
template = json.loads(fallback_template)
template["metadata"]["degraded"] = True
template["metadata"]["timestamp"] = datetime.now(timezone.utc).isoformat()
self.notification_queue.append({
"endpointId": endpoint_id,
"action": "fallback_injected",
"template": template,
"timestamp": datetime.now(timezone.utc).isoformat()
})
return template
def flush_notifications(self) -> list[dict]:
batch = copy.deepcopy(self.notification_queue)
self.notification_queue.clear()
return batch
Step 4: Synchronize Metrics, Track Latency, and Generate Audit Logs
Timeout frequency and recovery latency must sync with external reliability dashboards. Audit logs provide governance compliance. The following class handles webhook delivery, metric aggregation, and structured logging.
import logging
from logging.handlers import RotatingFileHandler
class MetricsAndAuditManager:
def __init__(self, webhook_url: str, audit_log_path: str = "timeout_audit.log"):
self.webhook_url = webhook_url
self.client = httpx.Client(timeout=httpx.Timeout(15.0))
self._setup_audit_logger(audit_log_path)
self.timeout_events: list[dict] = []
def _setup_audit_logger(self, path: str):
self.audit_logger = logging.getLogger("timeout_audit")
self.audit_logger.setLevel(logging.INFO)
handler = RotatingFileHandler(path, maxBytes=5_000_000, backupCount=5)
formatter = logging.Formatter("%(asctime)s | %(message)s")
handler.setFormatter(formatter)
self.audit_logger.addHandler(handler)
def log_audit_event(self, event_type: str, details: dict):
payload = {
"eventType": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"details": details
}
self.audit_logger.info(json.dumps(payload))
self.timeout_events.append(payload)
def sync_to_dashboard(self, token: str) -> bool:
if not self.timeout_events:
return False
batch = copy.deepcopy(self.timeout_events)
self.timeout_events.clear()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = self.client.post(
f"{self.webhook_url}/metrics/timeout-sync",
json={"events": batch},
headers=headers
)
if response.status_code in (200, 201, 204):
return True
logger.warning(f"Dashboard sync failed with status {response.status_code}")
return False
except httpx.HTTPStatusError as e:
logger.error(f"Dashboard sync HTTP error: {e.response.status_code} {e.response.text}")
return False
except Exception as e:
logger.error(f"Dashboard sync exception: {str(e)}")
return False
def close(self):
self.client.close()
Complete Working Example
The following script integrates all components into a single timeout manager. It demonstrates configuration validation, circuit breaker execution, graceful degradation, metric synchronization, and audit logging. Replace the placeholder credentials with your Cognigy tenant values.
import httpx
import time
import json
import logging
import copy
import enum
import threading
from typing import Optional
from datetime import datetime, timezone
from pydantic import BaseModel, Field, field_validator
from pybreaker import CircuitBreaker, State
from logging.handlers import RotatingFileHandler
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
# --- Authentication ---
class CognigyAuth:
def __init__(self, tenant: str, client_id: str, client_secret: str, scopes: list[str]):
self.base_url = f"https://api.{tenant}.cognigy.ai"
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self._token: Optional[str] = None
self._expires_at: float = 0.0
self.client = httpx.Client(timeout=httpx.Timeout(30.0))
def _request_token(self) -> dict:
auth_url = f"{self.base_url}/auth/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
response = self.client.post(auth_url, data=payload)
response.raise_for_status()
return response.json()
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
logger.info("Requesting new OAuth2 token")
token_data = self._request_token()
self._token = token_data["access_token"]
self._expires_at = time.time() + token_data["expires_in"]
return self._token
def close(self):
self.client.close()
# --- Payload Validation ---
class TimeoutConfig(BaseModel):
endpoint_id: str
max_wait_time_ms: int = Field(ge=1000, le=30000)
fallback_template: str
sla_threshold_ms: int = Field(ge=500, le=25000)
@field_validator("fallback_template")
@classmethod
def validate_fallback_structure(cls, v: str) -> str:
if not v.startswith("{") or not v.endswith("}"):
raise ValueError("Fallback template must be valid JSON")
try:
json.loads(v)
except json.JSONDecodeError:
raise ValueError("Fallback template contains invalid JSON")
return v
def build_timeout_payload(config: TimeoutConfig) -> dict:
return {
"endpointId": config.endpoint_id,
"timeoutConfiguration": {
"maxWaitTimeMs": config.max_wait_time_ms,
"retryAttempts": 2,
"backoffMs": 500
},
"fallbackConfiguration": {
"template": config.fallback_template,
"routeTo": "notification_queue"
},
"slaConfiguration": {
"thresholdMs": config.sla_threshold_ms,
"alertEnabled": True
}
}
# --- Circuit Breaker & Recovery ---
class RecoveryTracker:
def __init__(self):
self._lock = threading.Lock()
self.failure_count: int = 0
self.recovery_attempts: int = 0
self.last_failure_time: Optional[float] = None
self.recovery_latency_ms: Optional[float] = None
self.state_history: list[dict] = []
def record_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
self._log_state("OPEN", self.failure_count)
def record_recovery_attempt(self):
with self._lock:
self.recovery_attempts += 1
return time.time()
def record_recovery_success(self, start_time: float):
with self._lock:
latency = (time.time() - start_time) * 1000
self.recovery_latency_ms = latency
self._log_state("CLOSED", 0, latency)
def _log_state(self, state: str, failures: int, latency: Optional[float] = None):
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"state": state,
"failure_count": failures,
"recovery_latency_ms": latency
}
self.state_history.append(entry)
# --- Degradation ---
class DegradationManager:
def __init__(self, cache_ttl_seconds: int = 300):
self._cache: dict[str, dict] = {}
self._ttl = cache_ttl_seconds
self._cache_time: dict[str, float] = {}
self.notification_queue: list[dict] = []
def cache_response(self, endpoint_id: str, response: dict):
self._cache[endpoint_id] = copy.deepcopy(response)
self._cache_time[endpoint_id] = time.time()
def get_fallback(self, endpoint_id: str, fallback_template: str) -> dict:
if endpoint_id in self._cache and time.time() - self._cache_time[endpoint_id] < self._ttl:
return self._cache[endpoint_id]
template = json.loads(fallback_template)
template["metadata"]["degraded"] = True
template["metadata"]["timestamp"] = datetime.now(timezone.utc).isoformat()
self.notification_queue.append({
"endpointId": endpoint_id,
"action": "fallback_injected",
"template": template,
"timestamp": datetime.now(timezone.utc).isoformat()
})
return template
def flush_notifications(self) -> list[dict]:
batch = copy.deepcopy(self.notification_queue)
self.notification_queue.clear()
return batch
# --- Metrics & Audit ---
class MetricsAndAuditManager:
def __init__(self, webhook_url: str, audit_log_path: str = "timeout_audit.log"):
self.webhook_url = webhook_url
self.client = httpx.Client(timeout=httpx.Timeout(15.0))
self._setup_audit_logger(audit_log_path)
self.timeout_events: list[dict] = []
def _setup_audit_logger(self, path: str):
self.audit_logger = logging.getLogger("timeout_audit")
self.audit_logger.setLevel(logging.INFO)
handler = RotatingFileHandler(path, maxBytes=5_000_000, backupCount=5)
formatter = logging.Formatter("%(asctime)s | %(message)s")
handler.setFormatter(formatter)
self.audit_logger.addHandler(handler)
def log_audit_event(self, event_type: str, details: dict):
payload = {
"eventType": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"details": details
}
self.audit_logger.info(json.dumps(payload))
self.timeout_events.append(payload)
def sync_to_dashboard(self, token: str) -> bool:
if not self.timeout_events:
return False
batch = copy.deepcopy(self.timeout_events)
self.timeout_events.clear()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = self.client.post(
f"{self.webhook_url}/metrics/timeout-sync",
json={"events": batch},
headers=headers
)
return response.status_code in (200, 201, 204)
except httpx.HTTPStatusError as e:
logger.warning(f"Dashboard sync failed: {e.response.status_code}")
return False
except Exception as e:
logger.error(f"Dashboard sync exception: {str(e)}")
return False
def close(self):
self.client.close()
# --- Timeout Manager ---
class CognigyTimeoutManager:
def __init__(self, auth: CognigyAuth, webhook_url: str):
self.auth = auth
self.base_url = auth.base_url
self.metrics = MetricsAndAuditManager(webhook_url)
self.degradation = DegradationManager()
self.recovery = RecoveryTracker()
self.breaker = CircuitBreaker(
fail_max=3,
reset_timeout=30,
name="cognigy_external_service"
)
self.client = httpx.Client(timeout=httpx.Timeout(30.0))
def configure_timeout(self, config: TimeoutConfig) -> dict:
payload = build_timeout_payload(config)
token = self.auth.get_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
endpoint = f"{self.base_url}/api/v1/externalServices/{config.endpoint_id}/configuration"
for attempt in range(3):
try:
response = self.client.put(endpoint, json=payload, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("retry-after", 2))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
time.sleep(retry_after)
continue
response.raise_for_status()
self.metrics.log_audit_event("timeout_configured", {
"endpointId": config.endpoint_id,
"maxWaitTimeMs": config.max_wait_time_ms,
"slaThresholdMs": config.sla_threshold_ms
})
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in (401, 403):
self.metrics.log_audit_event("auth_failure", {"status": e.response.status_code})
raise
if e.response.status_code != 429:
raise
raise RuntimeError("Max retry attempts exceeded for timeout configuration")
def invoke_external_service(self, endpoint_id: str, fallback_template: str) -> dict:
try:
return self.breaker(self._call_service, endpoint_id)
except Exception as e:
self.recovery.record_failure()
self.metrics.log_audit_event("timeout_triggered", {
"endpointId": endpoint_id,
"error": str(e),
"state": "OPEN"
})
return self.degradation.get_fallback(endpoint_id, fallback_template)
def _call_service(self, endpoint_id: str) -> dict:
token = self.auth.get_token()
headers = {"Authorization": f"Bearer {token}"}
endpoint = f"{self.base_url}/api/v1/externalServices/{endpoint_id}/invoke"
response = self.client.post(endpoint, json={}, headers=headers)
response.raise_for_status()
result = response.json()
self.degradation.cache_response(endpoint_id, result)
self.recovery.record_recovery_success(self.recovery.record_recovery_attempt())
self.metrics.log_audit_event("service_succeeded", {"endpointId": endpoint_id})
return result
def sync_metrics(self) -> bool:
token = self.auth.get_token()
return self.metrics.sync_to_dashboard(token)
def close(self):
self.client.close()
self.auth.close()
self.metrics.close()
# --- Execution ---
if __name__ == "__main__":
auth = CognigyAuth(
tenant="your-tenant",
client_id="your-client-id",
client_secret="your-client-secret",
scopes=["externalService:write", "bot:read", "metrics:write", "audit:write"]
)
manager = CognigyTimeoutManager(auth, webhook_url="https://monitoring.example.com")
config = TimeoutConfig(
endpoint_id="ext_svc_weather_01",
max_wait_time_ms=15000,
fallback_template='{"message": "Service temporarily unavailable. Please try again.", "metadata": {}}',
sla_threshold_ms=10000
)
try:
result = manager.configure_timeout(config)
logger.info(f"Configuration applied: {result}")
response = manager.invoke_external_service("ext_svc_weather_01", config.fallback_template)
logger.info(f"Service response: {response}")
manager.sync_metrics()
except Exception as e:
logger.error(f"Execution failed: {str(e)}")
finally:
manager.close()
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The OAuth2 token expired, the client credentials are invalid, or the requested scopes do not match the configured grant type.
- How to fix it: Verify the
client_idandclient_secretin Cognigy Cloud. Ensure the token refresh logic subtracts a buffer (60 seconds) before expiration. Check that thescopeslist matches the application registration. - Code showing the fix: The
CognigyAuth.get_token()method already implements a 60-second buffer. If authentication fails during API calls, theconfigure_timeoutmethod catches 401 and raises immediately to prevent silent degradation.
Error: 403 Forbidden
- What causes it: The OAuth client lacks the required scope (
externalService:writeormetrics:write), or the tenant enforces role-based access control that blocks programmatic writes. - How to fix it: Navigate to the Cognigy Cloud security settings and attach the missing scopes to the OAuth client. Verify that the service account has the
External Service Adminrole. - Code showing the fix: The
configure_timeoutmethod logs 403 responses to the audit trail and raises the exception. Add a scope validation step before initialization:
REQUIRED_SCOPES = {"externalService:write", "metrics:write", "audit:write"}
if not REQUIRED_SCOPES.issubset(set(scopes)):
raise ValueError("Missing required OAuth scopes")
Error: 429 Too Many Requests
- What causes it: Cognigy enforces rate limits per tenant and per endpoint. Rapid configuration updates or metric sync calls trigger throttling.
- How to fix it: Implement exponential backoff. Read the
retry-afterheader when present. - Code showing the fix: The
configure_timeoutmethod includes a retry loop withretry-afterparsing. For metric synchronization, batch events and sync at fixed intervals rather than on every timeout event.
Error: 5xx Server Error
- What causes it: Cognigy platform maintenance, downstream dependency failure, or transient load balancer errors.
- How to fix it: Implement circuit breaker reset windows. The
pybreakerlibrary automatically transitions fromOPENtoHALF_OPENafterreset_timeoutseconds. Verify that your fallback templates contain all required fields for downstream bot logic. - Code showing the fix: The
CognigyTimeoutManagerinitializesCircuitBreaker(fail_max=3, reset_timeout=30). When the breaker opens,invoke_external_servicecatches the exception and routes todegradation.get_fallback().