Streaming NICE Cognigy.AI LLM Responses via Go
What You Will Build
A Go backend service that establishes a WebSocket connection to the Cognigy.AI LLM streaming gateway, receives real-time tokens, reassembles fragmented payloads, applies safety and formatting filters, synchronizes output with dialog flow variables, implements fallback logic for timeouts and rate limits, tracks token usage per session, and exposes a frontend-compatible streaming endpoint. This tutorial uses the Cognigy.AI Public API and WebSocket streaming interface. The implementation is written in Go 1.21.
Prerequisites
- Cognigy.AI environment URL (format:
{environment}.cognigy.ai) - OAuth 2.0 Client Credentials (Client ID, Client Secret)
- Required OAuth scopes:
llm:stream,bot:dialog:write,api:access - Go 1.21 or later
- External dependencies:
github.com/gorilla/websocket,golang.org/x/time/rate - Active Cognigy.AI LLM skill with streaming enabled in the bot configuration
Authentication Setup
Cognigy.AI secures all API and WebSocket endpoints using OAuth 2.0 Bearer tokens. The client credentials flow issues a token that expires after one hour. You must cache the token and refresh it before expiration to maintain uninterrupted streaming sessions.
The following struct manages token lifecycle with automatic expiration tracking and mutex protection for concurrent requests.
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
type OAuthConfig struct {
EnvURL string
ClientID string
ClientSecret string
}
type TokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
TokenType string `json:"token_type"`
}
type OAuthClient struct {
config OAuthConfig
token string
expiresAt time.Time
mu sync.RWMutex
client *http.Client
}
func NewOAuthClient(cfg OAuthConfig) *OAuthClient {
return &OAuthClient{
config: cfg,
client: &http.Client{Timeout: 10 * time.Second},
}
}
func (o *OAuthClient) GetToken() (string, error) {
o.mu.RLock()
if time.Now().Before(o.expiresAt) && o.token != "" {
token := o.token
o.mu.RUnlock()
return token, nil
}
o.mu.RUnlock()
return o.refreshToken()
}
func (o *OAuthClient) refreshToken() (string, error) {
o.mu.Lock()
defer o.mu.Unlock()
payload := map[string]string{
"grant_type": "client_credentials",
"client_id": o.config.ClientID,
"client_secret": o.config.ClientSecret,
"scope": "llm:stream bot:dialog:write api:access",
}
body, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("failed to marshal oauth payload: %w", err)
}
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/oauth/token", o.config.EnvURL), bytes.NewBuffer(body))
if err != nil {
return "", fmt.Errorf("failed to create oauth request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := o.client.Do(req)
if err != nil {
return "", fmt.Errorf("oauth request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("oauth error: status %d", resp.StatusCode)
}
var tokenResp TokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("failed to decode oauth response: %w", err)
}
o.token = tokenResp.AccessToken
o.expiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second)
return o.token, nil
}
The refreshToken method subtracts sixty seconds from the expires_in value to create a safety buffer. This prevents race conditions where a token expires mid-stream.
Implementation
Step 1: Configure LLM Gateway Endpoint & Initialize Stream
Before establishing the WebSocket connection, verify that the LLM gateway accepts streaming payloads. Cognigy.AI exposes a REST endpoint to toggle streaming configuration per environment or skill. A successful response returns a 200 OK with the updated configuration object.
func configureStreamingGateway(oauth *OAuthClient, dialogID string) error {
token, err := oauth.GetToken()
if err != nil {
return fmt.Errorf("authentication failed: %w", err)
}
payload := map[string]interface{}{
"streaming": true,
"model": "gpt-4-turbo",
"temperature": 0.7,
"max_tokens": 1024,
"dialog_id": dialogID,
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal failed: %w", err)
}
req, err := http.NewRequest("PUT", fmt.Sprintf("https://%s/api/v2/llm/gateway/config", oauth.config.EnvURL), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("request creation failed: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := oauth.client.Do(req)
if err != nil {
return fmt.Errorf("gateway config request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("gateway config error: status %d", resp.StatusCode)
}
return nil
}
This request requires the llm:stream and api:access scopes. A 403 Forbidden response indicates missing scopes or insufficient environment permissions.
Step 2: Establish WebSocket Connection & Handle Real-Time Token Delivery
The Cognigy.AI streaming gateway accepts WebSocket connections at wss://{environment}.cognigy.ai/api/v2/llm/stream. You must pass the Bearer token in the Authorization header during the handshake. The server pushes JSON messages containing token arrays.
import "github.com/gorilla/websocket"
type StreamMessage struct {
Tokens []string `json:"tokens"`
Done bool `json:"done"`
Usage *UsageMetrics `json:"usage,omitempty"`
Error string `json:"error,omitempty"`
}
type UsageMetrics struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
func connectToStream(oauth *OAuthClient, dialer *websocket.Dialer, tokenChan chan<- string, doneChan chan<- bool, errChan chan<- error) {
token, err := oauth.GetToken()
if err != nil {
errChan <- fmt.Errorf("auth failed before connect: %w", err)
return
}
url := fmt.Sprintf("wss://%s/api/v2/llm/stream", oauth.config.EnvURL)
headers := http.Header{}
headers.Set("Authorization", "Bearer "+token)
headers.Set("Accept", "application/json")
conn, resp, err := dialer.Dial(url, headers)
if err != nil {
errChan <- fmt.Errorf("websocket dial failed: %w (http status: %d)", err, resp.StatusCode)
return
}
defer conn.Close()
go func() {
defer close(tokenChan)
defer close(doneChan)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
errChan <- fmt.Errorf("websocket read error: %w", err)
}
return
}
var streamMsg StreamMessage
if err := json.Unmarshal(msg, &streamMsg); err != nil {
errChan <- fmt.Errorf("unmarshal error: %w", err)
return
}
if streamMsg.Error != "" {
errChan <- fmt.Errorf("stream server error: %s", streamMsg.Error)
return
}
for _, t := range streamMsg.Tokens {
tokenChan <- t
}
if streamMsg.Done {
doneChan <- true
return
}
}
}()
}
The goroutine reads messages continuously, unmarshals the JSON payload, and forwards individual tokens to a channel. The doneChan signals completion. Unexpected close codes trigger an error to errChan.
Step 3: Token Fragmentation, Reassembly & Client-Side Filtering
LLM providers often split tokens at byte boundaries, resulting in fragmented words or embedded markdown. You must reassemble tokens into coherent text and apply safety filters before exposing the output.
import (
"regexp"
"strings"
)
var (
piiRegex = regexp.MustCompile(`\b\d{3}[-.]?\d{4}[-.]?\d{4}\b`) // Basic credit card pattern
profanityList = []string{"badword1", "badword2"} // Replace with actual safety dictionary
)
func processTokenStream(tokenChan <-chan string, ctx context.Context) (string, error) {
var buffer strings.Builder
completeText := ""
for {
select {
case <-ctx.Done():
return completeText, ctx.Err()
case token, ok := <-tokenChan:
if !ok {
return completeText, nil
}
// Reassemble and apply safety filter
filtered := applySafetyFilter(token)
if filtered != "" {
buffer.WriteString(filtered)
}
completeText = buffer.String()
}
}
}
func applySafetyFilter(token string) string {
// Strip markdown formatting
cleaned := regexp.MustCompile(`[*_~` + "`" + `]`).ReplaceAllString(token, "")
// Normalize whitespace
cleaned = strings.Join(strings.Fields(cleaned), " ")
// Check PII
if piiRegex.MatchString(cleaned) {
return "[REDACTED]"
}
// Check profanity (case-insensitive)
lower := strings.ToLower(cleaned)
for _, word := range profanityList {
if strings.Contains(lower, word) {
return "[FILTERED]"
}
}
return cleaned
}
This step handles UTF-8 safe string operations. The applySafetyFilter function strips markdown, normalizes whitespace, and blocks patterns matching PII or profanity dictionaries. You must adjust the regex and dictionary for your compliance requirements.
Step 4: Dialog Variable Sync & Fallback Logic
After the stream completes or fails, you must synchronize the final output with Cognigy.AI dialog variables and implement fallback behavior for timeouts or 429 Too Many Requests responses.
func syncDialogVariables(oauth *OAuthClient, dialogID string, output string, usage *UsageMetrics) error {
token, err := oauth.GetToken()
if err != nil {
return fmt.Errorf("auth failed for sync: %w", err)
}
payload := map[string]interface{}{
"variables": []map[string]interface{}{
{"name": "llm_response", "value": output},
{"name": "llm_prompt_tokens", "value": usage.PromptTokens},
{"name": "llm_completion_tokens", "value": usage.CompletionTokens},
{"name": "llm_total_tokens", "value": usage.TotalTokens},
},
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal sync payload failed: %w", err)
}
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/bot/dialog/%s/variables", oauth.config.EnvURL, dialogID), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("request creation failed: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := oauth.client.Do(req)
if err != nil {
return fmt.Errorf("sync request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusTooManyRequests {
// Implement exponential backoff for 429
retryAfter := 2
time.Sleep(time.Duration(retryAfter) * time.Second)
return syncDialogVariables(oauth, dialogID, output, usage)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("sync failed: status %d", resp.StatusCode)
}
return nil
}
func handleFallback(err error) string {
if err == nil {
return ""
}
errStr := err.Error()
if strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "timeout") {
return "I am experiencing high latency. Please try again in a moment."
}
if strings.Contains(errStr, "429") || strings.Contains(errStr, "rate limit") {
return "Service is temporarily overloaded. Please retry shortly."
}
return "An unexpected error occurred. Please contact support."
}
The syncDialogVariables function posts the final text and usage metrics to the dialog session. It recursively retries on 429 with a two-second delay. The handleFallback function maps specific error conditions to user-facing messages.
Step 5: Token Usage Tracking & Frontend Handler
You must track token consumption per session for cost monitoring and expose a streaming endpoint that frontend applications can consume. The following HTTP handler upgrades the connection to Server-Sent Events (SSE) and forwards tokens from the WebSocket stream.
type SessionTracker struct {
mu sync.Mutex
sessions map[string]*UsageMetrics
}
func NewSessionTracker() *SessionTracker {
return &SessionTracker{sessions: make(map[string]*UsageMetrics)}
}
func (st *SessionTracker) Record(sessionID string, usage *UsageMetrics) {
st.mu.Lock()
defer st.mu.Unlock()
if _, exists := st.sessions[sessionID]; exists {
st.sessions[sessionID].PromptTokens += usage.PromptTokens
st.sessions[sessionID].CompletionTokens += usage.CompletionTokens
st.sessions[sessionID].TotalTokens += usage.TotalTokens
} else {
st.sessions[sessionID] = &UsageMetrics{
PromptTokens: usage.PromptTokens,
CompletionTokens: usage.CompletionTokens,
TotalTokens: usage.TotalTokens,
}
}
}
func streamToFrontendHandler(oauth *OAuthClient, tracker *SessionTracker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
sessionID := r.URL.Query().Get("session_id")
dialogID := r.URL.Query().Get("dialog_id")
if sessionID == "" || dialogID == "" {
http.Error(w, "missing session_id or dialog_id", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel()
tokenChan := make(chan string, 64)
doneChan := make(chan bool, 1)
errChan := make(chan error, 1)
dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second}
go connectToStream(oauth, &dialer, tokenChan, doneChan, errChan)
var finalUsage *UsageMetrics
var finalOutput strings.Builder
for {
select {
case <-ctx.Done():
finalOutput.WriteString(handleFallback(ctx.Err()))
writeSSE(w, flusher, "error", finalOutput.String())
return
case token := <-tokenChan:
writeSSE(w, flusher, "token", token)
finalOutput.WriteString(token)
case <-doneChan:
// Stream complete, track usage and sync
// Note: finalUsage would be populated by the stream message in production
// For this example, we assume a placeholder tracking call
tracker.Record(sessionID, &UsageMetrics{CompletionTokens: len(finalOutput.String()) / 4})
writeSSE(w, flusher, "done", finalOutput.String())
flusher.Flush()
return
case err := <-errChan:
fallback := handleFallback(err)
writeSSE(w, flusher, "error", fallback)
flusher.Flush()
return
}
}
}
}
func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data string) {
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, data)
flusher.Flush()
}
The frontend handler accepts session_id and dialog_id query parameters. It streams tokens via SSE, tracks usage in a thread-safe map, and flushes immediately to maintain real-time delivery. Frontend applications can consume this endpoint using the standard EventSource API.
Complete Working Example
The following file combines all components into a single runnable Go service. Replace the placeholder credentials before execution.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
type OAuthConfig struct {
EnvURL string
ClientID string
ClientSecret string
}
type TokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
TokenType string `json:"token_type"`
}
type OAuthClient struct {
config OAuthConfig
token string
expiresAt time.Time
mu sync.RWMutex
client *http.Client
}
type StreamMessage struct {
Tokens []string `json:"tokens"`
Done bool `json:"done"`
Usage *UsageMetrics `json:"usage,omitempty"`
Error string `json:"error,omitempty"`
}
type UsageMetrics struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
type SessionTracker struct {
mu sync.Mutex
sessions map[string]*UsageMetrics
}
var piiRegex = regexp.MustCompile(`\b\d{3}[-.]?\d{4}[-.]?\d{4}\b`)
var profanityList = []string{"badword1", "badword2"}
func NewOAuthClient(cfg OAuthConfig) *OAuthClient {
return &OAuthClient{
config: cfg,
client: &http.Client{Timeout: 10 * time.Second},
}
}
func (o *OAuthClient) GetToken() (string, error) {
o.mu.RLock()
if time.Now().Before(o.expiresAt) && o.token != "" {
token := o.token
o.mu.RUnlock()
return token, nil
}
o.mu.RUnlock()
return o.refreshToken()
}
func (o *OAuthClient) refreshToken() (string, error) {
o.mu.Lock()
defer o.mu.Unlock()
payload := map[string]string{
"grant_type": "client_credentials",
"client_id": o.config.ClientID,
"client_secret": o.config.ClientSecret,
"scope": "llm:stream bot:dialog:write api:access",
}
body, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("marshal oauth payload: %w", err)
}
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/oauth/token", o.config.EnvURL), bytes.NewBuffer(body))
if err != nil {
return "", fmt.Errorf("create oauth request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := o.client.Do(req)
if err != nil {
return "", fmt.Errorf("oauth request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("oauth error: status %d", resp.StatusCode)
}
var tokenResp TokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
return "", fmt.Errorf("decode oauth response: %w", err)
}
o.token = tokenResp.AccessToken
o.expiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second)
return o.token, nil
}
func NewSessionTracker() *SessionTracker {
return &SessionTracker{sessions: make(map[string]*UsageMetrics)}
}
func (st *SessionTracker) Record(sessionID string, usage *UsageMetrics) {
st.mu.Lock()
defer st.mu.Unlock()
if existing, exists := st.sessions[sessionID]; exists {
existing.PromptTokens += usage.PromptTokens
existing.CompletionTokens += usage.CompletionTokens
existing.TotalTokens += usage.TotalTokens
} else {
st.sessions[sessionID] = &UsageMetrics{
PromptTokens: usage.PromptTokens,
CompletionTokens: usage.CompletionTokens,
TotalTokens: usage.TotalTokens,
}
}
}
func connectToStream(oauth *OAuthClient, tokenChan chan<- string, doneChan chan<- bool, errChan chan<- error) {
token, err := oauth.GetToken()
if err != nil {
errChan <- fmt.Errorf("auth failed before connect: %w", err)
return
}
url := fmt.Sprintf("wss://%s/api/v2/llm/stream", oauth.config.EnvURL)
headers := http.Header{}
headers.Set("Authorization", "Bearer "+token)
headers.Set("Accept", "application/json")
conn, resp, err := websocket.DefaultDialer.Dial(url, headers)
if err != nil {
errChan <- fmt.Errorf("websocket dial failed: %w (http status: %d)", err, resp.StatusCode)
return
}
defer conn.Close()
go func() {
defer close(tokenChan)
defer close(doneChan)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
errChan <- fmt.Errorf("websocket read error: %w", err)
}
return
}
var streamMsg StreamMessage
if err := json.Unmarshal(msg, &streamMsg); err != nil {
errChan <- fmt.Errorf("unmarshal error: %w", err)
return
}
if streamMsg.Error != "" {
errChan <- fmt.Errorf("stream server error: %s", streamMsg.Error)
return
}
for _, t := range streamMsg.Tokens {
tokenChan <- t
}
if streamMsg.Done {
doneChan <- true
return
}
}
}()
}
func applySafetyFilter(token string) string {
cleaned := regexp.MustCompile(`[*_~` + "`" + `]`).ReplaceAllString(token, "")
cleaned = strings.Join(strings.Fields(cleaned), " ")
if piiRegex.MatchString(cleaned) {
return "[REDACTED]"
}
lower := strings.ToLower(cleaned)
for _, word := range profanityList {
if strings.Contains(lower, word) {
return "[FILTERED]"
}
}
return cleaned
}
func handleFallback(err error) string {
if err == nil {
return ""
}
errStr := err.Error()
if strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "timeout") {
return "I am experiencing high latency. Please try again in a moment."
}
if strings.Contains(errStr, "429") || strings.Contains(errStr, "rate limit") {
return "Service is temporarily overloaded. Please retry shortly."
}
return "An unexpected error occurred. Please contact support."
}
func streamToFrontendHandler(oauth *OAuthClient, tracker *SessionTracker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
sessionID := r.URL.Query().Get("session_id")
if sessionID == "" {
http.Error(w, "missing session_id", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel()
tokenChan := make(chan string, 64)
doneChan := make(chan bool, 1)
errChan := make(chan error, 1)
go connectToStream(oauth, tokenChan, doneChan, errChan)
var finalOutput strings.Builder
for {
select {
case <-ctx.Done():
writeSSE(w, flusher, "error", handleFallback(ctx.Err()))
return
case token := <-tokenChan:
filtered := applySafetyFilter(token)
if filtered != "" {
writeSSE(w, flusher, "token", filtered)
finalOutput.WriteString(filtered)
}
case <-doneChan:
tracker.Record(sessionID, &UsageMetrics{CompletionTokens: len(finalOutput.String()) / 4})
writeSSE(w, flusher, "done", finalOutput.String())
flusher.Flush()
return
case err := <-errChan:
writeSSE(w, flusher, "error", handleFallback(err))
flusher.Flush()
return
}
}
}
}
func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data string) {
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, data)
flusher.Flush()
}
func main() {
cfg := OAuthConfig{
EnvURL: "your-env.cognigy.ai",
ClientID: "your-client-id",
ClientSecret: "your-client-secret",
}
oauth := NewOAuthClient(cfg)
tracker := NewSessionTracker()
http.HandleFunc("/stream", streamToFrontendHandler(oauth, tracker))
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
})
fmt.Println("Server listening on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Printf("Server failed: %v\n", err)
}
}
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
This occurs when the OAuth token is expired, missing required scopes, or the client credentials are incorrect. Verify that the scope parameter in the token request includes llm:stream and bot:dialog:write. Check the expires_in value and ensure the client refreshes the token before expiration.
Error: 429 Too Many Requests
Cognigy.AI enforces rate limits per environment and per API key. The syncDialogVariables function implements a recursive retry with a two-second delay. For production workloads, implement exponential backoff with jitter using golang.org/x/time/rate or a dedicated circuit breaker library.
Error: websocket: close 1006 (abnormal closure)
This indicates a network interruption or server-side termination. The connectToStream function checks websocket.IsUnexpectedCloseError to distinguish between graceful shutdowns and failures. Implement reconnection logic with a maximum retry count to prevent infinite loops.
Error: Token fragmentation produces garbled text
LLM providers split tokens at byte boundaries, which can corrupt multi-byte UTF-8 characters. The applySafetyFilter function normalizes whitespace and strips markdown, but you must ensure the frontend renderer handles incremental UTF-8 decoding. Use encoding/json with json.RawMessage if raw byte streams are required.