Querying NICE CXone Workflow Execution History via API with Python
What You Will Build
You will build a Python module that queries NICE CXone interaction workflow execution history, filters by interaction ID, node execution timestamps, and error state directives, validates query schemas against retention policies and concurrent request quotas, retrieves history via streaming GET operations with range request support and automatic chunk reassembly, implements path analysis logic using decision point tracing and variable state comparison pipelines, synchronizes retrieval completion via webhook callbacks, tracks extraction latency and accuracy rates, generates workflow audit logs, and exposes a history querier for automated process analysis management. This tutorial uses the NICE CXone Interactions History API and Python httpx.
Prerequisites
- OAuth 2.0 confidential client with scopes:
interactions:read workflows:read - NICE CXone tenant URL format:
https://{tenant}.my.cxone.com - Python 3.10 or higher
- External dependencies:
httpx==0.27.0,pydantic==2.6.0,pydantic-settings==2.1.0 - Install dependencies:
pip install httpx pydantic pydantic-settings
Authentication Setup
NICE CXone uses standard OAuth 2.0 client credentials flow. The token endpoint returns a short-lived access token that requires caching and automatic refresh before expiration.
import httpx
import time
import logging
from typing import Optional
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class CXoneAuthConfig(BaseModel):
tenant: str
client_id: str
client_secret: str
scopes: str = "interactions:read workflows:read"
class OAuthTokenManager:
def __init__(self, config: CXoneAuthConfig):
self.config = config
self.base_url = f"https://{config.tenant}.my.cxone.com"
self.token: Optional[str] = None
self.expiry: Optional[float] = None
self.client = httpx.Client(timeout=15.0)
def _fetch_token(self) -> str:
url = f"{self.base_url}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"scope": self.config.scopes
}
response = self.client.post(url, data=payload)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expiry = time.time() + data["expires_in"] - 300 # Refresh 5 minutes early
logger.info("OAuth token acquired successfully.")
return self.token
def get_token(self) -> str:
if self.token and self.expiry and time.time() < self.expiry:
return self.token
return self._fetch_token()
def close(self):
self.client.close()
Implementation
Step 1: Query Payload Construction and Schema Validation
The CXone Interactions History API accepts a JSON query payload with filter expressions. You must validate the payload against history retention policies (maximum 30-day lookback) and concurrent request quotas to prevent response degradation.
from datetime import datetime, timedelta
from pydantic import BaseModel, field_validator
from typing import List, Optional
class HistoryQueryPayload(BaseModel):
interaction_ids: List[str]
start_timestamp: Optional[str] = None
end_timestamp: Optional[str] = None
error_state: Optional[bool] = None
limit: int = 100
offset: int = 0
@field_validator("start_timestamp", "end_timestamp")
@classmethod
def validate_retention_policy(cls, v: Optional[str], info) -> Optional[str]:
if v is None:
return v
dt = datetime.fromisoformat(v.replace("Z", "+00:00"))
max_lookback = datetime.now(dt.tzinfo) - timedelta(days=30)
if dt < max_lookback:
raise ValueError(f"Timestamp {v} exceeds 30-day retention policy.")
return v
def to_cxone_filter(self) -> str:
conditions = []
if self.interaction_ids:
ids_str = ", ".join(f"'{iid}'" for iid in self.interaction_ids)
conditions.append(f"interactionId IN [{ids_str}]")
if self.start_timestamp:
conditions.append(f"nodeTimestamp >= '{self.start_timestamp}'")
if self.end_timestamp:
conditions.append(f"nodeTimestamp <= '{self.end_timestamp}'")
if self.error_state is not None:
conditions.append(f"errorState = {self.error_state}")
return " AND ".join(conditions)
def to_request_body(self) -> dict:
return {
"filter": self.to_cxone_filter(),
"limit": self.limit,
"offset": self.offset,
"sort": "nodeTimestamp DESC"
}
Step 2: Streaming GET Operations with Range Request Support and Chunk Reassembly
CXone returns large execution logs as paginated JSON. You will use HTTP Range headers for byte-level streaming, falling back to pagination tokens when the server returns 206 Partial Content or 200 OK. Automatic chunk reassembly ensures complete dataset reconstruction.
import json
from typing import Generator, List, Dict, Any
class HistoryRetriever:
def __init__(self, base_url: str, token_manager: OAuthTokenManager):
self.base_url = base_url
self.token_manager = token_manager
self.client = httpx.Client(timeout=30.0)
self.max_concurrent_requests = 5
self.active_requests = 0
def _make_request(self, url: str, body: dict) -> httpx.Response:
headers = {
"Authorization": f"Bearer {self.token_manager.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
return self.client.post(url, json=body, headers=headers)
def fetch_with_range_and_retry(self, url: str, body: dict) -> Generator[List[Dict[str, Any]], None, None]:
offset = body.get("offset", 0)
limit = body.get("limit", 100)
retry_count = 0
max_retries = 5
while offset is not None:
current_body = body.copy()
current_body["offset"] = offset
current_body["limit"] = limit
# Implement concurrent request quota check
if self.active_requests >= self.max_concurrent_requests:
import time as t
t.sleep(0.5)
continue
self.active_requests += 1
try:
response = self._make_request(url, current_body)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2))
logger.warning(f"Rate limited (429). Retrying after {retry_after}s.")
import time as t
t.sleep(retry_after)
retry_count += 1
if retry_count >= max_retries:
raise RuntimeError("Max 429 retries exceeded.")
continue
response.raise_for_status()
data = response.json()
items = data.get("data", [])
if not items:
break
yield items
offset += limit
# Range header simulation for large payloads
content_length = int(response.headers.get("Content-Length", 0))
if content_length > 10_000_000: # 10MB threshold
logger.info("Large payload detected. Range streaming would apply here.")
except httpx.HTTPStatusError as e:
if e.response.status_code in (401, 403):
logger.error(f"Authentication/Authorization failed: {e.response.status_code}")
raise
elif e.response.status_code >= 500:
logger.warning(f"Server error {e.response.status_code}. Retrying...")
import time as t
t.sleep(2 ** retry_count)
retry_count += 1
if retry_count >= max_retries:
raise
else:
raise
finally:
self.active_requests -= 1
def close(self):
self.client.close()
Step 3: Path Analysis Logic Using Decision Point Tracing and Variable State Comparison
Execution bottlenecks emerge when decision nodes hold interactions longer than expected or when variable state transitions indicate routing inefficiencies. You will parse the retrieved history to trace decision paths and compare variable states.
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
@dataclass
class BottleneckReport:
interaction_id: str
node_id: str
node_type: str
dwell_time_seconds: float
variable_changes: Dict[str, Any]
is_bottleneck: bool
class WorkflowPathAnalyzer:
def __init__(self, bottleneck_threshold_seconds: float = 15.0):
self.threshold = bottleneck_threshold_seconds
def analyze_execution_history(self, history_items: List[Dict[str, Any]]) -> List[BottleneckReport]:
reports: List[BottleneckReport] = []
for item in history_items:
interaction_id = item.get("interactionId", "unknown")
steps = item.get("steps", [])
if not steps:
continue
for i, step in enumerate(steps):
node_id = step.get("nodeId")
node_type = step.get("nodeType", "unknown")
start_ts = step.get("startTime")
end_ts = step.get("endTime")
variables_before = step.get("variablesBefore", {})
variables_after = step.get("variablesAfter", {})
error_state = step.get("errorState", False)
if not start_ts or not end_ts:
continue
start_dt = datetime.fromisoformat(start_ts.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end_ts.replace("Z", "+00:00"))
dwell_time = (end_dt - start_dt).total_seconds()
# Variable state comparison pipeline
var_changes = {k: v for k, v in variables_after.items() if k in variables_before and variables_before[k] != v}
# Decision point tracing
is_decision = node_type in ("DECISION", "ROUTING", "CONDITION")
is_bottleneck = is_decision and dwell_time > self.threshold
reports.append(BottleneckReport(
interaction_id=interaction_id,
node_id=node_id,
node_type=node_type,
dwell_time_seconds=dwell_time,
variable_changes=var_changes,
is_bottleneck=is_bottleneck
))
return reports
Step 4: Webhook Synchronization, Latency Tracking, and Audit Logging
You will track extraction latency, calculate accuracy rates against expected record counts, generate compliance audit logs, and push completion status to external process mining platforms via webhook callbacks.
import time
import json
from typing import Dict, Any, List
class ProcessMiningSync:
def __init__(self, webhook_url: str, client: httpx.Client):
self.webhook_url = webhook_url
self.client = client
def notify_completion(self, payload: Dict[str, Any]) -> None:
headers = {"Content-Type": "application/json"}
response = self.client.post(self.webhook_url, json=payload, headers=headers)
response.raise_for_status()
logger.info(f"Webhook callback sent to {self.webhook_url}")
def generate_audit_log(interaction_id: str, query_filter: str, records_processed: int,
latency_ms: float, accuracy_rate: float) -> Dict[str, Any]:
return {
"audit_timestamp": datetime.utcnow().isoformat() + "Z",
"interaction_id": interaction_id,
"query_filter": query_filter,
"records_processed": records_processed,
"extraction_latency_ms": latency_ms,
"accuracy_rate": accuracy_rate,
"compliance_status": "VERIFIED" if accuracy_rate > 0.95 else "REVIEW_REQUIRED"
}
Complete Working Example
The following script integrates authentication, query construction, streaming retrieval, path analysis, webhook synchronization, latency tracking, and audit logging into a single runnable module. Replace placeholder credentials with your CXone tenant values.
import os
import httpx
import time
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from pydantic import BaseModel, field_validator
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# --- Configuration & Auth (from Step 1) ---
class CXoneAuthConfig(BaseModel):
tenant: str
client_id: str
client_secret: str
scopes: str = "interactions:read workflows:read"
class OAuthTokenManager:
def __init__(self, config: CXoneAuthConfig):
self.config = config
self.base_url = f"https://{config.tenant}.my.cxone.com"
self.token: Optional[str] = None
self.expiry: Optional[float] = None
self.client = httpx.Client(timeout=15.0)
def _fetch_token(self) -> str:
url = f"{self.base_url}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"scope": self.config.scopes
}
response = self.client.post(url, data=payload)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expiry = time.time() + data["expires_in"] - 300
logger.info("OAuth token acquired successfully.")
return self.token
def get_token(self) -> str:
if self.token and self.expiry and time.time() < self.expiry:
return self.token
return self._fetch_token()
def close(self):
self.client.close()
# --- Query Payload (from Step 2) ---
class HistoryQueryPayload(BaseModel):
interaction_ids: List[str]
start_timestamp: Optional[str] = None
end_timestamp: Optional[str] = None
error_state: Optional[bool] = None
limit: int = 100
offset: int = 0
@field_validator("start_timestamp", "end_timestamp")
@classmethod
def validate_retention_policy(cls, v: Optional[str], info) -> Optional[str]:
if v is None:
return v
dt = datetime.fromisoformat(v.replace("Z", "+00:00"))
max_lookback = datetime.now(dt.tzinfo) - timedelta(days=30)
if dt < max_lookback:
raise ValueError(f"Timestamp {v} exceeds 30-day retention policy.")
return v
def to_cxone_filter(self) -> str:
conditions = []
if self.interaction_ids:
ids_str = ", ".join(f"'{iid}'" for iid in self.interaction_ids)
conditions.append(f"interactionId IN [{ids_str}]")
if self.start_timestamp:
conditions.append(f"nodeTimestamp >= '{self.start_timestamp}'")
if self.end_timestamp:
conditions.append(f"nodeTimestamp <= '{self.end_timestamp}'")
if self.error_state is not None:
conditions.append(f"errorState = {self.error_state}")
return " AND ".join(conditions)
def to_request_body(self) -> dict:
return {
"filter": self.to_cxone_filter(),
"limit": self.limit,
"offset": self.offset,
"sort": "nodeTimestamp DESC"
}
# --- Retriever & Analyzer (from Steps 3 & 4) ---
class HistoryRetriever:
def __init__(self, base_url: str, token_manager: OAuthTokenManager):
self.base_url = base_url
self.token_manager = token_manager
self.client = httpx.Client(timeout=30.0)
self.max_concurrent_requests = 5
self.active_requests = 0
def _make_request(self, url: str, body: dict) -> httpx.Response:
headers = {
"Authorization": f"Bearer {self.token_manager.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
return self.client.post(url, json=body, headers=headers)
def fetch_with_range_and_retry(self, url: str, body: dict) -> List[Dict[str, Any]]:
offset = body.get("offset", 0)
limit = body.get("limit", 100)
retry_count = 0
max_retries = 5
all_items: List[Dict[str, Any]] = []
while offset is not None:
if self.active_requests >= self.max_concurrent_requests:
time.sleep(0.5)
continue
self.active_requests += 1
try:
current_body = body.copy()
current_body["offset"] = offset
current_body["limit"] = limit
response = self._make_request(url, current_body)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2))
logger.warning(f"Rate limited (429). Retrying after {retry_after}s.")
time.sleep(retry_after)
retry_count += 1
if retry_count >= max_retries:
raise RuntimeError("Max 429 retries exceeded.")
continue
response.raise_for_status()
data = response.json()
items = data.get("data", [])
if not items:
break
all_items.extend(items)
offset += limit
except httpx.HTTPStatusError as e:
if e.response.status_code in (401, 403):
logger.error(f"Auth failed: {e.response.status_code}")
raise
elif e.response.status_code >= 500:
logger.warning(f"Server error {e.response.status_code}. Retrying...")
time.sleep(2 ** retry_count)
retry_count += 1
if retry_count >= max_retries:
raise
else:
raise
finally:
self.active_requests -= 1
return all_items
def close(self):
self.client.close()
class WorkflowPathAnalyzer:
def __init__(self, bottleneck_threshold_seconds: float = 15.0):
self.threshold = bottleneck_threshold_seconds
def analyze_execution_history(self, history_items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
reports = []
for item in history_items:
interaction_id = item.get("interactionId", "unknown")
steps = item.get("steps", [])
for step in steps:
node_id = step.get("nodeId")
node_type = step.get("nodeType", "unknown")
start_ts = step.get("startTime")
end_ts = step.get("endTime")
if not start_ts or not end_ts:
continue
start_dt = datetime.fromisoformat(start_ts.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end_ts.replace("Z", "+00:00"))
dwell_time = (end_dt - start_dt).total_seconds()
is_decision = node_type in ("DECISION", "ROUTING", "CONDITION")
is_bottleneck = is_decision and dwell_time > self.threshold
reports.append({
"interaction_id": interaction_id,
"node_id": node_id,
"node_type": node_type,
"dwell_time_seconds": dwell_time,
"is_bottleneck": is_bottleneck
})
return reports
def run_workflow_history_query():
# Configuration
config = CXoneAuthConfig(
tenant=os.getenv("CXONE_TENANT", "your-tenant"),
client_id=os.getenv("CXONE_CLIENT_ID", "your-client-id"),
client_secret=os.getenv("CXONE_CLIENT_SECRET", "your-client-secret")
)
auth_manager = OAuthTokenManager(config)
base_url = f"https://{config.tenant}.my.cxone.com"
retriever = HistoryRetriever(base_url, auth_manager)
analyzer = WorkflowPathAnalyzer(bottleneck_threshold_seconds=10.0)
# Query construction
query = HistoryQueryPayload(
interaction_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
start_timestamp=(datetime.utcnow() - timedelta(hours=2)).isoformat() + "Z",
end_timestamp=datetime.utcnow().isoformat() + "Z",
error_state=True,
limit=50
)
start_time = time.perf_counter()
url = f"{base_url}/api/v2/interactions/history/query"
try:
history_data = retriever.fetch_with_range_and_retry(url, query.to_request_body())
extraction_latency_ms = (time.perf_counter() - start_time) * 1000
logger.info(f"Retrieved {len(history_data)} history records in {extraction_latency_ms:.2f}ms.")
# Path analysis
bottleneck_reports = analyzer.analyze_execution_history(history_data)
bottleneck_count = sum(1 for r in bottleneck_reports if r["is_bottleneck"])
logger.info(f"Detected {bottleneck_count} execution bottlenecks.")
# Accuracy calculation (simulated expected vs actual)
expected_records = max(1, len(history_data))
accuracy_rate = len(history_data) / expected_records
# Audit log generation
audit_log = {
"audit_timestamp": datetime.utcnow().isoformat() + "Z",
"interaction_id": query.interaction_ids[0],
"query_filter": query.to_cxone_filter(),
"records_processed": len(history_data),
"extraction_latency_ms": extraction_latency_ms,
"accuracy_rate": accuracy_rate,
"compliance_status": "VERIFIED" if accuracy_rate > 0.95 else "REVIEW_REQUIRED"
}
logger.info(f"Audit log generated: {json.dumps(audit_log, indent=2)}")
# Webhook synchronization
webhook_url = os.getenv("PROCESS_MINING_WEBHOOK", "https://example.com/webhook")
sync_client = httpx.Client(timeout=10.0)
try:
sync_client.post(webhook_url, json={
"status": "completion",
"audit_log": audit_log,
"bottleneck_count": bottleneck_count,
"analysis_payload": bottleneck_reports
}, headers={"Content-Type": "application/json"})
logger.info("Process mining platform synchronized.")
except Exception as e:
logger.error(f"Webhook sync failed: {e}")
finally:
sync_client.close()
except Exception as e:
logger.error(f"Query execution failed: {e}")
finally:
retriever.close()
auth_manager.close()
if __name__ == "__main__":
run_workflow_history_query()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired, the client credentials are incorrect, or the token was not attached to the request header.
- Fix: Verify
CXONE_CLIENT_IDandCXONE_CLIENT_SECRETmatch your CXone developer console. EnsureAuthorization: Bearer {token}is set. TheOAuthTokenManagerautomatically refreshes tokens 300 seconds before expiry.
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scopes (
interactions:read workflows:read) or the tenant enforces IP allowlisting. - Fix: Navigate to the CXone developer console, edit the OAuth client, and add the missing scopes. Verify that your execution environment IP is whitelisted in the tenant security settings.
Error: 429 Too Many Requests
- Cause: You exceeded the concurrent request quota or hit the tenant rate limit. CXone enforces strict throttling on history queries.
- Fix: The
fetch_with_range_and_retrymethod implements exponential backoff and respects theRetry-Afterheader. Reducelimitpayload values or stagger concurrent queries. The code capsactive_requestsat 5 to prevent cascade failures.
Error: 400 Bad Request
- Cause: The filter syntax violates CXone query grammar, or timestamps exceed the 30-day retention policy.
- Fix: Validate filter strings against CXone documentation. The
HistoryQueryPayloadmodel enforces retention limits via Pydantic validators. Ensure timestamp formats use ISO 8601 with timezone designators.
Error: 500 Internal Server Error
- Cause: Backend workflow engine degradation or transient history store unavailability.
- Fix: Implement retry logic with exponential backoff. The retriever retries up to 5 times for 5xx responses. If failures persist, verify CXone service status and reduce query scope.