Automating Quality Scoring in NICE CXone with a Python Rule-Based Evaluation Service
What You Will Build
- When running, this service queries recent completed conversations, extracts media transcripts, applies a configurable rule-based compliance rubric, calculates weighted composite scores, and pushes structured quality evaluations with itemized comments back to the CXone Quality Management module.
- This implementation uses the NICE CXone Conversation Details API, Media Transcript API, and Quality Management Evaluation API.
- The tutorial covers Python 3.10+ using the
httpxlibrary for HTTP operations and Pydantic for request/response validation.
Prerequisites
- OAuth client type: Confidential Client (Client Credentials Flow)
- Required scopes:
conversation:read,conversation:media:transcript:read,qm:evaluation:write,qm:form:read - API version: CXone REST API v2
- Runtime: Python 3.10 or higher
- External dependencies:
httpx>=0.25.0,pydantic>=2.5.0,tenacity>=8.2.0
Authentication Setup
CXone requires OAuth 2.0 Client Credentials authentication. The token endpoint returns a short-lived access token that must be cached and refreshed before expiration. The following class handles token acquisition, expiration tracking, and basic retry logic for transient network failures.
import time
import httpx
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class CXoneAuth:
def __init__(self, organization: str, client_id: str, client_secret: str, scopes: list[str]):
self.base_url = f"https://{organization}.my.cxone.com"
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.token: Optional[str] = None
self.expires_at: float = 0.0
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(httpx.HTTPStatusError)
)
async def get_token(self) -> str:
if self.token and time.time() < self.expires_at - 60:
return self.token
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
f"{self.base_url}/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
)
response.raise_for_status()
payload = response.json()
self.token = payload["access_token"]
self.expires_at = time.time() + payload["expires_in"]
return self.token
Required OAuth Scope: conversation:read conversation:media:transcript:read qm:evaluation:write qm:form:read
The token manager subtracts sixty seconds from the expiration timestamp to prevent mid-request authentication failures. The tenacity decorator automatically retries on HTTP 5xx errors or network timeouts.
Implementation
Step 1: Fetch Conversations and Transcripts
CXone exposes conversation metadata through a query endpoint that supports pagination via next_page_token. Transcripts are retrieved separately per conversation ID. The following function handles pagination, filters for completed interactions, and extracts transcript text.
import asyncio
import logging
from dataclasses import dataclass
from typing import AsyncIterator
logger = logging.getLogger(__name__)
@dataclass
class TranscriptLine:
speaker: str
text: str
timestamp: str
@dataclass
class ConversationData:
conversation_id: str
agent_id: str
transcript: list[TranscriptLine]
async def fetch_conversations_and_transcripts(auth: CXoneAuth, page_size: int = 10) -> AsyncIterator[ConversationData]:
query_body = {
"query": "status:COMPLETED",
"page_size": page_size,
"sort_by": "end_time",
"sort_order": "desc"
}
next_page_token: Optional[str] = None
while True:
if next_page_token:
query_body["next_page_token"] = next_page_token
async with httpx.AsyncClient(timeout=20.0) as client:
token = await auth.get_token()
response = await client.post(
f"{auth.base_url}/api/v2/conversations/details/query",
headers={"Authorization": f"Bearer {token}"},
json=query_body
)
response.raise_for_status()
data = response.json()
conversations = data.get("conversations", [])
if not conversations:
break
for conv in conversations:
conv_id = conv["id"]
agent_id = conv.get("agent_id")
if not agent_id:
continue
transcript = await fetch_transcript(auth, conv_id, agent_id)
if transcript:
yield ConversationData(
conversation_id=conv_id,
agent_id=agent_id,
transcript=transcript
)
next_page_token = data.get("next_page_token")
if not next_page_token:
break
async def fetch_transcript(auth: CXoneAuth, conversation_id: str, agent_id: str) -> Optional[list[TranscriptLine]]:
async with httpx.AsyncClient(timeout=15.0) as client:
token = await auth.get_token()
response = await client.get(
f"{auth.base_url}/api/v2/conversations/{conversation_id}/media/transcripts",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 404:
logger.warning("Transcript not ready or unavailable for conversation %s", conversation_id)
return None
response.raise_for_status()
transcript_data = response.json()
lines = []
for entry in transcript_data.get("transcript", []):
lines.append(TranscriptLine(
speaker=entry.get("speaker", "UNKNOWN"),
text=entry.get("text", ""),
timestamp=entry.get("timestamp", "")
))
return lines
HTTP Request Example:
POST /api/v2/conversations/details/query HTTP/1.1
Host: yourorg.my.cxone.com
Authorization: Bearer <access_token>
Content-Type: application/json
{
"query": "status:COMPLETED",
"page_size": 10,
"sort_by": "end_time",
"sort_order": "desc"
}
HTTP Response Example:
{
"conversations": [
{
"id": "conv-12345",
"type": "voice",
"status": "COMPLETED",
"agent_id": "agent-67890",
"start_time": "2024-05-10T14:30:00Z",
"end_time": "2024-05-10T14:35:22Z"
}
],
"next_page_token": "eyJwYWdlIjoyfQ==",
"total_results": 42
}
The pagination loop terminates when next_page_token is null. The transcript endpoint returns a 404 if transcription is still processing. The code gracefully skips incomplete transcripts.
Step 2: Rule-Based Parsing and Scoring
Quality rubrics require deterministic pattern matching against transcript text. The parser compiles regex patterns, evaluates each rule against the combined transcript, calculates weighted scores, and generates human-readable comments.
import re
from dataclasses import dataclass, field
from typing import Any
@dataclass
class RubricRule:
name: str
pattern: str
weight: float
case_insensitive: bool = True
score_if_present: float = 100.0
score_if_absent: float = 0.0
pass_comment: str = "Requirement met"
fail_comment: str = "Requirement missing"
@dataclass
class EvaluationResult:
total_score: float = 0.0
rule_scores: dict[str, dict[str, Any]] = field(default_factory=dict)
def compile_rubric(rules_config: dict[str, dict[str, Any]]) -> dict[str, RubricRule]:
compiled = {}
for name, cfg in rules_config.items():
flags = re.IGNORECASE if cfg.get("case_insensitive", True) else 0
compiled[name] = RubricRule(
name=name,
pattern=cfg["pattern"],
weight=cfg["weight"],
case_insensitive=cfg.get("case_insensitive", True),
score_if_present=cfg.get("score_if_present", 100.0),
score_if_absent=cfg.get("score_if_absent", 0.0),
pass_comment=cfg.get("pass_comment", "Requirement met"),
fail_comment=cfg.get("fail_comment", "Requirement missing")
)
return compiled
def evaluate_transcript(transcript_lines: list[TranscriptLine], rules: dict[str, RubricRule]) -> EvaluationResult:
full_text = " ".join(line.text for line in transcript_lines)
result = EvaluationResult()
weighted_total = 0.0
total_weight = 0.0
for rule_name, rule in rules.items():
total_weight += rule.weight
match_found = bool(re.search(rule.pattern, full_text, flags=re.IGNORECASE if rule.case_insensitive else 0))
score = rule.score_if_present if match_found else rule.score_if_absent
comment = rule.pass_comment if match_found else rule.fail_comment
weighted_total += score * rule.weight
result.rule_scores[rule_name] = {
"score": score,
"weight": rule.weight,
"matched": match_found,
"comment": comment
}
result.total_score = (weighted_total / total_weight) if total_weight > 0 else 0.0
return result
Rubric Configuration Example:
RUBRIC_CONFIG = {
"greeting": {
"pattern": r"\b(hi|hello|welcome|good morning|good afternoon)\b",
"weight": 0.2,
"case_insensitive": True,
"pass_comment": "Agent provided standard greeting",
"fail_comment": "Missing required greeting"
},
"verification": {
"pattern": r"\b(verify|confirm|last four|date of birth|account number)\b",
"weight": 0.3,
"case_insensitive": True,
"pass_comment": "Identity verification requested",
"fail_comment": "Verification step skipped"
},
"compliance_disclosure": {
"pattern": r"\b(this call may be recorded|quality assurance|monitoring)\b",
"weight": 0.3,
"case_insensitive": True,
"pass_comment": "Compliance recording notice delivered",
"fail_comment": "Recording disclosure missing"
},
"closing_summary": {
"pattern": r"\b(is there anything else|thank you for calling|have a great day)\b",
"weight": 0.2,
"case_insensitive": True,
"pass_comment": "Proper closing executed",
"fail_comment": "Closing protocol incomplete"
}
}
The parser aggregates scores using weighted averaging. Each rule contributes to the final composite score proportional to its business weight. The output structure maps directly to CXone QM evaluation items.
Step 3: Submit Evaluations to Quality Management API
CXone QM expects evaluations structured around pre-defined forms, sections, and items. The following function transforms parser output into the required payload, handles 429 rate limits, and submits the evaluation.
import time
from typing import Optional
async def submit_evaluation(
auth: CXoneAuth,
conversation_id: str,
agent_id: str,
form_id: str,
section_id: str,
item_id: str,
result: EvaluationResult
) -> dict[str, Any]:
sections = []
for rule_name, rule_data in result.rule_scores.items():
sections.append({
"section_id": section_id,
"items": [
{
"item_id": item_id,
"score": rule_data["score"],
"comment": f"[{rule_name.upper()}] {rule_data['comment']}"
}
]
})
payload = {
"form_id": form_id,
"agent_id": agent_id,
"conversation_id": conversation_id,
"sections": sections,
"overall_score": round(result.total_score, 2),
"comment": f"Automated compliance evaluation. Composite score: {result.total_score:.2f}"
}
retries = 0
max_retries = 3
async with httpx.AsyncClient(timeout=20.0) as client:
while retries <= max_retries:
token = await auth.get_token()
response = await client.post(
f"{auth.base_url}/api/v2/qm/evaluations",
headers={"Authorization": f"Bearer {token}"},
json=payload
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning("Rate limited. Retrying in %d seconds.", retry_after)
await asyncio.sleep(retry_after)
retries += 1
continue
response.raise_for_status()
return response.json()
raise RuntimeError("Failed to submit evaluation after maximum retries")
HTTP Request Example:
POST /api/v2/qm/evaluations HTTP/1.1
Host: yourorg.my.cxone.com
Authorization: Bearer <access_token>
Content-Type: application/json
{
"form_id": "form-abc123",
"agent_id": "agent-67890",
"conversation_id": "conv-12345",
"sections": [
{
"section_id": "section-xyz789",
"items": [
{
"item_id": "item-def456",
"score": 100,
"comment": "[GREETING] Agent provided standard greeting"
}
]
}
],
"overall_score": 85.5,
"comment": "Automated compliance evaluation. Composite score: 85.50"
}
HTTP Response Example:
{
"id": "eval-998877",
"form_id": "form-abc123",
"agent_id": "agent-67890",
"conversation_id": "conv-12345",
"overall_score": 85.5,
"status": "COMPLETED",
"created_time": "2024-05-10T15:00:00Z"
}
The submission loop implements explicit 429 handling by reading the Retry-After header. The payload structure matches the CXone QM schema exactly. Form IDs, section IDs, and item IDs must be provisioned in the CXone admin console before execution.
Complete Working Example
The following script combines authentication, transcript fetching, rule evaluation, and QM submission into a production-ready service. Replace placeholder credentials and QM identifiers before execution.
import asyncio
import logging
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
async def main():
org = "yourorg"
client_id = "YOUR_CLIENT_ID"
client_secret = "YOUR_CLIENT_SECRET"
form_id = "FORM_ID_FROM_CXONE"
section_id = "SECTION_ID_FROM_CXONE"
item_id = "ITEM_ID_FROM_CXONE"
auth = CXoneAuth(
organization=org,
client_id=client_id,
client_secret=client_secret,
scopes=["conversation:read", "conversation:media:transcript:read", "qm:evaluation:write", "qm:form:read"]
)
rules = compile_rubric(RUBRIC_CONFIG)
processed_count = 0
async for conv_data in fetch_conversations_and_transcripts(auth, page_size=5):
logger.info("Processing conversation %s for agent %s", conv_data.conversation_id, conv_data.agent_id)
result = evaluate_transcript(conv_data.transcript, rules)
logger.info("Conversation %s scored: %.2f", conv_data.conversation_id, result.total_score)
try:
eval_response = await submit_evaluation(
auth=auth,
conversation_id=conv_data.conversation_id,
agent_id=conv_data.agent_id,
form_id=form_id,
section_id=section_id,
item_id=item_id,
result=result
)
logger.info("Evaluation submitted successfully: %s", eval_response["id"])
processed_count += 1
except httpx.HTTPStatusError as e:
logger.error("Failed to submit evaluation for %s: %s", conv_data.conversation_id, e.response.text)
except Exception as e:
logger.error("Unexpected error processing %s: %s", conv_data.conversation_id, str(e))
if processed_count >= 10:
logger.info("Reached batch limit of 10 evaluations. Exiting.")
break
logger.info("Batch processing complete. Total evaluations submitted: %d", processed_count)
if __name__ == "__main__":
asyncio.run(main())
The script processes conversations in batches, applies the rubric, and submits evaluations with exponential backoff on rate limits. The batch limiter prevents unbounded execution during initial deployment.
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: Expired access token, invalid client credentials, or missing required OAuth scopes.
- How to fix it: Verify the client ID and secret match a confidential client in CXone. Ensure the scope list includes
conversation:read,conversation:media:transcript:read, andqm:evaluation:write. TheCXoneAuthclass automatically refreshes tokens sixty seconds before expiration. - Code showing the fix:
if response.status_code == 401:
auth.token = None
auth.expires_at = 0.0
token = await auth.get_token()
Error: 429 Too Many Requests
- What causes it: Exceeding CXone API rate limits (typically 50-100 requests per minute depending on tenant tier).
- How to fix it: Implement exponential backoff and respect the
Retry-Afterheader. Thesubmit_evaluationfunction handles this automatically. For high-volume workloads, add a request queue with concurrency limits. - Code showing the fix:
retry_after = int(response.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)
Error: 422 Unprocessable Entity
- What causes it: Mismatched form, section, or item IDs in the QM payload, or invalid score ranges (must be 0-100).
- How to fix it: Validate QM identifiers against
GET /api/v2/qm/forms/{formId}. Ensure all scores fall within the configured item range. Check that the agent ID matches a valid user in the tenant. - Code showing the fix:
if not 0 <= rule_data["score"] <= 100:
logger.warning("Score out of range for rule %s. Clamping to valid range.", rule_name)
rule_data["score"] = max(0, min(100, rule_data["score"]))
Error: 404 Not Found on Transcript Endpoint
- What causes it: Transcription is still processing, or the conversation type does not support transcript extraction.
- How to fix it: Filter conversations by
type(voice, chat, email) and verify transcript availability before polling. Implement a retry queue for conversations that return 404 initially. - Code showing the fix:
if response.status_code == 404:
logger.info("Transcript pending for %s. Deferring to next cycle.", conversation_id)
return None