Automating CXone Outbound Campaign Rule Adjustments Based on Real-Time Agent Availability

Automating CXone Outbound Campaign Rule Adjustments Based on Real-Time Agent Availability

What You Will Build

A Python script that queries CXone agent availability metrics every five minutes, calculates an optimal concurrent call limit, and updates outbound campaign rules via the Campaign API. This tutorial uses the CXone REST API and Celery for scheduled execution. The implementation is written in Python 3.9+ using synchronous HTTP calls optimized for Celery worker environments.

Prerequisites

  • CXone OAuth 2.0 client credentials with scopes: realtime:read, campaign:read, campaign:write
  • CXone API v2 endpoints
  • Python 3.9 or higher
  • External dependencies: celery, redis, requests, pydantic, python-dotenv
  • Running Redis instance for Celery broker and result backend
  • Access to a CXone tenant with at least one active outbound campaign

Authentication Setup

CXone uses the OAuth 2.0 Client Credentials grant type. The token endpoint issues short-lived access tokens that expire after 3600 seconds. You must implement token caching and automatic refresh logic to avoid unnecessary authentication calls during scheduled execution.

The following class handles token acquisition, caching, and expiration tracking. It raises explicit exceptions when the tenant credentials are invalid or the token endpoint returns an error.

import time
import requests
from typing import Optional

class CXoneAuthManager:
    def __init__(self, tenant_url: str, client_id: str, client_secret: str):
        self.tenant_url = tenant_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0
        self.token_endpoint = f"{self.tenant_url}/oauth/token"

    def get_access_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry:
            return self.access_token

        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "realtime:read campaign:read campaign:write"
        }

        response = requests.post(self.token_endpoint, headers=headers, data=payload)
        response.raise_for_status()
        data = response.json()

        self.access_token = data["access_token"]
        self.token_expiry = time.time() + (data["expires_in"] - 30)
        return self.access_token

The scope parameter explicitly requests realtime:read for agent state queries and campaign:read plus campaign:write for campaign rule modifications. The - 30 buffer on token_expiry prevents edge-case authentication failures caused by clock drift between your worker and CXone identity servers.

Implementation

Step 1: Configure Celery and HTTP Client with Retry Logic

CXone enforces strict rate limits on realtime and campaign endpoints. A 429 response indicates you have exceeded the allowed requests per minute. The HTTP client must implement exponential backoff with jitter to comply with rate limits without crashing the Celery worker.

Celery requires a broker connection and periodic task configuration. The following setup uses Redis as the broker and defines a five-minute execution interval.

import logging
import time
import requests
from celery import Celery, periodic_task
from typing import Dict, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Celery configuration
celery_app = Celery(
    "cxone_campaign_optimizer",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    beat_schedule={
        "optimize-campaign-every-5-minutes": {
            "task": "cxone_campaign_optimizer.optimize_campaign_task",
            "schedule": 300,
        }
    }
)

class CXoneHttpClient:
    def __init__(self, tenant_url: str, auth_manager: CXoneAuthManager):
        self.tenant_url = tenant_url.rstrip("/")
        self.auth = auth_manager
        self.session = requests.Session()
        self.session.headers.update({"Content-Type": "application/json"})

    def _make_request(self, method: str, path: str, **kwargs) -> requests.Response:
        url = f"{self.tenant_url}{path}"
        token = self.auth.get_access_token()
        self.session.headers["Authorization"] = f"Bearer {token}"

        max_retries = 5
        for attempt in range(max_retries):
            try:
                response = self.session.request(method, url, **kwargs)
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
                    logger.warning(f"Rate limited on {path}. Retrying in {retry_after}s")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                return response
            
            except requests.exceptions.HTTPError as e:
                if response.status_code in (401, 403):
                    logger.error(f"Authentication or authorization failed on {path}: {e}")
                    raise
                if response.status_code >= 500:
                    logger.warning(f"Server error on {path}. Retrying in {2 ** attempt}s")
                    time.sleep(2 ** attempt)
                    continue
                raise
        
        raise requests.exceptions.RetryError(f"Max retries exceeded for {path}")

    def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        response = self._make_request("GET", path, params=params)
        return response.json()

    def put(self, path: str, json_data: Dict[str, Any]) -> Dict[str, Any]:
        response = self._make_request("PUT", path, json=json_data)
        return response.json()

The _make_request method intercepts 429 responses, reads the Retry-After header, and applies exponential backoff. Server errors (5xx) trigger automatic retries with increasing delays. Authentication failures (401/403) abort immediately because retrying will not resolve missing scopes or invalid credentials.

Step 2: Fetch Real-Time Agent Availability and Calculate Metrics

The CXone Realtime API returns agent states in paginated batches. You must iterate through nextPageToken until the endpoint returns an empty list or a null token. The calculation logic determines how many agents are currently available or in-progress and applies a safety factor to prevent overloading agents during peak hours.

from typing import List, Dict, Any

class AgentMetricsCalculator:
    def __init__(self, client: CXoneHttpClient):
        self.client = client

    def fetch_all_agents(self) -> List[Dict[str, Any]]:
        all_agents: List[Dict[str, Any]] = []
        page_token = None
        page_size = 50

        while True:
            params = {"pageSize": page_size}
            if page_token:
                params["nextPageToken"] = page_token

            response = self.client.get("/api/v2/realtime/agents", params=params)
            agents = response.get("items", [])
            all_agents.extend(agents)

            next_token = response.get("nextPageToken")
            if not next_token:
                break
            page_token = next_token

        return all_agents

    def calculate_optimal_concurrent_calls(self, total_agents: int, available_agents: int) -> int:
        if total_agents == 0:
            return 0
        
        # Safety factor: never assign more calls than 80% of available agents
        # This prevents queue starvation during sudden availability drops
        safety_factor = 0.8
        optimal = int(available_agents * safety_factor)
        
        # Floor at 1 if any agents are available, otherwise 0
        return max(1, optimal) if available_agents > 0 else 0

The Realtime API requires the realtime:read scope. The endpoint returns agent objects containing state, userId, and queueId. You filter for state == "available" to determine active capacity. The safety factor accounts for call handling time variance and prevents the campaign from exhausting agent capacity before the next scheduled adjustment.

Step 3: Update Campaign Rules via the Campaign API

CXone outbound campaigns store dialing parameters inside a rules object. The Campaign API requires the full campaign version number for optimistic concurrency control. You must read the current campaign configuration, modify the maxConcurrentCalls rule, and submit the updated payload via PUT /api/v2/campaigns/{id}.

class CampaignRuleUpdater:
    def __init__(self, client: CXoneHttpClient):
        self.client = client

    def update_concurrent_calls(self, campaign_id: str, new_limit: int) -> Dict[str, Any]:
        # Fetch current campaign to obtain version and existing rules
        campaign_data = self.client.get(f"/api/v2/campaigns/{campaign_id}")
        
        current_version = campaign_data["version"]
        current_rules = campaign_data.get("rules", {})
        
        # Preserve existing rules and only update the concurrent call limit
        updated_rules = {
            **current_rules,
            "maxConcurrentCalls": new_limit
        }

        # Construct the update payload
        update_payload = {
            "id": campaign_id,
            "version": current_version,
            "rules": updated_rules
        }

        logger.info(f"Updating campaign {campaign_id} version {current_version} with maxConcurrentCalls={new_limit}")
        
        # PUT requires the full resource representation
        return self.client.put(f"/api/v2/campaigns/{campaign_id}", json_data=update_payload)

The Campaign API enforces optimistic locking via the version field. If another process modifies the campaign between your read and write operations, CXone returns a 409 Conflict. The retry logic in the HTTP client will catch this as a non-retryable error, which is correct because optimistic concurrency conflicts require application-level resolution, not blind retries.

Complete Working Example

The following script combines authentication, metrics calculation, and campaign updates into a single Celery task. It loads environment variables, initializes the clients, and executes the optimization loop.

import os
from dotenv import load_dotenv
from cxone_campaign_optimizer import (
    CXoneAuthManager,
    CXoneHttpClient,
    AgentMetricsCalculator,
    CampaignRuleUpdater,
    celery_app
)

load_dotenv()

TENANT_URL = os.getenv("CXONE_TENANT_URL", "https://api-us.nicecxone.com")
CLIENT_ID = os.getenv("CXONE_CLIENT_ID")
CLIENT_SECRET = os.getenv("CXONE_CLIENT_SECRET")
CAMPAIGN_ID = os.getenv("CXONE_CAMPAIGN_ID")

if not all([CLIENT_ID, CLIENT_SECRET, CAMPAIGN_ID]):
    raise ValueError("Missing required environment variables: CXONE_CLIENT_ID, CXONE_CLIENT_SECRET, CXONE_CAMPAIGN_ID")

auth_manager = CXoneAuthManager(TENANT_URL, CLIENT_ID, CLIENT_SECRET)
http_client = CXoneHttpClient(TENANT_URL, auth_manager)
metrics_calculator = AgentMetricsCalculator(http_client)
campaign_updater = CampaignRuleUpdater(http_client)

@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def optimize_campaign_task(self):
    try:
        agents = metrics_calculator.fetch_all_agents()
        
        available_count = sum(1 for agent in agents if agent.get("state") == "available")
        total_count = len(agents)
        
        optimal_limit = metrics_calculator.calculate_optimal_concurrent_calls(total_count, available_count)
        
        logger.info(f"Agent metrics: total={total_count}, available={available_count}, optimal_limit={optimal_limit}")
        
        campaign_updater.update_concurrent_calls(CAMPAIGN_ID, optimal_limit)
        logger.info(f"Campaign {CAMPAIGN_ID} successfully updated")
        
    except Exception as exc:
        logger.error(f"Optimization task failed: {exc}")
        raise self.retry(exc=exc)

Run the Celery worker and beat scheduler using the following commands:

celery -A cxone_campaign_optimizer worker --loglevel=info
celery -A cxone_campaign_optimizer beat --loglevel=info

The beat scheduler triggers optimize_campaign_task every 300 seconds. The worker fetches agent states, calculates the safe concurrent call limit, and applies the rule update. Token refresh occurs automatically on expiration. Rate limits are handled transparently by the HTTP client.

Common Errors & Debugging

Error: HTTP 401 Unauthorized

Cause: The OAuth client credentials are invalid, the tenant URL is incorrect, or the token has expired without successful refresh.
Fix: Verify CXONE_CLIENT_ID and CXONE_CLIENT_SECRET match the CXone developer console. Ensure the tenant URL matches your region (api-us.nicecxone.com, api-eu.nicecxone.com, etc.). Check that the scope parameter includes realtime:read and campaign:write.

# Debugging snippet
import requests
token_resp = requests.post(
    f"{TENANT_URL}/oauth/token",
    data={
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": "realtime:read campaign:read campaign:write"
    }
)
print(token_resp.status_code, token_resp.json())

Error: HTTP 403 Forbidden

Cause: The OAuth client lacks required scopes, or the tenant enforces IP allowlisting that blocks your worker environment.
Fix: Add missing scopes in the CXone admin console under Security > OAuth Clients. If IP allowlisting is enabled, add your server public IP to the whitelist. Verify the token payload contains the requested scopes using a JWT decoder.

Error: HTTP 429 Too Many Requests

Cause: The worker exceeds CXone rate limits, typically by polling too frequently or making synchronous calls without backoff.
Fix: The provided CXoneHttpClient implements exponential backoff and reads the Retry-After header. If you still encounter cascading 429 errors, increase the Celery beat interval beyond 300 seconds or implement request coalescing.

Error: HTTP 400 Bad Request (Invalid Rule Payload)

Cause: The campaign update payload omits required fields like version or id, or the maxConcurrentCalls value falls outside tenant-enforced boundaries.
Fix: Always include the current version in the PUT payload. Validate that maxConcurrentCalls does not exceed the total licensed agents. CXone returns a detailed errors array in the response body that specifies the invalid field.

{
  "errors": [
    {
      "code": "INVALID_VALUE",
      "message": "maxConcurrentCalls must be between 1 and 50",
      "path": "rules.maxConcurrentCalls"
    }
  ]
}

Official References