Injecting Neo4j Knowledge Graph Results into NICE CXone Agent Assist with Python FastAPI
What You Will Build
- A FastAPI microservice that accepts agent search queries, executes Cypher traversals against a Neo4j knowledge graph, and pushes structured troubleshooting articles directly into the NICE CXone agent workspace.
- This implementation uses the NICE CXone Agent Assist REST API (
/api/v2/agentassist/interactions/{interactionId}/content) and the official Neo4j async Python driver. - The code is written in Python 3.10+ using FastAPI,
httpx, Pydantic, andneo4j.
Prerequisites
- OAuth client type: Confidential client (Client Credentials)
- Required scopes:
agentassist:interactions:write,agentassist:read - SDK/API version: CXone REST API v2, Neo4j Python Driver 5.x, FastAPI 0.104+
- Language/runtime requirements: Python 3.10 or higher with async/await support
- External dependencies:
fastapi==0.104.1,uvicorn==0.24.0,httpx==0.25.2,pydantic==2.5.0,neo4j==5.13.0
Authentication Setup
NICE CXone uses OAuth 2.0 Client Credentials flow for server-to-server API access. The token endpoint resides at https://api.{region}.my.site.nice-incontact.com/oauth2/token. You must exchange your client credentials for an access token before calling the Agent Assist endpoints. The token expires after the duration specified in the expires_in field, typically 3600 seconds.
The following class handles token acquisition, in-memory caching, and automatic refresh when the cache expires. It uses httpx for non-blocking network calls, which aligns with FastAPI async routing.
import httpx
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class CXoneAuthManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, scopes: list[str]):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.token: Optional[str] = None
self.expiry: float = 0.0
async def get_token(self) -> str:
if self.token and time.time() < self.expiry:
return self.token
logger.info("Fetching new CXone OAuth token")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self.base_url}/oauth2/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expiry = time.time() + data["expires_in"] - 60
logger.info("OAuth token refreshed successfully")
return self.token
The request cycle for token acquisition follows standard OAuth 2.0 specifications. The client sends a POST request with form-encoded data. The response contains the bearer token and expiration metadata.
Request:
POST /oauth2/token HTTP/1.1
Host: api.us-east-1.my.site.nice-incontact.com
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=agentassist:interactions:write%20agentassist:read
Response:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "agentassist:interactions:write agentassist:read"
}
Implementation
Step 1: Initialize OAuth and Database Connections
FastAPI applications benefit from dependency injection for stateful resources like database drivers and authentication managers. You initialize the Neo4j driver and CXone auth manager at startup to avoid repeated connection overhead during request processing.
from fastapi import FastAPI
from neo4j import AsyncGraphDatabase
app = FastAPI()
# Global state initialized on startup
neo4j_driver: Optional[AsyncGraphDatabase] = None
cxone_auth: Optional[CXoneAuthManager] = None
@app.on_event("startup")
async def startup_event():
global neo4j_driver, cxone_auth
neo4j_driver = AsyncGraphDatabase.driver(
"bolt://neo4j.internal:7687",
auth=("neo4j", "graph_password")
)
cxone_auth = CXoneAuthManager(
base_url="https://api.us-east-1.my.site.nice-incontact.com",
client_id="your_client_id",
client_secret="your_client_secret",
scopes=["agentassist:interactions:write", "agentassist:read"]
)
logger.info("FastAPI service initialized with Neo4j and CXone connections")
The Neo4j async driver establishes a connection pool to the Bolt protocol endpoint. FastAPI manages the lifecycle of the application, so you register the driver in the startup event. You must verify that the Neo4j instance allows inbound Bolt connections from your FastAPI host.
Step 2: Execute Graph Traversal and Format Payload
When an agent submits a query, the service must traverse the knowledge graph to find relevant troubleshooting articles. The Cypher query uses variable-length relationships to capture direct matches and second-degree connections, which improves recall for ambiguous agent searches. You limit results to five items to prevent workspace clutter and respect CXone content size constraints.
from pydantic import BaseModel
from typing import List, Dict, Any
class QueryRequest(BaseModel):
interaction_id: str
query: str
agent_id: str
async def fetch_knowledge_articles(driver: AsyncGraphDatabase, search_query: str) -> List[Dict[str, Any]]:
cypher = """
MATCH (q:Query {keyword: $query})-[:RELATED_TO|SIMILAR_TO|RESOLVES*1..2]->(a:Article)
WHERE a.status = 'published'
RETURN a.title AS title, a.url AS url, a.summary AS description, a.priority AS priority
ORDER BY a.priority DESC
LIMIT 5
"""
async with driver.session() as session:
result = await session.run(cypher, query=search_query)
records = await result.data()
return records
The Cypher pattern [:RELATED_TO|SIMILAR_TO|RESOLVES*1..2] traverses up to two hops away from the query node. The WHERE a.status = 'published' clause filters out draft or deprecated articles. The ORDER BY a.priority DESC ensures high-impact troubleshooting steps appear first in the agent workspace.
You format the Neo4j records into the exact JSON schema expected by CXone Agent Assist. The CXone endpoint requires a content array where each object contains title, url, description, and type. You map the graph properties directly to these fields.
def format_cxone_content(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
content_items = []
for article in articles:
content_items.append({
"title": article["title"],
"url": article["url"],
"description": article["description"],
"type": "article",
"displayType": "card",
"metadata": {
"source": "neo4j_knowledge_graph",
"priority": article["priority"],
"lastUpdated": "2024-01-15T08:30:00Z"
}
})
return content_items
The displayType: "card" parameter instructs the CXone workspace to render the content as an expandable card rather than a plain text block. The metadata object is ignored by the CXone frontend but provides auditability for your backend systems.
Step 3: Inject Content into CXone Agent Workspace
The final step pushes the formatted payload to the CXone Agent Assist API. You must handle rate limiting (HTTP 429) because CXone enforces strict quotas on Agent Assist content pushes. The retry logic reads the Retry-After header and implements exponential backoff with jitter.
import asyncio
import random
async def push_to_agent_assist(auth: CXoneAuthManager, interaction_id: str, content_items: List[Dict[str, Any]]) -> Dict[str, Any]:
token = await auth.get_token()
url = f"{auth.base_url}/api/v2/agentassist/interactions/{interaction_id}/content"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {"content": content_items}
max_retries = 3
for attempt in range(max_retries):
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(url, headers=headers, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2))
jitter = random.uniform(0.5, 1.5)
wait_time = retry_after * jitter
logger.warning(f"Rate limited (429). Waiting {wait_time:.2f}s before retry {attempt + 1}")
await asyncio.sleep(wait_time)
continue
if response.status_code == 401:
logger.warning("Token expired during request. Refreshing and retrying once.")
await auth.get_token()
continue
response.raise_for_status()
return response.json()
raise httpx.HTTPStatusError("Persistent rate limit or authentication failure", request=None, response=response)
The HTTP cycle for content injection follows this pattern:
Request:
POST /api/v2/agentassist/interactions/INT-987654321/content HTTP/1.1
Host: api.us-east-1.my.site.nice-incontact.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
Content-Type: application/json
Accept: application/json
{
"content": [
{
"title": "Resetting VoIP Gateway Credentials",
"url": "https://kb.internal/article/vp-1024",
"description": "Step-by-step guide to clearing cached credentials and re-provisioning SIP trunks.",
"type": "article",
"displayType": "card",
"metadata": {
"source": "neo4j_knowledge_graph",
"priority": 9,
"lastUpdated": "2024-01-15T08:30:00Z"
}
}
]
}
Response:
{
"interactionId": "INT-987654321",
"contentId": "CONT-44556677",
"status": "delivered",
"timestamp": "2024-01-20T14:22:10Z"
}
The interactionId must match an active CXone interaction. If the interaction has ended or does not exist, CXone returns HTTP 404. You must validate the interaction state before pushing content in production environments.
Complete Working Example
The following script combines all components into a single deployable FastAPI application. You only need to replace the configuration constants with your environment values.
import httpx
import time
import asyncio
import random
import logging
from typing import Optional, List, Dict, Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from neo4j import AsyncGraphDatabase
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CXoneAuthManager:
def __init__(self, base_url: str, client_id: str, client_secret: str, scopes: list[str]):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.token: Optional[str] = None
self.expiry: float = 0.0
async def get_token(self) -> str:
if self.token and time.time() < self.expiry:
return self.token
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self.base_url}/oauth2/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes)
}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expiry = time.time() + data["expires_in"] - 60
return self.token
class QueryRequest(BaseModel):
interaction_id: str
query: str
agent_id: str
app = FastAPI()
neo4j_driver: Optional[AsyncGraphDatabase] = None
cxone_auth: Optional[CXoneAuthManager] = None
@app.on_event("startup")
async def startup_event():
global neo4j_driver, cxone_auth
neo4j_driver = AsyncGraphDatabase.driver("bolt://neo4j.internal:7687", auth=("neo4j", "graph_password"))
cxone_auth = CXoneAuthManager(
base_url="https://api.us-east-1.my.site.nice-incontact.com",
client_id="your_client_id",
client_secret="your_client_secret",
scopes=["agentassist:interactions:write", "agentassist:read"]
)
async def fetch_knowledge_articles(driver: AsyncGraphDatabase, search_query: str) -> List[Dict[str, Any]]:
cypher = """
MATCH (q:Query {keyword: $query})-[:RELATED_TO|SIMILAR_TO|RESOLVES*1..2]->(a:Article)
WHERE a.status = 'published'
RETURN a.title AS title, a.url AS url, a.summary AS description, a.priority AS priority
ORDER BY a.priority DESC
LIMIT 5
"""
async with driver.session() as session:
result = await session.run(cypher, query=search_query)
return await result.data()
def format_cxone_content(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [{
"title": a["title"],
"url": a["url"],
"description": a["description"],
"type": "article",
"displayType": "card",
"metadata": {"source": "neo4j_knowledge_graph", "priority": a["priority"]}
} for a in articles]
async def push_to_agent_assist(auth: CXoneAuthManager, interaction_id: str, content_items: List[Dict[str, Any]]) -> Dict[str, Any]:
token = await auth.get_token()
url = f"{auth.base_url}/api/v2/agentassist/interactions/{interaction_id}/content"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
payload = {"content": content_items}
max_retries = 3
for attempt in range(max_retries):
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(url, headers=headers, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2))
await asyncio.sleep(retry_after * random.uniform(0.5, 1.5))
continue
if response.status_code == 401:
await auth.get_token()
continue
response.raise_for_status()
return response.json()
raise httpx.HTTPStatusError("Persistent rate limit or authentication failure", request=None, response=response)
@app.post("/agent-assist/query")
async def handle_agent_query(req: QueryRequest):
try:
articles = await fetch_knowledge_articles(neo4j_driver, req.query)
if not articles:
return {"status": "no_results", "message": "No related articles found in knowledge graph"}
content_items = format_cxone_content(articles)
result = await push_to_agent_assist(cxone_auth, req.interaction_id, content_items)
return {"status": "success", "cxone_response": result}
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
raise HTTPException(status_code=403, detail="Missing agentassist:interactions:write scope")
elif e.response.status_code == 404:
raise HTTPException(status_code=404, detail="CXone interaction not found or inactive")
raise HTTPException(status_code=e.response.status_code, detail=str(e))
except Exception as e:
logger.error(f"Internal processing error: {str(e)}")
raise HTTPException(status_code=500, detail="Knowledge graph traversal failed")
Run the service with uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4. The application maintains a connection pool to Neo4j and caches the CXone token in memory. Each worker process maintains its own token cache, which aligns with standard deployment patterns for async Python services.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The access token expired during request processing or the client credentials are invalid.
- Fix: Verify that
client_idandclient_secretmatch a valid CXone OAuth client. Ensure the token refresh logic runs before every API call. The providedCXoneAuthManagerautomatically refreshes whentime.time() >= self.expiry. - Code showing the fix: The retry loop in
push_to_agent_assistcatches 401, callsawait auth.get_token(), and retries the request with the fresh token.
Error: 403 Forbidden
- Cause: The OAuth client lacks the
agentassist:interactions:writescope, or the token was issued with insufficient permissions. - Fix: Log into the CXone admin console, navigate to Developer > OAuth, and add
agentassist:interactions:writeto the client scope list. Re-authorize the client credentials. - Code showing the fix: The
QueryRequesthandler explicitly checks for 403 and raises a FastAPI HTTPException with a descriptive message. You can also log thescopeclaim from the token to verify authorization boundaries.
Error: 429 Too Many Requests
- Cause: CXone enforces rate limits on Agent Assist content injection, typically measured in requests per minute per OAuth client.
- Fix: Implement exponential backoff with
Retry-Afterheader parsing. The provided code readsRetry-After, multiplies it by a random jitter factor, and sleeps before retrying. You must also batch content pushes if your application processes multiple interactions simultaneously. - Code showing the fix: The
max_retriesloop inpush_to_agent_assisthandles 429 responses natively. You can adjustmax_retriesbased on your deployment scale.
Error: Neo4j ConstraintViolation or Missing Index
- Cause: The Cypher query references a
Querynode with akeywordproperty that lacks a unique constraint or index, causing full graph scans. - Fix: Create a uniqueness constraint on the
Querylabel:CREATE CONSTRAINT query_keyword_unique FOR (q:Query) REQUIRE q.keyword IS UNIQUE. This ensures O(1) lookup performance for incoming agent queries. - Code showing the fix: Run the constraint creation statement in Neo4j Browser before deploying the FastAPI service. The driver will raise
Neo4jServiceUnavailableorNeo4jTransientErrorif the database is unreachable, which you can catch withexcept neo4j.exceptions.ClientError.