Streaming Genesys Cloud LLM Gateway Responses via Flask SSE and WebSockets
What You Will Build
- A Python Flask route that streams Large Language Model tokens to a browser client using Server-Sent Events.
- Uses the Genesys Cloud LLM Gateway API (
/api/v2/ai/llm/gateway/chat) and Conversations WebSockets API (/v2/conversations/websockets). - Covered in Python 3.10+ with Flask, requests, and websockets.
Prerequisites
- OAuth Client Credentials (Confidential) with scopes:
ai:llm:gateway,conversation:write,websocket:subscribe - Genesys Cloud Environment URL (e.g.,
acme.mypurecloud.com) - Python 3.10+
pip install flask requests websockets httpx python-dotenv- Environment variables:
GENESYS_ENV,OAUTH_CLIENT_ID,OAUTH_CLIENT_SECRET,DEFAULT_CONVERSATION_ID
Authentication Setup
Genesys Cloud APIs require a bearer token obtained via the OAuth 2.0 Client Credentials flow. Tokens expire after 3600 seconds. Production code must cache tokens and refresh them before expiration to avoid blocking request threads.
import time
import requests
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class GenesysAuth:
def __init__(self, env: str, client_id: str, client_secret: str):
self.base_url = f"https://{env}.mypurecloud.com"
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 60:
return self.token
logger.info("Requesting new OAuth token")
response = requests.post(
f"{self.base_url}/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "ai:llm:gateway conversation:write websocket:subscribe"
},
timeout=15
)
response.raise_for_status()
payload = response.json()
self.token = payload["access_token"]
self.token_expiry = time.time() + payload["expires_in"]
return self.token
The get_token method checks the cache first. If the token is valid with a 60-second safety buffer, it returns immediately. Otherwise, it calls /oauth/token with the required scopes. The ai:llm:gateway scope enables LLM Gateway streaming. The conversation:write scope allows transcript updates. The websocket:subscribe scope permits WebSocket connections.
Implementation
Step 1: Flask SSE Route and LLM Gateway Streaming
The LLM Gateway API supports streaming via the stream: true parameter. The response returns newline-delimited JSON (NDJSON) wrapped in data: prefixes. Flask must yield these tokens as Server-Sent Events without blocking the response thread.
from flask import Flask, Response, stream_with_context, request
import requests
import json
import time
app = Flask(__name__)
def stream_llm_response(auth: GenesysAuth, conversation_id: str, user_prompt: str):
url = f"{auth.base_url}/api/v2/ai/llm/gateway/chat"
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json",
"Accept": "application/x-ndjson"
}
payload = {
"messages": [{"role": "user", "content": user_prompt}],
"stream": True,
"temperature": 0.7
}
max_retries = 3
for attempt in range(max_retries):
try:
with requests.post(url, json=payload, headers=headers, stream=True, timeout=300) as resp:
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
time.sleep(retry_after)
continue
resp.raise_for_status()
for line in resp.iter_lines():
if not line:
continue
decoded = line.decode("utf-8")
if decoded.startswith("data:"):
json_str = decoded[5:].strip()
if json_str == "[DONE]":
break
data = json.loads(json_str)
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
token = delta.get("content", "")
if token:
yield f"data: {json.dumps({'token': token})}\n\n"
# Background WebSocket update handled in Step 3
break
except requests.exceptions.RequestException as e:
logger.error(f"LLM Gateway request failed: {e}")
if attempt == max_retries - 1:
yield f"data: {json.dumps({'error': 'LLM Gateway unavailable'})}\n\n"
break
time.sleep(2 ** attempt)
@app.route("/stream-llm", methods=["POST"])
def llm_stream_endpoint():
data = request.get_json()
conversation_id = data.get("conversationId", "")
prompt = data.get("prompt", "")
auth = GenesysAuth("acme", "your_client_id", "your_client_secret")
return Response(
stream_with_context(stream_llm_response(auth, conversation_id, prompt)),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
The generator yields data: {"token": "..."} followed by two newlines, which is the SSE specification. The X-Accel-Buffering: no header prevents Nginx from buffering the stream. The 429 retry logic uses exponential backoff and respects the Retry-After header when present.
Step 2: WebSocket Transcript Updates
Genesys Conversations WebSockets API maintains a persistent connection at /v2/conversations/websockets. Authentication is passed via the access_token query parameter. Transcript updates are sent as JSON payloads matching the conversation message schema.
import asyncio
import websockets
import threading
from typing import Callable
class TranscriptWebSocket:
def __init__(self, env: str, token: str, conversation_id: str):
self.ws_url = f"wss://{env}.mypurecloud.com/v2/conversations/websockets?access_token={token}"
self.conversation_id = conversation_id
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.thread: Optional[threading.Thread] = None
self.token_callback: Optional[Callable] = None
def connect(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._connect_async())
async def _connect_async(self):
try:
async with websockets.connect(self.ws_url, ping_interval=20) as ws:
self.ws = ws
logger.info("WebSocket connected to Genesys Conversations API")
self.loop.run_forever()
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed: {e}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
def update_transcript(self, text: str):
if not self.ws:
logger.warning("WebSocket not connected. Skipping transcript update.")
return
payload = {
"conversationId": self.conversation_id,
"type": "message",
"text": text,
"role": "agent"
}
try:
self.loop.call_soon_threadsafe(
self.ws.send, json.dumps(payload)
)
except Exception as e:
logger.error(f"Failed to send transcript update: {e}")
def start_background(self, token_callback: Callable):
self.token_callback = token_callback
self.thread = threading.Thread(target=self.connect, daemon=True)
self.thread.start()
The TranscriptWebSocket class runs the asynchronous WebSocket library in a background thread to avoid blocking the Flask request thread. The update_transcript method pushes tokens to the Genesys conversation in real time. The payload structure matches the Conversations API message format.
Step 3: Integrating Streaming and Transcript Updates
The final step wires the LLM stream generator to the WebSocket transmitter. Each token yielded to the client is simultaneously pushed to the Genesys conversation.
def integrated_stream(auth: GenesysAuth, conversation_id: str, user_prompt: str):
ws_client = TranscriptWebSocket(auth.base_url.split("https://")[1].split(".")[0], auth.get_token(), conversation_id)
ws_client.start_background(auth.get_token)
try:
for chunk in stream_llm_response(auth, conversation_id, user_prompt):
if "error" in chunk:
yield chunk
continue
# Parse token for WebSocket push
json_str = chunk.strip().replace("data: ", "", 1)
try:
payload = json.loads(json_str)
token = payload.get("token", "")
if token:
ws_client.update_transcript(token)
except json.JSONDecodeError:
pass
yield chunk
finally:
# Cleanup occurs when generator exits
pass
The integration layer extracts the token from the SSE chunk and forwards it to the WebSocket client. Error chunks pass through directly to the browser. The background thread handles connection lifecycle independently.
Complete Working Example
import os
import json
import time
import logging
import requests
import websockets
import threading
import asyncio
from flask import Flask, Response, stream_with_context, request
from typing import Optional, Callable
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class GenesysAuth:
def __init__(self, env: str, client_id: str, client_secret: str):
self.base_url = f"https://{env}.mypurecloud.com"
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 60:
return self.token
logger.info("Requesting new OAuth token")
response = requests.post(
f"{self.base_url}/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "ai:llm:gateway conversation:write websocket:subscribe"
},
timeout=15
)
response.raise_for_status()
payload = response.json()
self.token = payload["access_token"]
self.token_expiry = time.time() + payload["expires_in"]
return self.token
class TranscriptWebSocket:
def __init__(self, env: str, token: str, conversation_id: str):
self.ws_url = f"wss://{env}.mypurecloud.com/v2/conversations/websockets?access_token={token}"
self.conversation_id = conversation_id
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.thread: Optional[threading.Thread] = None
def connect(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._connect_async())
async def _connect_async(self):
try:
async with websockets.connect(self.ws_url, ping_interval=20) as ws:
self.ws = ws
logger.info("WebSocket connected to Genesys Conversations API")
self.loop.run_forever()
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed: {e}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
def update_transcript(self, text: str):
if not self.ws:
logger.warning("WebSocket not connected. Skipping transcript update.")
return
payload = {
"conversationId": self.conversation_id,
"type": "message",
"text": text,
"role": "agent"
}
try:
self.loop.call_soon_threadsafe(self.ws.send, json.dumps(payload))
except Exception as e:
logger.error(f"Failed to send transcript update: {e}")
def start_background(self):
self.thread = threading.Thread(target=self.connect, daemon=True)
self.thread.start()
app = Flask(__name__)
def stream_llm_response(auth: GenesysAuth, conversation_id: str, user_prompt: str):
url = f"{auth.base_url}/api/v2/ai/llm/gateway/chat"
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json",
"Accept": "application/x-ndjson"
}
payload = {
"messages": [{"role": "user", "content": user_prompt}],
"stream": True,
"temperature": 0.7
}
max_retries = 3
for attempt in range(max_retries):
try:
with requests.post(url, json=payload, headers=headers, stream=True, timeout=300) as resp:
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"Rate limited. Retrying in {retry_after}s")
time.sleep(retry_after)
continue
resp.raise_for_status()
for line in resp.iter_lines():
if not line:
continue
decoded = line.decode("utf-8")
if decoded.startswith("data:"):
json_str = decoded[5:].strip()
if json_str == "[DONE]":
break
data = json.loads(json_str)
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
token = delta.get("content", "")
if token:
yield f"data: {json.dumps({'token': token})}\n\n"
break
except requests.exceptions.RequestException as e:
logger.error(f"LLM Gateway request failed: {e}")
if attempt == max_retries - 1:
yield f"data: {json.dumps({'error': 'LLM Gateway unavailable'})}\n\n"
break
time.sleep(2 ** attempt)
@app.route("/stream-llm", methods=["POST"])
def llm_stream_endpoint():
data = request.get_json()
conversation_id = data.get("conversationId", "")
prompt = data.get("prompt", "")
env = os.getenv("GENESYS_ENV", "acme")
client_id = os.getenv("OAUTH_CLIENT_ID", "")
client_secret = os.getenv("OAUTH_CLIENT_SECRET", "")
auth = GenesysAuth(env, client_id, client_secret)
ws_client = TranscriptWebSocket(env, auth.get_token(), conversation_id)
ws_client.start_background()
def generator():
try:
for chunk in stream_llm_response(auth, conversation_id, prompt):
if "error" in chunk:
yield chunk
continue
json_str = chunk.strip().replace("data: ", "", 1)
try:
payload = json.loads(json_str)
token = payload.get("token", "")
if token:
ws_client.update_transcript(token)
except json.JSONDecodeError:
pass
yield chunk
except Exception as e:
logger.error(f"Stream generator error: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generator()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)
Common Errors & Debugging
Error: 401 Unauthorized on LLM Gateway
- Cause: OAuth token expired or missing
ai:llm:gatewayscope. - Fix: Verify the client credentials have the required scope in the Genesys Admin console. Ensure the token cache refreshes before the 3600-second limit. The provided
GenesysAuthclass handles automatic refresh. - Code: Check
response.json()["error_description"]for scope mismatch details.
Error: 429 Too Many Requests
- Cause: Exceeded Genesys Cloud rate limits for LLM Gateway or WebSocket connections.
- Fix: Implement exponential backoff. The
stream_llm_responsefunction checksRetry-Afterheaders and sleeps accordingly. WebSocket connections should maintain a single persistent link per conversation rather than opening new connections per token. - Code: The retry loop in Step 1 handles 429 responses with configurable
max_retries.
Error: WebSocket Connection Refused or Closed Immediately
- Cause: Invalid
access_tokenquery parameter or missingwebsocket:subscribescope. - Fix: Pass the bearer token directly in the WebSocket URL as
?access_token=.... Do not use theAuthorizationheader for WebSocket handshakes. Verify the scope is attached to the OAuth client. - Code: The
TranscriptWebSocketclass constructs the URL correctly. Log the exact URL before connecting to verify formatting.
Error: SSE Stream Buffers in Browser or Nginx
- Cause: Reverse proxies buffer streaming responses by default.
- Fix: Add
X-Accel-Buffering: noto the Flask response headers. In Nginx, configureproxy_buffering off;for the specific location block. - Code: The
llm_stream_endpointfunction includes the required header.