Streaming LLM-generated knowledge base responses to Genesys Cloud Agent Assist panels using Server-Sent Events and the Python SDK

Streaming LLM-generated knowledge base responses to Genesys Cloud Agent Assist panels using Server-Sent Events and the Python SDK

What You Will Build

  • A Python service that consumes an LLM streaming endpoint via Server-Sent Events, accumulates generated text, and pushes incremental knowledge base suggestions to a live Genesys Cloud conversation.
  • The integration uses the Genesys Cloud Python SDK (genesyscloud) and the Agent Assist API (/api/v2/agent-assist/{conversationId}/suggestions).
  • The code is written in Python 3.10+ using httpx for SSE consumption and the official Genesys Cloud SDK for API calls.

Prerequisites

  • OAuth 2.0 Client Credentials grant configured in Genesys Cloud with the scope agent-assist:write
  • Genesys Cloud Python SDK version 2.10.0 or later
  • Python 3.10 runtime with asyncio support
  • External dependencies: httpx==0.27.0, httpx-sse==0.4.0, tenacity==8.3.0
  • A running LLM endpoint that supports SSE streaming (OpenAI-compatible or custom)

Authentication Setup

Genesys Cloud APIs require OAuth 2.0 bearer tokens. The Python SDK includes a built-in authentication manager that handles token acquisition and automatic refresh. You must configure it with a client credentials grant to avoid interactive login prompts in server environments.

import os
from genesyscloud import PureCloudPlatformClientV2

def init_genesys_sdk(environment: str = "mypurecloud.com") -> PureCloudPlatformClientV2:
    """
    Initialize the Genesys Cloud SDK with client credentials OAuth.
    Tokens are cached in memory and automatically refreshed before expiry.
    """
    client = PureCloudPlatformClientV2()
    client.set_environment(environment)
    
    client.set_oauth_client_credentials(
        client_id=os.getenv("GENESYS_CLIENT_ID"),
        client_secret=os.getenv("GENESYS_CLIENT_SECRET")
    )
    
    # The SDK automatically manages token lifecycle.
    # set_oauth_client_credentials enables auto-refresh on 401 responses.
    return client

The SDK intercepts outbound requests, attaches the Authorization: Bearer <token> header, and retries with a fresh token if it receives a 401 Unauthorized response. You do not need to implement manual token rotation logic.

Implementation

Step 1: Configure the Agent Assist API client

The Agent Assist API lives under the agents_assist module in the Python SDK. You initialize it by passing the authenticated platform client. The API expects a conversationId that matches an active Genesys Cloud conversation (voice, webchat, or messaging).

from genesyscloud.agents_assist.client import AgentsAssistApi
from genesyscloud.agents_assist.model import SuggestionRequest, Suggestion

def get_agent_assist_client(platform_client: PureCloudPlatformClientV2) -> AgentsAssistApi:
    return AgentsAssistApi(platform_client)

The AgentsAssistApi class exposes post_agent_assist_conversation_suggestions. This endpoint accepts a SuggestionRequest object containing a list of Suggestion objects. Each suggestion requires a type, title, and description. You will update this payload incrementally as the LLM streams tokens.

Step 2: Establish the SSE connection and parse LLM chunks

LLM providers stream responses using the text/event-stream MIME type. Each event contains a data: line followed by a JSON payload or raw text. You will use httpx with httpx_sse to consume the stream without blocking the main thread.

import json
import httpx
from httpx_sse import EventSource

async def fetch_llm_stream(prompt: str, api_key: str, base_url: str) -> EventSource:
    """
    Open an SSE connection to an LLM streaming endpoint.
    Returns an EventSource iterator that yields raw SSE events.
    """
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": prompt}],
        "stream": True
    }
    
    async with httpx.AsyncClient(timeout=60.0) as client:
        response = await client.post(
            f"{base_url}/chat/completions",
            headers=headers,
            json=payload
        )
        
        if response.status_code != 200:
            raise ConnectionError(f"LLM request failed with status {response.status_code}: {response.text}")
            
        return EventSource(response)

The EventSource object yields ServerSentEvent instances. Each event contains a data attribute that must be parsed. LLM providers often send multiple JSON chunks per event, or terminate the stream with a [DONE] marker. You must handle both cases explicitly.

Step 3: Push incremental suggestions to the Agent Assist API

You will maintain a running buffer of generated text. After each SSE chunk arrives, you will construct a Suggestion object and push it to Genesys Cloud. This creates the visual effect of a streaming knowledge base response in the agent panel.

from typing import Optional

async def stream_to_agent_assist(
    event_source: EventSource,
    assist_api: AgentsAssistApi,
    conversation_id: str,
    kb_source: str = "LLM Knowledge Base"
) -> str:
    """
    Consume SSE events and push incremental suggestions to Genesys Cloud.
    Returns the final accumulated response text.
    """
    accumulated_text = ""
    chunk_counter = 0
    
    async for event in event_source:
        if event.data == "[DONE]":
            break
            
        try:
            chunk_data = json.loads(event.data)
            delta = chunk_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
        except (json.JSONDecodeError, KeyError, IndexError):
            delta = event.data
            
        if not delta:
            continue
            
        accumulated_text += delta
        chunk_counter += 1
        
        # Push every 3 chunks to balance UI responsiveness and API rate limits
        if chunk_counter % 3 == 0:
            await push_suggestion(
                assist_api=assist_api,
                conversation_id=conversation_id,
                title=f"KB Response ({chunk_counter} chunks)",
                description=accumulated_text.strip(),
                source=kb_source
            )
            
    return accumulated_text

The push_suggestion function wraps the SDK call with retry logic and error handling. You must map the accumulated text to the Suggestion model fields. The type field must match one of the allowed values: article, custom, faq, or script. For LLM-generated content, custom is the correct classification.

Step 4: Implement rate-limit handling and retry logic

Genesys Cloud enforces strict rate limits on the Agent Assist API. You will receive 429 Too Many Requests responses when you exceed the quota. The SDK does not automatically retry 429 responses, so you must implement exponential backoff with jitter.

import asyncio
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

logger = logging.getLogger(__name__)

class GenesysRateLimitError(Exception):
    pass

@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type(GenesysRateLimitError),
    reraise=True
)
async def push_suggestion(
    assist_api: AgentsAssistApi,
    conversation_id: str,
    title: str,
    description: str,
    source: str
) -> None:
    """
    Push a suggestion to the Agent Assist API with built-in retry logic for 429 responses.
    """
    suggestion = Suggestion(
        type="custom",
        title=title,
        description=description,
        url=f"https://kb.internal/{source.replace(' ', '-').lower()}",
        thumbnail_url="https://cdn.example.com/kb-icon.png"
    )
    
    request_body = SuggestionRequest(suggestions=[suggestion])
    
    try:
        await assist_api.post_agent_assist_conversation_suggestions(
            conversation_id=conversation_id,
            body=request_body
        )
    except Exception as e:
        error_code = getattr(e, "status_code", None)
        
        if error_code == 429:
            retry_after = int(e.headers.get("Retry-After", 5))
            logger.warning("Rate limited. Retrying after %d seconds.", retry_after)
            raise GenesysRateLimitError(f"429 Rate limited. Retry-After: {retry_after}")
        elif error_code in (401, 403):
            logger.error("Authentication/Authorization failed: %s", str(e))
            raise
        elif error_code == 400:
            logger.error("Bad request payload: %s", str(e))
            raise
        else:
            logger.error("Unexpected API error: %s", str(e))
            raise

The tenacity decorator intercepts GenesysRateLimitError exceptions and applies exponential backoff. The Retry-After header from Genesys Cloud overrides the default backoff curve. You must propagate authentication errors immediately because retrying a 401 or 403 wastes tokens and masks configuration mistakes.

Complete Working Example

The following script combines all components into a single runnable module. Replace the environment variables with your Genesys Cloud credentials and LLM provider details.

import asyncio
import os
import logging
from genesyscloud import PureCloudPlatformClientV2
from genesyscloud.agents_assist.client import AgentsAssistApi
from genesyscloud.agents_assist.model import SuggestionRequest, Suggestion
import httpx
from httpx_sse import EventSource
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

class GenesysRateLimitError(Exception):
    pass

def init_genesys_sdk(environment: str = "mypurecloud.com") -> PureCloudPlatformClientV2:
    client = PureCloudPlatformClientV2()
    client.set_environment(environment)
    client.set_oauth_client_credentials(
        client_id=os.getenv("GENESYS_CLIENT_ID"),
        client_secret=os.getenv("GENESYS_CLIENT_SECRET")
    )
    return client

def get_agent_assist_client(platform_client: PureCloudPlatformClientV2) -> AgentsAssistApi:
    return AgentsAssistApi(platform_client)

async def fetch_llm_stream(prompt: str, api_key: str, base_url: str) -> EventSource:
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": prompt}],
        "stream": True
    }
    async with httpx.AsyncClient(timeout=60.0) as client:
        response = await client.post(
            f"{base_url}/chat/completions",
            headers=headers,
            json=payload
        )
        if response.status_code != 200:
            raise ConnectionError(f"LLM request failed: {response.text}")
        return EventSource(response)

@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type(GenesysRateLimitError),
    reraise=True
)
async def push_suggestion(
    assist_api: AgentsAssistApi,
    conversation_id: str,
    title: str,
    description: str,
    source: str
) -> None:
    suggestion = Suggestion(
        type="custom",
        title=title,
        description=description,
        url=f"https://kb.internal/{source.replace(' ', '-').lower()}",
        thumbnail_url="https://cdn.example.com/kb-icon.png"
    )
    request_body = SuggestionRequest(suggestions=[suggestion])
    try:
        await assist_api.post_agent_assist_conversation_suggestions(
            conversation_id=conversation_id,
            body=request_body
        )
    except Exception as e:
        error_code = getattr(e, "status_code", None)
        if error_code == 429:
            raise GenesysRateLimitError("429 Rate limited")
        raise

async def stream_to_agent_assist(
    event_source: EventSource,
    assist_api: AgentsAssistApi,
    conversation_id: str,
    kb_source: str = "LLM Knowledge Base"
) -> str:
    accumulated_text = ""
    chunk_counter = 0
    async for event in event_source:
        if event.data == "[DONE]":
            break
        try:
            chunk_data = json.loads(event.data)
            delta = chunk_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
        except (json.JSONDecodeError, KeyError, IndexError):
            delta = event.data
        if not delta:
            continue
        accumulated_text += delta
        chunk_counter += 1
        if chunk_counter % 3 == 0:
            await push_suggestion(
                assist_api=assist_api,
                conversation_id=conversation_id,
                title=f"KB Response ({chunk_counter} chunks)",
                description=accumulated_text.strip(),
                source=kb_source
            )
    return accumulated_text

async def main():
    conversation_id = os.getenv("GENESYS_CONVERSATION_ID")
    if not conversation_id:
        raise ValueError("GENESYS_CONVERSATION_ID environment variable is required")
        
    platform_client = init_genesys_sdk()
    assist_api = get_agent_assist_client(platform_client)
    
    llm_url = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
    llm_key = os.getenv("LLM_API_KEY")
    prompt = "Summarize the return policy for damaged electronics in under 150 words."
    
    try:
        event_source = await fetch_llm_stream(prompt, llm_key, llm_url)
        final_text = await stream_to_agent_assist(event_source, assist_api, conversation_id)
        logger.info("Streaming complete. Final length: %d characters", len(final_text))
    except Exception as e:
        logger.error("Pipeline failed: %s", str(e))
        raise

if __name__ == "__main__":
    asyncio.run(main())

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: The OAuth token is expired, malformed, or the client credentials lack the agent-assist:write scope.
  • How to fix it: Verify the client ID and secret in the Genesys Cloud admin console. Ensure the OAuth application has the agent-assist:write scope assigned. The SDK automatically refreshes tokens, but initial authentication failures will not be retried.
  • Code showing the fix: The init_genesys_sdk function uses set_oauth_client_credentials, which binds the grant to the platform client. If you receive a 401 during the first call, regenerate the secret in the Genesys Cloud portal and update the environment variable.

Error: 429 Too Many Requests

  • What causes it: You are pushing suggestions faster than the Genesys Cloud rate limit allows. The Agent Assist API enforces a per-tenant and per-conversation quota.
  • How to fix it: The push_suggestion function uses tenacity to catch 429 responses and apply exponential backoff. You can adjust the chunk_counter % 3 threshold in stream_to_agent_assist to reduce push frequency. Monitoring the Retry-After header in the SDK exception object provides the exact wait time Genesys Cloud requires.
  • Code showing the fix: The @retry decorator intercepts GenesysRateLimitError and delays the next attempt. Increase wait_exponential(max=30) if your tenant enforces stricter limits.

Error: 400 Bad Request

  • What causes it: The Suggestion payload violates the schema. Common triggers include missing type, title, or description fields, or exceeding character limits.
  • How to fix it: Validate the Suggestion object before serialization. The type field must be one of article, custom, faq, or script. The description field must not exceed 4000 characters. Trim accumulated text if it approaches the limit.
  • Code showing the fix: Add a length check before creating the Suggestion instance: if len(description) > 3900: description = description[:3900] + "..."

Error: SSE Parser JSONDecodeError

  • What causes it: The LLM provider sends non-JSON data: lines, or the stream contains interleaved control messages.
  • How to fix it: The stream_to_agent_assist function wraps json.loads in a try-except block. If parsing fails, it falls back to using the raw event.data string. You should log the raw event to verify provider behavior.
  • Code showing the fix: The existing try/except block handles malformed JSON. Add logger.debug("Raw SSE chunk: %s", event.data) during development to trace provider formatting.

Official References