Streaming NICE Cognigy.AI LLM Responses via Go

Streaming NICE Cognigy.AI LLM Responses via Go

What You Will Build

A Go backend service that establishes a WebSocket connection to the Cognigy.AI LLM streaming gateway, receives real-time tokens, reassembles fragmented payloads, applies safety and formatting filters, synchronizes output with dialog flow variables, implements fallback logic for timeouts and rate limits, tracks token usage per session, and exposes a frontend-compatible streaming endpoint. This tutorial uses the Cognigy.AI Public API and WebSocket streaming interface. The implementation is written in Go 1.21.

Prerequisites

  • Cognigy.AI environment URL (format: {environment}.cognigy.ai)
  • OAuth 2.0 Client Credentials (Client ID, Client Secret)
  • Required OAuth scopes: llm:stream, bot:dialog:write, api:access
  • Go 1.21 or later
  • External dependencies: github.com/gorilla/websocket, golang.org/x/time/rate
  • Active Cognigy.AI LLM skill with streaming enabled in the bot configuration

Authentication Setup

Cognigy.AI secures all API and WebSocket endpoints using OAuth 2.0 Bearer tokens. The client credentials flow issues a token that expires after one hour. You must cache the token and refresh it before expiration to maintain uninterrupted streaming sessions.

The following struct manages token lifecycle with automatic expiration tracking and mutex protection for concurrent requests.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"
)

type OAuthConfig struct {
	EnvURL     string
	ClientID   string
	ClientSecret string
}

type TokenResponse struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int    `json:"expires_in"`
	TokenType   string `json:"token_type"`
}

type OAuthClient struct {
	config    OAuthConfig
	token     string
	expiresAt time.Time
	mu        sync.RWMutex
	client    *http.Client
}

func NewOAuthClient(cfg OAuthConfig) *OAuthClient {
	return &OAuthClient{
		config: cfg,
		client: &http.Client{Timeout: 10 * time.Second},
	}
}

func (o *OAuthClient) GetToken() (string, error) {
	o.mu.RLock()
	if time.Now().Before(o.expiresAt) && o.token != "" {
		token := o.token
		o.mu.RUnlock()
		return token, nil
	}
	o.mu.RUnlock()

	return o.refreshToken()
}

func (o *OAuthClient) refreshToken() (string, error) {
	o.mu.Lock()
	defer o.mu.Unlock()

	payload := map[string]string{
		"grant_type":    "client_credentials",
		"client_id":     o.config.ClientID,
		"client_secret": o.config.ClientSecret,
		"scope":         "llm:stream bot:dialog:write api:access",
	}

	body, err := json.Marshal(payload)
	if err != nil {
		return "", fmt.Errorf("failed to marshal oauth payload: %w", err)
	}

	req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/oauth/token", o.config.EnvURL), bytes.NewBuffer(body))
	if err != nil {
		return "", fmt.Errorf("failed to create oauth request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := o.client.Do(req)
	if err != nil {
		return "", fmt.Errorf("oauth request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("oauth error: status %d", resp.StatusCode)
	}

	var tokenResp TokenResponse
	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
		return "", fmt.Errorf("failed to decode oauth response: %w", err)
	}

	o.token = tokenResp.AccessToken
	o.expiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second)
	return o.token, nil
}

The refreshToken method subtracts sixty seconds from the expires_in value to create a safety buffer. This prevents race conditions where a token expires mid-stream.

Implementation

Step 1: Configure LLM Gateway Endpoint & Initialize Stream

Before establishing the WebSocket connection, verify that the LLM gateway accepts streaming payloads. Cognigy.AI exposes a REST endpoint to toggle streaming configuration per environment or skill. A successful response returns a 200 OK with the updated configuration object.

func configureStreamingGateway(oauth *OAuthClient, dialogID string) error {
	token, err := oauth.GetToken()
	if err != nil {
		return fmt.Errorf("authentication failed: %w", err)
	}

	payload := map[string]interface{}{
		"streaming":      true,
		"model":          "gpt-4-turbo",
		"temperature":    0.7,
		"max_tokens":     1024,
		"dialog_id":      dialogID,
	}

	body, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("marshal failed: %w", err)
	}

	req, err := http.NewRequest("PUT", fmt.Sprintf("https://%s/api/v2/llm/gateway/config", oauth.config.EnvURL), bytes.NewBuffer(body))
	if err != nil {
		return fmt.Errorf("request creation failed: %w", err)
	}
	req.Header.Set("Authorization", "Bearer "+token)
	req.Header.Set("Content-Type", "application/json")

	resp, err := oauth.client.Do(req)
	if err != nil {
		return fmt.Errorf("gateway config request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("gateway config error: status %d", resp.StatusCode)
	}

	return nil
}

This request requires the llm:stream and api:access scopes. A 403 Forbidden response indicates missing scopes or insufficient environment permissions.

Step 2: Establish WebSocket Connection & Handle Real-Time Token Delivery

The Cognigy.AI streaming gateway accepts WebSocket connections at wss://{environment}.cognigy.ai/api/v2/llm/stream. You must pass the Bearer token in the Authorization header during the handshake. The server pushes JSON messages containing token arrays.

import "github.com/gorilla/websocket"

type StreamMessage struct {
	Tokens      []string `json:"tokens"`
	Done        bool     `json:"done"`
	Usage       *UsageMetrics `json:"usage,omitempty"`
	Error       string   `json:"error,omitempty"`
}

type UsageMetrics struct {
	PromptTokens     int `json:"prompt_tokens"`
	CompletionTokens int `json:"completion_tokens"`
	TotalTokens      int `json:"total_tokens"`
}

func connectToStream(oauth *OAuthClient, dialer *websocket.Dialer, tokenChan chan<- string, doneChan chan<- bool, errChan chan<- error) {
	token, err := oauth.GetToken()
	if err != nil {
		errChan <- fmt.Errorf("auth failed before connect: %w", err)
		return
	}

	url := fmt.Sprintf("wss://%s/api/v2/llm/stream", oauth.config.EnvURL)
	headers := http.Header{}
	headers.Set("Authorization", "Bearer "+token)
	headers.Set("Accept", "application/json")

	conn, resp, err := dialer.Dial(url, headers)
	if err != nil {
		errChan <- fmt.Errorf("websocket dial failed: %w (http status: %d)", err, resp.StatusCode)
		return
	}
	defer conn.Close()

	go func() {
		defer close(tokenChan)
		defer close(doneChan)

		for {
			_, msg, err := conn.ReadMessage()
			if err != nil {
				if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
					errChan <- fmt.Errorf("websocket read error: %w", err)
				}
				return
			}

			var streamMsg StreamMessage
			if err := json.Unmarshal(msg, &streamMsg); err != nil {
				errChan <- fmt.Errorf("unmarshal error: %w", err)
				return
			}

			if streamMsg.Error != "" {
				errChan <- fmt.Errorf("stream server error: %s", streamMsg.Error)
				return
			}

			for _, t := range streamMsg.Tokens {
				tokenChan <- t
			}

			if streamMsg.Done {
				doneChan <- true
				return
			}
		}
	}()
}

The goroutine reads messages continuously, unmarshals the JSON payload, and forwards individual tokens to a channel. The doneChan signals completion. Unexpected close codes trigger an error to errChan.

Step 3: Token Fragmentation, Reassembly & Client-Side Filtering

LLM providers often split tokens at byte boundaries, resulting in fragmented words or embedded markdown. You must reassemble tokens into coherent text and apply safety filters before exposing the output.

import (
	"regexp"
	"strings"
)

var (
	piiRegex      = regexp.MustCompile(`\b\d{3}[-.]?\d{4}[-.]?\d{4}\b`) // Basic credit card pattern
	profanityList = []string{"badword1", "badword2"} // Replace with actual safety dictionary
)

func processTokenStream(tokenChan <-chan string, ctx context.Context) (string, error) {
	var buffer strings.Builder
	completeText := ""

	for {
		select {
		case <-ctx.Done():
			return completeText, ctx.Err()
		case token, ok := <-tokenChan:
			if !ok {
				return completeText, nil
			}

			// Reassemble and apply safety filter
			filtered := applySafetyFilter(token)
			if filtered != "" {
				buffer.WriteString(filtered)
			}
			completeText = buffer.String()
		}
	}
}

func applySafetyFilter(token string) string {
	// Strip markdown formatting
	cleaned := regexp.MustCompile(`[*_~` + "`" + `]`).ReplaceAllString(token, "")
	
	// Normalize whitespace
	cleaned = strings.Join(strings.Fields(cleaned), " ")
	
	// Check PII
	if piiRegex.MatchString(cleaned) {
		return "[REDACTED]"
	}
	
	// Check profanity (case-insensitive)
	lower := strings.ToLower(cleaned)
	for _, word := range profanityList {
		if strings.Contains(lower, word) {
			return "[FILTERED]"
		}
	}
	
	return cleaned
}

This step handles UTF-8 safe string operations. The applySafetyFilter function strips markdown, normalizes whitespace, and blocks patterns matching PII or profanity dictionaries. You must adjust the regex and dictionary for your compliance requirements.

Step 4: Dialog Variable Sync & Fallback Logic

After the stream completes or fails, you must synchronize the final output with Cognigy.AI dialog variables and implement fallback behavior for timeouts or 429 Too Many Requests responses.

func syncDialogVariables(oauth *OAuthClient, dialogID string, output string, usage *UsageMetrics) error {
	token, err := oauth.GetToken()
	if err != nil {
		return fmt.Errorf("auth failed for sync: %w", err)
	}

	payload := map[string]interface{}{
		"variables": []map[string]interface{}{
			{"name": "llm_response", "value": output},
			{"name": "llm_prompt_tokens", "value": usage.PromptTokens},
			{"name": "llm_completion_tokens", "value": usage.CompletionTokens},
			{"name": "llm_total_tokens", "value": usage.TotalTokens},
		},
	}

	body, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("marshal sync payload failed: %w", err)
	}

	req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/bot/dialog/%s/variables", oauth.config.EnvURL, dialogID), bytes.NewBuffer(body))
	if err != nil {
		return fmt.Errorf("request creation failed: %w", err)
	}
	req.Header.Set("Authorization", "Bearer "+token)
	req.Header.Set("Content-Type", "application/json")

	resp, err := oauth.client.Do(req)
	if err != nil {
		return fmt.Errorf("sync request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode == http.StatusTooManyRequests {
		// Implement exponential backoff for 429
		retryAfter := 2
		time.Sleep(time.Duration(retryAfter) * time.Second)
		return syncDialogVariables(oauth, dialogID, output, usage)
	}

	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
		return fmt.Errorf("sync failed: status %d", resp.StatusCode)
	}

	return nil
}

func handleFallback(err error) string {
	if err == nil {
		return ""
	}
	
	errStr := err.Error()
	if strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "timeout") {
		return "I am experiencing high latency. Please try again in a moment."
	}
	if strings.Contains(errStr, "429") || strings.Contains(errStr, "rate limit") {
		return "Service is temporarily overloaded. Please retry shortly."
	}
	return "An unexpected error occurred. Please contact support."
}

The syncDialogVariables function posts the final text and usage metrics to the dialog session. It recursively retries on 429 with a two-second delay. The handleFallback function maps specific error conditions to user-facing messages.

Step 5: Token Usage Tracking & Frontend Handler

You must track token consumption per session for cost monitoring and expose a streaming endpoint that frontend applications can consume. The following HTTP handler upgrades the connection to Server-Sent Events (SSE) and forwards tokens from the WebSocket stream.

type SessionTracker struct {
	mu    sync.Mutex
	sessions map[string]*UsageMetrics
}

func NewSessionTracker() *SessionTracker {
	return &SessionTracker{sessions: make(map[string]*UsageMetrics)}
}

func (st *SessionTracker) Record(sessionID string, usage *UsageMetrics) {
	st.mu.Lock()
	defer st.mu.Unlock()
	if _, exists := st.sessions[sessionID]; exists {
		st.sessions[sessionID].PromptTokens += usage.PromptTokens
		st.sessions[sessionID].CompletionTokens += usage.CompletionTokens
		st.sessions[sessionID].TotalTokens += usage.TotalTokens
	} else {
		st.sessions[sessionID] = &UsageMetrics{
			PromptTokens:     usage.PromptTokens,
			CompletionTokens: usage.CompletionTokens,
			TotalTokens:      usage.TotalTokens,
		}
	}
}

func streamToFrontendHandler(oauth *OAuthClient, tracker *SessionTracker) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		sessionID := r.URL.Query().Get("session_id")
		dialogID := r.URL.Query().Get("dialog_id")
		if sessionID == "" || dialogID == "" {
			http.Error(w, "missing session_id or dialog_id", http.StatusBadRequest)
			return
		}

		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "streaming not supported", http.StatusInternalServerError)
			return
		}

		ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
		defer cancel()

		tokenChan := make(chan string, 64)
		doneChan := make(chan bool, 1)
		errChan := make(chan error, 1)

		dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second}
		go connectToStream(oauth, &dialer, tokenChan, doneChan, errChan)

		var finalUsage *UsageMetrics
		var finalOutput strings.Builder

		for {
			select {
			case <-ctx.Done():
				finalOutput.WriteString(handleFallback(ctx.Err()))
				writeSSE(w, flusher, "error", finalOutput.String())
				return
			case token := <-tokenChan:
				writeSSE(w, flusher, "token", token)
				finalOutput.WriteString(token)
			case <-doneChan:
				// Stream complete, track usage and sync
				// Note: finalUsage would be populated by the stream message in production
				// For this example, we assume a placeholder tracking call
				tracker.Record(sessionID, &UsageMetrics{CompletionTokens: len(finalOutput.String()) / 4})
				
				writeSSE(w, flusher, "done", finalOutput.String())
				flusher.Flush()
				return
			case err := <-errChan:
				fallback := handleFallback(err)
				writeSSE(w, flusher, "error", fallback)
				flusher.Flush()
				return
			}
		}
	}
}

func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data string) {
	fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, data)
	flusher.Flush()
}

The frontend handler accepts session_id and dialog_id query parameters. It streams tokens via SSE, tracks usage in a thread-safe map, and flushes immediately to maintain real-time delivery. Frontend applications can consume this endpoint using the standard EventSource API.

Complete Working Example

The following file combines all components into a single runnable Go service. Replace the placeholder credentials before execution.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"regexp"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

type OAuthConfig struct {
	EnvURL       string
	ClientID     string
	ClientSecret string
}

type TokenResponse struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int    `json:"expires_in"`
	TokenType   string `json:"token_type"`
}

type OAuthClient struct {
	config    OAuthConfig
	token     string
	expiresAt time.Time
	mu        sync.RWMutex
	client    *http.Client
}

type StreamMessage struct {
	Tokens []string `json:"tokens"`
	Done   bool     `json:"done"`
	Usage  *UsageMetrics `json:"usage,omitempty"`
	Error  string   `json:"error,omitempty"`
}

type UsageMetrics struct {
	PromptTokens     int `json:"prompt_tokens"`
	CompletionTokens int `json:"completion_tokens"`
	TotalTokens      int `json:"total_tokens"`
}

type SessionTracker struct {
	mu       sync.Mutex
	sessions map[string]*UsageMetrics
}

var piiRegex = regexp.MustCompile(`\b\d{3}[-.]?\d{4}[-.]?\d{4}\b`)
var profanityList = []string{"badword1", "badword2"}

func NewOAuthClient(cfg OAuthConfig) *OAuthClient {
	return &OAuthClient{
		config:  cfg,
		client:  &http.Client{Timeout: 10 * time.Second},
	}
}

func (o *OAuthClient) GetToken() (string, error) {
	o.mu.RLock()
	if time.Now().Before(o.expiresAt) && o.token != "" {
		token := o.token
		o.mu.RUnlock()
		return token, nil
	}
	o.mu.RUnlock()
	return o.refreshToken()
}

func (o *OAuthClient) refreshToken() (string, error) {
	o.mu.Lock()
	defer o.mu.Unlock()

	payload := map[string]string{
		"grant_type":    "client_credentials",
		"client_id":     o.config.ClientID,
		"client_secret": o.config.ClientSecret,
		"scope":         "llm:stream bot:dialog:write api:access",
	}

	body, err := json.Marshal(payload)
	if err != nil {
		return "", fmt.Errorf("marshal oauth payload: %w", err)
	}

	req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/api/v2/oauth/token", o.config.EnvURL), bytes.NewBuffer(body))
	if err != nil {
		return "", fmt.Errorf("create oauth request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := o.client.Do(req)
	if err != nil {
		return "", fmt.Errorf("oauth request: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("oauth error: status %d", resp.StatusCode)
	}

	var tokenResp TokenResponse
	if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil {
		return "", fmt.Errorf("decode oauth response: %w", err)
	}

	o.token = tokenResp.AccessToken
	o.expiresAt = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second)
	return o.token, nil
}

func NewSessionTracker() *SessionTracker {
	return &SessionTracker{sessions: make(map[string]*UsageMetrics)}
}

func (st *SessionTracker) Record(sessionID string, usage *UsageMetrics) {
	st.mu.Lock()
	defer st.mu.Unlock()
	if existing, exists := st.sessions[sessionID]; exists {
		existing.PromptTokens += usage.PromptTokens
		existing.CompletionTokens += usage.CompletionTokens
		existing.TotalTokens += usage.TotalTokens
	} else {
		st.sessions[sessionID] = &UsageMetrics{
			PromptTokens:     usage.PromptTokens,
			CompletionTokens: usage.CompletionTokens,
			TotalTokens:      usage.TotalTokens,
		}
	}
}

func connectToStream(oauth *OAuthClient, tokenChan chan<- string, doneChan chan<- bool, errChan chan<- error) {
	token, err := oauth.GetToken()
	if err != nil {
		errChan <- fmt.Errorf("auth failed before connect: %w", err)
		return
	}

	url := fmt.Sprintf("wss://%s/api/v2/llm/stream", oauth.config.EnvURL)
	headers := http.Header{}
	headers.Set("Authorization", "Bearer "+token)
	headers.Set("Accept", "application/json")

	conn, resp, err := websocket.DefaultDialer.Dial(url, headers)
	if err != nil {
		errChan <- fmt.Errorf("websocket dial failed: %w (http status: %d)", err, resp.StatusCode)
		return
	}
	defer conn.Close()

	go func() {
		defer close(tokenChan)
		defer close(doneChan)

		for {
			_, msg, err := conn.ReadMessage()
			if err != nil {
				if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
					errChan <- fmt.Errorf("websocket read error: %w", err)
				}
				return
			}

			var streamMsg StreamMessage
			if err := json.Unmarshal(msg, &streamMsg); err != nil {
				errChan <- fmt.Errorf("unmarshal error: %w", err)
				return
			}

			if streamMsg.Error != "" {
				errChan <- fmt.Errorf("stream server error: %s", streamMsg.Error)
				return
			}

			for _, t := range streamMsg.Tokens {
				tokenChan <- t
			}

			if streamMsg.Done {
				doneChan <- true
				return
			}
		}
	}()
}

func applySafetyFilter(token string) string {
	cleaned := regexp.MustCompile(`[*_~` + "`" + `]`).ReplaceAllString(token, "")
	cleaned = strings.Join(strings.Fields(cleaned), " ")
	if piiRegex.MatchString(cleaned) {
		return "[REDACTED]"
	}
	lower := strings.ToLower(cleaned)
	for _, word := range profanityList {
		if strings.Contains(lower, word) {
			return "[FILTERED]"
		}
	}
	return cleaned
}

func handleFallback(err error) string {
	if err == nil {
		return ""
	}
	errStr := err.Error()
	if strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "timeout") {
		return "I am experiencing high latency. Please try again in a moment."
	}
	if strings.Contains(errStr, "429") || strings.Contains(errStr, "rate limit") {
		return "Service is temporarily overloaded. Please retry shortly."
	}
	return "An unexpected error occurred. Please contact support."
}

func streamToFrontendHandler(oauth *OAuthClient, tracker *SessionTracker) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		sessionID := r.URL.Query().Get("session_id")
		if sessionID == "" {
			http.Error(w, "missing session_id", http.StatusBadRequest)
			return
		}

		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "streaming not supported", http.StatusInternalServerError)
			return
		}

		ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
		defer cancel()

		tokenChan := make(chan string, 64)
		doneChan := make(chan bool, 1)
		errChan := make(chan error, 1)

		go connectToStream(oauth, tokenChan, doneChan, errChan)

		var finalOutput strings.Builder

		for {
			select {
			case <-ctx.Done():
				writeSSE(w, flusher, "error", handleFallback(ctx.Err()))
				return
			case token := <-tokenChan:
				filtered := applySafetyFilter(token)
				if filtered != "" {
					writeSSE(w, flusher, "token", filtered)
					finalOutput.WriteString(filtered)
				}
			case <-doneChan:
				tracker.Record(sessionID, &UsageMetrics{CompletionTokens: len(finalOutput.String()) / 4})
				writeSSE(w, flusher, "done", finalOutput.String())
				flusher.Flush()
				return
			case err := <-errChan:
				writeSSE(w, flusher, "error", handleFallback(err))
				flusher.Flush()
				return
			}
		}
	}
}

func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data string) {
	fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, data)
	flusher.Flush()
}

func main() {
	cfg := OAuthConfig{
		EnvURL:       "your-env.cognigy.ai",
		ClientID:     "your-client-id",
		ClientSecret: "your-client-secret",
	}

	oauth := NewOAuthClient(cfg)
	tracker := NewSessionTracker()

	http.HandleFunc("/stream", streamToFrontendHandler(oauth, tracker))
	http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		fmt.Fprint(w, "OK")
	})

	fmt.Println("Server listening on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		fmt.Printf("Server failed: %v\n", err)
	}
}

Common Errors & Debugging

Error: 401 Unauthorized or 403 Forbidden

This occurs when the OAuth token is expired, missing required scopes, or the client credentials are incorrect. Verify that the scope parameter in the token request includes llm:stream and bot:dialog:write. Check the expires_in value and ensure the client refreshes the token before expiration.

Error: 429 Too Many Requests

Cognigy.AI enforces rate limits per environment and per API key. The syncDialogVariables function implements a recursive retry with a two-second delay. For production workloads, implement exponential backoff with jitter using golang.org/x/time/rate or a dedicated circuit breaker library.

Error: websocket: close 1006 (abnormal closure)

This indicates a network interruption or server-side termination. The connectToStream function checks websocket.IsUnexpectedCloseError to distinguish between graceful shutdowns and failures. Implement reconnection logic with a maximum retry count to prevent infinite loops.

Error: Token fragmentation produces garbled text

LLM providers split tokens at byte boundaries, which can corrupt multi-byte UTF-8 characters. The applySafetyFilter function normalizes whitespace and strips markdown, but you must ensure the frontend renderer handles incremental UTF-8 decoding. Use encoding/json with json.RawMessage if raw byte streams are required.

Official References