Archiving NICE CXone Interaction Data with Python SDK
What You Will Build
- A Python script that polls the CXone Interactions API for closed interactions, extracts payloads and media metadata, transforms data to match archival schemas, compresses records, encrypts them, uploads to object storage with retry logic, updates interaction archival status, and generates a completion report.
- This tutorial uses the official
nice-cxone-sdkPython package alongsideboto3for S3 storage andcryptographyfor client-side encryption. - The implementation covers Python 3.9+ with modern async/await patterns, type hints, and production-grade error handling.
Prerequisites
- OAuth 2.0 Client Credentials grant configured in CXone Admin Console
- Required OAuth scopes:
interactions:read,recordings:read,customfields:write nice-cxone-sdk>=4.0.0,boto3>=1.28.0,cryptography>=41.0.0,httpx>=0.24.0- Python 3.9 or higher
- AWS S3 bucket with programmatic access keys and server-side encryption enabled
- Custom field in CXone named
archival_status(type: string) to track processing state
Authentication Setup
CXone uses OAuth 2.0 Client Credentials for server-to-server integrations. The SDK handles token acquisition and automatic refresh when configured correctly. You must pass the base API URL, client ID, and client secret to the ApiClient.
import os
import httpx
from nice_cxone_sdk import ApiClient, Configuration, OAuthClient
from nice_cxone_sdk.rest import ApiException
def initialize_cxone_client() -> ApiClient:
"""Configure and return an authenticated CXone API client."""
config = Configuration(
host=os.getenv("CXONE_API_BASE", "https://api.cisco.com"),
client_id=os.getenv("CXONE_CLIENT_ID"),
client_secret=os.getenv("CXONE_CLIENT_SECRET"),
oauth_client_id=os.getenv("CXONE_CLIENT_ID"),
oauth_client_secret=os.getenv("CXONE_CLIENT_SECRET")
)
# The SDK requires an OAuth client for token management
oauth_client = OAuthClient(
client_id=config.client_id,
client_secret=config.client_secret,
host=config.host,
http_client=httpx.Client(timeout=30.0)
)
api_client = ApiClient(configuration=config, oauth_client=oauth_client)
return api_client
The OAuth token is cached in memory by the SDK. When the token expires, the OAuthClient automatically requests a new one using the client credentials flow. You do not need to implement manual refresh logic.
Implementation
Step 1: Polling Closed Interactions
CXone provides a query endpoint for interactions that supports filtering, pagination, and sorting. You must use POST /api/v2/interactions/query to retrieve closed interactions. The query body accepts a filter object, page size, and ordering parameters.
Required OAuth scope: interactions:read
from nice_cxone_sdk.models import QueryInteractionRequest
import json
def fetch_closed_interactions(api_client: ApiClient, page_size: int = 100) -> list[dict]:
"""Poll CXone for closed interactions with automatic pagination."""
interactions_api = api_client.InteractionsApi()
all_interactions = []
# Query filter for closed status
query_filter = {
"type": "interaction",
"filter": "status:closed",
"orderBy": "updatedTime desc",
"pageSize": page_size
}
request_body = QueryInteractionRequest(**query_filter)
while True:
try:
response = interactions_api.query_interactions(body=request_body)
all_interactions.extend(response.data)
# Check pagination cursor
if not response.pagination or not response.pagination.next_page:
break
request_body.page_token = response.pagination.next_page
except ApiException as e:
if e.status == 429:
# Implement exponential backoff for rate limits
import time
retry_after = int(e.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
raise
return all_interactions
The next_page token in the pagination object contains an encoded cursor. You must pass it back in subsequent requests to continue fetching. The SDK automatically serializes the QueryInteractionRequest to JSON and sets the Content-Type: application/json header.
Step 2: Extracting Payloads and Media Metadata
Each interaction payload contains core metadata, participant information, and references to associated media. You must fetch the full interaction details to extract media metadata. The interactions API returns a nested structure where media objects contain recording URLs, duration, and format.
Required OAuth scope: interactions:read, recordings:read
def extract_interaction_payload(interaction_id: str, api_client: ApiClient) -> dict:
"""Fetch full interaction details and extract media metadata."""
interactions_api = api_client.InteractionsApi()
try:
interaction = interactions_api.get_interaction_by_id(interaction_id)
except ApiException as e:
if e.status in (404, 403):
return None
raise
# Transform to archival schema
archival_record = {
"interaction_id": interaction_id,
"type": interaction.type,
"status": interaction.status,
"created_time": interaction.created_time.isoformat() if hasattr(interaction.created_time, 'isoformat') else str(interaction.created_time),
"updated_time": interaction.updated_time.isoformat() if hasattr(interaction.updated_time, 'isoformat') else str(interaction.updated_time),
"participants": [],
"media_metadata": []
}
# Extract participants
if interaction.participants:
for participant in interaction.participants:
archival_record["participants"].append({
"id": participant.id,
"type": participant.type,
"address": participant.address,
"state": participant.state
})
# Extract media metadata (recordings, transcripts, etc.)
if interaction.media:
for media_item in interaction.media:
media_meta = {
"media_id": media_item.id,
"type": media_item.type,
"duration_ms": media_item.duration_ms,
"format": media_item.format,
"playback_url": media_item.playback_url,
"created_time": media_item.created_time.isoformat() if hasattr(media_item.created_time, 'isoformat') else str(media_item.created_time)
}
archival_record["media_metadata"].append(media_meta)
return archival_record
The transformation step normalizes datetime objects to ISO 8601 strings, flattens nested participant arrays, and isolates media references. This structure matches standard archival schemas used in data lakes and compliance repositories.
Step 3: Transforming, Compressing, and Encrypting Records
Archival systems require compressed and encrypted payloads to minimize storage costs and meet security requirements. You will compress JSON lines data using gzip, then encrypt the compressed stream using AES-256-GCM via the cryptography library.
import gzip
import base64
import json
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import os
def derive_encryption_key(password: str, salt: bytes) -> bytes:
"""Derive a 256-bit AES key from a password using PBKDF2."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000
)
return kdf.derive(password.encode())
def compress_and_encrypt_records(records: list[dict], encryption_password: str) -> bytes:
"""Compress records to gzip JSON lines and encrypt with AES-256-GCM."""
# Serialize to JSON lines
json_lines = "".join(json.dumps(record, default=str) + "\n" for record in records)
compressed_data = gzip.compress(json_lines.encode("utf-8"))
# Encrypt with AES-GCM
salt = os.urandom(16)
key = derive_encryption_key(encryption_password, salt)
aesgcm = AESGCM(key)
nonce = os.urandom(12)
encrypted_data = aesgcm.encrypt(nonce, compressed_data, None)
# Package salt, nonce, and ciphertext for decryption later
encrypted_package = {
"salt": base64.b64encode(salt).decode("utf-8"),
"nonce": base64.b64encode(nonce).decode("utf-8"),
"ciphertext": base64.b64encode(encrypted_data).decode("utf-8")
}
return json.dumps(encrypted_package).encode("utf-8")
AES-GCM provides authenticated encryption. The salt and nonce are stored alongside the ciphertext because they are required for decryption. You must never reuse a nonce with the same key. The gzip module operates in memory to avoid temporary disk I/O.
Step 4: Uploading to Object Storage with Retry Logic
Object storage uploads can fail due to network timeouts, throttling, or partial writes. You must implement a retry mechanism that tracks failed records and reprocesses them before marking the batch as complete.
Required OAuth scope: None (S3 credentials used directly)
import boto3
import time
import uuid
from typing import Tuple
def upload_archive_to_s3(
s3_client: boto3.client,
bucket: str,
key_prefix: str,
encrypted_payload: bytes,
max_retries: int = 3
) -> Tuple[str, bool]:
"""Upload encrypted archive to S3 with retry logic for partial failures."""
file_key = f"{key_prefix}/archive_{uuid.uuid4().hex}.enc.gz"
for attempt in range(max_retries):
try:
s3_client.put_object(
Bucket=bucket,
Key=file_key,
Body=encrypted_payload,
ServerSideEncryption="AES256",
ContentType="application/octet-stream"
)
return file_key, True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return file_key, False
def process_batch_with_retry(
api_client: ApiClient,
s3_client: boto3.client,
s3_bucket: str,
encryption_password: str,
batch: list[str]
) -> dict:
"""Process a batch of interactions with retry logic for failed uploads."""
results = {"success": [], "failed": [], "retried": []}
# First pass
for interaction_id in batch:
payload = extract_interaction_payload(interaction_id, api_client)
if not payload:
results["failed"].append({"interaction_id": interaction_id, "reason": "fetch_failed"})
continue
encrypted = compress_and_encrypt_records([payload], encryption_password)
s3_key, success = upload_archive_to_s3(s3_client, s3_bucket, "cxone/archives", encrypted)
if success:
results["success"].append({"interaction_id": interaction_id, "s3_key": s3_key})
else:
results["failed"].append({"interaction_id": interaction_id, "reason": "upload_failed"})
# Retry failed uploads
failed_interactions = [r["interaction_id"] for r in results["failed"]]
for interaction_id in failed_interactions:
payload = extract_interaction_payload(interaction_id, api_client)
if not payload:
continue
encrypted = compress_and_encrypt_records([payload], encryption_password)
s3_key, success = upload_archive_to_s3(s3_client, s3_bucket, "cxone/archives", encrypted)
if success:
results["retried"].append({"interaction_id": interaction_id, "s3_key": s3_key})
results["failed"] = [r for r in results["failed"] if r["interaction_id"] != interaction_id]
return results
The retry logic separates successful uploads, permanent failures, and recovered records. You track S3 keys for audit trails. The exponential backoff in upload_archive_to_s3 prevents cascading failures during S3 throttling.
Step 5: Updating Archival Status and Generating Reports
CXone interactions are immutable after closure. You must update a custom field to mark the archival state. After processing, you generate a completion report containing success rates, S3 locations, and failure reasons.
Required OAuth scope: customfields:write
from nice_cxone_sdk.models import CustomFieldValue
def update_archival_status(api_client: ApiClient, interaction_id: str, status: str) -> bool:
"""Update the archival_status custom field on an interaction."""
custom_fields_api = api_client.CustomFieldsApi()
try:
custom_field_value = CustomFieldValue(
field_id=os.getenv("CXONE_ARCHIVAL_FIELD_ID"),
value=status
)
custom_fields_api.update_custom_field_value_by_id(
interaction_id=interaction_id,
body=custom_field_value
)
return True
except ApiException:
return False
def generate_completion_report(results: dict, total_processed: int) -> dict:
"""Generate a structured archival completion report."""
success_count = len(results["success"]) + len(results["retried"])
failure_count = len(results["failed"])
report = {
"total_interactions": total_processed,
"successful_archives": success_count,
"failed_archives": failure_count,
"retried_and_succeeded": len(results["retried"]),
"success_rate": f"{(success_count / total_processed) * 100:.2f}%",
"archived_records": results["success"] + results["retried"],
"failed_records": results["failed"],
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
return report
The custom field update uses the CustomFieldsApi to patch the archival_status value. You must replace CXONE_ARCHIVAL_FIELD_ID with the actual custom field ID from your CXone instance. The report aggregates metrics for downstream monitoring systems.
Complete Working Example
import os
import time
import uuid
import json
import gzip
import base64
import boto3
import httpx
from typing import Tuple
from nice_cxone_sdk import ApiClient, Configuration, OAuthClient
from nice_cxone_sdk.models import QueryInteractionRequest, CustomFieldValue
from nice_cxone_sdk.rest import ApiException
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
def initialize_cxone_client() -> ApiClient:
config = Configuration(
host=os.getenv("CXONE_API_BASE", "https://api.cisco.com"),
client_id=os.getenv("CXONE_CLIENT_ID"),
client_secret=os.getenv("CXONE_CLIENT_SECRET"),
oauth_client_id=os.getenv("CXONE_CLIENT_ID"),
oauth_client_secret=os.getenv("CXONE_CLIENT_SECRET")
)
oauth_client = OAuthClient(
client_id=config.client_id,
client_secret=config.client_secret,
host=config.host,
http_client=httpx.Client(timeout=30.0)
)
return ApiClient(configuration=config, oauth_client=oauth_client)
def fetch_closed_interactions(api_client: ApiClient, page_size: int = 100) -> list[dict]:
interactions_api = api_client.InteractionsApi()
all_interactions = []
query_filter = {
"type": "interaction",
"filter": "status:closed",
"orderBy": "updatedTime desc",
"pageSize": page_size
}
request_body = QueryInteractionRequest(**query_filter)
while True:
try:
response = interactions_api.query_interactions(body=request_body)
all_interactions.extend(response.data)
if not response.pagination or not response.pagination.next_page:
break
request_body.page_token = response.pagination.next_page
except ApiException as e:
if e.status == 429:
time.sleep(int(e.headers.get("Retry-After", 5)))
continue
raise
return all_interactions
def extract_interaction_payload(interaction_id: str, api_client: ApiClient) -> dict:
interactions_api = api_client.InteractionsApi()
try:
interaction = interactions_api.get_interaction_by_id(interaction_id)
except ApiException as e:
if e.status in (404, 403):
return None
raise
archival_record = {
"interaction_id": interaction_id,
"type": interaction.type,
"status": interaction.status,
"created_time": interaction.created_time.isoformat() if hasattr(interaction.created_time, 'isoformat') else str(interaction.created_time),
"updated_time": interaction.updated_time.isoformat() if hasattr(interaction.updated_time, 'isoformat') else str(interaction.updated_time),
"participants": [],
"media_metadata": []
}
if interaction.participants:
for participant in interaction.participants:
archival_record["participants"].append({
"id": participant.id,
"type": participant.type,
"address": participant.address,
"state": participant.state
})
if interaction.media:
for media_item in interaction.media:
archival_record["media_metadata"].append({
"media_id": media_item.id,
"type": media_item.type,
"duration_ms": media_item.duration_ms,
"format": media_item.format,
"playback_url": media_item.playback_url,
"created_time": media_item.created_time.isoformat() if hasattr(media_item.created_time, 'isoformat') else str(media_item.created_time)
})
return archival_record
def derive_encryption_key(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000)
return kdf.derive(password.encode())
def compress_and_encrypt_records(records: list[dict], encryption_password: str) -> bytes:
json_lines = "".join(json.dumps(record, default=str) + "\n" for record in records)
compressed_data = gzip.compress(json_lines.encode("utf-8"))
salt = os.urandom(16)
key = derive_encryption_key(encryption_password, salt)
aesgcm = AESGCM(key)
nonce = os.urandom(12)
encrypted_data = aesgcm.encrypt(nonce, compressed_data, None)
encrypted_package = {
"salt": base64.b64encode(salt).decode("utf-8"),
"nonce": base64.b64encode(nonce).decode("utf-8"),
"ciphertext": base64.b64encode(encrypted_data).decode("utf-8")
}
return json.dumps(encrypted_package).encode("utf-8")
def upload_archive_to_s3(s3_client: boto3.client, bucket: str, key_prefix: str, encrypted_payload: bytes, max_retries: int = 3) -> Tuple[str, bool]:
file_key = f"{key_prefix}/archive_{uuid.uuid4().hex}.enc.gz"
for attempt in range(max_retries):
try:
s3_client.put_object(Bucket=bucket, Key=file_key, Body=encrypted_payload, ServerSideEncryption="AES256", ContentType="application/octet-stream")
return file_key, True
except Exception:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return file_key, False
def update_archival_status(api_client: ApiClient, interaction_id: str, status: str) -> bool:
custom_fields_api = api_client.CustomFieldsApi()
try:
custom_field_value = CustomFieldValue(field_id=os.getenv("CXONE_ARCHIVAL_FIELD_ID"), value=status)
custom_fields_api.update_custom_field_value_by_id(interaction_id=interaction_id, body=custom_field_value)
return True
except ApiException:
return False
def run_archival_pipeline():
api_client = initialize_cxone_client()
s3_client = boto3.client("s3", aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"))
s3_bucket = os.getenv("S3_BUCKET_NAME")
encryption_password = os.getenv("ARCHIVAL_ENCRYPTION_KEY")
closed_interactions = fetch_closed_interactions(api_client, page_size=50)
interaction_ids = [i.id for i in closed_interactions]
results = {"success": [], "failed": [], "retried": []}
for interaction_id in interaction_ids:
payload = extract_interaction_payload(interaction_id, api_client)
if not payload:
results["failed"].append({"interaction_id": interaction_id, "reason": "fetch_failed"})
continue
encrypted = compress_and_encrypt_records([payload], encryption_password)
s3_key, success = upload_archive_to_s3(s3_client, s3_bucket, "cxone/archives", encrypted)
if success:
results["success"].append({"interaction_id": interaction_id, "s3_key": s3_key})
else:
results["failed"].append({"interaction_id": interaction_id, "reason": "upload_failed"})
# Retry failed uploads
failed_ids = [r["interaction_id"] for r in results["failed"]]
for interaction_id in failed_ids:
payload = extract_interaction_payload(interaction_id, api_client)
if not payload:
continue
encrypted = compress_and_encrypt_records([payload], encryption_password)
s3_key, success = upload_archive_to_s3(s3_client, s3_bucket, "cxone/archives", encrypted)
if success:
results["retried"].append({"interaction_id": interaction_id, "s3_key": s3_key})
results["failed"] = [r for r in results["failed"] if r["interaction_id"] != interaction_id]
# Update status for successful and retried records
for record in results["success"] + results["retried"]:
update_archival_status(api_client, record["interaction_id"], "archived")
report = {
"total_interactions": len(interaction_ids),
"successful_archives": len(results["success"]) + len(results["retried"]),
"failed_archives": len(results["failed"]),
"retried_and_succeeded": len(results["retried"]),
"success_rate": f"{((len(results['success']) + len(results['retried'])) / len(interaction_ids)) * 100:.2f}%",
"archived_records": results["success"] + results["retried"],
"failed_records": results["failed"],
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
print(json.dumps(report, indent=2))
if __name__ == "__main__":
run_archival_pipeline()
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
- Cause: Missing or incorrect OAuth scopes, expired token, or client credentials lack permission to access interactions or custom fields.
- Fix: Verify that the OAuth client in CXone Admin Console includes
interactions:read,recordings:read, andcustomfields:write. Ensure the client is not disabled. The SDK automatically refreshes tokens, but initial authentication fails if credentials are invalid. - Code Fix: Validate environment variables before initialization. Add explicit scope verification during setup.
Error: 429 Too Many Requests
- Cause: CXone API rate limits are enforced per tenant and per endpoint. Polling interactions rapidly triggers throttling.
- Fix: Implement exponential backoff. The code above checks the
Retry-Afterheader and sleeps accordingly. You must never poll faster than 10 requests per second for the interactions query endpoint. - Code Fix: The
fetch_closed_interactionsfunction already handles 429 responses by sleeping for theRetry-Afterduration.
Error: 5xx Internal Server Error
- Cause: CXone backend transient failures or malformed query payloads.
- Fix: Retry the request with increasing delays. Do not retry more than three times. Log the request ID from the response headers for CXone support tickets.
- Code Fix: Wrap API calls in try-except blocks and implement a retry counter. The
upload_archive_to_s3function demonstrates the retry pattern.
Error: S3 Partial Upload or Timeout
- Cause: Network instability or large payloads exceeding default timeouts.
- Fix: Increase
httpxtimeout values. Use multipart uploads for payloads larger than 5 MB. The current implementation usesput_objectwhich works efficiently for compressed archives under 10 MB. - Code Fix: Adjust
boto3client configuration withconfig=boto3.session.Config(read_timeout=60, max_pool_connections=10).
Error: Custom Field Update Fails
- Cause: The custom field ID does not exist, the field is read-only, or the interaction is locked.
- Fix: Verify the custom field ID matches the
archival_statusfield in CXone. Ensure the field allows programmatic updates. Interactions must be in a closed state before custom field updates are permitted. - Code Fix: Catch
ApiExceptionstatus 400 or 403 and log the specific error message. Fall back to external tracking if the update fails.