Aggregating NICE CXone Interaction Completion Events into InfluxDB Time-Series with Python

Aggregating NICE CXone Interaction Completion Events into InfluxDB Time-Series with Python

What You Will Build

  • A Python consumer that connects to the NICE CXone Events API WebSocket, listens for interaction.completed payloads, and streams structured telemetry to InfluxDB.
  • The application uses the CXone real-time events endpoint and the InfluxDB 2.x/3.x HTTP Write API.
  • The implementation covers Python 3.10+ with httpx, websockets, and standard library concurrency primitives.

Prerequisites

  • NICE CXone OAuth client credentials with view:interaction and read:interaction scopes
  • CXone organization domain (e.g., myorg.cxone.com)
  • InfluxDB 2.x/3.x instance with a configured bucket, organization, and API token
  • Python 3.10+ runtime
  • Dependencies: pip install httpx websockets aiofiles

Authentication Setup

CXone uses OAuth 2.0 client credentials flow. The token expires after a fixed window (typically 30 minutes), so the consumer must fetch a fresh token before WebSocket connection and handle 401 Unauthorized responses by refreshing automatically.

import httpx
import os
import time
from datetime import datetime, timezone

CXONE_TOKEN_URL = f"https://{os.getenv('CXONE_ORG_DOMAIN')}.cxone.com/oauth/token"
CXONE_CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CXONE_CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")

# Required OAuth scopes for event streaming
CXONE_SCOPES = "view:interaction read:interaction"

async def fetch_cxone_token() -> str:
    """
    HTTP POST /oauth/token
    Headers: Content-Type: application/x-www-form-urlencoded
    Body: grant_type=client_credentials&client_id={id}&client_secret={secret}&scope=view:interaction+read:interaction
    Response: {"access_token": "eyJhbG...", "token_type": "Bearer", "expires_in": 1800}
    """
    async with httpx.AsyncClient() as client:
        response = await client.post(
            CXONE_TOKEN_URL,
            data={
                "grant_type": "client_credentials",
                "client_id": CXONE_CLIENT_ID,
                "client_secret": CXONE_CLIENT_SECRET,
                "scope": CXONE_SCOPES
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        payload = response.json()
        return payload["access_token"]

Token caching is handled in the main loop. The script tracks the token issuance time and requests a new token when the remaining lifetime drops below 120 seconds. This prevents mid-stream 401 disconnections.

Implementation

Step 1: WebSocket Subscription & Event Consumption

CXone delivers real-time telemetry via WebSocket. The connection URI requires the bearer token as a query parameter. Upon connection, the client must send a subscription message specifying the event type. The server responds with a confirmation, then streams JSON payloads.

import websockets
import json
import asyncio

CXONE_WS_URL = f"wss://{os.getenv('CXONE_ORG_DOMAIN')}.cxone.com/api/v2/events"

async def subscribe_to_events(token: str, event_queue: asyncio.Queue):
    uri = f"{CXONE_WS_URL}?access_token={token}"
    # HTTP Upgrade sequence handled automatically by websockets library
    # Initial subscription message required by CXone Events API
    subscription_msg = json.dumps({"subscription": ["interaction.completed"]})
    
    async with websockets.connect(uri, ping_interval=20, ping_timeout=10) as ws:
        await ws.send(subscription_msg)
        print("Subscription confirmed. Listening for interaction.completed events.")
        
        async for raw_msg in ws:
            try:
                event = json.loads(raw_msg)
                # Enqueue for async processing to decouple network I/O from aggregation logic
                await event_queue.put(event)
            except json.JSONDecodeError:
                print("Warning: Malformed JSON from CXone WebSocket. Skipping.")
                continue
            except websockets.exceptions.ConnectionClosed as e:
                print(f"WebSocket closed: {e}. Reconnecting in 5 seconds.")
                await asyncio.sleep(5)
                break

The websockets library handles ping/pong frames automatically. If the server closes the connection, the outer loop catches ConnectionClosed and triggers a reconnect sequence. This pattern survives network flaps without dropping unprocessed events.

Step 2: Hash Map Grouping & Percentile Calculation

Events arrive out of order and at variable rates. The consumer groups them by hour and campaign ID using a dictionary. Each bucket maintains lists of handle times and wait times. When the current hour advances or the bucket reaches a size threshold, the system calculates percentiles and flushes the data.

from collections import defaultdict
from statistics import quantiles
from datetime import datetime, timezone

class MetricAggregator:
    def __init__(self):
        # Structure: {(hour_bucket, campaign_id): {"handle": [], "wait": [], "count": 0}}
        self.buckets = defaultdict(lambda: {"handle_times": [], "wait_times": [], "count": 0})
        self.flushed_keys = set()
        
    def add_event(self, event: dict):
        payload = event.get("payload", {})
        campaign_id = payload.get("campaignId")
        handle_time = payload.get("handleTime")
        wait_time = payload.get("waitTime")
        timestamp_str = event.get("timestamp") or payload.get("completionTime")
        
        if not all([campaign_id, handle_time is not None, wait_time is not None, timestamp_str]):
            return
            
        # Truncate to hourly bucket: "2024-05-20T14:00:00Z"
        dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
        hour_bucket = dt.replace(minute=0, second=0, microsecond=0).isoformat()
        
        key = (hour_bucket, campaign_id)
        if key in self.flushed_keys:
            return
            
        self.buckets[key]["handle_times"].append(float(handle_time))
        self.buckets[key]["wait_times"].append(float(wait_time))
        self.buckets[key]["count"] += 1
        
    def calculate_percentiles(self, values: list[float], percentiles: list[float]) -> dict:
        if not values:
            return {p: 0.0 for p in percentiles}
        # statistics.quantiles uses linear interpolation between closest observations
        q = quantiles(sorted(values), n=[100/p for p in percentiles], method="inclusive")
        return dict(zip(percentiles, q))
        
    def prepare_flush(self) -> list[tuple]:
        """Returns list of (key, metrics_dict) ready for InfluxDB export."""
        ready = []
        for key, data in self.buckets.items():
            if data["count"] == 0:
                continue
            p_handle = self.calculate_percentiles(data["handle_times"], [50, 90, 95])
            p_wait = self.calculate_percentiles(data["wait_times"], [50, 90, 95])
            ready.append((key, {"handle": p_handle, "wait": p_wait, "count": data["count"]}))
        return ready

The hash map avoids duplicate processing by tracking flushed keys. Percentile calculation uses statistics.quantiles with the inclusive method, which matches standard business intelligence expectations for handle time distributions. The aggregation happens in memory, keeping latency low.

Step 3: InfluxDB HTTP API Export

InfluxDB accepts time-series data via the HTTP Write API. The payload uses line protocol format. The client must handle 429 Too Many Requests with exponential backoff and retry failed batches.

import httpx
import os

INFLUX_URL = os.getenv("INFLUX_URL")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN")
INFLUX_ORG = os.getenv("INFLUX_ORG")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET")

async def write_to_influxdb(flush_items: list[tuple], client: httpx.AsyncClient):
    # HTTP POST /api/v2/write?bucket={bucket}&org={org}&precision=s
    # Headers: Authorization: Token {token}, Content-Type: text/plain
    # Body: cxone_metrics,campaign_id=camp_xyz hour_bucket=2024-05-20T14:00:00Z p50_handle=120.5,p90_handle=150.2,p95_wait=80.1 1716206400
    url = f"{INFLUX_URL}/api/v2/write?bucket={INFLUX_BUCKET}&org={INFLUX_ORG}&precision=s"
    headers = {
        "Authorization": f"Token {INFLUX_TOKEN}",
        "Content-Type": "text/plain"
    }
    
    lines = []
    for (hour_bucket, campaign_id), metrics in flush_items:
        ts = int(datetime.fromisoformat(hour_bucket).timestamp())
        line = (
            f"cxone_metrics,campaign_id={campaign_id} "
            f"hour_bucket=\"{hour_bucket}\" "
            f"p50_handle={metrics['handle'][50]},p90_handle={metrics['handle'][90]},p95_handle={metrics['handle'][95]} "
            f"p50_wait={metrics['wait'][50]},p90_wait={metrics['wait'][90]},p95_wait={metrics['wait'][95]} "
            f"count={metrics['count']}i "
            f"{ts}"
        )
        lines.append(line)
        
    if not lines:
        return
        
    payload = "\n".join(lines)
    retries = 0
    max_retries = 3
    
    while retries < max_retries:
        try:
            response = await client.post(url, content=payload, headers=headers)
            if response.status_code == 204:
                print(f"Successfully exported {len(lines)} line protocol records to InfluxDB.")
                return
            elif response.status_code == 429:
                wait_time = 2 ** retries
                print(f"InfluxDB rate limited (429). Retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
                retries += 1
            else:
                print(f"InfluxDB write failed with {response.status_code}: {response.text}")
                raise httpx.HTTPStatusError(response=response)
        except httpx.RequestError as e:
            print(f"Network error writing to InfluxDB: {e}. Retrying...")
            await asyncio.sleep(2 ** retries)
            retries += 1
            
    print("Failed to write to InfluxDB after maximum retries.")

The line protocol format requires string fields to be quoted and integers to have an i suffix. The retry loop handles transient network failures and rate limits. Production deployments should adjust max_retries and backoff multipliers based on InfluxDB cluster capacity.

Step 4: Checkpointing & Restart Resilience

Process restarts cause in-memory buckets to vanish. The consumer persists processed bucket keys and the last successful flush timestamp to a JSON file. On startup, it loads this state and filters out already-exported keys. Atomic file writes prevent corruption during crashes.

import json
import tempfile
import os

CHECKPOINT_FILE = "checkpoint.json"

async def save_checkpoint(flushed_keys: set, last_flush_time: str):
    data = {
        "flushed_keys": list(flushed_keys),
        "last_flush_time": last_flush_time
    }
    # Atomic write pattern: write to temp file, then rename
    dir_name = os.path.dirname(os.path.abspath(CHECKPOINT_FILE))
    fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
    try:
        with os.fdopen(fd, 'w') as tmp_file:
            json.dump(data, tmp_file)
        os.replace(tmp_path, CHECKPOINT_FILE)
    except Exception as e:
        os.unlink(tmp_path)
        raise e
        
async def load_checkpoint() -> tuple[set, str]:
    if not os.path.exists(CHECKPOINT_FILE):
        return set(), "1970-01-01T00:00:00+00:00"
    try:
        with open(CHECKPOINT_FILE, 'r') as f:
            data = json.load(f)
        return set(data.get("flushed_keys", [])), data.get("last_flush_time", "1970-01-01T00:00:00+00:00")
    except (json.JSONDecodeError, IOError) as e:
        print(f"Checkpoint corrupted or unreadable: {e}. Starting fresh.")
        return set(), "1970-01-01T00:00:00+00:00"

The checkpoint file stores the exact keys that have been exported. When the consumer restarts, it populates flushed_keys from disk. New events matching those keys are ignored, preventing duplicate writes. The atomic replace pattern guarantees the checkpoint is either fully written or not written at all.

Complete Working Example

import asyncio
import httpx
import websockets
import json
import os
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from statistics import quantiles

# Configuration via environment variables
CXONE_ORG_DOMAIN = os.getenv("CXONE_ORG_DOMAIN")
CXONE_CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CXONE_CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
INFLUX_URL = os.getenv("INFLUX_URL")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN")
INFLUX_ORG = os.getenv("INFLUX_ORG")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET")
CHECKPOINT_FILE = "checkpoint.json"
FLUSH_INTERVAL_SECONDS = 3600

CXONE_TOKEN_URL = f"https://{CXONE_ORG_DOMAIN}.cxone.com/oauth/token"
CXONE_WS_URL = f"wss://{CXONE_ORG_DOMAIN}.cxone.com/api/v2/events"
CXONE_SCOPES = "view:interaction read:interaction"

async def fetch_cxone_token() -> str:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            CXONE_TOKEN_URL,
            data={
                "grant_type": "client_credentials",
                "client_id": CXONE_CLIENT_ID,
                "client_secret": CXONE_CLIENT_SECRET,
                "scope": CXONE_SCOPES
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        return response.json()["access_token"]

class MetricAggregator:
    def __init__(self):
        self.buckets = defaultdict(lambda: {"handle_times": [], "wait_times": [], "count": 0})
        self.flushed_keys = set()
        
    def add_event(self, event: dict):
        payload = event.get("payload", {})
        campaign_id = payload.get("campaignId")
        handle_time = payload.get("handleTime")
        wait_time = payload.get("waitTime")
        timestamp_str = event.get("timestamp") or payload.get("completionTime")
        
        if not all([campaign_id, handle_time is not None, wait_time is not None, timestamp_str]):
            return
            
        dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
        hour_bucket = dt.replace(minute=0, second=0, microsecond=0).isoformat()
        key = (hour_bucket, campaign_id)
        
        if key in self.flushed_keys:
            return
            
        self.buckets[key]["handle_times"].append(float(handle_time))
        self.buckets[key]["wait_times"].append(float(wait_time))
        self.buckets[key]["count"] += 1
        
    def calculate_percentiles(self, values: list[float], percentiles: list[float]) -> dict:
        if not values:
            return {p: 0.0 for p in percentiles}
        q = quantiles(sorted(values), n=[100/p for p in percentiles], method="inclusive")
        return dict(zip(percentiles, q))
        
    def prepare_flush(self) -> list[tuple]:
        ready = []
        for key, data in self.buckets.items():
            if data["count"] == 0:
                continue
            p_handle = self.calculate_percentiles(data["handle_times"], [50, 90, 95])
            p_wait = self.calculate_percentiles(data["wait_times"], [50, 90, 95])
            ready.append((key, {"handle": p_handle, "wait": p_wait, "count": data["count"]}))
        return ready

async def write_to_influxdb(flush_items: list[tuple], client: httpx.AsyncClient):
    url = f"{INFLUX_URL}/api/v2/write?bucket={INFLUX_BUCKET}&org={INFLUX_ORG}&precision=s"
    headers = {"Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "text/plain"}
    
    lines = []
    for (hour_bucket, campaign_id), metrics in flush_items:
        ts = int(datetime.fromisoformat(hour_bucket).timestamp())
        line = (
            f"cxone_metrics,campaign_id={campaign_id} "
            f"hour_bucket=\"{hour_bucket}\" "
            f"p50_handle={metrics['handle'][50]},p90_handle={metrics['handle'][90]},p95_handle={metrics['handle'][95]} "
            f"p50_wait={metrics['wait'][50]},p90_wait={metrics['wait'][90]},p95_wait={metrics['wait'][95]} "
            f"count={metrics['count']}i "
            f"{ts}"
        )
        lines.append(line)
        
    if not lines:
        return
        
    payload = "\n".join(lines)
    retries = 0
    max_retries = 3
    
    while retries < max_retries:
        try:
            response = await client.post(url, content=payload, headers=headers)
            if response.status_code == 204:
                print(f"Successfully exported {len(lines)} line protocol records to InfluxDB.")
                return
            elif response.status_code == 429:
                wait_time = 2 ** retries
                print(f"InfluxDB rate limited (429). Retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
                retries += 1
            else:
                print(f"InfluxDB write failed with {response.status_code}: {response.text}")
                raise httpx.HTTPStatusError(response=response)
        except httpx.RequestError as e:
            print(f"Network error writing to InfluxDB: {e}. Retrying...")
            await asyncio.sleep(2 ** retries)
            retries += 1
            
    print("Failed to write to InfluxDB after maximum retries.")

async def save_checkpoint(flushed_keys: set, last_flush_time: str):
    data = {"flushed_keys": list(flushed_keys), "last_flush_time": last_flush_time}
    dir_name = os.path.dirname(os.path.abspath(CHECKPOINT_FILE))
    fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
    try:
        with os.fdopen(fd, 'w') as tmp_file:
            json.dump(data, tmp_file)
        os.replace(tmp_path, CHECKPOINT_FILE)
    except Exception as e:
        os.unlink(tmp_path)
        raise e
        
async def load_checkpoint() -> tuple[set, str]:
    if not os.path.exists(CHECKPOINT_FILE):
        return set(), "1970-01-01T00:00:00+00:00"
    try:
        with open(CHECKPOINT_FILE, 'r') as f:
            data = json.load(f)
        return set(data.get("flushed_keys", [])), data.get("last_flush_time", "1970-01-01T00:00:00+00:00")
    except (json.JSONDecodeError, IOError) as e:
        print(f"Checkpoint corrupted or unreadable: {e}. Starting fresh.")
        return set(), "1970-01-01T00:00:00+00:00"

async def main():
    print("Loading checkpoint...")
    flushed_keys, last_flush_time = await load_checkpoint()
    aggregator = MetricAggregator()
    aggregator.flushed_keys = flushed_keys
    
    token = await fetch_cxone_token()
    token_expiry = datetime.now(timezone.utc).timestamp() + 1600
    
    event_queue = asyncio.Queue()
    
    async def producer():
        while True:
            try:
                await asyncio.sleep(0.1)
                if datetime.now(timezone.utc).timestamp() > token_expiry:
                    print("Token expiring. Refreshing...")
                    token = await fetch_cxone_token()
                    token_expiry = datetime.now(timezone.utc).timestamp() + 1600
            except Exception as e:
                print(f"Token refresh failed: {e}")
                await asyncio.sleep(10)
                
            async with websockets.connect(f"{CXONE_WS_URL}?access_token={token}", ping_interval=20) as ws:
                await ws.send(json.dumps({"subscription": ["interaction.completed"]}))
                async for raw_msg in ws:
                    try:
                        await event_queue.put(json.loads(raw_msg))
                    except json.JSONDecodeError:
                        continue
                    except websockets.exceptions.ConnectionClosed:
                        await asyncio.sleep(5)
                        break
                        
    async def consumer(influx_client: httpx.AsyncClient):
        while True:
            event = await event_queue.get()
            aggregator.add_event(event)
            
            flush_items = aggregator.prepare_flush()
            if flush_items:
                await write_to_influxdb(flush_items, influx_client)
                for key, _ in flush_items:
                    aggregator.flushed_keys.add(key)
                current_time = datetime.now(timezone.utc).isoformat()
                await save_checkpoint(aggregator.flushed_keys, current_time)
                aggregator.buckets.clear()
                
    async with httpx.AsyncClient() as influx_client:
        await asyncio.gather(producer(), consumer(influx_client))

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket Handshake

  • What causes it: The bearer token has expired or the OAuth client lacks the view:interaction scope. CXone rejects the connection immediately.
  • How to fix it: Verify the token generation response contains a valid JWT. Check the scope claim in the decoded token. Implement proactive token refresh 120 seconds before expiry as shown in the producer loop.
  • Code showing the fix: The producer() coroutine tracks token_expiry and fetches a new token before the WebSocket connection drops.

Error: 429 Too Many Requests from InfluxDB

  • What causes it: The consumer flushes too frequently or sends oversized batches. InfluxDB enforces write throughput limits per organization.
  • How to fix it: Increase FLUSH_INTERVAL_SECONDS or implement batch size limits. The retry loop in write_to_influxdb already applies exponential backoff. Add a time.Sleep between batches if writing to a constrained cluster.
  • Code showing the fix: The while retries < max_retries block checks response.status_code == 429 and sleeps for 2 ** retries seconds before retrying.

Error: Malformed Line Protocol or 400 Bad Request

  • What causes it: String fields containing commas, quotes, or equals signs are not escaped. InfluxDB parsers reject unescaped special characters.
  • How to fix it: Escape commas with \,, quotes with \", and equals signs with \= in tag and field values. The example assumes campaign IDs and hour buckets are alphanumeric, but production code should sanitize strings before formatting.
  • Code showing the fix: Replace raw string interpolation with a helper function that escapes special characters: value.replace("\\", "\\\\").replace("\"", "\\\"").replace(",", "\\,").replace("=", "\\=").

Error: Checkpoint File Corruption on Crash

  • What causes it: The process terminates during json.dump(), leaving a partially written file. Subsequent loads fail with JSONDecodeError.
  • How to fix it: Use atomic file replacement. Write to a temporary file in the same directory, flush, then rename. The save_checkpoint function implements this pattern. If corruption occurs, the load_checkpoint function catches the exception and resets to an empty state, preventing infinite crash loops.

Official References