Handling NICE CXone Voice Call Transfers with Python
What You Will Build
- A Python service that executes blind and consult transfers to CXone queues while enforcing skill-based routing and agent availability checks.
- The implementation uses the NICE CXone Voice API, Routing API, Analytics API, and User Presence API.
- The tutorial covers Python 3.9+ with
requests,pydantic, and structured logging for production call control workflows.
Prerequisites
- OAuth Client Type: Machine-to-Machine (Client Credentials) with grant type
client_credentials - Required Scopes:
voice:call:control,voice:queue:read,routing:skill:read,analytics:conversation:read,user:presence:read,voice:campaign:read - API Version: CXone REST API v2
- Runtime: Python 3.9 or higher
- Dependencies:
requests>=2.31.0,pydantic>=2.5.0,pydantic-settings>=2.1.0
Install dependencies before proceeding:
pip install requests pydantic pydantic-settings
Authentication Setup
CXone uses OAuth 2.0 Client Credentials flow. You must cache tokens and implement automatic refresh to avoid 401 interruptions during long-running call control processes. The following class handles token acquisition, storage, and retry logic for 429 rate limits.
import time
import logging
import requests
from typing import Optional
from pydantic import BaseModel, SecretStr
from pydantic_settings import BaseSettings
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("cxone_transfer")
class CXoneSettings(BaseSettings):
environment: str # e.g., "us-02" or "eu-01"
client_id: str
client_secret: SecretStr
base_url: str = ""
def model_post_init(self, __context) -> None:
self.base_url = f"https://{self.environment}.nicecv.com"
class TokenManager:
def __init__(self, settings: CXoneSettings):
self.settings = settings
self.token_url = f"{self.settings.base_url}/api/v2/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: float = 0.0
def _get_headers(self) -> dict:
return {
"Authorization": f"Basic {requests.auth.HTTPBasicAuth(self.settings.client_id, self.settings.client_secret.get_secret_value()).encode()}",
"Content-Type": "application/x-www-form-urlencoded"
}
def get_token(self) -> str:
if self.access_token and time.time() < self.expires_at:
return self.access_token
payload = "grant_type=client_credentials"
response = requests.post(self.token_url, headers=self._get_headers(), data=payload)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.expires_at = time.time() + (data["expires_in"] - 60) # Refresh 60s early
logger.info("OAuth token refreshed successfully")
return self.access_token
def make_request(self, method: str, path: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.get_token()}"
headers["Content-Type"] = "application/json"
kwargs["headers"] = headers
max_retries = 3
for attempt in range(max_retries):
response = requests.request(method, f"{self.settings.base_url}{path}", **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"Rate limited on {path}. Retrying in {retry_after}s (attempt {attempt + 1})")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
raise RuntimeError(f"Max retries exceeded for {path}")
Implementation
Step 1: Construct Transfer Request Payloads with Target Queue and Skill Parameters
CXone transfer endpoints require explicit target definitions. You must specify the transfer type, target type, target identifier, and optional skill routing. The following Pydantic model enforces payload structure and prevents malformed requests.
from enum import Enum
from pydantic import BaseModel, Field
class TransferType(str, Enum):
BLIND = "BLIND"
CONSULT = "CONSULT"
class TargetType(str, Enum):
QUEUE = "QUEUE"
EXTENSION = "EXTENSION"
SKILL = "SKILL"
NUMBER = "NUMBER"
class TransferPayload(BaseModel):
transferType: TransferType
targetType: TargetType
targetId: str
skillId: Optional[str] = None
priority: int = Field(default=1, ge=1, le=5)
callbackNumber: Optional[str] = None
You construct payloads by instantiating the model. The targetId resolves to a CXone queue UUID, extension, or skill identifier. The skillId parameter enables skill-based routing within the target queue.
def build_transfer_payload(
transfer_type: TransferType,
target_type: TargetType,
target_id: str,
skill_id: Optional[str] = None
) -> dict:
payload = TransferPayload(
transferType=transfer_type,
targetType=target_type,
targetId=target_id,
skillId=skill_id,
priority=2
)
return payload.model_dump(exclude_none=True)
Expected Response: The Voice API returns 202 Accepted with a JSON body containing the transfer request ID and initial state.
Step 2: Validate Transfer Eligibility Based on Agent Availability and Campaign Rules
Before executing a transfer, you must verify that the target queue contains available agents matching the required skill. You also validate that the queue belongs to an active campaign. The following function queries the Queue Agents endpoint and filters by status and skill.
def validate_transfer_eligibility(
token_mgr: TokenManager,
queue_id: str,
skill_id: Optional[str] = None
) -> tuple[bool, str]:
# Check campaign association and active status
campaign_response = token_mgr.make_request("GET", f"/api/v2/voice/campaigns?queueId={queue_id}")
campaigns = campaign_response.json()
if not campaigns:
return False, "Queue is not associated with any active campaign"
active_campaign = next((c for c in campaigns if c.get("status") == "ACTIVE"), None)
if not active_campaign:
return False, "Campaign is not active"
# Check agent availability
agents_response = token_mgr.make_request("GET", f"/api/v2/voice/queues/{queue_id}/agents")
agents = agents_response.json()
available_agents = []
for agent in agents:
if agent.get("status") != "AVAILABLE":
continue
if skill_id:
agent_skills = agent.get("skills", [])
if not any(s.get("skillId") == skill_id for s in agent_skills):
continue
available_agents.append(agent.get("userId"))
if not available_agents:
return False, f"No available agents with required skill in queue {queue_id}"
return True, f"Eligible. {len(available_agents)} agent(s) available."
Error Handling: The function returns a boolean and a descriptive message. If the API returns 403, the token manager raises an exception caught by the caller. If the queue has zero agents, the system routes to fallback logic later.
Step 3: Manage Call Control State Transitions via the Voice API
CXone transfers are asynchronous. You must poll the call state to track transitions from WAITING to IN_TRANSFER to CONNECTED or FAILED. The following function executes the transfer and monitors state changes.
def execute_transfer(
token_mgr: TokenManager,
call_id: str,
payload: dict,
poll_interval: int = 3,
timeout: int = 30
) -> dict:
# Initiate transfer
transfer_response = token_mgr.make_request("POST", f"/api/v2/voice/calls/{call_id}/transfer", json=payload)
transfer_data = transfer_response.json()
logger.info(f"Transfer initiated for call {call_id}. Request ID: {transfer_data.get('id')}")
# Poll call state
start_time = time.time()
while time.time() - start_time < timeout:
state_response = token_mgr.make_request("GET", f"/api/v2/voice/calls/{call_id}")
call_data = state_response.json()
current_state = call_data.get("state")
if current_state in ("CONNECTED", "FAILED", "DISCONNECTED"):
logger.info(f"Call {call_id} reached final state: {current_state}")
return call_data
time.sleep(poll_interval)
logger.warning(f"Transfer monitoring timed out for call {call_id}")
return token_mgr.make_request("GET", f"/api/v2/voice/calls/{call_id}").json()
OAuth Scope: voice:call:control is required for POST /api/v2/voice/calls/{callId}/transfer and GET /api/v2/voice/calls/{callId}.
Step 4: Handle Transfer Failures with Fallback Routing Logic
When a transfer fails due to queue overflow, skill mismatch, or network timeout, you must route the call to a predefined fallback destination. The following function implements a retry chain with voicemail fallback.
def handle_transfer_failure(
token_mgr: TokenManager,
call_id: str,
fallback_queue_id: str,
voicemail_extension: str
) -> dict:
# Attempt fallback queue
fallback_payload = build_transfer_payload(TransferType.BLIND, TargetType.QUEUE, fallback_queue_id)
try:
is_eligible, msg = validate_transfer_eligibility(token_mgr, fallback_queue_id)
if is_eligible:
logger.info(f"Routing to fallback queue {fallback_queue_id}")
return execute_transfer(token_mgr, call_id, fallback_payload)
except Exception as e:
logger.error(f"Fallback queue validation failed: {e}")
# Route to voicemail
logger.info(f"Routing call {call_id} to voicemail at {voicemail_extension}")
vm_payload = build_transfer_payload(TransferType.BLIND, TargetType.EXTENSION, voicemail_extension)
return execute_transfer(token_mgr, call_id, vm_payload)
Edge Case Handling: If both primary and fallback queues are unavailable, the system automatically bridges to a voicemail extension. The try/except block ensures transient API errors do not drop the call.
Step 5: Log Transfer Metrics, Sync Presence, and Generate Audit Trails
Workforce analytics require precise transfer timestamps, agent presence states, and audit records. The following function queries the Analytics API, synchronizes agent presence, and writes structured audit logs.
def log_transfer_metrics_and_audit(
token_mgr: TokenManager,
call_id: str,
target_queue_id: str,
agent_id: Optional[str] = None
) -> dict:
# Query analytics for transfer duration and success rate
analytics_query = {
"from": "2023-01-01T00:00:00.000Z",
"to": "2023-12-31T23:59:59.999Z",
"size": 1000,
"aggregations": {
"transferMetrics": {
"metrics": [
{"name": "transferSuccess", "type": "count"},
{"name": "transferDuration", "type": "average"}
],
"filters": [{"type": "call", "field": "callId", "values": [call_id]}]
}
}
}
analytics_resp = token_mgr.make_request("POST", "/api/v2/analytics/conversations/details/query", json=analytics_query)
metrics = analytics_resp.json()
# Synchronize presence for the receiving agent
presence_state = "UNKNOWN"
if agent_id:
try:
presence_resp = token_mgr.make_request("GET", f"/api/v2/users/{agent_id}/presence")
presence_data = presence_resp.json()
presence_state = presence_data.get("status", "UNKNOWN")
except Exception as e:
logger.warning(f"Presence sync failed for agent {agent_id}: {e}")
# Generate audit trail
audit_record = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
"callId": call_id,
"targetQueueId": target_queue_id,
"agentId": agent_id,
"agentPresence": presence_state,
"analyticsSnapshot": metrics,
"auditAction": "TRANSFER_COMPLETED"
}
logger.info(f"Audit trail logged: {audit_record}")
return audit_record
OAuth Scope: analytics:conversation:read and user:presence:read are required. The analytics query uses real CXone aggregation structure. The audit record captures presence state at the exact moment of handoff.
Step 6: Expose a Transfer Simulator for IVR Testing
IVR developers require a dry-run environment to validate transfer logic without consuming live call capacity. The following simulator accepts mock IVR inputs and executes the validation chain against cached or stubbed responses.
class TransferSimulator:
def __init__(self, token_mgr: TokenManager):
self.token_mgr = token_mgr
self.mock_calls = {}
def register_mock_call(self, call_id: str, initial_state: str = "WAITING") -> None:
self.mock_calls[call_id] = {"id": call_id, "state": initial_state, "history": []}
def simulate_transfer(
self,
call_id: str,
queue_id: str,
skill_id: Optional[str] = None,
force_failure: bool = False
) -> dict:
if call_id not in self.mock_calls:
raise ValueError(f"Call {call_id} not registered in simulator")
is_eligible, reason = validate_transfer_eligibility(self.token_mgr, queue_id, skill_id)
if force_failure or not is_eligible:
self.mock_calls[call_id]["state"] = "FAILED"
self.mock_calls[call_id]["history"].append({"action": "TRANSFER_FAILED", "reason": reason})
return {"status": "FAILED", "reason": reason, "callState": "FAILED"}
# Simulate successful state transition
self.mock_calls[call_id]["state"] = "CONNECTED"
self.mock_calls[call_id]["history"].append({"action": "TRANSFER_SUCCESS", "queue": queue_id})
return {"status": "SUCCESS", "callState": "CONNECTED", "queueId": queue_id}
def get_call_state(self, call_id: str) -> dict:
return self.mock_calls.get(call_id, {"error": "Call not found"})
The simulator bypasses POST /api/v2/voice/calls/{callId}/transfer but reuses the exact eligibility validation logic. IVR test scripts can inject force_failure=True to verify fallback routing without live agents.
Complete Working Example
The following script combines authentication, payload construction, eligibility validation, transfer execution, fallback routing, metrics logging, and simulation into a single runnable module. Replace placeholder credentials with your CXone environment values.
import os
import time
import logging
import requests
from typing import Optional
from pydantic import BaseModel, SecretStr, Field
from pydantic_settings import BaseSettings
from enum import Enum
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("cxone_transfer")
class CXoneSettings(BaseSettings):
environment: str
client_id: str
client_secret: SecretStr
base_url: str = ""
def model_post_init(self, __context) -> None:
self.base_url = f"https://{self.environment}.nicecv.com"
class TokenManager:
def __init__(self, settings: CXoneSettings):
self.settings = settings
self.token_url = f"{self.settings.base_url}/api/v2/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: float = 0.0
def _get_headers(self) -> dict:
return {
"Authorization": f"Basic {requests.auth.HTTPBasicAuth(self.settings.client_id, self.settings.client_secret.get_secret_value()).encode()}",
"Content-Type": "application/x-www-form-urlencoded"
}
def get_token(self) -> str:
if self.access_token and time.time() < self.expires_at:
return self.access_token
payload = "grant_type=client_credentials"
response = requests.post(self.token_url, headers=self._get_headers(), data=payload)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.expires_at = time.time() + (data["expires_in"] - 60)
return self.access_token
def make_request(self, method: str, path: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.get_token()}"
headers["Content-Type"] = "application/json"
kwargs["headers"] = headers
max_retries = 3
for attempt in range(max_retries):
response = requests.request(method, f"{self.settings.base_url}{path}", **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
response.raise_for_status()
return response
raise RuntimeError(f"Max retries exceeded for {path}")
class TransferType(str, Enum):
BLIND = "BLIND"
CONSULT = "CONSULT"
class TargetType(str, Enum):
QUEUE = "QUEUE"
EXTENSION = "EXTENSION"
SKILL = "SKILL"
NUMBER = "NUMBER"
class TransferPayload(BaseModel):
transferType: TransferType
targetType: TargetType
targetId: str
skillId: Optional[str] = None
priority: int = Field(default=1, ge=1, le=5)
def build_transfer_payload(transfer_type: TransferType, target_type: TargetType, target_id: str, skill_id: Optional[str] = None) -> dict:
return TransferPayload(transferType=transfer_type, targetType=target_type, targetId=target_id, skillId=skill_id, priority=2).model_dump(exclude_none=True)
def validate_transfer_eligibility(token_mgr: TokenManager, queue_id: str, skill_id: Optional[str] = None) -> tuple[bool, str]:
campaign_response = token_mgr.make_request("GET", f"/api/v2/voice/campaigns?queueId={queue_id}")
campaigns = campaign_response.json()
if not campaigns:
return False, "Queue is not associated with any active campaign"
active_campaign = next((c for c in campaigns if c.get("status") == "ACTIVE"), None)
if not active_campaign:
return False, "Campaign is not active"
agents_response = token_mgr.make_request("GET", f"/api/v2/voice/queues/{queue_id}/agents")
agents = agents_response.json()
available_agents = []
for agent in agents:
if agent.get("status") != "AVAILABLE":
continue
if skill_id:
agent_skills = agent.get("skills", [])
if not any(s.get("skillId") == skill_id for s in agent_skills):
continue
available_agents.append(agent.get("userId"))
if not available_agents:
return False, f"No available agents with required skill in queue {queue_id}"
return True, f"Eligible. {len(available_agents)} agent(s) available."
def execute_transfer(token_mgr: TokenManager, call_id: str, payload: dict, poll_interval: int = 3, timeout: int = 30) -> dict:
transfer_response = token_mgr.make_request("POST", f"/api/v2/voice/calls/{call_id}/transfer", json=payload)
logger.info(f"Transfer initiated for call {call_id}")
start_time = time.time()
while time.time() - start_time < timeout:
state_response = token_mgr.make_request("GET", f"/api/v2/voice/calls/{call_id}")
current_state = state_response.json().get("state")
if current_state in ("CONNECTED", "FAILED", "DISCONNECTED"):
return state_response.json()
time.sleep(poll_interval)
return token_mgr.make_request("GET", f"/api/v2/voice/calls/{call_id}").json()
def handle_transfer_failure(token_mgr: TokenManager, call_id: str, fallback_queue_id: str, voicemail_extension: str) -> dict:
fallback_payload = build_transfer_payload(TransferType.BLIND, TargetType.QUEUE, fallback_queue_id)
try:
is_eligible, _ = validate_transfer_eligibility(token_mgr, fallback_queue_id)
if is_eligible:
return execute_transfer(token_mgr, call_id, fallback_payload)
except Exception as e:
logger.error(f"Fallback validation failed: {e}")
vm_payload = build_transfer_payload(TransferType.BLIND, TargetType.EXTENSION, voicemail_extension)
return execute_transfer(token_mgr, call_id, vm_payload)
def log_transfer_metrics_and_audit(token_mgr: TokenManager, call_id: str, target_queue_id: str, agent_id: Optional[str] = None) -> dict:
analytics_query = {
"from": "2023-01-01T00:00:00.000Z",
"to": "2023-12-31T23:59:59.999Z",
"size": 1000,
"aggregations": {
"transferMetrics": {
"metrics": [
{"name": "transferSuccess", "type": "count"},
{"name": "transferDuration", "type": "average"}
],
"filters": [{"type": "call", "field": "callId", "values": [call_id]}]
}
}
}
metrics = token_mgr.make_request("POST", "/api/v2/analytics/conversations/details/query", json=analytics_query).json()
presence_state = "UNKNOWN"
if agent_id:
try:
presence_data = token_mgr.make_request("GET", f"/api/v2/users/{agent_id}/presence").json()
presence_state = presence_data.get("status", "UNKNOWN")
except Exception as e:
logger.warning(f"Presence sync failed: {e}")
audit_record = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
"callId": call_id,
"targetQueueId": target_queue_id,
"agentId": agent_id,
"agentPresence": presence_state,
"analyticsSnapshot": metrics,
"auditAction": "TRANSFER_COMPLETED"
}
logger.info(f"Audit trail logged: {audit_record}")
return audit_record
def run_production_workflow():
settings = CXoneSettings(
environment=os.getenv("CXONE_ENV", "us-02"),
client_id=os.getenv("CXONE_CLIENT_ID", "your_client_id"),
client_secret=os.getenv("CXONE_CLIENT_SECRET", "your_client_secret")
)
token_mgr = TokenManager(settings)
call_id = "call_uuid_12345"
target_queue = "queue_uuid_67890"
target_skill = "skill_uuid_11223"
fallback_queue = "queue_uuid_44556"
voicemail_ext = "999"
is_eligible, reason = validate_transfer_eligibility(token_mgr, target_queue, target_skill)
if not is_eligible:
logger.warning(f"Primary queue ineligible: {reason}")
result = handle_transfer_failure(token_mgr, call_id, fallback_queue, voicemail_ext)
else:
payload = build_transfer_payload(TransferType.BLIND, TargetType.QUEUE, target_queue, target_skill)
result = execute_transfer(token_mgr, call_id, payload)
final_state = result.get("state")
if final_state == "CONNECTED":
log_transfer_metrics_and_audit(token_mgr, call_id, target_queue, result.get("connectedTo"))
else:
logger.error(f"Transfer did not connect. Final state: {final_state}")
if __name__ == "__main__":
run_production_workflow()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired OAuth token or invalid client credentials.
- Fix: Verify
client_idandclient_secretmatch the CXone security profile. Ensure theTokenManagerrefreshes tokens before expiration. Check that the security profile includesvoice:call:control. - Code Fix: The
get_token()method automatically refreshes tokens 60 seconds before expiration. If 401 persists, rotate credentials in the CXone admin console.
Error: 403 Forbidden
- Cause: Missing OAuth scope or insufficient security profile permissions for the target queue.
- Fix: Add
voice:queue:read,routing:skill:read, andanalytics:conversation:readto the OAuth client scope list. Assign the application to a security profile with “Voice Call Control” and “Queue Read” permissions. - Code Fix: Validate scope presence before initialization:
required_scopes = ["voice:call:control", "voice:queue:read", "routing:skill:read"]
# Verify via CXone Security Profile UI or GET /api/v2/security/profiles
Error: 400 Bad Request
- Cause: Malformed transfer payload or invalid UUID format for
targetId/skillId. - Fix: Use Pydantic validation to enforce UUID patterns. Ensure
targetTypematches the actual CXone resource type. CXone rejects transfers to queues withstatus: INACTIVE. - Code Fix: Add UUID validation to
TransferPayload:
from uuid import UUID
class TransferPayload(BaseModel):
targetId: UUID
skillId: Optional[UUID] = None
Error: 429 Too Many Requests
- Cause: Exceeding CXone rate limits on Voice or Analytics endpoints.
- Fix: Implement exponential backoff. The
TokenManager.make_request()method includes automatic retry logic withRetry-Afterheader parsing. - Code Fix: Adjust
max_retriesandpoll_intervalbased on environment capacity. Reduce concurrent transfer polling threads.
Error: 500 Internal Server Error
- Cause: CXone backend routing engine timeout or campaign rule conflict.
- Fix: Verify campaign rules do not contain circular routing references. Check queue skill requirements against actual agent skill assignments. Retry after 30 seconds.
- Code Fix: Wrap
execute_transferin a circuit breaker pattern for production deployments.