Streaming LLM-generated knowledge base responses to Genesys Cloud Agent Assist panels using Server-Sent Events and the Python SDK
What You Will Build
- A Python service that consumes an LLM streaming endpoint via Server-Sent Events, accumulates generated text, and pushes incremental knowledge base suggestions to a live Genesys Cloud conversation.
- The integration uses the Genesys Cloud Python SDK (
genesyscloud) and the Agent Assist API (/api/v2/agent-assist/{conversationId}/suggestions). - The code is written in Python 3.10+ using
httpxfor SSE consumption and the official Genesys Cloud SDK for API calls.
Prerequisites
- OAuth 2.0 Client Credentials grant configured in Genesys Cloud with the scope
agent-assist:write - Genesys Cloud Python SDK version 2.10.0 or later
- Python 3.10 runtime with
asynciosupport - External dependencies:
httpx==0.27.0,httpx-sse==0.4.0,tenacity==8.3.0 - A running LLM endpoint that supports SSE streaming (OpenAI-compatible or custom)
Authentication Setup
Genesys Cloud APIs require OAuth 2.0 bearer tokens. The Python SDK includes a built-in authentication manager that handles token acquisition and automatic refresh. You must configure it with a client credentials grant to avoid interactive login prompts in server environments.
import os
from genesyscloud import PureCloudPlatformClientV2
def init_genesys_sdk(environment: str = "mypurecloud.com") -> PureCloudPlatformClientV2:
"""
Initialize the Genesys Cloud SDK with client credentials OAuth.
Tokens are cached in memory and automatically refreshed before expiry.
"""
client = PureCloudPlatformClientV2()
client.set_environment(environment)
client.set_oauth_client_credentials(
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET")
)
# The SDK automatically manages token lifecycle.
# set_oauth_client_credentials enables auto-refresh on 401 responses.
return client
The SDK intercepts outbound requests, attaches the Authorization: Bearer <token> header, and retries with a fresh token if it receives a 401 Unauthorized response. You do not need to implement manual token rotation logic.
Implementation
Step 1: Configure the Agent Assist API client
The Agent Assist API lives under the agents_assist module in the Python SDK. You initialize it by passing the authenticated platform client. The API expects a conversationId that matches an active Genesys Cloud conversation (voice, webchat, or messaging).
from genesyscloud.agents_assist.client import AgentsAssistApi
from genesyscloud.agents_assist.model import SuggestionRequest, Suggestion
def get_agent_assist_client(platform_client: PureCloudPlatformClientV2) -> AgentsAssistApi:
return AgentsAssistApi(platform_client)
The AgentsAssistApi class exposes post_agent_assist_conversation_suggestions. This endpoint accepts a SuggestionRequest object containing a list of Suggestion objects. Each suggestion requires a type, title, and description. You will update this payload incrementally as the LLM streams tokens.
Step 2: Establish the SSE connection and parse LLM chunks
LLM providers stream responses using the text/event-stream MIME type. Each event contains a data: line followed by a JSON payload or raw text. You will use httpx with httpx_sse to consume the stream without blocking the main thread.
import json
import httpx
from httpx_sse import EventSource
async def fetch_llm_stream(prompt: str, api_key: str, base_url: str) -> EventSource:
"""
Open an SSE connection to an LLM streaming endpoint.
Returns an EventSource iterator that yields raw SSE events.
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"stream": True
}
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload
)
if response.status_code != 200:
raise ConnectionError(f"LLM request failed with status {response.status_code}: {response.text}")
return EventSource(response)
The EventSource object yields ServerSentEvent instances. Each event contains a data attribute that must be parsed. LLM providers often send multiple JSON chunks per event, or terminate the stream with a [DONE] marker. You must handle both cases explicitly.
Step 3: Push incremental suggestions to the Agent Assist API
You will maintain a running buffer of generated text. After each SSE chunk arrives, you will construct a Suggestion object and push it to Genesys Cloud. This creates the visual effect of a streaming knowledge base response in the agent panel.
from typing import Optional
async def stream_to_agent_assist(
event_source: EventSource,
assist_api: AgentsAssistApi,
conversation_id: str,
kb_source: str = "LLM Knowledge Base"
) -> str:
"""
Consume SSE events and push incremental suggestions to Genesys Cloud.
Returns the final accumulated response text.
"""
accumulated_text = ""
chunk_counter = 0
async for event in event_source:
if event.data == "[DONE]":
break
try:
chunk_data = json.loads(event.data)
delta = chunk_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
except (json.JSONDecodeError, KeyError, IndexError):
delta = event.data
if not delta:
continue
accumulated_text += delta
chunk_counter += 1
# Push every 3 chunks to balance UI responsiveness and API rate limits
if chunk_counter % 3 == 0:
await push_suggestion(
assist_api=assist_api,
conversation_id=conversation_id,
title=f"KB Response ({chunk_counter} chunks)",
description=accumulated_text.strip(),
source=kb_source
)
return accumulated_text
The push_suggestion function wraps the SDK call with retry logic and error handling. You must map the accumulated text to the Suggestion model fields. The type field must match one of the allowed values: article, custom, faq, or script. For LLM-generated content, custom is the correct classification.
Step 4: Implement rate-limit handling and retry logic
Genesys Cloud enforces strict rate limits on the Agent Assist API. You will receive 429 Too Many Requests responses when you exceed the quota. The SDK does not automatically retry 429 responses, so you must implement exponential backoff with jitter.
import asyncio
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logger = logging.getLogger(__name__)
class GenesysRateLimitError(Exception):
pass
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(GenesysRateLimitError),
reraise=True
)
async def push_suggestion(
assist_api: AgentsAssistApi,
conversation_id: str,
title: str,
description: str,
source: str
) -> None:
"""
Push a suggestion to the Agent Assist API with built-in retry logic for 429 responses.
"""
suggestion = Suggestion(
type="custom",
title=title,
description=description,
url=f"https://kb.internal/{source.replace(' ', '-').lower()}",
thumbnail_url="https://cdn.example.com/kb-icon.png"
)
request_body = SuggestionRequest(suggestions=[suggestion])
try:
await assist_api.post_agent_assist_conversation_suggestions(
conversation_id=conversation_id,
body=request_body
)
except Exception as e:
error_code = getattr(e, "status_code", None)
if error_code == 429:
retry_after = int(e.headers.get("Retry-After", 5))
logger.warning("Rate limited. Retrying after %d seconds.", retry_after)
raise GenesysRateLimitError(f"429 Rate limited. Retry-After: {retry_after}")
elif error_code in (401, 403):
logger.error("Authentication/Authorization failed: %s", str(e))
raise
elif error_code == 400:
logger.error("Bad request payload: %s", str(e))
raise
else:
logger.error("Unexpected API error: %s", str(e))
raise
The tenacity decorator intercepts GenesysRateLimitError exceptions and applies exponential backoff. The Retry-After header from Genesys Cloud overrides the default backoff curve. You must propagate authentication errors immediately because retrying a 401 or 403 wastes tokens and masks configuration mistakes.
Complete Working Example
The following script combines all components into a single runnable module. Replace the environment variables with your Genesys Cloud credentials and LLM provider details.
import asyncio
import os
import logging
from genesyscloud import PureCloudPlatformClientV2
from genesyscloud.agents_assist.client import AgentsAssistApi
from genesyscloud.agents_assist.model import SuggestionRequest, Suggestion
import httpx
from httpx_sse import EventSource
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class GenesysRateLimitError(Exception):
pass
def init_genesys_sdk(environment: str = "mypurecloud.com") -> PureCloudPlatformClientV2:
client = PureCloudPlatformClientV2()
client.set_environment(environment)
client.set_oauth_client_credentials(
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET")
)
return client
def get_agent_assist_client(platform_client: PureCloudPlatformClientV2) -> AgentsAssistApi:
return AgentsAssistApi(platform_client)
async def fetch_llm_stream(prompt: str, api_key: str, base_url: str) -> EventSource:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"stream": True
}
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload
)
if response.status_code != 200:
raise ConnectionError(f"LLM request failed: {response.text}")
return EventSource(response)
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(GenesysRateLimitError),
reraise=True
)
async def push_suggestion(
assist_api: AgentsAssistApi,
conversation_id: str,
title: str,
description: str,
source: str
) -> None:
suggestion = Suggestion(
type="custom",
title=title,
description=description,
url=f"https://kb.internal/{source.replace(' ', '-').lower()}",
thumbnail_url="https://cdn.example.com/kb-icon.png"
)
request_body = SuggestionRequest(suggestions=[suggestion])
try:
await assist_api.post_agent_assist_conversation_suggestions(
conversation_id=conversation_id,
body=request_body
)
except Exception as e:
error_code = getattr(e, "status_code", None)
if error_code == 429:
raise GenesysRateLimitError("429 Rate limited")
raise
async def stream_to_agent_assist(
event_source: EventSource,
assist_api: AgentsAssistApi,
conversation_id: str,
kb_source: str = "LLM Knowledge Base"
) -> str:
accumulated_text = ""
chunk_counter = 0
async for event in event_source:
if event.data == "[DONE]":
break
try:
chunk_data = json.loads(event.data)
delta = chunk_data.get("choices", [{}])[0].get("delta", {}).get("content", "")
except (json.JSONDecodeError, KeyError, IndexError):
delta = event.data
if not delta:
continue
accumulated_text += delta
chunk_counter += 1
if chunk_counter % 3 == 0:
await push_suggestion(
assist_api=assist_api,
conversation_id=conversation_id,
title=f"KB Response ({chunk_counter} chunks)",
description=accumulated_text.strip(),
source=kb_source
)
return accumulated_text
async def main():
conversation_id = os.getenv("GENESYS_CONVERSATION_ID")
if not conversation_id:
raise ValueError("GENESYS_CONVERSATION_ID environment variable is required")
platform_client = init_genesys_sdk()
assist_api = get_agent_assist_client(platform_client)
llm_url = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
llm_key = os.getenv("LLM_API_KEY")
prompt = "Summarize the return policy for damaged electronics in under 150 words."
try:
event_source = await fetch_llm_stream(prompt, llm_key, llm_url)
final_text = await stream_to_agent_assist(event_source, assist_api, conversation_id)
logger.info("Streaming complete. Final length: %d characters", len(final_text))
except Exception as e:
logger.error("Pipeline failed: %s", str(e))
raise
if __name__ == "__main__":
asyncio.run(main())
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The OAuth token is expired, malformed, or the client credentials lack the
agent-assist:writescope. - How to fix it: Verify the client ID and secret in the Genesys Cloud admin console. Ensure the OAuth application has the
agent-assist:writescope assigned. The SDK automatically refreshes tokens, but initial authentication failures will not be retried. - Code showing the fix: The
init_genesys_sdkfunction usesset_oauth_client_credentials, which binds the grant to the platform client. If you receive a 401 during the first call, regenerate the secret in the Genesys Cloud portal and update the environment variable.
Error: 429 Too Many Requests
- What causes it: You are pushing suggestions faster than the Genesys Cloud rate limit allows. The Agent Assist API enforces a per-tenant and per-conversation quota.
- How to fix it: The
push_suggestionfunction usestenacityto catch 429 responses and apply exponential backoff. You can adjust thechunk_counter % 3threshold instream_to_agent_assistto reduce push frequency. Monitoring theRetry-Afterheader in the SDK exception object provides the exact wait time Genesys Cloud requires. - Code showing the fix: The
@retrydecorator interceptsGenesysRateLimitErrorand delays the next attempt. Increasewait_exponential(max=30)if your tenant enforces stricter limits.
Error: 400 Bad Request
- What causes it: The
Suggestionpayload violates the schema. Common triggers include missingtype,title, ordescriptionfields, or exceeding character limits. - How to fix it: Validate the
Suggestionobject before serialization. Thetypefield must be one ofarticle,custom,faq, orscript. Thedescriptionfield must not exceed 4000 characters. Trim accumulated text if it approaches the limit. - Code showing the fix: Add a length check before creating the
Suggestioninstance:if len(description) > 3900: description = description[:3900] + "..."
Error: SSE Parser JSONDecodeError
- What causes it: The LLM provider sends non-JSON
data:lines, or the stream contains interleaved control messages. - How to fix it: The
stream_to_agent_assistfunction wrapsjson.loadsin a try-except block. If parsing fails, it falls back to using the rawevent.datastring. You should log the raw event to verify provider behavior. - Code showing the fix: The existing
try/exceptblock handles malformed JSON. Addlogger.debug("Raw SSE chunk: %s", event.data)during development to trace provider formatting.