Retrieving NICE CXone Outbound Call Outcome Records via API with Python
What You Will Build
- The code retrieves outbound call outcome records from NICE CXone, applies disposition mapping, calculates conversion rates, and synchronizes results with an external CRM via webhook.
- This uses the NICE CXone
/api/v2/outbound/callsREST API with OAuth 2.0 client credentials authentication. - The tutorial covers Python 3.9+ using the
requestsandpydanticlibraries.
Prerequisites
- OAuth client type: Confidential Client (Client Credentials Grant)
- Required scopes:
outbound:call:read,outbound:campaign:read - API version: NICE CXone API v2 (REST)
- Language/runtime: Python 3.9+
- External dependencies:
requests,pydantic,python-dotenv
Authentication Setup
NICE CXone uses OAuth 2.0 client credentials flow for server-to-server authentication. You must cache the access token and refresh it before expiration to avoid 401 Unauthorized errors during long-running retrieval jobs.
import requests
import time
import logging
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CxoneAuthManager:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://login.nicecxone.com"):
self.client_id = client_id
self.client_secret = client_secret
self.auth_url = f"{base_url}/oauth/token"
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
payload = {
"grant_type": "client_credentials",
"scope": "outbound:call:read outbound:campaign:read"
}
response = requests.post(self.auth_url, data=payload, auth=(self.client_id, self.client_secret))
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data["expires_in"]
return self._token
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
The scope parameter must include outbound:call:read to access call outcome records. The token caches automatically and refreshes sixty seconds before expiration to prevent mid-stream authentication failures.
Implementation
Step 1: Payload Construction and Schema Validation
You must construct the retrieval payload with campaign ID filters, outcome type matrices, and timestamp boundaries. NICE CXone enforces record retention policies (typically ninety days for standard outbound logs) and limits concurrent download jobs. Validation prevents resource exhaustion before network requests begin.
from pydantic import BaseModel, validator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class OutboundRetrievalPayload(BaseModel):
campaign_id: str
start_datetime: datetime
end_datetime: datetime
outcome_codes: list[str]
limit: int = 1000
@validator("start_datetime")
def validate_retention_policy(cls, v, values):
max_lookback = datetime.utcnow() - timedelta(days=90)
if v < max_lookback:
raise ValueError(f"Start date {v} exceeds ninety-day retention policy. Use {max_lookback} or later.")
return v
@validator("end_datetime")
def validate_time_boundary(cls, v, values):
if v > datetime.utcnow():
raise ValueError("End date cannot be in the future.")
return v
@validator("limit")
def validate_concurrent_quota(cls, v):
if v > 5000:
raise ValueError("Limit exceeds concurrent download quota. Maximum allowed is 5000 per request.")
return v
def build_query_params(payload: OutboundRetrievalPayload) -> dict:
return {
"campaignId": payload.campaign_id,
"startDateTime": payload.start_datetime.isoformat(),
"endDateTime": payload.end_datetime.isoformat(),
"dispositionCode": ",".join(payload.outcome_codes),
"limit": payload.limit
}
The pydantic model enforces retention boundaries and quota limits. The dispositionCode parameter accepts a comma-separated matrix of outcome codes. Passing invalid boundaries returns a 400 Bad Request from the CXone API, so client-side validation reduces network waste.
Step 2: Streaming GET Operations with Pagination and Chunk Reassembly
The CXone outbound calls endpoint returns paginated JSON. You must handle streaming retrieval, automatic chunk reassembly, and rate limit backoff. The endpoint supports nextPageToken for pagination. HTTP Range headers are not natively supported for this JSON endpoint, so chunk reassembly uses token-based pagination with incremental buffer flushing.
import requests
import time
import json
import logging
from typing import Generator, Any
logger = logging.getLogger(__name__)
class CallOutcomeStreamer:
def __init__(self, auth: CxoneAuthManager, base_api_url: str = "https://api.nicecxone.com"):
self.auth = auth
self.base_url = f"{base_api_url}/api/v2/outbound/calls"
self.session = requests.Session()
def stream_outcomes(self, params: dict) -> Generator[Any, None, None]:
current_params = params.copy()
retry_count = 0
max_retries = 5
while True:
retry_count += 1
try:
headers = self.auth.get_headers()
response = self.session.get(
self.base_url,
params=current_params,
headers=headers,
stream=True,
timeout=30
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** retry_count))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
time.sleep(retry_after)
continue
response.raise_for_status()
# Chunk reassembly for large JSON payloads
raw_data = b""
for chunk in response.iter_content(chunk_size=8192):
if chunk:
raw_data += chunk
data = json.loads(raw_data)
yield data.get("calls", [])
next_token = data.get("nextPageToken")
if not next_token:
break
current_params["nextPageToken"] = next_token
retry_count = 0
except requests.exceptions.HTTPError as e:
if e.response.status_code in [401, 403]:
logger.error(f"Authentication/Authorization failed: {e.response.status_code}")
raise
elif e.response.status_code == 429:
continue
else:
logger.error(f"HTTP error during retrieval: {e}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Network error during streaming: {e}")
raise
# Required OAuth scope: outbound:call:read
# Expected response structure:
# {
# "calls": [
# {
# "id": "call-uuid-123",
# "campaignId": "camp-uuid-456",
# "dispositionCode": "CONVERTED",
# "startTime": "2023-10-01T10:00:00Z",
# "endTime": "2023-10-01T10:05:00Z",
# "direction": "OUTBOUND"
# }
# ],
# "nextPageToken": "eyJwYWdlIjoxfQ=="
# }
The stream=True parameter prevents memory exhaustion during large dataset retrieval. The iter_content loop reassembles chunks into a complete JSON object before yielding. The 429 handler implements exponential backoff. The nextPageToken drives pagination until exhaustion.
Step 3: Outcome Analysis Logic and Conversion Rate Pipeline
Raw call records require disposition code mapping and conversion rate calculation. You must structure the pipeline to transform raw API responses into actionable reporting data.
from typing import Dict, List, Any
import logging
logger = logging.getLogger(__name__)
DISPOSITION_MAP = {
"CONVERTED": "success",
"APPOINTMENT_SET": "success",
"INTERESTED": "warm",
"CALLBACK_REQUESTED": "warm",
"NOT_INTERESTED": "cold",
"BUSY": "neutral",
"NO_ANSWER": "neutral",
"DISCONNECTED": "failure",
"WRONG_NUMBER": "failure"
}
class OutcomeAnalyzer:
def __init__(self):
self.total_calls: int = 0
self.conversion_count: int = 0
self.category_counts: Dict[str, int] = {
"success": 0, "warm": 0, "cold": 0, "neutral": 0, "failure": 0
}
def process_record(self, call: Dict[str, Any]) -> Dict[str, Any]:
self.total_calls += 1
disposition = call.get("dispositionCode", "UNKNOWN")
category = DISPOSITION_MAP.get(disposition, "neutral")
self.category_counts[category] += 1
if category == "success":
self.conversion_count += 1
return {
"id": call.get("id"),
"disposition_code": disposition,
"mapped_category": category,
"start_time": call.get("startTime"),
"end_time": call.get("endTime"),
"duration_seconds": self._calculate_duration(call.get("startTime"), call.get("endTime"))
}
def _calculate_duration(self, start: str, end: str) -> int:
try:
from datetime import datetime
s = datetime.fromisoformat(start.replace("Z", "+00:00"))
e = datetime.fromisoformat(end.replace("Z", "+00:00"))
return int((e - s).total_seconds())
except Exception:
return 0
def get_conversion_rate(self) -> float:
if self.total_calls == 0:
return 0.0
return (self.conversion_count / self.total_calls) * 100.0
def get_summary(self) -> Dict[str, Any]:
return {
"total_calls": self.total_calls,
"conversion_rate_percent": round(self.get_conversion_rate(), 2),
"category_distribution": self.category_counts,
"performance_trend": "positive" if self.get_conversion_rate() > 15.0 else "negative"
}
The analyzer maintains state across chunks. The DISPOSITION_MAP translates CXone codes into business categories. The conversion rate pipeline calculates success percentages against the total call volume. The summary method structures the output for downstream reporting systems.
Step 4: Webhook Callbacks, Metrics, and Audit Logging
You must synchronize retrieval completion with external CRM systems, track extraction latency, calculate download success rates, and generate compliance audit logs.
import requests
import time
import json
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class RetrievalCoordinator:
def __init__(self, auth: CxoneAuthManager, webhook_url: str):
self.auth = auth
self.webhook_url = webhook_url
self.start_time: float = 0.0
self.successful_chunks: int = 0
self.failed_chunks: int = 0
self.processed_records: List[Dict[str, Any]] = []
def run_retrieval_pipeline(self, payload: OutboundRetrievalPayload) -> Dict[str, Any]:
self.start_time = time.time()
analyzer = OutcomeAnalyzer()
streamer = CallOutcomeStreamer(self.auth)
params = build_query_params(payload)
audit_log = {
"job_id": f"job-{int(self.start_time)}",
"campaign_id": payload.campaign_id,
"start_time": payload.start_datetime.isoformat(),
"end_time": payload.end_datetime.isoformat(),
"initiated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"status": "running",
"records_processed": 0,
"compliance_hash": ""
}
try:
for chunk in streamer.stream_outcomes(params):
self.successful_chunks += 1
for record in chunk:
analyzed = analyzer.process_record(record)
self.processed_records.append(analyzed)
audit_log["records_processed"] += len(chunk)
logger.info(f"Processed chunk. Total records: {audit_log['records_processed']}")
audit_log["status"] = "completed"
audit_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
# Generate compliance audit hash
import hashlib
payload_str = json.dumps(self.processed_records, sort_keys=True)
audit_log["compliance_hash"] = hashlib.sha256(payload_str.encode()).hexdigest()
metrics = {
"extraction_latency_seconds": round(time.time() - self.start_time, 2),
"download_success_rate": round(
(self.successful_chunks / (self.successful_chunks + self.failed_chunks)) * 100 if (self.successful_chunks + self.failed_chunks) > 0 else 100.0, 2
),
"total_records": len(self.processed_records),
"conversion_rate": analyzer.get_conversion_rate(),
"summary": analyzer.get_summary()
}
# Synchronize with external CRM via webhook
self._notify_crm(metrics, audit_log)
return {
"metrics": metrics,
"audit_log": audit_log,
"records": self.processed_records
}
except Exception as e:
audit_log["status"] = "failed"
audit_log["error"] = str(e)
self.failed_chunks += 1
raise
def _notify_crm(self, metrics: Dict[str, Any], audit_log: Dict[str, Any]) -> None:
try:
headers = self.auth.get_headers()
response = requests.post(
self.webhook_url,
json={"metrics": metrics, "audit": audit_log},
headers=headers,
timeout=10
)
response.raise_for_status()
logger.info("CRM webhook synchronized successfully")
except requests.exceptions.RequestException as e:
logger.error(f"CRM webhook failed: {e}")
The coordinator orchestrates streaming, analysis, metrics tracking, and webhook delivery. The extraction_latency_seconds field measures total pipeline duration. The download_success_rate tracks chunk-level reliability. The compliance_hash provides cryptographic verification for audit trails. The webhook POST delivers structured metrics to the external CRM.
Complete Working Example
import os
import time
import logging
import requests
import json
import hashlib
from typing import Optional, Dict, List, Any, Generator
from datetime import datetime, timedelta
from pydantic import BaseModel, validator
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# Authentication Manager
class CxoneAuthManager:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://login.nicecxone.com"):
self.client_id = client_id
self.client_secret = client_secret
self.auth_url = f"{base_url}/oauth/token"
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
payload = {"grant_type": "client_credentials", "scope": "outbound:call:read outbound:campaign:read"}
response = requests.post(self.auth_url, data=payload, auth=(self.client_id, self.client_secret))
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data["expires_in"]
return self._token
def get_headers(self) -> dict:
return {"Authorization": f"Bearer {self.get_token()}", "Content-Type": "application/json"}
# Payload Schema
class OutboundRetrievalPayload(BaseModel):
campaign_id: str
start_datetime: datetime
end_datetime: datetime
outcome_codes: list[str]
limit: int = 1000
@validator("start_datetime")
def validate_retention_policy(cls, v):
max_lookback = datetime.utcnow() - timedelta(days=90)
if v < max_lookback:
raise ValueError(f"Start date {v} exceeds ninety-day retention policy.")
return v
@validator("end_datetime")
def validate_time_boundary(cls, v):
if v > datetime.utcnow():
raise ValueError("End date cannot be in the future.")
return v
@validator("limit")
def validate_concurrent_quota(cls, v):
if v > 5000:
raise ValueError("Limit exceeds concurrent download quota.")
return v
def build_query_params(payload: OutboundRetrievalPayload) -> dict:
return {
"campaignId": payload.campaign_id,
"startDateTime": payload.start_datetime.isoformat(),
"endDateTime": payload.end_datetime.isoformat(),
"dispositionCode": ",".join(payload.outcome_codes),
"limit": payload.limit
}
# Streaming Retriever
class CallOutcomeStreamer:
def __init__(self, auth: CxoneAuthManager, base_api_url: str = "https://api.nicecxone.com"):
self.auth = auth
self.base_url = f"{base_api_url}/api/v2/outbound/calls"
self.session = requests.Session()
def stream_outcomes(self, params: dict) -> Generator[Any, None, None]:
current_params = params.copy()
retry_count = 0
while True:
retry_count += 1
try:
headers = self.auth.get_headers()
response = self.session.get(self.base_url, params=current_params, headers=headers, stream=True, timeout=30)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** retry_count))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
time.sleep(retry_after)
continue
response.raise_for_status()
raw_data = b""
for chunk in response.iter_content(chunk_size=8192):
if chunk:
raw_data += chunk
data = json.loads(raw_data)
yield data.get("calls", [])
next_token = data.get("nextPageToken")
if not next_token:
break
current_params["nextPageToken"] = next_token
retry_count = 0
except requests.exceptions.HTTPError as e:
if e.response.status_code in [401, 403]:
logger.error(f"Auth failed: {e.response.status_code}")
raise
elif e.response.status_code == 429:
continue
else:
raise
except requests.exceptions.RequestException as e:
logger.error(f"Network error: {e}")
raise
# Analyzer
DISPOSITION_MAP = {
"CONVERTED": "success", "APPOINTMENT_SET": "success", "INTERESTED": "warm",
"CALLBACK_REQUESTED": "warm", "NOT_INTERESTED": "cold", "BUSY": "neutral",
"NO_ANSWER": "neutral", "DISCONNECTED": "failure", "WRONG_NUMBER": "failure"
}
class OutcomeAnalyzer:
def __init__(self):
self.total_calls = 0
self.conversion_count = 0
self.category_counts = {"success": 0, "warm": 0, "cold": 0, "neutral": 0, "failure": 0}
def process_record(self, call: Dict[str, Any]) -> Dict[str, Any]:
self.total_calls += 1
disposition = call.get("dispositionCode", "UNKNOWN")
category = DISPOSITION_MAP.get(disposition, "neutral")
self.category_counts[category] += 1
if category == "success":
self.conversion_count += 1
return {
"id": call.get("id"), "disposition_code": disposition, "mapped_category": category,
"start_time": call.get("startTime"), "end_time": call.get("endTime")
}
def get_conversion_rate(self) -> float:
return (self.conversion_count / self.total_calls) * 100.0 if self.total_calls > 0 else 0.0
def get_summary(self) -> Dict[str, Any]:
return {
"total_calls": self.total_calls, "conversion_rate_percent": round(self.get_conversion_rate(), 2),
"category_distribution": self.category_counts, "performance_trend": "positive" if self.get_conversion_rate() > 15.0 else "negative"
}
# Coordinator
class CxoneOutboundOutcomeRetriever:
def __init__(self, client_id: str, client_secret: str, webhook_url: str):
self.auth = CxoneAuthManager(client_id, client_secret)
self.webhook_url = webhook_url
self.start_time = 0.0
self.successful_chunks = 0
self.failed_chunks = 0
self.processed_records = []
def run(self, payload: OutboundRetrievalPayload) -> Dict[str, Any]:
self.start_time = time.time()
analyzer = OutcomeAnalyzer()
streamer = CallOutcomeStreamer(self.auth)
params = build_query_params(payload)
audit_log = {
"job_id": f"job-{int(self.start_time)}", "campaign_id": payload.campaign_id,
"start_time": payload.start_datetime.isoformat(), "end_time": payload.end_datetime.isoformat(),
"initiated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "status": "running",
"records_processed": 0, "compliance_hash": ""
}
try:
for chunk in streamer.stream_outcomes(params):
self.successful_chunks += 1
for record in chunk:
analyzed = analyzer.process_record(record)
self.processed_records.append(analyzed)
audit_log["records_processed"] += len(chunk)
audit_log["status"] = "completed"
audit_log["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
payload_str = json.dumps(self.processed_records, sort_keys=True)
audit_log["compliance_hash"] = hashlib.sha256(payload_str.encode()).hexdigest()
metrics = {
"extraction_latency_seconds": round(time.time() - self.start_time, 2),
"download_success_rate": round((self.successful_chunks / (self.successful_chunks + self.failed_chunks)) * 100 if (self.successful_chunks + self.failed_chunks) > 0 else 100.0, 2),
"total_records": len(self.processed_records), "conversion_rate": analyzer.get_conversion_rate(),
"summary": analyzer.get_summary()
}
self._notify_crm(metrics, audit_log)
return {"metrics": metrics, "audit_log": audit_log, "records": self.processed_records}
except Exception as e:
audit_log["status"] = "failed"
audit_log["error"] = str(e)
self.failed_chunks += 1
raise
def _notify_crm(self, metrics: Dict[str, Any], audit_log: Dict[str, Any]) -> None:
try:
headers = self.auth.get_headers()
response = requests.post(self.webhook_url, json={"metrics": metrics, "audit": audit_log}, headers=headers, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"CRM webhook failed: {e}")
if __name__ == "__main__":
CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
WEBHOOK_URL = os.getenv("CRM_WEBHOOK_URL", "https://webhook.site/test")
payload = OutboundRetrievalPayload(
campaign_id="your-campaign-uuid-here",
start_datetime=datetime.utcnow() - timedelta(days=7),
end_datetime=datetime.utcnow(),
outcome_codes=["CONVERTED", "APPOINTMENT_SET", "NOT_INTERESTED", "BUSY"],
limit=1000
)
retriever = CxoneOutboundOutcomeRetriever(CLIENT_ID, CLIENT_SECRET, WEBHOOK_URL)
result = retriever.run(payload)
print(json.dumps(result, indent=2))
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: Expired access token, invalid client credentials, or missing
outbound:call:readscope. - How to fix it: Verify the client secret matches the CXone admin console. Ensure the token refresh logic executes before expiration. Check that the OAuth request includes the correct scopes.
- Code showing the fix: The
CxoneAuthManagerclass caches tokens and refreshes them sixty seconds before expiration. Theget_headersmethod always fetches a valid token before attaching it to the request.
Error: 429 Too Many Requests
- What causes it: Exceeding CXone rate limits (typically 100 requests per minute for outbound endpoints) or concurrent download quotas.
- How to fix it: Implement exponential backoff with
Retry-Afterheader parsing. Reduce thelimitparameter to decrease request frequency. Stagger retrieval jobs across campaigns. - Code showing the fix: The
stream_outcomesmethod checksresponse.status_code == 429, extractsRetry-After, sleeps, and retries. TheOutboundRetrievalPayloadvalidator enforces a maximum limit of 5000 to prevent quota violations.
Error: 400 Bad Request
- What causes it: Invalid timestamp boundaries, malformed
dispositionCodematrix, or exceeding retention policy limits. - How to fix it: Validate dates against the ninety-day retention window. Ensure disposition codes match CXone enum values. Format timestamps as ISO 8601 with timezone offsets.
- Code showing the fix: The
pydanticvalidators reject dates outside the retention window and future end dates. Thebuild_query_paramsfunction serializes dates correctly and joins disposition codes with commas.
Error: 5xx Server Error
- What causes it: CXone backend overload, temporary service degradation, or internal routing failures.
- How to fix it: Retry with jittered delays. Monitor CXone status pages. Log the full response body for support tickets.
- Code showing the fix: The
requests.exceptions.HTTPErrorhandler catches non-429 server errors and re-raises them after logging. The streaming loop resetsretry_counton success to prevent unnecessary backoff on transient issues.