Generating NICE CXone Outbound Dialer Performance Reports with Python

Generating NICE CXone Outbound Dialer Performance Reports with Python

What You Will Build

  • A Python scheduler that queries the NICE CXone Campaign API for hourly dialer metrics, calculates answer rate and connect rate using pandas aggregations, compares performance against historical baselines to detect anomalies, exports formatted reports to PDF using ReportLab, and distributes results to stakeholders via email with executive summaries and drill-down links.
  • This tutorial uses the CXone REST Reporting API and OAuth 2.0 Client Credentials flow.
  • The implementation is written in Python 3.9+ with requests, pandas, reportlab, schedule, and standard library email modules.

Prerequisites

  • OAuth Client Type: Confidential client (Server-to-Server)
  • Required Scopes: reporting:read, campaigns:read
  • API Version: CXone REST API v1
  • Runtime: Python 3.9 or higher
  • Dependencies:
    pip install requests pandas reportlab schedule python-dateutil
    

Authentication Setup

NICE CXone uses standard OAuth 2.0 Client Credentials flow. The token endpoint requires your environment base URL, client ID, client secret, and explicit scopes. Tokens expire after 3600 seconds. You must cache the token and refresh it before expiration to avoid unnecessary authentication overhead and prevent 401 interruptions during batch reporting.

The following class handles token acquisition, caching, and automatic refresh. It includes retry logic for 429 rate limits and explicit error handling for 401 and 403 responses.

import time
import requests
from typing import Optional, Dict, Any

class CXoneAuthClient:
    def __init__(self, base_url: str, client_id: str, client_secret: str, scopes: str = "reporting:read campaigns:read"):
        self.base_url = base_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.token: Optional[str] = None
        self.token_expiry: float = 0.0

    def _request_token(self) -> Dict[str, Any]:
        url = f"{self.base_url}/oauth/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": self.scopes
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        
        # Retry logic for 429 rate limits
        max_retries = 3
        for attempt in range(max_retries):
            response = requests.post(url, data=payload, headers=headers)
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                time.sleep(retry_after)
                continue
            response.raise_for_status()
            return response.json()
        
        raise requests.exceptions.HTTPError("Exceeded retry limit for 429 on token request")

    def get_token(self) -> str:
        if self.token and time.time() < self.token_expiry - 60:
            return self.token
            
        data = self._request_token()
        self.token = data.get("access_token")
        self.token_expiry = time.time() + data.get("expires_in", 3600)
        
        if not self.token:
            raise ValueError("OAuth response missing access_token")
            
        return self.token

Implementation

Step 1: Querying Campaign Dialer Metrics

The CXone Reporting API operates asynchronously. You POST a query definition to /api/v1/reporting/queries, receive a queryId, and then poll /api/v1/reporting/queries/{queryId}/results until the status completes. The query payload defines metrics, dimensions, filters, and time ranges. You must handle pagination using limit and offset parameters until hasMore returns false.

import pandas as pd
from datetime import datetime, timezone

class CXoneMetricsFetcher:
    def __init__(self, auth: CXoneAuthClient):
        self.auth = auth
        self.base_url = auth.base_url

    def _poll_results(self, query_id: str, limit: int = 100) -> list:
        all_data = []
        offset = 0
        max_retries = 3
        
        while True:
            url = f"{self.base_url}/api/v1/reporting/queries/{query_id}/results"
            params = {"limit": limit, "offset": offset}
            headers = {"Authorization": f"Bearer {self.auth.get_token()}"}
            
            for attempt in range(max_retries):
                response = requests.get(url, headers=headers, params=params)
                if response.status_code == 429:
                    time.sleep(int(response.headers.get("Retry-After", 5)))
                    continue
                if response.status_code == 401:
                    self.auth.token = None  # Force refresh
                    continue
                response.raise_for_status()
                break
            else:
                raise requests.exceptions.HTTPError("Failed to fetch query results after retries")
                
            result = response.json()
            all_data.extend(result.get("data", []))
            
            if not result.get("hasMore", False):
                break
            offset += limit
            time.sleep(0.5)  # Rate limit mitigation
            
        return all_data

    def fetch_hourly_campaign_metrics(self, campaign_id: str, start_date: str, end_date: str) -> pd.DataFrame:
        query_payload = {
            "reportType": "campaign",
            "metrics": ["calls_attempted", "calls_connected", "calls_answered"],
            "dimensions": ["campaignId", "hour"],
            "filters": [
                {"dimension": "campaignId", "operator": "eq", "values": [campaign_id]}
            ],
            "timeRange": {
                "from": start_date,
                "to": end_date
            }
        }
        
        url = f"{self.base_url}/api/v1/reporting/queries"
        headers = {"Authorization": f"Bearer {self.auth.get_token()}", "Content-Type": "application/json"}
        
        response = requests.post(url, json=query_payload, headers=headers)
        response.raise_for_status()
        query_id = response.json().get("queryId")
        
        raw_data = self._poll_results(query_id)
        df = pd.DataFrame(raw_data)
        
        if df.empty:
            return pd.DataFrame(columns=["campaignId", "hour", "calls_attempted", "calls_connected", "calls_answered"])
            
        df["hour"] = pd.to_datetime(df["hour"], utc=True)
        return df[["campaignId", "hour", "calls_attempted", "calls_connected", "calls_answered"]]

Step 2: Calculating KPIs and Detecting Anomalies

Raw metric counts do not provide actionable insights. You must calculate answer rate (answered / attempted) and connect rate (connected / attempted). Pandas vectorized operations handle this efficiently. For anomaly detection, you compare hourly KPIs against a rolling historical baseline. A z-score threshold of 2.0 identifies statistically significant deviations. This approach isolates dialer configuration changes, carrier failures, or script issues from normal variance.

def calculate_kpis_and_anomalies(df: pd.DataFrame, z_threshold: float = 2.0) -> pd.DataFrame:
    if df.empty:
        return df
        
    df = df.copy()
    
    # Prevent division by zero
    df["answer_rate"] = df["calls_answered"] / df["calls_attempted"].replace(0, float("nan"))
    df["connect_rate"] = df["calls_connected"] / df["calls_attempted"].replace(0, float("nan"))
    
    # Rolling baseline calculation (30-day window, hourly granularity)
    # Sort by hour to ensure temporal order
    df = df.sort_values("hour")
    
    rolling_window = 720  # 30 days * 24 hours
    df["answer_rate_mean"] = df["answer_rate"].rolling(window=rolling_window, min_periods=100).mean()
    df["answer_rate_std"] = df["answer_rate"].rolling(window=rolling_window, min_periods=100).std()
    
    df["connect_rate_mean"] = df["connect_rate"].rolling(window=rolling_window, min_periods=100).mean()
    df["connect_rate_std"] = df["connect_rate"].rolling(window=rolling_window, min_periods=100).std()
    
    # Z-score calculation for anomaly detection
    df["answer_rate_z"] = (df["answer_rate"] - df["answer_rate_mean"]) / df["answer_rate_std"].replace(0, float("nan"))
    df["connect_rate_z"] = (df["connect_rate"] - df["connect_rate_mean"]) / df["connect_rate_std"].replace(0, float("nan"))
    
    df["is_anomaly"] = (df["answer_rate_z"].abs() > z_threshold) | (df["connect_rate_z"].abs() > z_threshold)
    
    return df

Step 3: Generating PDF Reports with ReportLab

Stakeholders require structured, printable output. ReportLab provides deterministic PDF generation without browser rendering dependencies. You construct a table with key metrics, highlight anomalies, and append an executive summary. The table style enforces consistent formatting, and the PDF includes drill-down URLs pointing to the CXone analytics dashboard.

from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet

def generate_pdf_report(df: pd.DataFrame, output_path: str, campaign_name: str, cxone_base_url: str) -> None:
    doc = SimpleDocTemplate(output_path, pagesize=letter, topMargin=30, bottomMargin=30)
    styles = getSampleStyleSheet()
    elements = []
    
    title = Paragraph(f"Dialer Performance Report: {campaign_name}", styles["Title"])
    elements.append(title)
    elements.append(Spacer(1, 15))
    
    # Executive Summary
    total_attempted = int(df["calls_attempted"].sum())
    total_answered = int(df["calls_answered"].sum())
    total_connected = int(df["calls_connected"].sum())
    overall_answer_rate = total_answered / total_attempted if total_attempted > 0 else 0
    anomalies_count = int(df["is_anomaly"].sum())
    
    summary_text = (
        f"Report covers {len(df)} hourly intervals. "
        f"Total Attempts: {total_attempted:,}. "
        f"Overall Answer Rate: {overall_answer_rate:.2%}. "
        f"Anomalous Hours Detected: {anomalies_count}. "
        f"Drill-down: {cxone_base_url}/analytics/campaigns"
    )
    elements.append(Paragraph(summary_text, styles["Normal"]))
    elements.append(Spacer(1, 20))
    
    # Table Data
    table_data = [["Hour", "Attempted", "Answered", "Connected", "Answer Rate", "Connect Rate", "Status"]]
    
    for _, row in df.iterrows():
        status = "ANOMALY" if row["is_anomaly"] else "Normal"
        table_data.append([
            row["hour"].strftime("%Y-%m-%d %H:%M"),
            int(row["calls_attempted"]),
            int(row["calls_answered"]),
            int(row["calls_connected"]),
            f"{row['answer_rate']:.2%}",
            f"{row['connect_rate']:.2%}",
            status
        ])
        
    table = Table(table_data, colWidths=[120, 80, 80, 80, 80, 80, 80])
    style = TableStyle([
        ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#2E2E2E")),
        ("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
        ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
        ("FONTSIZE", (0, 0), (-1, 0), 10),
        ("ALIGN", (0, 0), (-1, -1), "CENTER"),
        ("FONTNAME", (0, 1), (-1, -1), "Helvetica"),
        ("FONTSIZE", (0, 1), (-1, -1), 9),
        ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.HexColor("#F8F9FA"), colors.white]),
        ("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
        ("TOPPADDING", (0, 0), (-1, -1), 6),
        ("BOTTOMPADDING", (0, 0), (-1, -1), 6),
    ])
    
    # Highlight anomalies in red
    for i, row in enumerate(df.itertuples(), start=1):
        if row.is_anomaly:
            style.add("BACKGROUND", (0, i), (-1, i), colors.HexColor("#FFE6E6"))
            
    table.setStyle(style)
    elements.append(table)
    
    doc.build(elements)

Step 4: Scheduling and Email Distribution

You integrate the scheduler using the schedule library. The email distribution uses standard library smtplib and email modules to send the PDF attachment with a plain-text executive summary. This avoids third-party email service dependencies and maintains full control over SMTP authentication and message formatting.

import smtplib
import schedule
import time as time_module
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

def send_report_email(pdf_path: str, recipients: list, smtp_config: dict, summary: str) -> None:
    msg = MIMEMultipart()
    msg["From"] = smtp_config["sender"]
    msg["To"] = ", ".join(recipients)
    msg["Subject"] = "CXone Dialer Performance Report"
    
    msg.attach(MIMEText(summary, "plain"))
    
    with open(pdf_path, "rb") as attachment:
        part = MIMEBase("application", "octet-stream")
        part.set_payload(attachment.read())
        encoders.encode_base64(part)
        part.add_header("Content-Disposition", f"attachment; filename= {pdf_path.split('/')[-1]}")
        msg.attach(part)
        
    with smtplib.SMTP(smtp_config["host"], smtp_config["port"]) as server:
        server.starttls()
        server.login(smtp_config["username"], smtp_config["password"])
        server.sendmail(smtp_config["sender"], recipients, msg.as_string())

def run_daily_report(campaign_id: str, campaign_name: str, cxone_base_url: str, smtp_config: dict, recipients: list) -> None:
    auth = CXoneAuthClient(cxone_base_url, smtp_config["client_id"], smtp_config["client_secret"])
    fetcher = CXoneMetricsFetcher(auth)
    
    # Fetch last 24 hours
    end_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    start_time = (datetime.now(timezone.utc) - pd.Timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    df = fetcher.fetch_hourly_campaign_metrics(campaign_id, start_time, end_time)
    df = calculate_kpis_and_anomalies(df)
    
    pdf_path = f"reports/{campaign_name}_report_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M')}.pdf"
    generate_pdf_report(df, pdf_path, campaign_name, cxone_base_url)
    
    total_anomalies = int(df["is_anomaly"].sum())
    overall_rate = (df["calls_answered"].sum() / df["calls_attempted"].sum()) if df["calls_attempted"].sum() > 0 else 0
    summary = f"Dialer Report for {campaign_name}. Overall Answer Rate: {overall_rate:.2%}. Anomalies Detected: {total_anomalies}. PDF attached for detailed hourly breakdown."
    
    send_report_email(pdf_path, recipients, smtp_config, summary)
    print(f"Report generated and emailed successfully at {datetime.now(timezone.utc)}")

Complete Working Example

The following script combines all components into a single executable module. You must replace the credential placeholders with your CXone environment values. The scheduler runs daily at 08:00 UTC.

import os
import pandas as pd
from datetime import datetime, timezone
import schedule
import time
import requests

# Import classes and functions defined in previous sections
# In production, place these in separate modules: auth.py, fetcher.py, analytics.py, report.py, scheduler.py

def main():
    # Configuration
    CXONE_BASE_URL = "https://api.nicecxone.com"
    CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
    CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
    CAMPAIGN_ID = os.getenv("CXONE_CAMPAIGN_ID")
    CAMPAIGN_NAME = "Enterprise_Outbound_2024"
    
    SMTP_CONFIG = {
        "host": os.getenv("SMTP_HOST", "smtp.company.com"),
        "port": int(os.getenv("SMTP_PORT", 587)),
        "username": os.getenv("SMTP_USER"),
        "password": os.getenv("SMTP_PASS"),
        "sender": os.getenv("REPORT_SENDER"),
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    
    RECIPIENTS = [email.strip() for email in os.getenv("REPORT_RECIPIENTS", "manager@company.com,ops@company.com").split(",")]
    
    # Validate required environment variables
    missing = [k for k in ["CXONE_CLIENT_ID", "CXONE_CLIENT_SECRET", "CXONE_CAMPAIGN_ID", "SMTP_USER", "SMTP_PASS", "REPORT_SENDER"] if not os.getenv(k)]
    if missing:
        raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
        
    # Create reports directory
    os.makedirs("reports", exist_ok=True)
    
    # Schedule daily execution at 08:00 UTC
    schedule.every().day.at("08:00").do(run_daily_report, CAMPAIGN_ID, CAMPAIGN_NAME, CXONE_BASE_URL, SMTP_CONFIG, RECIPIENTS)
    
    print(f"Scheduler initialized. Next run at 08:00 UTC. Campaign: {CAMPAIGN_NAME}")
    
    # Keep the script running
    while True:
        schedule.run_pending()
        time.sleep(60)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired access token, missing reporting:read scope, or incorrect client credentials.
  • Fix: Verify the OAuth client has both reporting:read and campaigns:read scopes assigned in the CXone admin console. Ensure the CXoneAuthClient refreshes the token before expiration. Check that the base URL matches your environment exactly.

Error: 403 Forbidden

  • Cause: The OAuth client lacks permissions to access the specific campaign or reporting data.
  • Fix: Assign the client to a user group with Campaign Read and Reporting Read permissions. Verify the campaignId in the filter matches an active campaign in your environment.

Error: 429 Too Many Requests

  • Cause: Exceeding CXone API rate limits during polling or pagination.
  • Fix: The implementation includes exponential backoff and Retry-After header parsing. If cascading 429s occur, increase the sleep interval between pagination requests or reduce the query time range to split large datasets into smaller batches.

Error: 500 Internal Server Error

  • Cause: Malformed query payload or unsupported metric names in the environment.
  • Fix: Validate the JSON payload against the CXone Reporting API schema. Metric names vary by environment configuration. Use the CXone API Explorer to verify available metrics for your specific deployment. Ensure reportType matches campaign.

Error: Pandas DivisionByZeroWarning

  • Cause: Hours with zero attempted calls cause NaN rates.
  • Fix: The calculate_kpis_and_anomalies function replaces zero attempts with NaN before division. Ensure downstream code handles NaN values gracefully during aggregation or export.

Official References