Injecting Neo4j Knowledge Graph Results into NICE CXone Agent Assist with Python FastAPI

Injecting Neo4j Knowledge Graph Results into NICE CXone Agent Assist with Python FastAPI

What You Will Build

  • A FastAPI microservice that accepts agent search queries, executes Cypher traversals against a Neo4j knowledge graph, and pushes structured troubleshooting articles directly into the NICE CXone agent workspace.
  • This implementation uses the NICE CXone Agent Assist REST API (/api/v2/agentassist/interactions/{interactionId}/content) and the official Neo4j async Python driver.
  • The code is written in Python 3.10+ using FastAPI, httpx, Pydantic, and neo4j.

Prerequisites

  • OAuth client type: Confidential client (Client Credentials)
  • Required scopes: agentassist:interactions:write, agentassist:read
  • SDK/API version: CXone REST API v2, Neo4j Python Driver 5.x, FastAPI 0.104+
  • Language/runtime requirements: Python 3.10 or higher with async/await support
  • External dependencies: fastapi==0.104.1, uvicorn==0.24.0, httpx==0.25.2, pydantic==2.5.0, neo4j==5.13.0

Authentication Setup

NICE CXone uses OAuth 2.0 Client Credentials flow for server-to-server API access. The token endpoint resides at https://api.{region}.my.site.nice-incontact.com/oauth2/token. You must exchange your client credentials for an access token before calling the Agent Assist endpoints. The token expires after the duration specified in the expires_in field, typically 3600 seconds.

The following class handles token acquisition, in-memory caching, and automatic refresh when the cache expires. It uses httpx for non-blocking network calls, which aligns with FastAPI async routing.

import httpx
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class CXoneAuthManager:
    def __init__(self, base_url: str, client_id: str, client_secret: str, scopes: list[str]):
        self.base_url = base_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.token: Optional[str] = None
        self.expiry: float = 0.0

    async def get_token(self) -> str:
        if self.token and time.time() < self.expiry:
            return self.token

        logger.info("Fetching new CXone OAuth token")
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                f"{self.base_url}/oauth2/token",
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": " ".join(self.scopes)
                }
            )
            response.raise_for_status()
            data = response.json()
            self.token = data["access_token"]
            self.expiry = time.time() + data["expires_in"] - 60
            logger.info("OAuth token refreshed successfully")
            return self.token

The request cycle for token acquisition follows standard OAuth 2.0 specifications. The client sends a POST request with form-encoded data. The response contains the bearer token and expiration metadata.

Request:

POST /oauth2/token HTTP/1.1
Host: api.us-east-1.my.site.nice-incontact.com
Content-Type: application/x-www-form-urlencoded

grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=agentassist:interactions:write%20agentassist:read

Response:

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "Bearer",
  "expires_in": 3600,
  "scope": "agentassist:interactions:write agentassist:read"
}

Implementation

Step 1: Initialize OAuth and Database Connections

FastAPI applications benefit from dependency injection for stateful resources like database drivers and authentication managers. You initialize the Neo4j driver and CXone auth manager at startup to avoid repeated connection overhead during request processing.

from fastapi import FastAPI
from neo4j import AsyncGraphDatabase

app = FastAPI()

# Global state initialized on startup
neo4j_driver: Optional[AsyncGraphDatabase] = None
cxone_auth: Optional[CXoneAuthManager] = None

@app.on_event("startup")
async def startup_event():
    global neo4j_driver, cxone_auth
    neo4j_driver = AsyncGraphDatabase.driver(
        "bolt://neo4j.internal:7687",
        auth=("neo4j", "graph_password")
    )
    cxone_auth = CXoneAuthManager(
        base_url="https://api.us-east-1.my.site.nice-incontact.com",
        client_id="your_client_id",
        client_secret="your_client_secret",
        scopes=["agentassist:interactions:write", "agentassist:read"]
    )
    logger.info("FastAPI service initialized with Neo4j and CXone connections")

The Neo4j async driver establishes a connection pool to the Bolt protocol endpoint. FastAPI manages the lifecycle of the application, so you register the driver in the startup event. You must verify that the Neo4j instance allows inbound Bolt connections from your FastAPI host.

Step 2: Execute Graph Traversal and Format Payload

When an agent submits a query, the service must traverse the knowledge graph to find relevant troubleshooting articles. The Cypher query uses variable-length relationships to capture direct matches and second-degree connections, which improves recall for ambiguous agent searches. You limit results to five items to prevent workspace clutter and respect CXone content size constraints.

from pydantic import BaseModel
from typing import List, Dict, Any

class QueryRequest(BaseModel):
    interaction_id: str
    query: str
    agent_id: str

async def fetch_knowledge_articles(driver: AsyncGraphDatabase, search_query: str) -> List[Dict[str, Any]]:
    cypher = """
    MATCH (q:Query {keyword: $query})-[:RELATED_TO|SIMILAR_TO|RESOLVES*1..2]->(a:Article)
    WHERE a.status = 'published'
    RETURN a.title AS title, a.url AS url, a.summary AS description, a.priority AS priority
    ORDER BY a.priority DESC
    LIMIT 5
    """
    async with driver.session() as session:
        result = await session.run(cypher, query=search_query)
        records = await result.data()
        return records

The Cypher pattern [:RELATED_TO|SIMILAR_TO|RESOLVES*1..2] traverses up to two hops away from the query node. The WHERE a.status = 'published' clause filters out draft or deprecated articles. The ORDER BY a.priority DESC ensures high-impact troubleshooting steps appear first in the agent workspace.

You format the Neo4j records into the exact JSON schema expected by CXone Agent Assist. The CXone endpoint requires a content array where each object contains title, url, description, and type. You map the graph properties directly to these fields.

def format_cxone_content(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    content_items = []
    for article in articles:
        content_items.append({
            "title": article["title"],
            "url": article["url"],
            "description": article["description"],
            "type": "article",
            "displayType": "card",
            "metadata": {
                "source": "neo4j_knowledge_graph",
                "priority": article["priority"],
                "lastUpdated": "2024-01-15T08:30:00Z"
            }
        })
    return content_items

The displayType: "card" parameter instructs the CXone workspace to render the content as an expandable card rather than a plain text block. The metadata object is ignored by the CXone frontend but provides auditability for your backend systems.

Step 3: Inject Content into CXone Agent Workspace

The final step pushes the formatted payload to the CXone Agent Assist API. You must handle rate limiting (HTTP 429) because CXone enforces strict quotas on Agent Assist content pushes. The retry logic reads the Retry-After header and implements exponential backoff with jitter.

import asyncio
import random

async def push_to_agent_assist(auth: CXoneAuthManager, interaction_id: str, content_items: List[Dict[str, Any]]) -> Dict[str, Any]:
    token = await auth.get_token()
    url = f"{auth.base_url}/api/v2/agentassist/interactions/{interaction_id}/content"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    payload = {"content": content_items}

    max_retries = 3
    for attempt in range(max_retries):
        async with httpx.AsyncClient(timeout=15.0) as client:
            response = await client.post(url, headers=headers, json=payload)
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 2))
                jitter = random.uniform(0.5, 1.5)
                wait_time = retry_after * jitter
                logger.warning(f"Rate limited (429). Waiting {wait_time:.2f}s before retry {attempt + 1}")
                await asyncio.sleep(wait_time)
                continue
                
            if response.status_code == 401:
                logger.warning("Token expired during request. Refreshing and retrying once.")
                await auth.get_token()
                continue
                
            response.raise_for_status()
            return response.json()
            
    raise httpx.HTTPStatusError("Persistent rate limit or authentication failure", request=None, response=response)

The HTTP cycle for content injection follows this pattern:

Request:

POST /api/v2/agentassist/interactions/INT-987654321/content HTTP/1.1
Host: api.us-east-1.my.site.nice-incontact.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
Content-Type: application/json
Accept: application/json

{
  "content": [
    {
      "title": "Resetting VoIP Gateway Credentials",
      "url": "https://kb.internal/article/vp-1024",
      "description": "Step-by-step guide to clearing cached credentials and re-provisioning SIP trunks.",
      "type": "article",
      "displayType": "card",
      "metadata": {
        "source": "neo4j_knowledge_graph",
        "priority": 9,
        "lastUpdated": "2024-01-15T08:30:00Z"
      }
    }
  ]
}

Response:

{
  "interactionId": "INT-987654321",
  "contentId": "CONT-44556677",
  "status": "delivered",
  "timestamp": "2024-01-20T14:22:10Z"
}

The interactionId must match an active CXone interaction. If the interaction has ended or does not exist, CXone returns HTTP 404. You must validate the interaction state before pushing content in production environments.

Complete Working Example

The following script combines all components into a single deployable FastAPI application. You only need to replace the configuration constants with your environment values.

import httpx
import time
import asyncio
import random
import logging
from typing import Optional, List, Dict, Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from neo4j import AsyncGraphDatabase

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CXoneAuthManager:
    def __init__(self, base_url: str, client_id: str, client_secret: str, scopes: list[str]):
        self.base_url = base_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.token: Optional[str] = None
        self.expiry: float = 0.0

    async def get_token(self) -> str:
        if self.token and time.time() < self.expiry:
            return self.token
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                f"{self.base_url}/oauth2/token",
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": " ".join(self.scopes)
                }
            )
            response.raise_for_status()
            data = response.json()
            self.token = data["access_token"]
            self.expiry = time.time() + data["expires_in"] - 60
            return self.token

class QueryRequest(BaseModel):
    interaction_id: str
    query: str
    agent_id: str

app = FastAPI()
neo4j_driver: Optional[AsyncGraphDatabase] = None
cxone_auth: Optional[CXoneAuthManager] = None

@app.on_event("startup")
async def startup_event():
    global neo4j_driver, cxone_auth
    neo4j_driver = AsyncGraphDatabase.driver("bolt://neo4j.internal:7687", auth=("neo4j", "graph_password"))
    cxone_auth = CXoneAuthManager(
        base_url="https://api.us-east-1.my.site.nice-incontact.com",
        client_id="your_client_id",
        client_secret="your_client_secret",
        scopes=["agentassist:interactions:write", "agentassist:read"]
    )

async def fetch_knowledge_articles(driver: AsyncGraphDatabase, search_query: str) -> List[Dict[str, Any]]:
    cypher = """
    MATCH (q:Query {keyword: $query})-[:RELATED_TO|SIMILAR_TO|RESOLVES*1..2]->(a:Article)
    WHERE a.status = 'published'
    RETURN a.title AS title, a.url AS url, a.summary AS description, a.priority AS priority
    ORDER BY a.priority DESC
    LIMIT 5
    """
    async with driver.session() as session:
        result = await session.run(cypher, query=search_query)
        return await result.data()

def format_cxone_content(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    return [{
        "title": a["title"],
        "url": a["url"],
        "description": a["description"],
        "type": "article",
        "displayType": "card",
        "metadata": {"source": "neo4j_knowledge_graph", "priority": a["priority"]}
    } for a in articles]

async def push_to_agent_assist(auth: CXoneAuthManager, interaction_id: str, content_items: List[Dict[str, Any]]) -> Dict[str, Any]:
    token = await auth.get_token()
    url = f"{auth.base_url}/api/v2/agentassist/interactions/{interaction_id}/content"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
    payload = {"content": content_items}
    
    max_retries = 3
    for attempt in range(max_retries):
        async with httpx.AsyncClient(timeout=15.0) as client:
            response = await client.post(url, headers=headers, json=payload)
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 2))
                await asyncio.sleep(retry_after * random.uniform(0.5, 1.5))
                continue
            if response.status_code == 401:
                await auth.get_token()
                continue
            response.raise_for_status()
            return response.json()
    raise httpx.HTTPStatusError("Persistent rate limit or authentication failure", request=None, response=response)

@app.post("/agent-assist/query")
async def handle_agent_query(req: QueryRequest):
    try:
        articles = await fetch_knowledge_articles(neo4j_driver, req.query)
        if not articles:
            return {"status": "no_results", "message": "No related articles found in knowledge graph"}
        
        content_items = format_cxone_content(articles)
        result = await push_to_agent_assist(cxone_auth, req.interaction_id, content_items)
        return {"status": "success", "cxone_response": result}
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 403:
            raise HTTPException(status_code=403, detail="Missing agentassist:interactions:write scope")
        elif e.response.status_code == 404:
            raise HTTPException(status_code=404, detail="CXone interaction not found or inactive")
        raise HTTPException(status_code=e.response.status_code, detail=str(e))
    except Exception as e:
        logger.error(f"Internal processing error: {str(e)}")
        raise HTTPException(status_code=500, detail="Knowledge graph traversal failed")

Run the service with uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4. The application maintains a connection pool to Neo4j and caches the CXone token in memory. Each worker process maintains its own token cache, which aligns with standard deployment patterns for async Python services.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The access token expired during request processing or the client credentials are invalid.
  • Fix: Verify that client_id and client_secret match a valid CXone OAuth client. Ensure the token refresh logic runs before every API call. The provided CXoneAuthManager automatically refreshes when time.time() >= self.expiry.
  • Code showing the fix: The retry loop in push_to_agent_assist catches 401, calls await auth.get_token(), and retries the request with the fresh token.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the agentassist:interactions:write scope, or the token was issued with insufficient permissions.
  • Fix: Log into the CXone admin console, navigate to Developer > OAuth, and add agentassist:interactions:write to the client scope list. Re-authorize the client credentials.
  • Code showing the fix: The QueryRequest handler explicitly checks for 403 and raises a FastAPI HTTPException with a descriptive message. You can also log the scope claim from the token to verify authorization boundaries.

Error: 429 Too Many Requests

  • Cause: CXone enforces rate limits on Agent Assist content injection, typically measured in requests per minute per OAuth client.
  • Fix: Implement exponential backoff with Retry-After header parsing. The provided code reads Retry-After, multiplies it by a random jitter factor, and sleeps before retrying. You must also batch content pushes if your application processes multiple interactions simultaneously.
  • Code showing the fix: The max_retries loop in push_to_agent_assist handles 429 responses natively. You can adjust max_retries based on your deployment scale.

Error: Neo4j ConstraintViolation or Missing Index

  • Cause: The Cypher query references a Query node with a keyword property that lacks a unique constraint or index, causing full graph scans.
  • Fix: Create a uniqueness constraint on the Query label: CREATE CONSTRAINT query_keyword_unique FOR (q:Query) REQUIRE q.keyword IS UNIQUE. This ensures O(1) lookup performance for incoming agent queries.
  • Code showing the fix: Run the constraint creation statement in Neo4j Browser before deploying the FastAPI service. The driver will raise Neo4jServiceUnavailable or Neo4jTransientError if the database is unreachable, which you can catch with except neo4j.exceptions.ClientError.

Official References