Streaming Genesys Cloud LLM Gateway Responses via Flask SSE and WebSockets

Streaming Genesys Cloud LLM Gateway Responses via Flask SSE and WebSockets

What You Will Build

  • A Python Flask route that streams Large Language Model tokens to a browser client using Server-Sent Events.
  • Uses the Genesys Cloud LLM Gateway API (/api/v2/ai/llm/gateway/chat) and Conversations WebSockets API (/v2/conversations/websockets).
  • Covered in Python 3.10+ with Flask, requests, and websockets.

Prerequisites

  • OAuth Client Credentials (Confidential) with scopes: ai:llm:gateway, conversation:write, websocket:subscribe
  • Genesys Cloud Environment URL (e.g., acme.mypurecloud.com)
  • Python 3.10+
  • pip install flask requests websockets httpx python-dotenv
  • Environment variables: GENESYS_ENV, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET, DEFAULT_CONVERSATION_ID

Authentication Setup

Genesys Cloud APIs require a bearer token obtained via the OAuth 2.0 Client Credentials flow. Tokens expire after 3600 seconds. Production code must cache tokens and refresh them before expiration to avoid blocking request threads.

import time
import requests
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class GenesysAuth:
    def __init__(self, env: str, client_id: str, client_secret: str):
        self.base_url = f"https://{env}.mypurecloud.com"
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.token_expiry: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.token_expiry - 60:
            return self.token

        logger.info("Requesting new OAuth token")
        response = requests.post(
            f"{self.base_url}/oauth/token",
            data={
                "grant_type": "client_credentials",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "scope": "ai:llm:gateway conversation:write websocket:subscribe"
            },
            timeout=15
        )
        response.raise_for_status()
        payload = response.json()
        self.token = payload["access_token"]
        self.token_expiry = time.time() + payload["expires_in"]
        return self.token

The get_token method checks the cache first. If the token is valid with a 60-second safety buffer, it returns immediately. Otherwise, it calls /oauth/token with the required scopes. The ai:llm:gateway scope enables LLM Gateway streaming. The conversation:write scope allows transcript updates. The websocket:subscribe scope permits WebSocket connections.

Implementation

Step 1: Flask SSE Route and LLM Gateway Streaming

The LLM Gateway API supports streaming via the stream: true parameter. The response returns newline-delimited JSON (NDJSON) wrapped in data: prefixes. Flask must yield these tokens as Server-Sent Events without blocking the response thread.

from flask import Flask, Response, stream_with_context, request
import requests
import json
import time

app = Flask(__name__)

def stream_llm_response(auth: GenesysAuth, conversation_id: str, user_prompt: str):
    url = f"{auth.base_url}/api/v2/ai/llm/gateway/chat"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/x-ndjson"
    }
    payload = {
        "messages": [{"role": "user", "content": user_prompt}],
        "stream": True,
        "temperature": 0.7
    }

    max_retries = 3
    for attempt in range(max_retries):
        try:
            with requests.post(url, json=payload, headers=headers, stream=True, timeout=300) as resp:
                if resp.status_code == 429:
                    retry_after = int(resp.headers.get("Retry-After", 2 ** attempt))
                    logger.warning(f"Rate limited. Retrying in {retry_after}s")
                    time.sleep(retry_after)
                    continue
                
                resp.raise_for_status()
                
                for line in resp.iter_lines():
                    if not line:
                        continue
                    decoded = line.decode("utf-8")
                    if decoded.startswith("data:"):
                        json_str = decoded[5:].strip()
                        if json_str == "[DONE]":
                            break
                        
                        data = json.loads(json_str)
                        if "choices" in data and len(data["choices"]) > 0:
                            delta = data["choices"][0].get("delta", {})
                            token = delta.get("content", "")
                            if token:
                                yield f"data: {json.dumps({'token': token})}\n\n"
                                # Background WebSocket update handled in Step 3
                break
        except requests.exceptions.RequestException as e:
            logger.error(f"LLM Gateway request failed: {e}")
            if attempt == max_retries - 1:
                yield f"data: {json.dumps({'error': 'LLM Gateway unavailable'})}\n\n"
                break
            time.sleep(2 ** attempt)

@app.route("/stream-llm", methods=["POST"])
def llm_stream_endpoint():
    data = request.get_json()
    conversation_id = data.get("conversationId", "")
    prompt = data.get("prompt", "")
    auth = GenesysAuth("acme", "your_client_id", "your_client_secret")
    
    return Response(
        stream_with_context(stream_llm_response(auth, conversation_id, prompt)),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

The generator yields data: {"token": "..."} followed by two newlines, which is the SSE specification. The X-Accel-Buffering: no header prevents Nginx from buffering the stream. The 429 retry logic uses exponential backoff and respects the Retry-After header when present.

Step 2: WebSocket Transcript Updates

Genesys Conversations WebSockets API maintains a persistent connection at /v2/conversations/websockets. Authentication is passed via the access_token query parameter. Transcript updates are sent as JSON payloads matching the conversation message schema.

import asyncio
import websockets
import threading
from typing import Callable

class TranscriptWebSocket:
    def __init__(self, env: str, token: str, conversation_id: str):
        self.ws_url = f"wss://{env}.mypurecloud.com/v2/conversations/websockets?access_token={token}"
        self.conversation_id = conversation_id
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.loop: Optional[asyncio.AbstractEventLoop] = None
        self.thread: Optional[threading.Thread] = None
        self.token_callback: Optional[Callable] = None

    def connect(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self._connect_async())

    async def _connect_async(self):
        try:
            async with websockets.connect(self.ws_url, ping_interval=20) as ws:
                self.ws = ws
                logger.info("WebSocket connected to Genesys Conversations API")
                self.loop.run_forever()
        except websockets.exceptions.ConnectionClosed as e:
            logger.error(f"WebSocket connection closed: {e}")
        except Exception as e:
            logger.error(f"WebSocket error: {e}")

    def update_transcript(self, text: str):
        if not self.ws:
            logger.warning("WebSocket not connected. Skipping transcript update.")
            return
        payload = {
            "conversationId": self.conversation_id,
            "type": "message",
            "text": text,
            "role": "agent"
        }
        try:
            self.loop.call_soon_threadsafe(
                self.ws.send, json.dumps(payload)
            )
        except Exception as e:
            logger.error(f"Failed to send transcript update: {e}")

    def start_background(self, token_callback: Callable):
        self.token_callback = token_callback
        self.thread = threading.Thread(target=self.connect, daemon=True)
        self.thread.start()

The TranscriptWebSocket class runs the asynchronous WebSocket library in a background thread to avoid blocking the Flask request thread. The update_transcript method pushes tokens to the Genesys conversation in real time. The payload structure matches the Conversations API message format.

Step 3: Integrating Streaming and Transcript Updates

The final step wires the LLM stream generator to the WebSocket transmitter. Each token yielded to the client is simultaneously pushed to the Genesys conversation.

def integrated_stream(auth: GenesysAuth, conversation_id: str, user_prompt: str):
    ws_client = TranscriptWebSocket(auth.base_url.split("https://")[1].split(".")[0], auth.get_token(), conversation_id)
    ws_client.start_background(auth.get_token)
    
    try:
        for chunk in stream_llm_response(auth, conversation_id, user_prompt):
            if "error" in chunk:
                yield chunk
                continue
            
            # Parse token for WebSocket push
            json_str = chunk.strip().replace("data: ", "", 1)
            try:
                payload = json.loads(json_str)
                token = payload.get("token", "")
                if token:
                    ws_client.update_transcript(token)
            except json.JSONDecodeError:
                pass
                
            yield chunk
    finally:
        # Cleanup occurs when generator exits
        pass

The integration layer extracts the token from the SSE chunk and forwards it to the WebSocket client. Error chunks pass through directly to the browser. The background thread handles connection lifecycle independently.

Complete Working Example

import os
import json
import time
import logging
import requests
import websockets
import threading
import asyncio
from flask import Flask, Response, stream_with_context, request
from typing import Optional, Callable

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

class GenesysAuth:
    def __init__(self, env: str, client_id: str, client_secret: str):
        self.base_url = f"https://{env}.mypurecloud.com"
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.token_expiry: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.token_expiry - 60:
            return self.token
        logger.info("Requesting new OAuth token")
        response = requests.post(
            f"{self.base_url}/oauth/token",
            data={
                "grant_type": "client_credentials",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "scope": "ai:llm:gateway conversation:write websocket:subscribe"
            },
            timeout=15
        )
        response.raise_for_status()
        payload = response.json()
        self.token = payload["access_token"]
        self.token_expiry = time.time() + payload["expires_in"]
        return self.token

class TranscriptWebSocket:
    def __init__(self, env: str, token: str, conversation_id: str):
        self.ws_url = f"wss://{env}.mypurecloud.com/v2/conversations/websockets?access_token={token}"
        self.conversation_id = conversation_id
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.loop: Optional[asyncio.AbstractEventLoop] = None
        self.thread: Optional[threading.Thread] = None

    def connect(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self._connect_async())

    async def _connect_async(self):
        try:
            async with websockets.connect(self.ws_url, ping_interval=20) as ws:
                self.ws = ws
                logger.info("WebSocket connected to Genesys Conversations API")
                self.loop.run_forever()
        except websockets.exceptions.ConnectionClosed as e:
            logger.error(f"WebSocket connection closed: {e}")
        except Exception as e:
            logger.error(f"WebSocket error: {e}")

    def update_transcript(self, text: str):
        if not self.ws:
            logger.warning("WebSocket not connected. Skipping transcript update.")
            return
        payload = {
            "conversationId": self.conversation_id,
            "type": "message",
            "text": text,
            "role": "agent"
        }
        try:
            self.loop.call_soon_threadsafe(self.ws.send, json.dumps(payload))
        except Exception as e:
            logger.error(f"Failed to send transcript update: {e}")

    def start_background(self):
        self.thread = threading.Thread(target=self.connect, daemon=True)
        self.thread.start()

app = Flask(__name__)

def stream_llm_response(auth: GenesysAuth, conversation_id: str, user_prompt: str):
    url = f"{auth.base_url}/api/v2/ai/llm/gateway/chat"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/x-ndjson"
    }
    payload = {
        "messages": [{"role": "user", "content": user_prompt}],
        "stream": True,
        "temperature": 0.7
    }

    max_retries = 3
    for attempt in range(max_retries):
        try:
            with requests.post(url, json=payload, headers=headers, stream=True, timeout=300) as resp:
                if resp.status_code == 429:
                    retry_after = int(resp.headers.get("Retry-After", 2 ** attempt))
                    logger.warning(f"Rate limited. Retrying in {retry_after}s")
                    time.sleep(retry_after)
                    continue
                resp.raise_for_status()
                for line in resp.iter_lines():
                    if not line:
                        continue
                    decoded = line.decode("utf-8")
                    if decoded.startswith("data:"):
                        json_str = decoded[5:].strip()
                        if json_str == "[DONE]":
                            break
                        data = json.loads(json_str)
                        if "choices" in data and len(data["choices"]) > 0:
                            delta = data["choices"][0].get("delta", {})
                            token = delta.get("content", "")
                            if token:
                                yield f"data: {json.dumps({'token': token})}\n\n"
                break
        except requests.exceptions.RequestException as e:
            logger.error(f"LLM Gateway request failed: {e}")
            if attempt == max_retries - 1:
                yield f"data: {json.dumps({'error': 'LLM Gateway unavailable'})}\n\n"
                break
            time.sleep(2 ** attempt)

@app.route("/stream-llm", methods=["POST"])
def llm_stream_endpoint():
    data = request.get_json()
    conversation_id = data.get("conversationId", "")
    prompt = data.get("prompt", "")
    
    env = os.getenv("GENESYS_ENV", "acme")
    client_id = os.getenv("OAUTH_CLIENT_ID", "")
    client_secret = os.getenv("OAUTH_CLIENT_SECRET", "")
    
    auth = GenesysAuth(env, client_id, client_secret)
    ws_client = TranscriptWebSocket(env, auth.get_token(), conversation_id)
    ws_client.start_background()
    
    def generator():
        try:
            for chunk in stream_llm_response(auth, conversation_id, prompt):
                if "error" in chunk:
                    yield chunk
                    continue
                json_str = chunk.strip().replace("data: ", "", 1)
                try:
                    payload = json.loads(json_str)
                    token = payload.get("token", "")
                    if token:
                        ws_client.update_transcript(token)
                except json.JSONDecodeError:
                    pass
                yield chunk
        except Exception as e:
            logger.error(f"Stream generator error: {e}")
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
    
    return Response(
        stream_with_context(generator()),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=False)

Common Errors & Debugging

Error: 401 Unauthorized on LLM Gateway

  • Cause: OAuth token expired or missing ai:llm:gateway scope.
  • Fix: Verify the client credentials have the required scope in the Genesys Admin console. Ensure the token cache refreshes before the 3600-second limit. The provided GenesysAuth class handles automatic refresh.
  • Code: Check response.json()["error_description"] for scope mismatch details.

Error: 429 Too Many Requests

  • Cause: Exceeded Genesys Cloud rate limits for LLM Gateway or WebSocket connections.
  • Fix: Implement exponential backoff. The stream_llm_response function checks Retry-After headers and sleeps accordingly. WebSocket connections should maintain a single persistent link per conversation rather than opening new connections per token.
  • Code: The retry loop in Step 1 handles 429 responses with configurable max_retries.

Error: WebSocket Connection Refused or Closed Immediately

  • Cause: Invalid access_token query parameter or missing websocket:subscribe scope.
  • Fix: Pass the bearer token directly in the WebSocket URL as ?access_token=.... Do not use the Authorization header for WebSocket handshakes. Verify the scope is attached to the OAuth client.
  • Code: The TranscriptWebSocket class constructs the URL correctly. Log the exact URL before connecting to verify formatting.

Error: SSE Stream Buffers in Browser or Nginx

  • Cause: Reverse proxies buffer streaming responses by default.
  • Fix: Add X-Accel-Buffering: no to the Flask response headers. In Nginx, configure proxy_buffering off; for the specific location block.
  • Code: The llm_stream_endpoint function includes the required header.

Official References