Exporting NICE Cognigy Bot Flow Definitions via REST API with Go

Exporting NICE Cognigy Bot Flow Definitions via REST API with Go

What You Will Build

A Go service that triggers asynchronous bot flow exports, validates dependency constraints, traverses node graphs for conflict detection, transforms platform definitions into portable formats, and synchronizes completion to external version control systems.
This tutorial uses the NICE Cognigy REST API surface for bot management and export orchestration.
The implementation covers Go 1.21+ with standard library HTTP clients, JSON schema validation, and asynchronous job polling.

Prerequisites

  • OAuth2 client credentials flow configured in the Cognigy platform
  • Required scopes: bot:read, export:manage, webhook:write
  • Go runtime version 1.21 or higher
  • Standard library dependencies: net/http, encoding/json, context, time, sync, crypto/sha256, fmt, os, log

Authentication Setup

Cognigy enforces OAuth2 client credentials authentication for machine-to-machine API access. You must cache the access token and implement automatic refresh before expiration.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"
)

type OAuthConfig struct {
	ClientID     string
	ClientSecret string
	TokenURL     string
}

type TokenResponse struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int    `json:"expires_in"`
}

type TokenCache struct {
	mu          sync.RWMutex
	token       string
	expiresAt   time.Time
	client      *http.Client
	config      OAuthConfig
}

func NewTokenCache(cfg OAuthConfig) *TokenCache {
	return &TokenCache{
		client: &http.Client{Timeout: 10 * time.Second},
		config: cfg,
	}
}

func (tc *TokenCache) GetToken(ctx context.Context) (string, error) {
	tc.mu.RLock()
	if time.Now().Before(tc.expiresAt) && tc.token != "" {
		token := tc.token
		tc.mu.RUnlock()
		return token, nil
	}
	tc.mu.RUnlock()

	tc.mu.Lock()
	defer tc.mu.Unlock()

	if time.Now().Before(tc.expiresAt) && tc.token != "" {
		return tc.token, nil
	}

	payload := fmt.Sprintf("grant_type=client_credentials&client_id=%s&client_secret=%s",
		tc.config.ClientID, tc.config.ClientSecret)

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, tc.config.TokenURL,
		bytes.NewBufferString(payload))
	if err != nil {
		return "", fmt.Errorf("token request creation failed: %w", err)
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

	resp, err := tc.client.Do(req)
	if err != nil {
		return "", fmt.Errorf("token fetch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("token fetch returned status %d", resp.StatusCode)
	}

	var tr TokenResponse
	if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
		return "", fmt.Errorf("token decode failed: %w", err)
	}

	tc.token = tr.AccessToken
	tc.expiresAt = time.Now().Add(time.Duration(tr.ExpiresIn-60) * time.Second)
	return tc.token, nil
}

HTTP Cycle: OAuth2 Token Request

  • Method: POST
  • Path: /oauth/token
  • Headers: Content-Type: application/x-www-form-urlencoded, Authorization: Basic base64(client_id:client_secret) (alternative to body encoding)
  • Request Body: grant_type=client_credentials&scope=bot:read+export:manage+webhook:write
  • Response Body: {"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", "expires_in": 3600, "token_type": "Bearer"}

Implementation

Step 1: Construct Export Payload and Trigger Asynchronous Job

The export job requires a structured payload specifying the bot version, scope filters, serialization format, and node limit thresholds. Cognigy processes exports asynchronously to prevent blocking the main event loop.

type ExportRequest struct {
	BotVersionID       string          `json:"botVersionId"`
	ScopeFilters       ScopeFilters    `json:"scopeFilters"`
	SerializationFormat string         `json:"serializationFormat"`
	NodeLimitThreshold int             `json:"nodeLimitThreshold"`
	IncludeDependencies bool           `json:"includeDependencies"`
}

type ScopeFilters struct {
	NodeTypes []string `json:"nodeTypes"`
	Intents   []string `json:"intents,omitempty"`
	Entities  []string `json:"entities,omitempty"`
}

type ExportJobResponse struct {
	JobID      string `json:"jobId"`
	Status     string `json:"status"`
	CreatedAt  string `json:"createdAt"`
	EstimatedDurationSeconds int `json:"estimatedDurationSeconds"`
}

func (e *FlowExporter) TriggerExport(ctx context.Context, botID string, req ExportRequest) (ExportJobResponse, error) {
	token, err := e.tokenCache.GetToken(ctx)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("authentication failed: %w", err)
	}

	payload, err := json.Marshal(req)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("payload serialization failed: %w", err)
	}

	endpoint := fmt.Sprintf("%s/api/v1/bots/%s/export", e.baseURL, botID)
	httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload))
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("request creation failed: %w", err)
	}

	httpReq.Header.Set("Authorization", "Bearer "+token)
	httpReq.Header.Set("Content-Type", "application/json")
	httpReq.Header.Set("Accept", "application/json")
	httpReq.Header.Set("X-Request-ID", generateRequestID())

	resp, err := e.httpClient.Do(httpReq)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("export trigger failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode == http.StatusTooManyRequests {
		return ExportJobResponse{}, fmt.Errorf("rate limit exceeded (429): retry after %s", resp.Header.Get("Retry-After"))
	}
	if resp.StatusCode != http.StatusAccepted {
		return ExportJobResponse{}, fmt.Errorf("export trigger returned status %d", resp.StatusCode)
	}

	var jobResp ExportJobResponse
	if err := json.NewDecoder(resp.Body).Decode(&jobResp); err != nil {
		return ExportJobResponse{}, fmt.Errorf("response decode failed: %w", err)
	}

	return jobResp, nil
}

HTTP Cycle: Export Trigger

  • Method: POST
  • Path: /api/v1/bots/{botId}/export
  • Headers: Authorization: Bearer <token>, Content-Type: application/json, Accept: application/json, X-Request-ID: <uuid>
  • Request Body: {"botVersionId": "v2.4.1", "scopeFilters": {"nodeTypes": ["flow", "intent", "entity"], "intents": ["order_status", "refund_request"]}, "serializationFormat": "json", "nodeLimitThreshold": 500, "includeDependencies": true}
  • Response Body: {"jobId": "exp_8f3a9c2d1e", "status": "queued", "createdAt": "2024-05-15T10:30:00Z", "estimatedDurationSeconds": 45}
  • Required Scope: bot:read, export:manage

Step 2: Poll Job Status with Dependency Graph Traversal and Conflict Detection

Export jobs transition through queued, processing, validating, and completed states. You must poll the status endpoint with exponential backoff. During the validating phase, the API returns a dependency graph that requires traversal to detect circular references and node limit violations.

type JobStatusResponse struct {
	JobID    string `json:"jobId"`
	Status   string `json:"status"`
	Progress int    `json:"progress"`
	Graph    Graph  `json:"graph,omitempty"`
}

type Graph struct {
	Nodes []GraphNode `json:"nodes"`
	Edges []GraphEdge `json:"edges"`
}

type GraphNode struct {
	ID   string `json:"id"`
	Type string `json:"type"`
	Count int   `json:"count"`
}

type GraphEdge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

func (e *FlowExporter) PollJobStatus(ctx context.Context, jobID string) (JobStatusResponse, error) {
	var lastStatus string
	interval := time.Second
	maxRetries := 30

	for i := 0; i < maxRetries; i++ {
		select {
		case <-ctx.Done():
			return JobStatusResponse{}, ctx.Err()
		case <-time.After(interval):
		}

		token, err := e.tokenCache.GetToken(ctx)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("authentication failed during polling: %w", err)
		}

		endpoint := fmt.Sprintf("%s/api/v1/exports/%s/status", e.baseURL, jobID)
		httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("status request failed: %w", err)
		}

		httpReq.Header.Set("Authorization", "Bearer "+token)
		httpReq.Header.Set("Accept", "application/json")

		resp, err := e.httpClient.Do(httpReq)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("status fetch failed: %w", err)
		}

		if resp.StatusCode == http.StatusTooManyRequests {
			retryAfter := parseRetryAfter(resp.Header.Get("Retry-After"))
			time.Sleep(retryAfter)
			continue
		}

		var statusResp JobStatusResponse
		if err := json.NewDecoder(resp.Body).Decode(&statusResp); err != nil {
			resp.Body.Close()
			return JobStatusResponse{}, fmt.Errorf("status decode failed: %w", err)
		}
		resp.Body.Close()

		if statusResp.Status != lastStatus {
			log.Printf("Job %s status changed to %s", jobID, statusResp.Status)
		}
		lastStatus = statusResp.Status

		if statusResp.Status == "completed" || statusResp.Status == "failed" {
			if err := e.validateDependencyGraph(statusResp.Graph); err != nil {
				return statusResp, fmt.Errorf("dependency validation failed: %w", err)
			}
			return statusResp, nil
		}

		interval *= 2
		if interval > 30*time.Second {
			interval = 30 * time.Second
		}
	}

	return JobStatusResponse{Status: "timeout"}, fmt.Errorf("job polling exceeded maximum retries")
}

func (e *FlowExporter) validateDependencyGraph(graph Graph) error {
	if len(graph.Nodes) > e.nodeLimitThreshold {
		return fmt.Errorf("node count %d exceeds threshold %d", len(graph.Nodes), e.nodeLimitThreshold)
	}

	adjacency := make(map[string][]string)
	for _, edge := range graph.Edges {
		adjacency[edge.From] = append(adjacency[edge.From], edge.To)
	}

	visited := make(map[string]bool)
	recStack := make(map[string]bool)

	var hasCycle func(string) bool
	hasCycle = func(nodeID string) bool {
		visited[nodeID] = true
		recStack[nodeID] = true

		for _, neighbor := range adjacency[nodeID] {
			if !visited[neighbor] {
				if hasCycle(neighbor) {
					return true
				}
			} else if recStack[neighbor] {
				return true
			}
		}

		recStack[nodeID] = false
		return false
	}

	for _, node := range graph.Nodes {
		if !visited[node.ID] {
			if hasCycle(node.ID) {
				return fmt.Errorf("circular dependency detected starting at node %s", node.ID)
			}
		}
	}

	return nil
}

HTTP Cycle: Job Status Polling

  • Method: GET
  • Path: /api/v1/exports/{jobId}/status
  • Headers: Authorization: Bearer <token>, Accept: application/json
  • Request Body: None
  • Response Body: {"jobId": "exp_8f3a9c2d1e", "status": "validating", "progress": 75, "graph": {"nodes": [{"id": "n_1a2b", "type": "flow", "count": 120}], "edges": [{"from": "n_1a2b", "to": "n_3c4d"}]}}
  • Required Scope: export:manage

Step 3: Execute Transformation Pipeline and Schema Validation

Once the job completes, you retrieve the raw export payload. The transformation pipeline normalizes JSON schema structures, resolves internal reference pointers, and converts Cognigy-specific node definitions into a portable format.

type ExportResultResponse struct {
	JobID   string          `json:"jobId"`
	Status  string          `json:"status"`
	Payload json.RawMessage `json:"payload"`
}

type PortableFlow struct {
	Version string              `json:"version"`
	Metadata FlowMetadata       `json:"metadata"`
	Nodes   []PortableNode      `json:"nodes"`
	Edges   []PortableEdge      `json:"edges"`
}

type FlowMetadata struct {
	BotID       string    `json:"botId"`
	ExportedAt  time.Time `json:"exportedAt"`
	Source      string    `json:"source"`
	Checksum    string    `json:"checksum"`
}

type PortableNode struct {
	ID          string            `json:"id"`
	Type        string            `json:"type"`
	Properties  map[string]any    `json:"properties"`
	References  []string          `json:"references,omitempty"`
}

type PortableEdge struct {
	SourceID string `json:"sourceId"`
	TargetID string `json:"targetId"`
	Condition string `json:"condition,omitempty"`
}

func (e *FlowExporter) FetchAndTransform(ctx context.Context, jobID string) (PortableFlow, error) {
	token, err := e.tokenCache.GetToken(ctx)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("authentication failed: %w", err)
	}

	endpoint := fmt.Sprintf("%s/api/v1/exports/%s/result", e.baseURL, jobID)
	httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("result request failed: %w", err)
	}

	httpReq.Header.Set("Authorization", "Bearer "+token)
	httpReq.Header.Set("Accept", "application/json")

	resp, err := e.httpClient.Do(httpReq)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("result fetch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return PortableFlow{}, fmt.Errorf("result fetch returned status %d", resp.StatusCode)
	}

	var resultResp ExportResultResponse
	if err := json.NewDecoder(resp.Body).Decode(&resultResp); err != nil {
		return PortableFlow{}, fmt.Errorf("result decode failed: %w", err)
	}

	return e.transformPipeline(resultResp.Payload)
}

func (e *FlowExporter) transformPipeline(raw json.RawMessage) (PortableFlow, error) {
	var cognigyDef map[string]any
	if err := json.Unmarshal(raw, &cognigyDef); err != nil {
		return PortableFlow{}, fmt.Errorf("schema normalization failed: %w", err)
	}

	checksum := generateChecksum(raw)
	portable := PortableFlow{
		Version: "1.0.0",
		Metadata: FlowMetadata{
			BotID:       extractString(cognigyDef, "botId"),
			ExportedAt:  time.Now().UTC(),
			Source:      "cognigy_platform",
			Checksum:    checksum,
		},
	}

	if nodes, ok := cognigyDef["nodes"].([]any); ok {
		for _, n := range nodes {
			nodeMap, ok := n.(map[string]any)
			if !ok {
				continue
			}
			pn := PortableNode{
				ID:         extractString(nodeMap, "id"),
				Type:       extractString(nodeMap, "type"),
				Properties: nodeMap,
			}
			if refs, ok := nodeMap["references"].([]any); ok {
				for _, r := range refs {
					if rs, ok := r.(string); ok {
						pn.References = append(pn.References, rs)
					}
				}
			}
			portable.Nodes = append(portable.Nodes, pn)
		}
	}

	if edges, ok := cognigyDef["edges"].([]any); ok {
		for _, e := range edges {
			edgeMap, ok := e.(map[string]any)
			if !ok {
				continue
			}
			portable.Edges = append(portable.Edges, PortableEdge{
				SourceID:  extractString(edgeMap, "sourceId"),
				TargetID:  extractString(edgeMap, "targetId"),
				Condition: extractString(edgeMap, "condition"),
			})
		}
	}

	return portable, nil
}

func generateChecksum(data []byte) string {
	h := sha256.Sum256(data)
	return fmt.Sprintf("%x", h)
}

func extractString(m map[string]any, key string) string {
	if v, ok := m[key].(string); ok {
		return v
	}
	return ""
}

HTTP Cycle: Export Result Retrieval

  • Method: GET
  • Path: /api/v1/exports/{jobId}/result
  • Headers: Authorization: Bearer <token>, Accept: application/json
  • Request Body: None
  • Response Body: {"jobId": "exp_8f3a9c2d1e", "status": "completed", "payload": "{\"botId\": \"bot_prod_01\", \"nodes\": [{\"id\": \"n_start\", \"type\": \"trigger\", \"references\": [\"intent_order\"]}], \"edges\": [{\"sourceId\": \"n_start\", \"targetId\": \"n_route\", \"condition\": \"true\"}]}"}
  • Required Scope: export:manage

Step 4: Synchronize to Version Control, Track Metrics, and Generate Audit Logs

After transformation, the service dispatches a webhook to the external version control system, records execution duration, updates validation success rates, and writes an immutable audit log entry for governance compliance.

type WebhookPayload struct {
	Event    string          `json:"event"`
	Timestamp time.Time      `json:"timestamp"`
	Data     json.RawMessage `json:"data"`
}

type Metrics struct {
	ExportDurationMs  int64 `json:"exportDurationMs"`
	ValidationSuccess bool  `json:"validationSuccess"`
	NodeCount         int   `json:"nodeCount"`
}

type AuditLogEntry struct {
	Actor    string    `json:"actor"`
	Action   string    `json:"action"`
	Resource string    `json:"resource"`
	Timestamp time.Time `json:"timestamp"`
	Outcome  string    `json:"outcome"`
	Details  string    `json:"details"`
}

func (e *FlowExporter) SyncToVCS(ctx context.Context, portable PortableFlow, metrics Metrics) error {
	payload, err := json.Marshal(WebhookPayload{
		Event:     "flow.export.completed",
		Timestamp: time.Now().UTC(),
		Data:      mustMarshalJSON(portable),
	})
	if err != nil {
		return fmt.Errorf("webhook payload serialization failed: %w", err)
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.webhookURL, bytes.NewReader(payload))
	if err != nil {
		return fmt.Errorf("webhook request creation failed: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Webhook-Secret", e.webhookSecret)

	resp, err := e.httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("webhook dispatch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("webhook returned status %d", resp.StatusCode)
	}

	e.recordMetrics(metrics)
	e.writeAuditLog(AuditLogEntry{
		Actor:    "flow_exporter_service",
		Action:   "EXPORT_SYNC",
		Resource: portable.Metadata.BotID,
		Timestamp: time.Now().UTC(),
		Outcome:  "SUCCESS",
		Details:  fmt.Sprintf("duration:%dms nodes:%d", metrics.ExportDurationMs, metrics.NodeCount),
	})

	return nil
}

func (e *FlowExporter) recordMetrics(m Metrics) {
	e.mu.Lock()
	defer e.mu.Unlock()
	e.totalExports++
	if m.ValidationSuccess {
		e.successfulExports++
	}
	e.averageDuration = (e.averageDuration + m.ExportDurationMs) / 2
}

func (e *FlowExporter) writeAuditLog(entry AuditLogEntry) {
	log.Printf("AUDIT: %+v", entry)
}

func mustMarshalJSON(v any) json.RawMessage {
	b, _ := json.Marshal(v)
	return b
}

HTTP Cycle: Webhook Synchronization

  • Method: POST
  • Path: https://vcs.internal/api/webhooks/cognigy-export (external endpoint)
  • Headers: Content-Type: application/json, X-Webhook-Secret: <shared_secret>
  • Request Body: {"event": "flow.export.completed", "timestamp": "2024-05-15T10:35:22Z", "data": "{\"version\":\"1.0.0\",\"metadata\":{\"botId\":\"bot_prod_01\",\"exportedAt\":\"2024-05-15T10:35:20Z\",\"source\":\"cognigy_platform\",\"checksum\":\"a1b2c3d4...\"},\"nodes\":[],\"edges\":[]}"}
  • Response Body: {"status": "received", "ackId": "wh_9x8y7z"}
  • Required Scope: webhook:write (platform side), external VCS requires its own secret validation.

Complete Working Example

package main

import (
	"bytes"
	"context"
	"crypto/sha256"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"
	"time"
)

type OAuthConfig struct {
	ClientID     string
	ClientSecret string
	TokenURL     string
}

type TokenResponse struct {
	AccessToken string `json:"access_token"`
	ExpiresIn   int    `json:"expires_in"`
}

type TokenCache struct {
	mu          sync.RWMutex
	token       string
	expiresAt   time.Time
	client      *http.Client
	config      OAuthConfig
}

type ExportRequest struct {
	BotVersionID       string       `json:"botVersionId"`
	ScopeFilters       ScopeFilters `json:"scopeFilters"`
	SerializationFormat string      `json:"serializationFormat"`
	NodeLimitThreshold int          `json:"nodeLimitThreshold"`
	IncludeDependencies bool        `json:"includeDependencies"`
}

type ScopeFilters struct {
	NodeTypes []string `json:"nodeTypes"`
	Intents   []string `json:"intents,omitempty"`
	Entities  []string `json:"entities,omitempty"`
}

type ExportJobResponse struct {
	JobID      string `json:"jobId"`
	Status     string `json:"status"`
	CreatedAt  string `json:"createdAt"`
	EstimatedDurationSeconds int `json:"estimatedDurationSeconds"`
}

type JobStatusResponse struct {
	JobID    string `json:"jobId"`
	Status   string `json:"status"`
	Progress int    `json:"progress"`
	Graph    Graph  `json:"graph,omitempty"`
}

type Graph struct {
	Nodes []GraphNode `json:"nodes"`
	Edges []GraphEdge `json:"edges"`
}

type GraphNode struct {
	ID   string `json:"id"`
	Type string `json:"type"`
	Count int   `json:"count"`
}

type GraphEdge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

type ExportResultResponse struct {
	JobID   string          `json:"jobId"`
	Status  string          `json:"status"`
	Payload json.RawMessage `json:"payload"`
}

type PortableFlow struct {
	Version  string         `json:"version"`
	Metadata FlowMetadata   `json:"metadata"`
	Nodes    []PortableNode `json:"nodes"`
	Edges    []PortableEdge `json:"edges"`
}

type FlowMetadata struct {
	BotID      string    `json:"botId"`
	ExportedAt time.Time `json:"exportedAt"`
	Source     string    `json:"source"`
	Checksum   string    `json:"checksum"`
}

type PortableNode struct {
	ID         string            `json:"id"`
	Type       string            `json:"type"`
	Properties map[string]any    `json:"properties"`
	References []string          `json:"references,omitempty"`
}

type PortableEdge struct {
	SourceID  string `json:"sourceId"`
	TargetID  string `json:"targetId"`
	Condition string `json:"condition,omitempty"`
}

type WebhookPayload struct {
	Event     string          `json:"event"`
	Timestamp time.Time       `json:"timestamp"`
	Data      json.RawMessage `json:"data"`
}

type Metrics struct {
	ExportDurationMs  int64 `json:"exportDurationMs"`
	ValidationSuccess bool  `json:"validationSuccess"`
	NodeCount         int   `json:"nodeCount"`
}

type AuditLogEntry struct {
	Actor     string    `json:"actor"`
	Action    string    `json:"action"`
	Resource  string    `json:"resource"`
	Timestamp time.Time `json:"timestamp"`
	Outcome   string    `json:"outcome"`
	Details   string    `json:"details"`
}

type FlowExporter struct {
	tokenCache        *TokenCache
	httpClient        *http.Client
	baseURL           string
	webhookURL        string
	webhookSecret     string
	nodeLimitThreshold int
	mu                sync.Mutex
	totalExports      int
	successfulExports int
	averageDuration   int64
}

func NewFlowExporter(cfg OAuthConfig, baseURL, webhookURL, webhookSecret string, nodeLimit int) *FlowExporter {
	return &FlowExporter{
		tokenCache:        NewTokenCache(cfg),
		httpClient:        &http.Client{Timeout: 30 * time.Second},
		baseURL:           baseURL,
		webhookURL:        webhookURL,
		webhookSecret:     webhookSecret,
		nodeLimitThreshold: nodeLimit,
	}
}

func NewTokenCache(cfg OAuthConfig) *TokenCache {
	return &TokenCache{
		client: &http.Client{Timeout: 10 * time.Second},
		config: cfg,
	}
}

func (tc *TokenCache) GetToken(ctx context.Context) (string, error) {
	tc.mu.RLock()
	if time.Now().Before(tc.expiresAt) && tc.token != "" {
		token := tc.token
		tc.mu.RUnlock()
		return token, nil
	}
	tc.mu.RUnlock()

	tc.mu.Lock()
	defer tc.mu.Unlock()

	if time.Now().Before(tc.expiresAt) && tc.token != "" {
		return tc.token, nil
	}

	payload := fmt.Sprintf("grant_type=client_credentials&client_id=%s&client_secret=%s",
		tc.config.ClientID, tc.config.ClientSecret)

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, tc.config.TokenURL,
		bytes.NewBufferString(payload))
	if err != nil {
		return "", fmt.Errorf("token request creation failed: %w", err)
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

	resp, err := tc.client.Do(req)
	if err != nil {
		return "", fmt.Errorf("token fetch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("token fetch returned status %d", resp.StatusCode)
	}

	var tr TokenResponse
	if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
		return "", fmt.Errorf("token decode failed: %w", err)
	}

	tc.token = tr.AccessToken
	tc.expiresAt = time.Now().Add(time.Duration(tr.ExpiresIn-60) * time.Second)
	return tc.token, nil
}

func (e *FlowExporter) TriggerExport(ctx context.Context, botID string, req ExportRequest) (ExportJobResponse, error) {
	token, err := e.tokenCache.GetToken(ctx)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("authentication failed: %w", err)
	}

	payload, err := json.Marshal(req)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("payload serialization failed: %w", err)
	}

	endpoint := fmt.Sprintf("%s/api/v1/bots/%s/export", e.baseURL, botID)
	httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload))
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("request creation failed: %w", err)
	}

	httpReq.Header.Set("Authorization", "Bearer "+token)
	httpReq.Header.Set("Content-Type", "application/json")
	httpReq.Header.Set("Accept", "application/json")
	httpReq.Header.Set("X-Request-ID", fmt.Sprintf("req_%d", time.Now().UnixNano()))

	resp, err := e.httpClient.Do(httpReq)
	if err != nil {
		return ExportJobResponse{}, fmt.Errorf("export trigger failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode == http.StatusTooManyRequests {
		return ExportJobResponse{}, fmt.Errorf("rate limit exceeded (429): retry after %s", resp.Header.Get("Retry-After"))
	}
	if resp.StatusCode != http.StatusAccepted {
		return ExportJobResponse{}, fmt.Errorf("export trigger returned status %d", resp.StatusCode)
	}

	var jobResp ExportJobResponse
	if err := json.NewDecoder(resp.Body).Decode(&jobResp); err != nil {
		return ExportJobResponse{}, fmt.Errorf("response decode failed: %w", err)
	}

	return jobResp, nil
}

func (e *FlowExporter) PollJobStatus(ctx context.Context, jobID string) (JobStatusResponse, error) {
	var lastStatus string
	interval := time.Second
	maxRetries := 30

	for i := 0; i < maxRetries; i++ {
		select {
		case <-ctx.Done():
			return JobStatusResponse{}, ctx.Err()
		case <-time.After(interval):
		}

		token, err := e.tokenCache.GetToken(ctx)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("authentication failed during polling: %w", err)
		}

		endpoint := fmt.Sprintf("%s/api/v1/exports/%s/status", e.baseURL, jobID)
		httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("status request failed: %w", err)
		}

		httpReq.Header.Set("Authorization", "Bearer "+token)
		httpReq.Header.Set("Accept", "application/json")

		resp, err := e.httpClient.Do(httpReq)
		if err != nil {
			return JobStatusResponse{}, fmt.Errorf("status fetch failed: %w", err)
		}

		if resp.StatusCode == http.StatusTooManyRequests {
			time.Sleep(time.Duration(2*i+1) * time.Second)
			continue
		}

		var statusResp JobStatusResponse
		if err := json.NewDecoder(resp.Body).Decode(&statusResp); err != nil {
			resp.Body.Close()
			return JobStatusResponse{}, fmt.Errorf("status decode failed: %w", err)
		}
		resp.Body.Close()

		if statusResp.Status != lastStatus {
			log.Printf("Job %s status changed to %s", jobID, statusResp.Status)
		}
		lastStatus = statusResp.Status

		if statusResp.Status == "completed" || statusResp.Status == "failed" {
			if err := e.validateDependencyGraph(statusResp.Graph); err != nil {
				return statusResp, fmt.Errorf("dependency validation failed: %w", err)
			}
			return statusResp, nil
		}

		interval *= 2
		if interval > 30*time.Second {
			interval = 30 * time.Second
		}
	}

	return JobStatusResponse{Status: "timeout"}, fmt.Errorf("job polling exceeded maximum retries")
}

func (e *FlowExporter) validateDependencyGraph(graph Graph) error {
	if len(graph.Nodes) > e.nodeLimitThreshold {
		return fmt.Errorf("node count %d exceeds threshold %d", len(graph.Nodes), e.nodeLimitThreshold)
	}

	adjacency := make(map[string][]string)
	for _, edge := range graph.Edges {
		adjacency[edge.From] = append(adjacency[edge.From], edge.To)
	}

	visited := make(map[string]bool)
	recStack := make(map[string]bool)

	var hasCycle func(string) bool
	hasCycle = func(nodeID string) bool {
		visited[nodeID] = true
		recStack[nodeID] = true

		for _, neighbor := range adjacency[nodeID] {
			if !visited[neighbor] {
				if hasCycle(neighbor) {
					return true
				}
			} else if recStack[neighbor] {
				return true
			}
		}

		recStack[nodeID] = false
		return false
	}

	for _, node := range graph.Nodes {
		if !visited[node.ID] {
			if hasCycle(node.ID) {
				return fmt.Errorf("circular dependency detected starting at node %s", node.ID)
			}
		}
	}

	return nil
}

func (e *FlowExporter) FetchAndTransform(ctx context.Context, jobID string) (PortableFlow, error) {
	token, err := e.tokenCache.GetToken(ctx)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("authentication failed: %w", err)
	}

	endpoint := fmt.Sprintf("%s/api/v1/exports/%s/result", e.baseURL, jobID)
	httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("result request failed: %w", err)
	}

	httpReq.Header.Set("Authorization", "Bearer "+token)
	httpReq.Header.Set("Accept", "application/json")

	resp, err := e.httpClient.Do(httpReq)
	if err != nil {
		return PortableFlow{}, fmt.Errorf("result fetch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return PortableFlow{}, fmt.Errorf("result fetch returned status %d", resp.StatusCode)
	}

	var resultResp ExportResultResponse
	if err := json.NewDecoder(resp.Body).Decode(&resultResp); err != nil {
		return PortableFlow{}, fmt.Errorf("result decode failed: %w", err)
	}

	return e.transformPipeline(resultResp.Payload)
}

func (e *FlowExporter) transformPipeline(raw json.RawMessage) (PortableFlow, error) {
	var cognigyDef map[string]any
	if err := json.Unmarshal(raw, &cognigyDef); err != nil {
		return PortableFlow{}, fmt.Errorf("schema normalization failed: %w", err)
	}

	checksum := generateChecksum(raw)
	portable := PortableFlow{
		Version: "1.0.0",
		Metadata: FlowMetadata{
			BotID:      extractString(cognigyDef, "botId"),
			ExportedAt: time.Now().UTC(),
			Source:     "cognigy_platform",
			Checksum:   checksum,
		},
	}

	if nodes, ok := cognigyDef["nodes"].([]any); ok {
		for _, n := range nodes {
			nodeMap, ok := n.(map[string]any)
			if !ok {
				continue
			}
			pn := PortableNode{
				ID:         extractString(nodeMap, "id"),
				Type:       extractString(nodeMap, "type"),
				Properties: nodeMap,
			}
			if refs, ok := nodeMap["references"].([]any); ok {
				for _, r := range refs {
					if rs, ok := r.(string); ok {
						pn.References = append(pn.References, rs)
					}
				}
			}
			portable.Nodes = append(portable.Nodes, pn)
		}
	}

	if edges, ok := cognigyDef["edges"].([]any); ok {
		for _, e := range edges {
			edgeMap, ok := e.(map[string]any)
			if !ok {
				continue
			}
			portable.Edges = append(portable.Edges, PortableEdge{
				SourceID:  extractString(edgeMap, "sourceId"),
				TargetID:  extractString(edgeMap, "targetId"),
				Condition: extractString(edgeMap, "condition"),
			})
		}
	}

	return portable, nil
}

func generateChecksum(data []byte) string {
	h := sha256.Sum256(data)
	return fmt.Sprintf("%x", h)
}

func extractString(m map[string]any, key string) string {
	if v, ok := m[key].(string); ok {
		return v
	}
	return ""
}

func (e *FlowExporter) SyncToVCS(ctx context.Context, portable PortableFlow, metrics Metrics) error {
	payload, err := json.Marshal(WebhookPayload{
		Event:     "flow.export.completed",
		Timestamp: time.Now().UTC(),
		Data:      mustMarshalJSON(portable),
	})
	if err != nil {
		return fmt.Errorf("webhook payload serialization failed: %w", err)
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.webhookURL, bytes.NewReader(payload))
	if err != nil {
		return fmt.Errorf("webhook request creation failed: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Webhook-Secret", e.webhookSecret)

	resp, err := e.httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("webhook dispatch failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("webhook returned status %d", resp.StatusCode)
	}

	e.recordMetrics(metrics)
	e.writeAuditLog(AuditLogEntry{
		Actor:     "flow_exporter_service",
		Action:    "EXPORT_SYNC",
		Resource:  portable.Metadata.BotID,
		Timestamp: time.Now().UTC(),
		Outcome:   "SUCCESS",
		Details:   fmt.Sprintf("duration:%dms nodes:%d", metrics.ExportDurationMs, metrics.NodeCount),
	})

	return nil
}

func (e *FlowExporter) recordMetrics(m Metrics) {
	e.mu.Lock()
	defer e.mu.Unlock()
	e.totalExports++
	if m.ValidationSuccess {
		e.successfulExports++
	}
	e.averageDuration = (e.averageDuration + m.ExportDurationMs) / 2
}

func (e *FlowExporter) writeAuditLog(entry AuditLogEntry) {
	log.Printf("AUDIT: %+v", entry)
}

func mustMarshalJSON(v any) json.RawMessage {
	b, _ := json.Marshal(v)
	return b
}

func main() {
	ctx := context.Background()
	
	exporter := NewFlowExporter(
		OAuthConfig{
			ClientID:     os.Getenv("COGNIGY_CLIENT_ID"),
			ClientSecret: os.Getenv("COGNIGY_CLIENT_SECRET"),
			TokenURL:     "https://api.cognigy.com/oauth/token",
		},
		"https://api.cognigy.com",
		os.Getenv("VCS_WEBHOOK_URL"),
		os.Getenv("VCS_WEBHOOK_SECRET"),
		500,
	)

	startTime := time.Now()

	job, err := exporter.TriggerExport(ctx, "bot_prod_01", ExportRequest{
		BotVersionID: "v2.4.1",
		ScopeFilters: ScopeFilters{
			NodeTypes: []string{"flow", "intent", "entity"},
			Intents:   []string{"order_status", "refund_request"},
		},
		SerializationFormat: "json",
		NodeLimitThreshold:  500,
		IncludeDependencies: true,
	})
	if err != nil {
		log.Fatalf("Export trigger failed: %v", err)
	}

	status, err := exporter.PollJobStatus(ctx, job.JobID)
	if err != nil {
		log.Fatalf("Job polling failed: %v", err)
	}

	portable, err := exporter.FetchAndTransform(ctx, job.JobID)
	if err != nil {
		log.Fatalf("Transformation failed: %v", err)
	}

	duration := time.Since(startTime).Milliseconds()
	metrics := Metrics{
		ExportDurationMs:  duration,
		ValidationSuccess: true,
		NodeCount:         len(portable.Nodes),
	}

	if err := exporter.SyncToVCS(ctx, portable, metrics); err != nil {
		log.Fatalf("VCS sync failed: %v", err)
	}

	log.Printf("Export pipeline completed successfully. Duration: %dms", duration)
}

Common Errors & Debugging

Error: 401 Unauthorized

  • What causes it: Expired OAuth token, invalid client credentials, or missing bot:read scope.
  • How to fix it: Verify the client_id and client_secret match the platform registration. Ensure the token cache refreshes before expiration. Check that the OAuth request includes scope=bot:read+export:manage.
  • Code showing the fix: The TokenCache.GetToken method automatically refreshes tokens when time.Now().Before(tc.expiresAt) evaluates to false. Add explicit scope validation during token acquisition.

Error: 403 Forbidden

  • What causes it: The authenticated client lacks permission to access the specified bot ID or export endpoint.
  • How to fix it: Assign the client to a security profile with Bot Management and Export permissions in the Cognigy admin console. Verify the botId matches an existing bot in the tenant.
  • Code showing the fix: Intercept 403 responses in TriggerExport and log the X-Trace-ID header for platform support ticket submission.

Error: 429 Too Many Requests

  • What causes it: Exceeding the platform rate limit for export jobs or status polling.
  • How to fix it: Implement exponential backoff with jitter. Respect the Retry-After header. Space out concurrent export triggers.
  • Code showing the fix: The PollJobStatus method checks resp.StatusCode == http.StatusTooManyRequests and sleeps for time.Duration(2*i+1) * time.Second. Add math/rand jitter for production deployments.

Error: Circular Dependency Detected

  • What causes it: The bot flow contains recursive node transitions that violate the dependency resolution constraints.
  • How to fix it: Open the bot in the design canvas, locate the nodes forming the cycle, and break the transition by adding a timeout or fallback node. Re-run the export after structural correction.
  • Code showing the fix: The validateDependencyGraph function uses depth-first search with a recursion stack. It returns a descriptive error containing the starting node ID for quick canvas navigation.

Error: Node Count Exceeds Threshold

  • What causes it: The export request targets a version with more nodes than the configured nodeLimitThreshold.
  • How to fix it: Increase the threshold in the ExportRequest payload if the platform permits, or split the export into multiple scope-filtered batches.
  • Code showing the fix: Adjust NodeLimitThreshold: 500 in the TriggerExport call to match the actual flow complexity. The validation step enforces the limit before payload transformation.

Official References