Implementing a Voice Command Listener for Genesys Cloud Interactions
What You Will Build
- A Python service that connects to a live Genesys Cloud voice conversation via the Real-Time WebSocket API, extracts PCM audio frames, filters non-speech segments using Voice Activity Detection, and triggers programmatic actions when a specific keyword phrase is detected.
- This implementation uses the Genesys Cloud Real-Time Conversations WebSocket endpoint and the
purecloudplatformclientv2authentication SDK. - The tutorial covers Python 3.9+ with asynchronous WebSocket handling,
webrtcvadfor speech detection, andvoskfor offline keyword spotting.
Prerequisites
- OAuth Client: JWT or Client Credentials flow configured in the Genesys Cloud admin console.
- Required Scopes:
conversation:view(grants real-time audio stream access). Some tenants requireinteraction:readinstead. - Runtime: Python 3.9 or higher.
- Dependencies:
purecloudplatformclientv2,websockets,webrtcvad,vosk,numpy. - Model File: Download the Vosk small English model (
vosk-model-small-en-us-0.15.zip) and extract it to a local directory. - Environment Variables:
GENESYS_CLIENT_ID,GENESYS_CLIENT_SECRET,GENESYS_REGION(optional, defaults tomypurecloud.com),VOSK_MODEL_PATH.
Authentication Setup
The Real-Time WebSocket API requires a valid OAuth 2.0 bearer token. The token must be passed in the initial WebSocket connect message. The following code uses the official Genesys Cloud Python SDK to retrieve a token using the Client Credentials flow. Token caching is recommended in production to avoid unnecessary authentication requests and to stay within rate limits.
import os
import logging
from purecloudplatformclientv2 import PlatformClient, ClientCredentialsLoginRequest
logger = logging.getLogger(__name__)
def get_access_token() -> str:
"""
Retrieves a valid OAuth 2.0 bearer token using Client Credentials.
Requires GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET in environment.
"""
client = PlatformClient()
login_request = ClientCredentialsLoginRequest(
client_id=os.environ["GENESYS_CLIENT_ID"],
client_secret=os.environ["GENESYS_CLIENT_SECRET"]
)
try:
response = client.auth_client.login(login_request)
logger.info("Successfully authenticated with Genesys Cloud.")
return response.access_token
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
The SDK handles the HTTP POST to /api/v2/oauth/token internally. The response contains access_token, expires_in, and token_type. Store the token and its expiration timestamp in memory or Redis. Refresh the token when expires_in approaches zero. A 401 Unauthorized response from the WebSocket server indicates an expired or invalid token.
Implementation
Step 1: WebSocket Connection and Subscription
The Genesys Cloud Real-Time API uses a single WebSocket endpoint for all media types. You establish a connection, send a connect payload with your token, and then send a subscribe payload specifying the conversation ID and audio format. The server responds with JSON messages over the same socket.
import asyncio
import json
import websockets
from typing import Callable
async def establish_realtime_connection(token: str, conversation_id: str) -> websockets.WebSocketClientProtocol:
"""
Connects to the Genesys Cloud Real-Time API and subscribes to voice audio.
Returns the connected WebSocket object.
"""
base_domain = os.environ.get("GENESYS_REGION", "mypurecloud.com")
uri = f"wss://api.{base_domain}/api/v2/interactions/realtime"
async with websockets.connect(uri) as ws:
# Send connect message
connect_payload = {
"type": "connect",
"apiVersion": "v2",
"token": token
}
await ws.send(json.dumps(connect_payload))
# Parse connection response
response_json = json.loads(await ws.recv())
if response_json.get("type") != "connected":
raise RuntimeError(f"WebSocket connection failed: {response_json}")
# Subscribe to voice audio stream
subscribe_payload = {
"type": "subscribe",
"id": "voice-stream-1",
"request": {
"type": "conversation",
"conversationId": conversation_id,
"mediaType": "voice",
"audio": {
"format": "pcm",
"sampleRate": 8000
}
}
}
await ws.send(json.dumps(subscribe_payload))
logger.info(f"Subscribed to audio stream for conversation {conversation_id}")
return ws
The subscribe request explicitly requests pcm format at 8000 Hz. Genesys Cloud delivers audio in base64-encoded chunks. Each chunk typically contains 20 milliseconds of audio. You must decode and process these chunks in real time.
Step 2: Audio Frame Extraction and Voice Activity Detection
Raw audio streams contain long periods of silence. Processing every byte through a speech recognition engine wastes CPU cycles and increases latency. Voice Activity Detection (VAD) filters out non-speech segments. The webrtcvad library operates on fixed-size chunks. At 8000 Hz with 16-bit mono PCM, a 20 millisecond chunk equals 320 bytes.
import base64
import webrtcvad
class AudioProcessor:
def __init__(self):
self.vad = webrtcvad.Vad(mode=3) # Mode 3: aggressive filtering
self.buffer = bytearray()
self.chunk_size = 320 # 20ms @ 8kHz, 16-bit
def ingest_frame(self, base64_audio: str) -> bool:
"""
Decodes a base64 audio frame, runs VAD, and returns True if speech is detected.
"""
raw_bytes = base64.b64decode(base64_audio)
self.buffer.extend(raw_bytes)
speech_detected = False
# Process complete 20ms chunks
while len(self.buffer) >= self.chunk_size:
chunk = bytes(self.buffer[:self.chunk_size])
self.buffer = self.buffer[self.chunk_size:]
try:
if self.vad.is_speech(chunk, 8000):
speech_detected = True
except webrtcvad.Error:
# VAD may reject malformed chunks; skip and continue
continue
return speech_detected
The ingest_frame method accumulates incoming base64 data, slices it into valid 320-byte chunks, and runs VAD. If any chunk within the frame contains speech, the method returns True. You will use this flag to route audio to the keyword spotting engine only when a human is speaking.
Step 3: Keyword Spotting and Command Routing
Keyword spotting requires a streaming recognizer that accepts audio incrementally and returns results when silence is detected. vosk provides a KaldiRecognizer that handles this pattern efficiently. You feed speech chunks to AcceptWaveform. When VAD indicates silence, you call Result to retrieve the transcription. If the transcription matches your target phrase, you trigger the command handler.
import vosk
import json
from typing import Optional
class KeywordSpotter:
def __init__(self, model_path: str, target_phrase: str):
self.model = vosk.Model(model_path)
self.recognizer = vosk.KaldiRecognizer(self.model, 8000)
self.target_phrase = target_phrase.lower()
def accept_audio(self, chunk: bytes) -> Optional[str]:
"""
Passes audio chunk to the recognizer. Returns transcription if silence is detected.
"""
self.recognizer.AcceptWaveform(chunk)
result = self.recognizer.Result()
if result:
self.recognizer.Finalize()
return result
return None
def check_match(self, transcription_json: str) -> bool:
"""
Parses transcription JSON and checks for the target phrase.
"""
try:
data = json.loads(transcription_json)
text = data.get("text", "").lower()
return self.target_phrase in text
except json.JSONDecodeError:
return False
The recognizer maintains internal state across chunks. When Result returns a non-empty string, the engine has detected a pause long enough to consider the utterance complete. You parse the JSON, extract the text field, and perform a case-insensitive substring match. Replace substring matching with intent classification or exact phrase matching based on your business logic.
Complete Working Example
The following script integrates authentication, WebSocket subscription, VAD filtering, and keyword spotting into a single async service. It includes connection retry logic for 429 rate limits and transient network failures.
import asyncio
import os
import logging
import json
import base64
import websockets
import webrtcvad
import vosk
from purecloudplatformclientv2 import PlatformClient, ClientCredentialsLoginRequest
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class GenesysVoiceCommandListener:
def __init__(self, conversation_id: str, target_keyword: str):
self.conversation_id = conversation_id
self.target_keyword = target_keyword
self.model_path = os.environ["VOSK_MODEL_PATH"]
# Initialize components
self.vad = webrtcvad.Vad(mode=3)
self.model = vosk.Model(self.model_path)
self.recognizer = vosk.KaldiRecognizer(self.model, 8000)
self.buffer = bytearray()
self.chunk_size = 320 # 20ms @ 8kHz 16-bit
def get_token(self) -> str:
client = PlatformClient()
login_req = ClientCredentialsLoginRequest(
client_id=os.environ["GENESYS_CLIENT_ID"],
client_secret=os.environ["GENESYS_CLIENT_SECRET"]
)
return client.auth_client.login(login_req).access_token
async def connect_with_retry(self) -> websockets.WebSocketClientProtocol:
max_retries = 5
base_delay = 2
for attempt in range(1, max_retries + 1):
try:
token = self.get_token()
base_domain = os.environ.get("GENESYS_REGION", "mypurecloud.com")
uri = f"wss://api.{base_domain}/api/v2/interactions/realtime"
async with websockets.connect(uri) as ws:
connect_msg = {"type": "connect", "apiVersion": "v2", "token": token}
await ws.send(json.dumps(connect_msg))
resp = json.loads(await ws.recv())
if resp.get("type") != "connected":
raise RuntimeError(f"Connection rejected: {resp}")
subscribe_msg = {
"type": "subscribe",
"id": "voice-sub-1",
"request": {
"type": "conversation",
"conversationId": self.conversation_id,
"mediaType": "voice",
"audio": {"format": "pcm", "sampleRate": 8000}
}
}
await ws.send(json.dumps(subscribe_msg))
logger.info("Successfully subscribed to conversation audio.")
return ws
except websockets.exceptions.InvalidStatusCode as e:
if e.response.status_code == 429:
delay = base_delay * (2 ** (attempt - 1))
logger.warning(f"Rate limited (429). Retrying in {delay}s (attempt {attempt})")
await asyncio.sleep(delay)
else:
logger.error(f"WebSocket status error: {e}")
raise
except Exception as e:
logger.error(f"Connection attempt {attempt} failed: {e}")
if attempt == max_retries:
raise
await asyncio.sleep(base_delay)
def process_audio(self, raw_chunk: bytes) -> None:
self.buffer.extend(raw_chunk)
while len(self.buffer) >= self.chunk_size:
chunk = bytes(self.buffer[:self.chunk_size])
self.buffer = self.buffer[self.chunk_size:]
try:
if self.vad.is_speech(chunk, 8000):
self.recognizer.AcceptWaveform(chunk)
else:
result = self.recognizer.Result()
if result:
self.handle_result(result)
self.recognizer.Finalize()
except webrtcvad.Error:
continue
def handle_result(self, result_json: str) -> None:
try:
data = json.loads(result_json)
text = data.get("text", "").lower()
if self.target_keyword in text:
logger.info(f"Keyword detected: '{text}'. Executing command.")
self.execute_command(text)
except json.JSONDecodeError:
pass
def execute_command(self, transcribed_text: str) -> None:
# Replace with actual API calls, database updates, or workflow triggers
logger.info(f"COMMAND TRIGGERED: {transcribed_text}")
# Example: POST to internal webhook, update CRM record, transfer call
async def run(self) -> None:
ws = await self.connect_with_retry()
try:
async for message in ws:
payload = json.loads(message)
if payload.get("type") == "audio":
audio_b64 = payload.get("audio", "")
if audio_b64:
raw = base64.b64decode(audio_b64)
self.process_audio(raw)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"Connection closed: {e.code} {e.reason}")
except Exception as e:
logger.error(f"Stream processing error: {e}")
if __name__ == "__main__":
CONV_ID = os.environ["GENESYS_CONVERSATION_ID"]
KEYWORD = os.environ.get("TARGET_KEYWORD", "transfer me")
listener = GenesysVoiceCommandListener(CONV_ID, KEYWORD)
asyncio.run(listener.run())
Common Errors and Debugging
Error: 401 Unauthorized on WebSocket Connect
- Cause: The bearer token is expired, malformed, or lacks the
conversation:viewscope. - Fix: Verify the OAuth client credentials in the Genesys admin console. Ensure the token was generated within the last hour. Re-authenticate and resend the
connectmessage. - Code Fix: Wrap token retrieval in a try-except block and refresh automatically when
expires_infalls below 300 seconds.
Error: 403 Forbidden on Subscribe
- Cause: The authenticated user does not have visibility into the specified conversation ID, or the conversation has ended.
- Fix: Validate the conversation ID against the
/api/v2/conversations/voice/{id}endpoint. Ensure the OAuth client belongs to a user with queue/routing permissions that grant conversation visibility. - Code Fix: Add a pre-flight HTTP GET to verify conversation status before subscribing.
Error: webrtcvad.Error: Invalid chunk size
- Cause: The audio chunk passed to
is_speechdoes not match the expected byte length for the sample rate. Genesys Cloud may occasionally send fragmented frames. - Fix: Always buffer incoming bytes and slice strictly by
sample_rate * 2 * 0.02. Discard partial bytes at the buffer tail until the next frame arrives. - Code Fix: The
process_audiomethod in the complete example handles this by accumulating bytes and only processing complete 320-byte chunks.
Error: Vosk KaldiRecognizer throws memory or model errors
- Cause: The model path is incorrect, or the model file is corrupted. Vosk loads acoustic models into RAM.
- Fix: Verify
VOSK_MODEL_PATHpoints to the extracted directory containingfst,ivector, andamfolders. Runvosk.Model(path)in isolation to confirm loading. - Code Fix: Add a model validation step during initialization that raises a clear
RuntimeErrorif the directory lacksgraphorivectorsubdirectories.