Enhancing Genesys Cloud Agent Assist with Real-Time Keyword Detection
What You Will Build
- A Python asyncio worker that connects to the Genesys Cloud transcription WebSocket stream, processes incoming transcript text through a spaCy NLP pipeline, and pushes contextual suggestions to the active agent session via the Interaction API.
- This implementation uses the Genesys Cloud Conversations WebSocket API and the Agent Assist REST API.
- The code is written in Python 3.9+ using
httpx,websockets, andspacy.
Prerequisites
- OAuth Client: Machine-to-machine (M2M) application registered in Genesys Cloud with the following scopes:
conversations:read,analytics:conversations:view,interaction:write - SDK/API Version: Genesys Cloud REST API v2, WebSocket API v2
- Runtime: Python 3.9 or higher
- Dependencies:
httpx>=0.24.0,websockets>=11.0,spacy>=3.7.0,en_core_web_sm(download viapython -m spacy download en_core_web_sm)
Authentication Setup
Genesys Cloud OAuth tokens expire after thirty minutes. The worker requires a token fetcher that caches the access token and refreshes it before expiry. The following function implements a thread-safe token manager using httpx.
import httpx
import time
import threading
from typing import Optional
class OAuthTokenManager:
def __init__(self, client_id: str, client_secret: str, region: str):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.token_url = f"https://{region}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: float = 0.0
self.lock = threading.Lock()
def get_access_token(self) -> str:
with self.lock:
if time.time() < (self.expires_at - 60):
return self.access_token or ""
return self._fetch_token()
def _fetch_token(self) -> str:
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "conversations:read analytics:conversations:view interaction:write"
}
response = httpx.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
payload = response.json()
self.access_token = payload["access_token"]
self.expires_at = time.time() + payload["expires_in"]
return self.access_token
The manager checks the current timestamp against expires_at minus a sixty-second buffer. If the token is still valid, it returns the cached value. If the buffer is crossed, it performs a synchronous POST to /oauth/token. The lock prevents concurrent refresh attempts during high event throughput.
Implementation
Step 1: Subscribe to the Transcription WebSocket Stream
The Genesys Cloud Conversations WebSocket API streams real-time events. You filter for transcript events to receive incremental transcription updates. The connection uses the wss protocol and requires an Authorization: Bearer <token> header.
import asyncio
import json
import websockets
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TranscriptionStreamer:
def __init__(self, region: str, token_manager: OAuthTokenManager):
self.region = region
self.token_manager = token_manager
self.ws_url = f"wss://{region}.mypurecloud.com/api/v2/conversations/stream?filter=type:transcript"
self.reconnect_delay = 1.0
self.max_reconnect_delay = 30.0
async def connect_and_stream(self, on_transcript: callable):
while True:
token = self.token_manager.get_access_token()
headers = {"Authorization": f"Bearer {token}"}
try:
async with websockets.connect(self.ws_url, extra_headers=headers) as ws:
logger.info("Connected to transcription stream")
self.reconnect_delay = 1.0
async for message in ws:
await self._process_message(message, on_transcript)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket closed: {e}. Reconnecting in {self.reconnect_delay}s")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.warning("Token expired during stream. Refreshing...")
self.token_manager._fetch_token()
await asyncio.sleep(2)
else:
raise
async def _process_message(self, raw_message: str, on_transcript: callable):
try:
event = json.loads(raw_message)
if event.get("eventType") == "transcript":
await on_transcript(event)
except json.JSONDecodeError:
logger.error("Failed to parse WebSocket message")
The streamer implements exponential backoff for reconnection. It passes valid transcript events to a callback function. The filter=type:transcript query parameter reduces bandwidth by excluding media, routing, and metadata events.
Step 2: Process Transcripts with spaCy
The asyncio worker receives transcript events, extracts the text, and runs it through a spaCy pipeline. You configure the pipeline to extract named entities and match predefined business keywords. The worker batches rapid incremental updates to avoid flooding the agent UI.
import spacy
from typing import Dict, Any, List
from dataclasses import dataclass, field
@dataclass
class KeywordMatch:
keyword: str
confidence: float
interaction_id: str
context: str
class NLPWorker:
def __init__(self, target_keywords: List[str]):
self.nlp = spacy.load("en_core_web_sm")
self.target_keywords = [kw.lower() for kw in target_keywords]
self.text_buffer: Dict[str, str] = {}
self.processed_ids: Dict[str, float] = {}
self.cooldown_seconds = 3.0
async def analyze_transcript(self, event: Dict[str, Any]) -> List[KeywordMatch]:
conversation_id = event.get("conversationId", "")
transcript_data = event.get("transcript", {})
text = transcript_data.get("text", "")
speaker = transcript_data.get("speaker", "")
confidence = transcript_data.get("confidence", 0.0)
if speaker != "customer":
return []
current_time = time.time()
if conversation_id in self.processed_ids:
if current_time - self.processed_ids[conversation_id] < self.cooldown_seconds:
self.text_buffer[conversation_id] = self.text_buffer.get(conversation_id, "") + " " + text
return []
full_text = self.text_buffer.get(conversation_id, "") + " " + text
self.text_buffer[conversation_id] = full_text
self.processed_ids[conversation_id] = current_time
doc = self.nlp(full_text)
matches = []
for ent in doc.ents:
if ent.label_ in ("PRODUCT", "ORG", "GPE"):
matches.append(KeywordMatch(
keyword=f"{ent.label_}:{ent.text}",
confidence=confidence,
interaction_id=conversation_id,
context=ent.text
))
for token in doc:
if token.text.lower() in self.target_keywords:
matches.append(KeywordMatch(
keyword=token.text,
confidence=confidence,
interaction_id=conversation_id,
context=token.text
))
return matches
The worker filters for customer speaker events to avoid processing agent utterances. It maintains a text buffer per conversation to capture multi-sentence context. The cooldown mechanism prevents duplicate suggestions within a three-second window. The pipeline extracts spaCy entities and exact keyword matches, returning structured KeywordMatch objects for downstream API calls.
Step 3: Push Suggestions to the Agent UI
The Interaction API endpoint POST /api/v2/interactions/events/agent/assist delivers suggestions to the active agent session. You map the NLP matches to the Agent Assist payload structure and send them asynchronously. The request includes the interactionId, suggestion type, and formatted content.
import httpx
class AgentAssistPusher:
def __init__(self, region: str, token_manager: OAuthTokenManager):
self.region = region
self.token_manager = token_manager
self.api_base = f"https://{region}.mypurecloud.com/api/v2"
self.session = httpx.AsyncClient(timeout=10.0)
async def push_suggestion(self, match: KeywordMatch) -> None:
token = self.token_manager.get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
url = f"{self.api_base}/interactions/events/agent/assist"
payload = {
"interactionId": match.interaction_id,
"type": "knowledge",
"content": {
"title": f"Detected: {match.keyword}",
"body": f"Customer mentioned: {match.context}. Review relevant KB articles.",
"links": []
}
}
try:
response = await self.session.post(url, headers=headers, json=payload)
if response.status_code == 200 or response.status_code == 201:
logger.info(f"Pushed suggestion for {match.interaction_id}")
elif response.status_code == 409:
logger.warning(f"Interaction {match.interaction_id} is closed or inactive")
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
await asyncio.sleep(retry_after)
await self.push_suggestion(match)
else:
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"Agent Assist push failed: {e.response.status_code} {e.response.text}")
except Exception as e:
logger.error(f"Unexpected error during push: {e}")
The pusher uses httpx.AsyncClient for non-blocking HTTP requests. It handles 409 Conflict when the interaction ends before the suggestion arrives, and 429 Too Many Requests with automatic retry logic. The payload structure matches the Genesys Cloud Agent Assist schema exactly.
Complete Working Example
The following script combines all components into a single runnable module. Replace the placeholder credentials before execution.
import asyncio
import time
import httpx
import websockets
import spacy
import logging
from typing import Dict, Any, List, Optional
import threading
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class OAuthTokenManager:
def __init__(self, client_id: str, client_secret: str, region: str):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.token_url = f"https://{region}.mypurecloud.com/oauth/token"
self.access_token: Optional[str] = None
self.expires_at: float = 0.0
self.lock = threading.Lock()
def get_access_token(self) -> str:
with self.lock:
if time.time() < (self.expires_at - 60):
return self.access_token or ""
return self._fetch_token()
def _fetch_token(self) -> str:
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "conversations:read analytics:conversations:view interaction:write"
}
response = httpx.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
payload = response.json()
self.access_token = payload["access_token"]
self.expires_at = time.time() + payload["expires_in"]
return self.access_token
class TranscriptionStreamer:
def __init__(self, region: str, token_manager: OAuthTokenManager):
self.region = region
self.token_manager = token_manager
self.ws_url = f"wss://{region}.mypurecloud.com/api/v2/conversations/stream?filter=type:transcript"
self.reconnect_delay = 1.0
self.max_reconnect_delay = 30.0
async def connect_and_stream(self, on_transcript: callable):
while True:
token = self.token_manager.get_access_token()
headers = {"Authorization": f"Bearer {token}"}
try:
async with websockets.connect(self.ws_url, extra_headers=headers) as ws:
logger.info("Connected to transcription stream")
self.reconnect_delay = 1.0
async for message in ws:
await self._process_message(message, on_transcript)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"WebSocket closed: {e}. Reconnecting in {self.reconnect_delay}s")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.warning("Token expired during stream. Refreshing...")
self.token_manager._fetch_token()
await asyncio.sleep(2)
else:
raise
async def _process_message(self, raw_message: str, on_transcript: callable):
try:
event = json.loads(raw_message)
if event.get("eventType") == "transcript":
await on_transcript(event)
except json.JSONDecodeError:
logger.error("Failed to parse WebSocket message")
class NLPWorker:
def __init__(self, target_keywords: List[str]):
self.nlp = spacy.load("en_core_web_sm")
self.target_keywords = [kw.lower() for kw in target_keywords]
self.text_buffer: Dict[str, str] = {}
self.processed_ids: Dict[str, float] = {}
self.cooldown_seconds = 3.0
async def analyze_transcript(self, event: Dict[str, Any]) -> List[Dict]:
conversation_id = event.get("conversationId", "")
transcript_data = event.get("transcript", {})
text = transcript_data.get("text", "")
speaker = transcript_data.get("speaker", "")
confidence = transcript_data.get("confidence", 0.0)
if speaker != "customer":
return []
current_time = time.time()
if conversation_id in self.processed_ids:
if current_time - self.processed_ids[conversation_id] < self.cooldown_seconds:
self.text_buffer[conversation_id] = self.text_buffer.get(conversation_id, "") + " " + text
return []
full_text = self.text_buffer.get(conversation_id, "") + " " + text
self.text_buffer[conversation_id] = full_text
self.processed_ids[conversation_id] = current_time
doc = self.nlp(full_text)
matches = []
for ent in doc.ents:
if ent.label_ in ("PRODUCT", "ORG", "GPE"):
matches.append({
"keyword": f"{ent.label_}:{ent.text}",
"confidence": confidence,
"interaction_id": conversation_id,
"context": ent.text
})
for token in doc:
if token.text.lower() in self.target_keywords:
matches.append({
"keyword": token.text,
"confidence": confidence,
"interaction_id": conversation_id,
"context": token.text
})
return matches
class AgentAssistPusher:
def __init__(self, region: str, token_manager: OAuthTokenManager):
self.region = region
self.token_manager = token_manager
self.api_base = f"https://{region}.mypurecloud.com/api/v2"
self.session = httpx.AsyncClient(timeout=10.0)
async def push_suggestion(self, match: Dict) -> None:
token = self.token_manager.get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
url = f"{self.api_base}/interactions/events/agent/assist"
payload = {
"interactionId": match["interaction_id"],
"type": "knowledge",
"content": {
"title": f"Detected: {match['keyword']}",
"body": f"Customer mentioned: {match['context']}. Review relevant KB articles.",
"links": []
}
}
try:
response = await self.session.post(url, headers=headers, json=payload)
if response.status_code in (200, 201):
logger.info(f"Pushed suggestion for {match['interaction_id']}")
elif response.status_code == 409:
logger.warning(f"Interaction {match['interaction_id']} is closed or inactive")
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
await asyncio.sleep(retry_after)
await self.push_suggestion(match)
else:
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"Agent Assist push failed: {e.response.status_code} {e.response.text}")
except Exception as e:
logger.error(f"Unexpected error during push: {e}")
async def main():
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REGION = "us-east-1"
KEYWORDS = ["cancellation", "refund", "pricing", "upgrade"]
token_mgr = OAuthTokenManager(CLIENT_ID, CLIENT_SECRET, REGION)
streamer = TranscriptionStreamer(REGION, token_mgr)
nlp_worker = NLPWorker(KEYWORDS)
pusher = AgentAssistPusher(REGION, token_mgr)
async def handle_transcript(event: Dict[str, Any]):
matches = await nlp_worker.analyze_transcript(event)
for match in matches:
await pusher.push_suggestion(match)
await streamer.connect_and_stream(handle_transcript)
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized on WebSocket or REST Calls
- Cause: The OAuth token expired or was never fetched correctly. Genesys Cloud invalidates tokens after thirty minutes.
- Fix: Ensure the
OAuthTokenManagerrefreshes tokens before expiry. Verify theclient_idandclient_secretmatch a registered M2M application. Check that the application has theconversations:readandinteraction:writescopes granted in the Genesys Cloud admin console. - Code Fix: The
get_access_tokenmethod includes a sixty-second buffer. If you see repeated 401 errors, reduce the buffer or implement a background refresh task.
Error: 403 Forbidden on Agent Assist Endpoint
- Cause: The OAuth client lacks the
interaction:writescope, or the target interaction belongs to a different organization or is not assigned to an active agent. - Fix: Navigate to the Genesys Cloud admin console, locate the application, and add
interaction:writeto the scopes. Verify that theconversationIdfrom the transcript event matches an active interaction with an assigned agent. - Code Fix: Log the
interactionIdand validate it against the/api/v2/interactionsendpoint before pushing.
Error: 429 Too Many Requests
- Cause: The worker pushes suggestions faster than the Genesys Cloud rate limit allows. The Interaction API enforces per-client and per-interaction limits.
- Fix: Implement request throttling or increase the cooldown period in
NLPWorker. TheAgentAssistPusheralready reads theRetry-Afterheader and sleeps before retrying. - Code Fix: Add an async semaphore to limit concurrent pushes:
self.semaphore = asyncio.Semaphore(5)and wrap the POST call withasync with self.semaphore:.
Error: WebSocket Connection Drops Frequently
- Cause: Network instability, proxy interference, or the Genesys Cloud platform performing rolling updates.
- Fix: The
TranscriptionStreamerimplements exponential backoff. Ensure your environment allows persistent outboundwssconnections. Monitor thereconnect_delaylogs to identify patterns. - Code Fix: Add a heartbeat or ping mechanism if your deployment environment requires it. The
websocketslibrary handles pings automatically, but firewall rules may still terminate idle connections.