Routing NICE CXone Social Media Interactions with Python
What You Will Build
A Python service that ingests social media interactions via CXone webhooks, routes them to skill-based queues using sentiment analysis, manages platform authentication tokens with automatic rotation, handles delivery failures with exponential backoff, synchronizes thread history asynchronously, logs interaction latency for SLA monitoring, and exposes a simulator for channel testing. This tutorial uses the NICE CXone REST API and the nice-cxone-platform-client Python SDK. The implementation covers Python 3.9+ with fastapi, httpx, and asyncio.
Prerequisites
- CXone OAuth 2.0 client credentials grant configured in your CXone organization
- Required OAuth scopes:
webhook:write,task:write,routing:write,social:read - Python 3.9 or later
- Dependencies:
pip install nice-cxone-platform-client httpx fastapi uvicorn textblob pydantic - A valid social platform access token (e.g., Facebook Graph API or X API) for token rotation demonstration
- External ticketing system endpoint (simulated in this tutorial)
Authentication Setup
CXone uses the OAuth 2.0 client credentials flow. You must obtain an access token before initializing the SDK. The token expires after two hours and requires rotation.
import httpx
from cxone_platform_client import Configuration, ApiClient, PlatformClient
from cxone_platform_client.rest import ApiException
CXONE_ENV = "https://api.us-east-1.my.site.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
async def get_cxone_token() -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{CXONE_ENV}/oauth/token",
data={"grant_type": "client_credentials"},
auth=(CLIENT_ID, CLIENT_SECRET),
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
async def initialize_cxone_client() -> PlatformClient:
token = await get_cxone_token()
configuration = Configuration()
configuration.host = CXONE_ENV
configuration.access_token = token
api_client = ApiClient(configuration)
return PlatformClient(api_client)
The initialize_cxone_client function returns a fully configured PlatformClient instance. You will use this client for webhook registration and task routing. Store the token in memory or a secure cache and refresh it before expiration.
Implementation
Step 1: Register Webhook and Parse Social Payloads
You subscribe to social interaction events by creating a webhook via the CXone Webhooks API. The endpoint path is /api/v2/webhooks. This step registers the webhook and defines the payload parser.
from pydantic import BaseModel
from typing import List, Optional
class SocialMediaAttachment(BaseModel):
url: str
media_type: str
size_bytes: int
class SocialMessage(BaseModel):
text: str
from_id: str
timestamp: str
class SocialProfile(BaseModel):
platform: str
user_id: str
display_name: str
avatar_url: Optional[str] = None
class CXoneSocialPayload(BaseModel):
contact_id: str
thread_id: str
social_profile: SocialProfile
messages: List[SocialMessage]
media_attachments: List[SocialMediaAttachment] = []
event_timestamp: str
async def register_social_webhook(platform_client: PlatformClient, webhook_url: str):
"""Registers a webhook for social.interaction.created events.
Required scope: webhook:write
"""
from cxone_platform_client import WebhooksApi
webhooks_api = WebhooksApi(platform_client)
webhook_definition = {
"name": "social-interaction-ingest",
"url": webhook_url,
"enabled": True,
"event": "social.interaction.created",
"headers": {"X-CXone-Webhook": "true"}
}
try:
result = webhooks_api.post_webhooks(body=webhook_definition)
print(f"Webhook registered: {result.id}")
return result.id
except ApiException as e:
if e.status == 409:
print("Webhook already exists. Skipping registration.")
else:
raise
The CXoneSocialPayload model maps directly to CXone social interaction webhook structures. When the webhook fires, you parse the JSON body into this model to extract user profiles, message history, and media attachments.
Step 2: Platform Authentication Token Management with Rotation
Social platforms require access tokens to be rotated periodically. Facebook tokens expire after sixty days, while X API tokens may require explicit refresh. This manager handles automatic rotation.
import time
from datetime import datetime, timedelta
class SocialTokenManager:
def __init__(self, refresh_endpoint: str, initial_token: str):
self.refresh_endpoint = refresh_endpoint
self.token = initial_token
self.expires_at = datetime.utcnow() + timedelta(days=59)
self.lock = asyncio.Lock()
async def get_valid_token(self) -> str:
if datetime.utcnow() + timedelta(hours=1) >= self.expires_at:
await self._rotate_token()
return self.token
async def _rotate_token(self):
async with self.lock:
if datetime.utcnow() + timedelta(hours=1) >= self.expires_at:
async with httpx.AsyncClient() as client:
response = await client.post(
self.refresh_endpoint,
json={"grant_type": "token_rotation", "current_token": self.token}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = datetime.fromisoformat(data["expires_at"])
print("Social platform token rotated successfully.")
The SocialTokenManager checks expiry before every external call. It uses an asyncio.Lock to prevent concurrent rotation requests. Replace refresh_endpoint with your social platform provider token endpoint.
Step 3: Sentiment Analysis and Skill-Based Task Routing
You route interactions to CXone skill-based queues by creating a task via /api/v2/routing/tasks. The routing decision depends on sentiment analysis of the latest message.
from textblob import TextBlob
def analyze_sentiment(text: str) -> tuple:
"""Returns polarity score and recommended routing skill."""
blob = TextBlob(text)
polarity = blob.sentiment.polarity
if polarity < -0.3:
return polarity, ["social-escalation", "sentiment-negative"]
elif polarity > 0.3:
return polarity, ["social-support", "sentiment-positive"]
else:
return polarity, ["social-support", "sentiment-neutral"]
async def route_social_interaction(platform_client: PlatformClient, payload: CXoneSocialPayload, social_token: str):
"""Creates a CXone task routed by sentiment.
Required scopes: task:write, routing:write, social:read
"""
from cxone_platform_client import TasksApi
tasks_api = Tasks_api(platform_client)
latest_message = payload.messages[-1].text
polarity, routing_skills = analyze_sentiment(latest_message)
task_definition = {
"type": "social",
"priority": 5,
"routingData": {
"routingType": "skills",
"routingSkills": routing_skills
},
"socialData": {
"contactId": payload.contact_id,
"socialProfile": {
"id": payload.social_profile.user_id,
"name": payload.social_profile.display_name,
"platform": payload.social_profile.platform
},
"messages": [
{"text": msg.text, "from": msg.from_id, "timestamp": msg.timestamp}
for msg in payload.messages
],
"mediaAttachments": [
{"url": att.url, "mediaType": att.media_type, "size": att.size_bytes}
for att in payload.media_attachments
]
},
"customAttributes": {
"sentiment_polarity": str(polarity),
"platform_token_version": social_token[:8]
}
}
try:
task = tasks_api.post_routing_tasks(body=task_definition)
print(f"Task created: {task.id} with skills {routing_skills}")
return task.id
except ApiException as e:
if e.status == 429:
print("Rate limited. Implement retry logic.")
raise
elif e.status in [500, 502, 503]:
print("CXone backend error. Implement retry logic.")
raise
else:
raise
The routingData object dictates how CXone matches the task to agents. The socialData object preserves the full thread context. Sentiment polarity drives the skill assignment.
Step 4: Delivery Failure Handling with Exponential Backoff
Network instability or API rate limits require robust retry logic. This function implements exponential backoff with jitter and a retry queue.
import asyncio
import random
import collections
retry_queue = collections.deque()
async def retry_with_backoff(func, *args, max_retries=3, base_delay=2.0, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
status = e.response.status_code
if status == 429:
retry_after = int(e.response.headers.get("retry-after", base_delay * (2 ** attempt)))
print(f"429 Rate Limited. Retrying after {retry_after}s (attempt {attempt+1})")
await asyncio.sleep(retry_after)
continue
elif 500 <= status < 600:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Server error {status}. Retrying after {delay:.2f}s (attempt {attempt+1})")
await asyncio.sleep(delay)
continue
else:
raise
except Exception as e:
if attempt == max_retries - 1:
print(f"Max retries reached. Moving to retry queue: {e}")
retry_queue.append({"func": func, "args": args, "kwargs": kwargs})
return None
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
return None
The retry logic catches 429 and 5xx responses. It respects the Retry-After header when present. Failed requests move to a deque for asynchronous processing later.
Step 5: Async Batch Synchronization and Latency Logging
You synchronize historical thread data with external ticketing systems using batch jobs. This step processes queued failures and logs SLA latency.
import time
import logging
import asyncio
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("social_routing")
async def process_retry_queue():
"""Drains the retry queue and reattempts failed routing operations."""
while retry_queue:
item = retry_queue.popleft()
func = item["func"]
args = item["args"]
kwargs = item["kwargs"]
await retry_with_backoff(func, *args, max_retries=2, base_delay=1.0, **kwargs)
await asyncio.sleep(0.5)
async def sync_thread_to_ticketing(thread_id: str, payload: CXoneSocialPayload):
"""Simulates async batch sync to external ticketing system."""
batch_payload = {
"thread_id": thread_id,
"contact_id": payload.contact_id,
"messages": [m.text for m in payload.messages],
"attachments": [a.url for a in payload.media_attachments],
"sync_timestamp": datetime.utcnow().isoformat()
}
async with httpx.AsyncClient() as client:
try:
resp = await client.post(
"https://ticketing.example.com/api/v1/sync",
json=batch_payload,
timeout=10.0
)
resp.raise_for_status()
logger.info(f"Thread {thread_id} synchronized to ticketing system.")
except httpx.HTTPError as e:
logger.error(f"Sync failed for {thread_id}: {e}")
def log_interaction_latency(operation: str, start_time: float):
latency_ms = (time.perf_counter() - start_time) * 1000
logger.info(f"SLA Latency [{operation}]: {latency_ms:.2f}ms")
if latency_ms > 2500:
logger.warning(f"SLA breach detected for {operation}: {latency_ms:.2f}ms")
The process_retry_queue function runs periodically. The sync_thread_to_ticketing function batches messages and attachments. Latency logging uses time.perf_counter() for high-resolution timing.
Step 6: Social Media Simulator for Channel Testing
You expose a simulator endpoint to generate mock CXone social payloads and verify the routing pipeline without live social traffic.
from fastapi import FastAPI
import uuid
from datetime import datetime
app = FastAPI()
@app.post("/simulator/send")
async def simulate_social_interaction():
"""Generates a mock CXone social payload and POSTs it to the webhook."""
mock_payload = CXoneSocialPayload(
contact_id=f"social_{uuid.uuid4().hex[:12]}",
thread_id=f"thread_{uuid.uuid4().hex[:12]}",
social_profile=SocialProfile(
platform="facebook",
user_id="fb_user_98765",
display_name="Test Customer",
avatar_url="https://example.com/avatar.png"
),
messages=[
SocialMessage(
text="This is a test message with neutral sentiment.",
from_id="fb_user_98765",
timestamp=datetime.utcnow().isoformat()
)
],
media_attachments=[
SocialMediaAttachment(
url="https://example.com/image.png",
media_type="image/png",
size_bytes=24500
)
],
event_timestamp=datetime.utcnow().isoformat()
)
async with httpx.AsyncClient() as client:
resp = await client.post(
"http://localhost:8000/webhook/social",
json=mock_payload.dict(),
headers={"Content-Type": "application/json"}
)
resp.raise_for_status()
return {"status": "simulated", "payload": mock_payload.dict()}
The simulator creates a valid CXoneSocialPayload instance and POSTs it to your local webhook endpoint. You can modify the message text to test different sentiment routing paths.
Complete Working Example
Combine all components into a single runnable FastAPI application. Run with uvicorn main:app --port 8000.
import asyncio
import time
from datetime import datetime, timedelta
from typing import List, Optional
import collections
import random
import logging
import httpx
import uvicorn
from fastapi import FastAPI, Request
from pydantic import BaseModel
from textblob import TextBlob
from cxone_platform_client import Configuration, ApiClient, PlatformClient
from cxone_platform_client.rest import ApiException
from cxone_platform_client import WebhooksApi, TasksApi
# Configuration
CXONE_ENV = "https://api.us-east-1.my.site.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
WEBHOOK_URL = "http://localhost:8000/webhook/social"
SOCIAL_TOKEN_REFRESH_URL = "https://social-platform.example.com/oauth/rotate"
INITIAL_SOCIAL_TOKEN = "your_initial_social_token"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("social_routing")
# Models
class SocialMediaAttachment(BaseModel):
url: str
media_type: str
size_bytes: int
class SocialMessage(BaseModel):
text: str
from_id: str
timestamp: str
class SocialProfile(BaseModel):
platform: str
user_id: str
display_name: str
avatar_url: Optional[str] = None
class CXoneSocialPayload(BaseModel):
contact_id: str
thread_id: str
social_profile: SocialProfile
messages: List[SocialMessage]
media_attachments: List[SocialMediaAttachment] = []
event_timestamp: str
# Token Manager
class SocialTokenManager:
def __init__(self, refresh_endpoint: str, initial_token: str):
self.refresh_endpoint = refresh_endpoint
self.token = initial_token
self.expires_at = datetime.utcnow() + timedelta(days=59)
self.lock = asyncio.Lock()
async def get_valid_token(self) -> str:
if datetime.utcnow() + timedelta(hours=1) >= self.expires_at:
await self._rotate_token()
return self.token
async def _rotate_token(self):
async with self.lock:
if datetime.utcnow() + timedelta(hours=1) >= self.expires_at:
async with httpx.AsyncClient() as client:
response = await client.post(
self.refresh_endpoint,
json={"grant_type": "token_rotation", "current_token": self.token}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = datetime.fromisoformat(data["expires_at"])
logger.info("Social platform token rotated successfully.")
# Retry Logic
retry_queue = collections.deque()
async def retry_with_backoff(func, *args, max_retries=3, base_delay=2.0, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
status = e.response.status_code
if status == 429:
retry_after = int(e.response.headers.get("retry-after", base_delay * (2 ** attempt)))
logger.warning(f"429 Rate Limited. Retrying after {retry_after}s (attempt {attempt+1})")
await asyncio.sleep(retry_after)
continue
elif 500 <= status < 600:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Server error {status}. Retrying after {delay:.2f}s (attempt {attempt+1})")
await asyncio.sleep(delay)
continue
else:
raise
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Max retries reached. Moving to retry queue: {e}")
retry_queue.append({"func": func, "args": args, "kwargs": kwargs})
return None
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
return None
# CXone Client
async def initialize_cxone_client() -> PlatformClient:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{CXONE_ENV}/oauth/token",
data={"grant_type": "client_credentials"},
auth=(CLIENT_ID, CLIENT_SECRET),
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
token = response.json()["access_token"]
configuration = Configuration()
configuration.host = CXONE_ENV
configuration.access_token = token
api_client = ApiClient(configuration)
return PlatformClient(api_client)
# Routing Logic
def analyze_sentiment(text: str) -> tuple:
blob = TextBlob(text)
polarity = blob.sentiment.polarity
if polarity < -0.3:
return polarity, ["social-escalation", "sentiment-negative"]
elif polarity > 0.3:
return polarity, ["social-support", "sentiment-positive"]
else:
return polarity, ["social-support", "sentiment-neutral"]
async def route_social_interaction(platform_client: PlatformClient, payload: CXoneSocialPayload, social_token: str):
tasks_api = TasksApi(platform_client)
latest_message = payload.messages[-1].text
polarity, routing_skills = analyze_sentiment(latest_message)
task_definition = {
"type": "social",
"priority": 5,
"routingData": {"routingType": "skills", "routingSkills": routing_skills},
"socialData": {
"contactId": payload.contact_id,
"socialProfile": {
"id": payload.social_profile.user_id,
"name": payload.social_profile.display_name,
"platform": payload.social_profile.platform
},
"messages": [{"text": m.text, "from": m.from_id, "timestamp": m.timestamp} for m in payload.messages],
"mediaAttachments": [{"url": a.url, "mediaType": a.media_type, "size": a.size_bytes} for a in payload.media_attachments]
},
"customAttributes": {"sentiment_polarity": str(polarity), "platform_token_version": social_token[:8]}
}
try:
task = tasks_api.post_routing_tasks(body=task_definition)
logger.info(f"Task created: {task.id} with skills {routing_skills}")
return task.id
except ApiException as e:
if e.status in [429, 500, 502, 503]:
logger.warning(f"Routing API error {e.status}. Queuing for retry.")
raise
else:
raise
async def sync_thread_to_ticketing(thread_id: str, payload: CXoneSocialPayload):
batch_payload = {
"thread_id": thread_id,
"contact_id": payload.contact_id,
"messages": [m.text for m in payload.messages],
"attachments": [a.url for a in payload.media_attachments],
"sync_timestamp": datetime.utcnow().isoformat()
}
async with httpx.AsyncClient() as client:
try:
resp = await client.post("https://ticketing.example.com/api/v1/sync", json=batch_payload, timeout=10.0)
resp.raise_for_status()
logger.info(f"Thread {thread_id} synchronized to ticketing system.")
except httpx.HTTPError as e:
logger.error(f"Sync failed for {thread_id}: {e}")
# FastAPI App
app = FastAPI()
cxone_client = None
token_manager = None
@app.on_event("startup")
async def startup_event():
global cxone_client, token_manager
cxone_client = await initialize_cxone_client()
token_manager = SocialTokenManager(SOCIAL_TOKEN_REFRESH_URL, INITIAL_SOCIAL_TOKEN)
asyncio.create_task(periodic_retry_processor())
async def periodic_retry_processor():
while True:
await asyncio.sleep(10)
if retry_queue:
await process_retry_queue()
async def process_retry_queue():
while retry_queue:
item = retry_queue.popleft()
func = item["func"]
args = item["args"]
kwargs = item["kwargs"]
await retry_with_backoff(func, *args, max_retries=2, base_delay=1.0, **kwargs)
await asyncio.sleep(0.5)
@app.post("/webhook/social")
async def handle_social_webhook(request: Request):
start_time = time.perf_counter()
payload = CXoneSocialPayload(**await request.json())
try:
social_token = await token_manager.get_valid_token()
await retry_with_backoff(route_social_interaction, cxone_client, payload, social_token)
await sync_thread_to_ticketing(payload.thread_id, payload)
except Exception as e:
logger.error(f"Webhook processing failed: {e}")
return {"status": "error", "message": str(e)}, 500
log_interaction_latency("social_routing", start_time)
return {"status": "accepted"}
@app.post("/simulator/send")
async def simulate_social_interaction():
import uuid
mock_payload = CXoneSocialPayload(
contact_id=f"social_{uuid.uuid4().hex[:12]}",
thread_id=f"thread_{uuid.uuid4().hex[:12]}",
social_profile=SocialProfile(platform="facebook", user_id="fb_user_98765", display_name="Test Customer", avatar_url="https://example.com/avatar.png"),
messages=[SocialMessage(text="This is a test message with neutral sentiment.", from_id="fb_user_98765", timestamp=datetime.utcnow().isoformat())],
media_attachments=[SocialMediaAttachment(url="https://example.com/image.png", media_type="image/png", size_bytes=24500)],
event_timestamp=datetime.utcnow().isoformat()
)
async with httpx.AsyncClient() as client:
resp = await client.post(WEBHOOK_URL, json=mock_payload.dict(), headers={"Content-Type": "application/json"})
resp.raise_for_status()
return {"status": "simulated", "payload": mock_payload.dict()}
def log_interaction_latency(operation: str, start_time: float):
latency_ms = (time.perf_counter() - start_time) * 1000
logger.info(f"SLA Latency [{operation}]: {latency_ms:.2f}ms")
if latency_ms > 2500:
logger.warning(f"SLA breach detected for {operation}: {latency_ms:.2f}ms")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 401 Unauthorized on CXone API Calls
This occurs when the OAuth access token expires or was never obtained. The initialize_cxone_client function fetches a fresh token at startup. Implement a background task to refresh the token every sixty minutes. Verify that your client credentials have the task:write and routing:write scopes assigned in the CXone admin console.
Error: 403 Forbidden on Webhook Registration
The client lacks the webhook:write scope. Navigate to your CXone organization settings, locate the API client, and append webhook:write to the allowed scopes. Restart the application to re-authenticate.
Error: 429 Too Many Requests on Task Routing
CXone enforces rate limits per tenant and per API endpoint. The retry_with_backoff function parses the Retry-After header and applies exponential backoff with jitter. If you consistently hit 429 errors, batch your task creation requests or implement a client-side rate limiter that caps requests at fifty per minute.
Error: 502 Bad Gateway or 503 Service Unavailable
These indicate CXone backend degradation. The retry logic catches these status codes and schedules automatic retries. Monitor CXone status pages and implement circuit breaker patterns for prolonged outages. Ensure your base_delay parameter scales appropriately for your SLA requirements.
Error: Pydantic ValidationError on Webhook Payload
CXone may send additional fields or null values that your model does not expect. Add model_config = {"extra": "ignore"} to your CXoneSocialPayload class or mark fields as Optional. Validate incoming JSON against the official CXone social webhook schema before parsing.