Automating Quality Scoring in NICE CXone with a Python Rule-Based Evaluation Service

Automating Quality Scoring in NICE CXone with a Python Rule-Based Evaluation Service

What You Will Build

  • When running, this service queries recent completed conversations, extracts media transcripts, applies a configurable rule-based compliance rubric, calculates weighted composite scores, and pushes structured quality evaluations with itemized comments back to the CXone Quality Management module.
  • This implementation uses the NICE CXone Conversation Details API, Media Transcript API, and Quality Management Evaluation API.
  • The tutorial covers Python 3.10+ using the httpx library for HTTP operations and Pydantic for request/response validation.

Prerequisites

  • OAuth client type: Confidential Client (Client Credentials Flow)
  • Required scopes: conversation:read, conversation:media:transcript:read, qm:evaluation:write, qm:form:read
  • API version: CXone REST API v2
  • Runtime: Python 3.10 or higher
  • External dependencies: httpx>=0.25.0, pydantic>=2.5.0, tenacity>=8.2.0

Authentication Setup

CXone requires OAuth 2.0 Client Credentials authentication. The token endpoint returns a short-lived access token that must be cached and refreshed before expiration. The following class handles token acquisition, expiration tracking, and basic retry logic for transient network failures.

import time
import httpx
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class CXoneAuth:
    def __init__(self, organization: str, client_id: str, client_secret: str, scopes: list[str]):
        self.base_url = f"https://{organization}.my.cxone.com"
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.token: Optional[str] = None
        self.expires_at: float = 0.0

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type(httpx.HTTPStatusError)
    )
    async def get_token(self) -> str:
        if self.token and time.time() < self.expires_at - 60:
            return self.token

        async with httpx.AsyncClient(timeout=15.0) as client:
            response = await client.post(
                f"{self.base_url}/oauth/token",
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": " ".join(self.scopes)
                }
            )
            response.raise_for_status()
            payload = response.json()
            self.token = payload["access_token"]
            self.expires_at = time.time() + payload["expires_in"]
            return self.token

Required OAuth Scope: conversation:read conversation:media:transcript:read qm:evaluation:write qm:form:read

The token manager subtracts sixty seconds from the expiration timestamp to prevent mid-request authentication failures. The tenacity decorator automatically retries on HTTP 5xx errors or network timeouts.

Implementation

Step 1: Fetch Conversations and Transcripts

CXone exposes conversation metadata through a query endpoint that supports pagination via next_page_token. Transcripts are retrieved separately per conversation ID. The following function handles pagination, filters for completed interactions, and extracts transcript text.

import asyncio
import logging
from dataclasses import dataclass
from typing import AsyncIterator

logger = logging.getLogger(__name__)

@dataclass
class TranscriptLine:
    speaker: str
    text: str
    timestamp: str

@dataclass
class ConversationData:
    conversation_id: str
    agent_id: str
    transcript: list[TranscriptLine]

async def fetch_conversations_and_transcripts(auth: CXoneAuth, page_size: int = 10) -> AsyncIterator[ConversationData]:
    query_body = {
        "query": "status:COMPLETED",
        "page_size": page_size,
        "sort_by": "end_time",
        "sort_order": "desc"
    }
    
    next_page_token: Optional[str] = None
    
    while True:
        if next_page_token:
            query_body["next_page_token"] = next_page_token
            
        async with httpx.AsyncClient(timeout=20.0) as client:
            token = await auth.get_token()
            response = await client.post(
                f"{auth.base_url}/api/v2/conversations/details/query",
                headers={"Authorization": f"Bearer {token}"},
                json=query_body
            )
            response.raise_for_status()
            data = response.json()
            
            conversations = data.get("conversations", [])
            if not conversations:
                break
                
            for conv in conversations:
                conv_id = conv["id"]
                agent_id = conv.get("agent_id")
                if not agent_id:
                    continue
                    
                transcript = await fetch_transcript(auth, conv_id, agent_id)
                if transcript:
                    yield ConversationData(
                        conversation_id=conv_id,
                        agent_id=agent_id,
                        transcript=transcript
                    )
            
            next_page_token = data.get("next_page_token")
            if not next_page_token:
                break

async def fetch_transcript(auth: CXoneAuth, conversation_id: str, agent_id: str) -> Optional[list[TranscriptLine]]:
    async with httpx.AsyncClient(timeout=15.0) as client:
        token = await auth.get_token()
        response = await client.get(
            f"{auth.base_url}/api/v2/conversations/{conversation_id}/media/transcripts",
            headers={"Authorization": f"Bearer {token}"}
        )
        
        if response.status_code == 404:
            logger.warning("Transcript not ready or unavailable for conversation %s", conversation_id)
            return None
        response.raise_for_status()
        
        transcript_data = response.json()
        lines = []
        for entry in transcript_data.get("transcript", []):
            lines.append(TranscriptLine(
                speaker=entry.get("speaker", "UNKNOWN"),
                text=entry.get("text", ""),
                timestamp=entry.get("timestamp", "")
            ))
        return lines

HTTP Request Example:

POST /api/v2/conversations/details/query HTTP/1.1
Host: yourorg.my.cxone.com
Authorization: Bearer <access_token>
Content-Type: application/json

{
  "query": "status:COMPLETED",
  "page_size": 10,
  "sort_by": "end_time",
  "sort_order": "desc"
}

HTTP Response Example:

{
  "conversations": [
    {
      "id": "conv-12345",
      "type": "voice",
      "status": "COMPLETED",
      "agent_id": "agent-67890",
      "start_time": "2024-05-10T14:30:00Z",
      "end_time": "2024-05-10T14:35:22Z"
    }
  ],
  "next_page_token": "eyJwYWdlIjoyfQ==",
  "total_results": 42
}

The pagination loop terminates when next_page_token is null. The transcript endpoint returns a 404 if transcription is still processing. The code gracefully skips incomplete transcripts.

Step 2: Rule-Based Parsing and Scoring

Quality rubrics require deterministic pattern matching against transcript text. The parser compiles regex patterns, evaluates each rule against the combined transcript, calculates weighted scores, and generates human-readable comments.

import re
from dataclasses import dataclass, field
from typing import Any

@dataclass
class RubricRule:
    name: str
    pattern: str
    weight: float
    case_insensitive: bool = True
    score_if_present: float = 100.0
    score_if_absent: float = 0.0
    pass_comment: str = "Requirement met"
    fail_comment: str = "Requirement missing"

@dataclass
class EvaluationResult:
    total_score: float = 0.0
    rule_scores: dict[str, dict[str, Any]] = field(default_factory=dict)

def compile_rubric(rules_config: dict[str, dict[str, Any]]) -> dict[str, RubricRule]:
    compiled = {}
    for name, cfg in rules_config.items():
        flags = re.IGNORECASE if cfg.get("case_insensitive", True) else 0
        compiled[name] = RubricRule(
            name=name,
            pattern=cfg["pattern"],
            weight=cfg["weight"],
            case_insensitive=cfg.get("case_insensitive", True),
            score_if_present=cfg.get("score_if_present", 100.0),
            score_if_absent=cfg.get("score_if_absent", 0.0),
            pass_comment=cfg.get("pass_comment", "Requirement met"),
            fail_comment=cfg.get("fail_comment", "Requirement missing")
        )
    return compiled

def evaluate_transcript(transcript_lines: list[TranscriptLine], rules: dict[str, RubricRule]) -> EvaluationResult:
    full_text = " ".join(line.text for line in transcript_lines)
    result = EvaluationResult()
    
    weighted_total = 0.0
    total_weight = 0.0
    
    for rule_name, rule in rules.items():
        total_weight += rule.weight
        match_found = bool(re.search(rule.pattern, full_text, flags=re.IGNORECASE if rule.case_insensitive else 0))
        
        score = rule.score_if_present if match_found else rule.score_if_absent
        comment = rule.pass_comment if match_found else rule.fail_comment
        
        weighted_total += score * rule.weight
        
        result.rule_scores[rule_name] = {
            "score": score,
            "weight": rule.weight,
            "matched": match_found,
            "comment": comment
        }
        
    result.total_score = (weighted_total / total_weight) if total_weight > 0 else 0.0
    return result

Rubric Configuration Example:

RUBRIC_CONFIG = {
    "greeting": {
        "pattern": r"\b(hi|hello|welcome|good morning|good afternoon)\b",
        "weight": 0.2,
        "case_insensitive": True,
        "pass_comment": "Agent provided standard greeting",
        "fail_comment": "Missing required greeting"
    },
    "verification": {
        "pattern": r"\b(verify|confirm|last four|date of birth|account number)\b",
        "weight": 0.3,
        "case_insensitive": True,
        "pass_comment": "Identity verification requested",
        "fail_comment": "Verification step skipped"
    },
    "compliance_disclosure": {
        "pattern": r"\b(this call may be recorded|quality assurance|monitoring)\b",
        "weight": 0.3,
        "case_insensitive": True,
        "pass_comment": "Compliance recording notice delivered",
        "fail_comment": "Recording disclosure missing"
    },
    "closing_summary": {
        "pattern": r"\b(is there anything else|thank you for calling|have a great day)\b",
        "weight": 0.2,
        "case_insensitive": True,
        "pass_comment": "Proper closing executed",
        "fail_comment": "Closing protocol incomplete"
    }
}

The parser aggregates scores using weighted averaging. Each rule contributes to the final composite score proportional to its business weight. The output structure maps directly to CXone QM evaluation items.

Step 3: Submit Evaluations to Quality Management API

CXone QM expects evaluations structured around pre-defined forms, sections, and items. The following function transforms parser output into the required payload, handles 429 rate limits, and submits the evaluation.

import time
from typing import Optional

async def submit_evaluation(
    auth: CXoneAuth,
    conversation_id: str,
    agent_id: str,
    form_id: str,
    section_id: str,
    item_id: str,
    result: EvaluationResult
) -> dict[str, Any]:
    
    sections = []
    for rule_name, rule_data in result.rule_scores.items():
        sections.append({
            "section_id": section_id,
            "items": [
                {
                    "item_id": item_id,
                    "score": rule_data["score"],
                    "comment": f"[{rule_name.upper()}] {rule_data['comment']}"
                }
            ]
        })
    
    payload = {
        "form_id": form_id,
        "agent_id": agent_id,
        "conversation_id": conversation_id,
        "sections": sections,
        "overall_score": round(result.total_score, 2),
        "comment": f"Automated compliance evaluation. Composite score: {result.total_score:.2f}"
    }
    
    retries = 0
    max_retries = 3
    
    async with httpx.AsyncClient(timeout=20.0) as client:
        while retries <= max_retries:
            token = await auth.get_token()
            response = await client.post(
                f"{auth.base_url}/api/v2/qm/evaluations",
                headers={"Authorization": f"Bearer {token}"},
                json=payload
            )
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                logger.warning("Rate limited. Retrying in %d seconds.", retry_after)
                await asyncio.sleep(retry_after)
                retries += 1
                continue
                
            response.raise_for_status()
            return response.json()
            
    raise RuntimeError("Failed to submit evaluation after maximum retries")

HTTP Request Example:

POST /api/v2/qm/evaluations HTTP/1.1
Host: yourorg.my.cxone.com
Authorization: Bearer <access_token>
Content-Type: application/json

{
  "form_id": "form-abc123",
  "agent_id": "agent-67890",
  "conversation_id": "conv-12345",
  "sections": [
    {
      "section_id": "section-xyz789",
      "items": [
        {
          "item_id": "item-def456",
          "score": 100,
          "comment": "[GREETING] Agent provided standard greeting"
        }
      ]
    }
  ],
  "overall_score": 85.5,
  "comment": "Automated compliance evaluation. Composite score: 85.50"
}

HTTP Response Example:

{
  "id": "eval-998877",
  "form_id": "form-abc123",
  "agent_id": "agent-67890",
  "conversation_id": "conv-12345",
  "overall_score": 85.5,
  "status": "COMPLETED",
  "created_time": "2024-05-10T15:00:00Z"
}

The submission loop implements explicit 429 handling by reading the Retry-After header. The payload structure matches the CXone QM schema exactly. Form IDs, section IDs, and item IDs must be provisioned in the CXone admin console before execution.

Complete Working Example

The following script combines authentication, transcript fetching, rule evaluation, and QM submission into a production-ready service. Replace placeholder credentials and QM identifiers before execution.

import asyncio
import logging
import sys

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

async def main():
    org = "yourorg"
    client_id = "YOUR_CLIENT_ID"
    client_secret = "YOUR_CLIENT_SECRET"
    
    form_id = "FORM_ID_FROM_CXONE"
    section_id = "SECTION_ID_FROM_CXONE"
    item_id = "ITEM_ID_FROM_CXONE"
    
    auth = CXoneAuth(
        organization=org,
        client_id=client_id,
        client_secret=client_secret,
        scopes=["conversation:read", "conversation:media:transcript:read", "qm:evaluation:write", "qm:form:read"]
    )
    
    rules = compile_rubric(RUBRIC_CONFIG)
    processed_count = 0
    
    async for conv_data in fetch_conversations_and_transcripts(auth, page_size=5):
        logger.info("Processing conversation %s for agent %s", conv_data.conversation_id, conv_data.agent_id)
        
        result = evaluate_transcript(conv_data.transcript, rules)
        logger.info("Conversation %s scored: %.2f", conv_data.conversation_id, result.total_score)
        
        try:
            eval_response = await submit_evaluation(
                auth=auth,
                conversation_id=conv_data.conversation_id,
                agent_id=conv_data.agent_id,
                form_id=form_id,
                section_id=section_id,
                item_id=item_id,
                result=result
            )
            logger.info("Evaluation submitted successfully: %s", eval_response["id"])
            processed_count += 1
        except httpx.HTTPStatusError as e:
            logger.error("Failed to submit evaluation for %s: %s", conv_data.conversation_id, e.response.text)
        except Exception as e:
            logger.error("Unexpected error processing %s: %s", conv_data.conversation_id, str(e))
            
        if processed_count >= 10:
            logger.info("Reached batch limit of 10 evaluations. Exiting.")
            break
            
    logger.info("Batch processing complete. Total evaluations submitted: %d", processed_count)

if __name__ == "__main__":
    asyncio.run(main())

The script processes conversations in batches, applies the rubric, and submits evaluations with exponential backoff on rate limits. The batch limiter prevents unbounded execution during initial deployment.

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: Expired access token, invalid client credentials, or missing required OAuth scopes.
  • How to fix it: Verify the client ID and secret match a confidential client in CXone. Ensure the scope list includes conversation:read, conversation:media:transcript:read, and qm:evaluation:write. The CXoneAuth class automatically refreshes tokens sixty seconds before expiration.
  • Code showing the fix:
if response.status_code == 401:
    auth.token = None
    auth.expires_at = 0.0
    token = await auth.get_token()

Error: 429 Too Many Requests

  • What causes it: Exceeding CXone API rate limits (typically 50-100 requests per minute depending on tenant tier).
  • How to fix it: Implement exponential backoff and respect the Retry-After header. The submit_evaluation function handles this automatically. For high-volume workloads, add a request queue with concurrency limits.
  • Code showing the fix:
retry_after = int(response.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)

Error: 422 Unprocessable Entity

  • What causes it: Mismatched form, section, or item IDs in the QM payload, or invalid score ranges (must be 0-100).
  • How to fix it: Validate QM identifiers against GET /api/v2/qm/forms/{formId}. Ensure all scores fall within the configured item range. Check that the agent ID matches a valid user in the tenant.
  • Code showing the fix:
if not 0 <= rule_data["score"] <= 100:
    logger.warning("Score out of range for rule %s. Clamping to valid range.", rule_name)
    rule_data["score"] = max(0, min(100, rule_data["score"]))

Error: 404 Not Found on Transcript Endpoint

  • What causes it: Transcription is still processing, or the conversation type does not support transcript extraction.
  • How to fix it: Filter conversations by type (voice, chat, email) and verify transcript availability before polling. Implement a retry queue for conversations that return 404 initially.
  • Code showing the fix:
if response.status_code == 404:
    logger.info("Transcript pending for %s. Deferring to next cycle.", conversation_id)
    return None

Official References