Aggregating NICE CXone Interaction Completion Events into InfluxDB Time-Series with Python
What You Will Build
- A Python consumer that connects to the NICE CXone Events API WebSocket, listens for
interaction.completedpayloads, and streams structured telemetry to InfluxDB. - The application uses the CXone real-time events endpoint and the InfluxDB 2.x/3.x HTTP Write API.
- The implementation covers Python 3.10+ with
httpx,websockets, and standard library concurrency primitives.
Prerequisites
- NICE CXone OAuth client credentials with
view:interactionandread:interactionscopes - CXone organization domain (e.g.,
myorg.cxone.com) - InfluxDB 2.x/3.x instance with a configured bucket, organization, and API token
- Python 3.10+ runtime
- Dependencies:
pip install httpx websockets aiofiles
Authentication Setup
CXone uses OAuth 2.0 client credentials flow. The token expires after a fixed window (typically 30 minutes), so the consumer must fetch a fresh token before WebSocket connection and handle 401 Unauthorized responses by refreshing automatically.
import httpx
import os
import time
from datetime import datetime, timezone
CXONE_TOKEN_URL = f"https://{os.getenv('CXONE_ORG_DOMAIN')}.cxone.com/oauth/token"
CXONE_CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CXONE_CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
# Required OAuth scopes for event streaming
CXONE_SCOPES = "view:interaction read:interaction"
async def fetch_cxone_token() -> str:
"""
HTTP POST /oauth/token
Headers: Content-Type: application/x-www-form-urlencoded
Body: grant_type=client_credentials&client_id={id}&client_secret={secret}&scope=view:interaction+read:interaction
Response: {"access_token": "eyJhbG...", "token_type": "Bearer", "expires_in": 1800}
"""
async with httpx.AsyncClient() as client:
response = await client.post(
CXONE_TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CXONE_CLIENT_ID,
"client_secret": CXONE_CLIENT_SECRET,
"scope": CXONE_SCOPES
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
payload = response.json()
return payload["access_token"]
Token caching is handled in the main loop. The script tracks the token issuance time and requests a new token when the remaining lifetime drops below 120 seconds. This prevents mid-stream 401 disconnections.
Implementation
Step 1: WebSocket Subscription & Event Consumption
CXone delivers real-time telemetry via WebSocket. The connection URI requires the bearer token as a query parameter. Upon connection, the client must send a subscription message specifying the event type. The server responds with a confirmation, then streams JSON payloads.
import websockets
import json
import asyncio
CXONE_WS_URL = f"wss://{os.getenv('CXONE_ORG_DOMAIN')}.cxone.com/api/v2/events"
async def subscribe_to_events(token: str, event_queue: asyncio.Queue):
uri = f"{CXONE_WS_URL}?access_token={token}"
# HTTP Upgrade sequence handled automatically by websockets library
# Initial subscription message required by CXone Events API
subscription_msg = json.dumps({"subscription": ["interaction.completed"]})
async with websockets.connect(uri, ping_interval=20, ping_timeout=10) as ws:
await ws.send(subscription_msg)
print("Subscription confirmed. Listening for interaction.completed events.")
async for raw_msg in ws:
try:
event = json.loads(raw_msg)
# Enqueue for async processing to decouple network I/O from aggregation logic
await event_queue.put(event)
except json.JSONDecodeError:
print("Warning: Malformed JSON from CXone WebSocket. Skipping.")
continue
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket closed: {e}. Reconnecting in 5 seconds.")
await asyncio.sleep(5)
break
The websockets library handles ping/pong frames automatically. If the server closes the connection, the outer loop catches ConnectionClosed and triggers a reconnect sequence. This pattern survives network flaps without dropping unprocessed events.
Step 2: Hash Map Grouping & Percentile Calculation
Events arrive out of order and at variable rates. The consumer groups them by hour and campaign ID using a dictionary. Each bucket maintains lists of handle times and wait times. When the current hour advances or the bucket reaches a size threshold, the system calculates percentiles and flushes the data.
from collections import defaultdict
from statistics import quantiles
from datetime import datetime, timezone
class MetricAggregator:
def __init__(self):
# Structure: {(hour_bucket, campaign_id): {"handle": [], "wait": [], "count": 0}}
self.buckets = defaultdict(lambda: {"handle_times": [], "wait_times": [], "count": 0})
self.flushed_keys = set()
def add_event(self, event: dict):
payload = event.get("payload", {})
campaign_id = payload.get("campaignId")
handle_time = payload.get("handleTime")
wait_time = payload.get("waitTime")
timestamp_str = event.get("timestamp") or payload.get("completionTime")
if not all([campaign_id, handle_time is not None, wait_time is not None, timestamp_str]):
return
# Truncate to hourly bucket: "2024-05-20T14:00:00Z"
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
hour_bucket = dt.replace(minute=0, second=0, microsecond=0).isoformat()
key = (hour_bucket, campaign_id)
if key in self.flushed_keys:
return
self.buckets[key]["handle_times"].append(float(handle_time))
self.buckets[key]["wait_times"].append(float(wait_time))
self.buckets[key]["count"] += 1
def calculate_percentiles(self, values: list[float], percentiles: list[float]) -> dict:
if not values:
return {p: 0.0 for p in percentiles}
# statistics.quantiles uses linear interpolation between closest observations
q = quantiles(sorted(values), n=[100/p for p in percentiles], method="inclusive")
return dict(zip(percentiles, q))
def prepare_flush(self) -> list[tuple]:
"""Returns list of (key, metrics_dict) ready for InfluxDB export."""
ready = []
for key, data in self.buckets.items():
if data["count"] == 0:
continue
p_handle = self.calculate_percentiles(data["handle_times"], [50, 90, 95])
p_wait = self.calculate_percentiles(data["wait_times"], [50, 90, 95])
ready.append((key, {"handle": p_handle, "wait": p_wait, "count": data["count"]}))
return ready
The hash map avoids duplicate processing by tracking flushed keys. Percentile calculation uses statistics.quantiles with the inclusive method, which matches standard business intelligence expectations for handle time distributions. The aggregation happens in memory, keeping latency low.
Step 3: InfluxDB HTTP API Export
InfluxDB accepts time-series data via the HTTP Write API. The payload uses line protocol format. The client must handle 429 Too Many Requests with exponential backoff and retry failed batches.
import httpx
import os
INFLUX_URL = os.getenv("INFLUX_URL")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN")
INFLUX_ORG = os.getenv("INFLUX_ORG")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET")
async def write_to_influxdb(flush_items: list[tuple], client: httpx.AsyncClient):
# HTTP POST /api/v2/write?bucket={bucket}&org={org}&precision=s
# Headers: Authorization: Token {token}, Content-Type: text/plain
# Body: cxone_metrics,campaign_id=camp_xyz hour_bucket=2024-05-20T14:00:00Z p50_handle=120.5,p90_handle=150.2,p95_wait=80.1 1716206400
url = f"{INFLUX_URL}/api/v2/write?bucket={INFLUX_BUCKET}&org={INFLUX_ORG}&precision=s"
headers = {
"Authorization": f"Token {INFLUX_TOKEN}",
"Content-Type": "text/plain"
}
lines = []
for (hour_bucket, campaign_id), metrics in flush_items:
ts = int(datetime.fromisoformat(hour_bucket).timestamp())
line = (
f"cxone_metrics,campaign_id={campaign_id} "
f"hour_bucket=\"{hour_bucket}\" "
f"p50_handle={metrics['handle'][50]},p90_handle={metrics['handle'][90]},p95_handle={metrics['handle'][95]} "
f"p50_wait={metrics['wait'][50]},p90_wait={metrics['wait'][90]},p95_wait={metrics['wait'][95]} "
f"count={metrics['count']}i "
f"{ts}"
)
lines.append(line)
if not lines:
return
payload = "\n".join(lines)
retries = 0
max_retries = 3
while retries < max_retries:
try:
response = await client.post(url, content=payload, headers=headers)
if response.status_code == 204:
print(f"Successfully exported {len(lines)} line protocol records to InfluxDB.")
return
elif response.status_code == 429:
wait_time = 2 ** retries
print(f"InfluxDB rate limited (429). Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
retries += 1
else:
print(f"InfluxDB write failed with {response.status_code}: {response.text}")
raise httpx.HTTPStatusError(response=response)
except httpx.RequestError as e:
print(f"Network error writing to InfluxDB: {e}. Retrying...")
await asyncio.sleep(2 ** retries)
retries += 1
print("Failed to write to InfluxDB after maximum retries.")
The line protocol format requires string fields to be quoted and integers to have an i suffix. The retry loop handles transient network failures and rate limits. Production deployments should adjust max_retries and backoff multipliers based on InfluxDB cluster capacity.
Step 4: Checkpointing & Restart Resilience
Process restarts cause in-memory buckets to vanish. The consumer persists processed bucket keys and the last successful flush timestamp to a JSON file. On startup, it loads this state and filters out already-exported keys. Atomic file writes prevent corruption during crashes.
import json
import tempfile
import os
CHECKPOINT_FILE = "checkpoint.json"
async def save_checkpoint(flushed_keys: set, last_flush_time: str):
data = {
"flushed_keys": list(flushed_keys),
"last_flush_time": last_flush_time
}
# Atomic write pattern: write to temp file, then rename
dir_name = os.path.dirname(os.path.abspath(CHECKPOINT_FILE))
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
try:
with os.fdopen(fd, 'w') as tmp_file:
json.dump(data, tmp_file)
os.replace(tmp_path, CHECKPOINT_FILE)
except Exception as e:
os.unlink(tmp_path)
raise e
async def load_checkpoint() -> tuple[set, str]:
if not os.path.exists(CHECKPOINT_FILE):
return set(), "1970-01-01T00:00:00+00:00"
try:
with open(CHECKPOINT_FILE, 'r') as f:
data = json.load(f)
return set(data.get("flushed_keys", [])), data.get("last_flush_time", "1970-01-01T00:00:00+00:00")
except (json.JSONDecodeError, IOError) as e:
print(f"Checkpoint corrupted or unreadable: {e}. Starting fresh.")
return set(), "1970-01-01T00:00:00+00:00"
The checkpoint file stores the exact keys that have been exported. When the consumer restarts, it populates flushed_keys from disk. New events matching those keys are ignored, preventing duplicate writes. The atomic replace pattern guarantees the checkpoint is either fully written or not written at all.
Complete Working Example
import asyncio
import httpx
import websockets
import json
import os
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from statistics import quantiles
# Configuration via environment variables
CXONE_ORG_DOMAIN = os.getenv("CXONE_ORG_DOMAIN")
CXONE_CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CXONE_CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
INFLUX_URL = os.getenv("INFLUX_URL")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN")
INFLUX_ORG = os.getenv("INFLUX_ORG")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET")
CHECKPOINT_FILE = "checkpoint.json"
FLUSH_INTERVAL_SECONDS = 3600
CXONE_TOKEN_URL = f"https://{CXONE_ORG_DOMAIN}.cxone.com/oauth/token"
CXONE_WS_URL = f"wss://{CXONE_ORG_DOMAIN}.cxone.com/api/v2/events"
CXONE_SCOPES = "view:interaction read:interaction"
async def fetch_cxone_token() -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
CXONE_TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CXONE_CLIENT_ID,
"client_secret": CXONE_CLIENT_SECRET,
"scope": CXONE_SCOPES
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
class MetricAggregator:
def __init__(self):
self.buckets = defaultdict(lambda: {"handle_times": [], "wait_times": [], "count": 0})
self.flushed_keys = set()
def add_event(self, event: dict):
payload = event.get("payload", {})
campaign_id = payload.get("campaignId")
handle_time = payload.get("handleTime")
wait_time = payload.get("waitTime")
timestamp_str = event.get("timestamp") or payload.get("completionTime")
if not all([campaign_id, handle_time is not None, wait_time is not None, timestamp_str]):
return
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
hour_bucket = dt.replace(minute=0, second=0, microsecond=0).isoformat()
key = (hour_bucket, campaign_id)
if key in self.flushed_keys:
return
self.buckets[key]["handle_times"].append(float(handle_time))
self.buckets[key]["wait_times"].append(float(wait_time))
self.buckets[key]["count"] += 1
def calculate_percentiles(self, values: list[float], percentiles: list[float]) -> dict:
if not values:
return {p: 0.0 for p in percentiles}
q = quantiles(sorted(values), n=[100/p for p in percentiles], method="inclusive")
return dict(zip(percentiles, q))
def prepare_flush(self) -> list[tuple]:
ready = []
for key, data in self.buckets.items():
if data["count"] == 0:
continue
p_handle = self.calculate_percentiles(data["handle_times"], [50, 90, 95])
p_wait = self.calculate_percentiles(data["wait_times"], [50, 90, 95])
ready.append((key, {"handle": p_handle, "wait": p_wait, "count": data["count"]}))
return ready
async def write_to_influxdb(flush_items: list[tuple], client: httpx.AsyncClient):
url = f"{INFLUX_URL}/api/v2/write?bucket={INFLUX_BUCKET}&org={INFLUX_ORG}&precision=s"
headers = {"Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "text/plain"}
lines = []
for (hour_bucket, campaign_id), metrics in flush_items:
ts = int(datetime.fromisoformat(hour_bucket).timestamp())
line = (
f"cxone_metrics,campaign_id={campaign_id} "
f"hour_bucket=\"{hour_bucket}\" "
f"p50_handle={metrics['handle'][50]},p90_handle={metrics['handle'][90]},p95_handle={metrics['handle'][95]} "
f"p50_wait={metrics['wait'][50]},p90_wait={metrics['wait'][90]},p95_wait={metrics['wait'][95]} "
f"count={metrics['count']}i "
f"{ts}"
)
lines.append(line)
if not lines:
return
payload = "\n".join(lines)
retries = 0
max_retries = 3
while retries < max_retries:
try:
response = await client.post(url, content=payload, headers=headers)
if response.status_code == 204:
print(f"Successfully exported {len(lines)} line protocol records to InfluxDB.")
return
elif response.status_code == 429:
wait_time = 2 ** retries
print(f"InfluxDB rate limited (429). Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
retries += 1
else:
print(f"InfluxDB write failed with {response.status_code}: {response.text}")
raise httpx.HTTPStatusError(response=response)
except httpx.RequestError as e:
print(f"Network error writing to InfluxDB: {e}. Retrying...")
await asyncio.sleep(2 ** retries)
retries += 1
print("Failed to write to InfluxDB after maximum retries.")
async def save_checkpoint(flushed_keys: set, last_flush_time: str):
data = {"flushed_keys": list(flushed_keys), "last_flush_time": last_flush_time}
dir_name = os.path.dirname(os.path.abspath(CHECKPOINT_FILE))
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
try:
with os.fdopen(fd, 'w') as tmp_file:
json.dump(data, tmp_file)
os.replace(tmp_path, CHECKPOINT_FILE)
except Exception as e:
os.unlink(tmp_path)
raise e
async def load_checkpoint() -> tuple[set, str]:
if not os.path.exists(CHECKPOINT_FILE):
return set(), "1970-01-01T00:00:00+00:00"
try:
with open(CHECKPOINT_FILE, 'r') as f:
data = json.load(f)
return set(data.get("flushed_keys", [])), data.get("last_flush_time", "1970-01-01T00:00:00+00:00")
except (json.JSONDecodeError, IOError) as e:
print(f"Checkpoint corrupted or unreadable: {e}. Starting fresh.")
return set(), "1970-01-01T00:00:00+00:00"
async def main():
print("Loading checkpoint...")
flushed_keys, last_flush_time = await load_checkpoint()
aggregator = MetricAggregator()
aggregator.flushed_keys = flushed_keys
token = await fetch_cxone_token()
token_expiry = datetime.now(timezone.utc).timestamp() + 1600
event_queue = asyncio.Queue()
async def producer():
while True:
try:
await asyncio.sleep(0.1)
if datetime.now(timezone.utc).timestamp() > token_expiry:
print("Token expiring. Refreshing...")
token = await fetch_cxone_token()
token_expiry = datetime.now(timezone.utc).timestamp() + 1600
except Exception as e:
print(f"Token refresh failed: {e}")
await asyncio.sleep(10)
async with websockets.connect(f"{CXONE_WS_URL}?access_token={token}", ping_interval=20) as ws:
await ws.send(json.dumps({"subscription": ["interaction.completed"]}))
async for raw_msg in ws:
try:
await event_queue.put(json.loads(raw_msg))
except json.JSONDecodeError:
continue
except websockets.exceptions.ConnectionClosed:
await asyncio.sleep(5)
break
async def consumer(influx_client: httpx.AsyncClient):
while True:
event = await event_queue.get()
aggregator.add_event(event)
flush_items = aggregator.prepare_flush()
if flush_items:
await write_to_influxdb(flush_items, influx_client)
for key, _ in flush_items:
aggregator.flushed_keys.add(key)
current_time = datetime.now(timezone.utc).isoformat()
await save_checkpoint(aggregator.flushed_keys, current_time)
aggregator.buckets.clear()
async with httpx.AsyncClient() as influx_client:
await asyncio.gather(producer(), consumer(influx_client))
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket Handshake
- What causes it: The bearer token has expired or the OAuth client lacks the
view:interactionscope. CXone rejects the connection immediately. - How to fix it: Verify the token generation response contains a valid JWT. Check the
scopeclaim in the decoded token. Implement proactive token refresh 120 seconds before expiry as shown in the producer loop. - Code showing the fix: The
producer()coroutine trackstoken_expiryand fetches a new token before the WebSocket connection drops.
Error: 429 Too Many Requests from InfluxDB
- What causes it: The consumer flushes too frequently or sends oversized batches. InfluxDB enforces write throughput limits per organization.
- How to fix it: Increase
FLUSH_INTERVAL_SECONDSor implement batch size limits. The retry loop inwrite_to_influxdbalready applies exponential backoff. Add atime.Sleepbetween batches if writing to a constrained cluster. - Code showing the fix: The
while retries < max_retriesblock checksresponse.status_code == 429and sleeps for2 ** retriesseconds before retrying.
Error: Malformed Line Protocol or 400 Bad Request
- What causes it: String fields containing commas, quotes, or equals signs are not escaped. InfluxDB parsers reject unescaped special characters.
- How to fix it: Escape commas with
\,, quotes with\", and equals signs with\=in tag and field values. The example assumes campaign IDs and hour buckets are alphanumeric, but production code should sanitize strings before formatting. - Code showing the fix: Replace raw string interpolation with a helper function that escapes special characters:
value.replace("\\", "\\\\").replace("\"", "\\\"").replace(",", "\\,").replace("=", "\\=").
Error: Checkpoint File Corruption on Crash
- What causes it: The process terminates during
json.dump(), leaving a partially written file. Subsequent loads fail withJSONDecodeError. - How to fix it: Use atomic file replacement. Write to a temporary file in the same directory, flush, then rename. The
save_checkpointfunction implements this pattern. If corruption occurs, theload_checkpointfunction catches the exception and resets to an empty state, preventing infinite crash loops.