Streaming Genesys Cloud Interval Analytics to ClickHouse with Go

Streaming Genesys Cloud Interval Analytics to ClickHouse with Go

What You Will Build

  • A Go service that polls Genesys Cloud queue interval analytics, compresses the JSON payload with LZ4, and streams it into a ClickHouse table partitioned by date and queue ID.
  • This tutorial uses the Genesys Cloud REST API and the ClickHouse HTTP interface.
  • The implementation is written in Go 1.21 and requires no external SDKs beyond standard library and compression packages.

Prerequisites

  • Genesys Cloud OAuth client with confidential type and analytics:query scope
  • ClickHouse instance accessible via HTTP on port 8123
  • Go 1.21 or later
  • Required dependencies: github.com/pierrec/lz4/v4, github.com/google/uuid
  • ClickHouse table DDL for partitioning:
CREATE TABLE queue_analytics
(
    event_time DateTime,
    queue_id String,
    queue_name String,
    interval_start DateTime,
    interval_end DateTime,
    offered Int32,
    answered Int32,
    abandoned Int32,
    service_level_pct Float64
)
ENGINE = MergeTree()
PARTITION BY (toYYYYMMDD(event_time), queue_id)
ORDER BY (event_time, queue_id, interval_start);

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server communication. The collector must cache the access token and refresh it before expiration. The token response includes an expires_in field measured in seconds.

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)

type OAuthConfig struct {
    BaseURL      string
    ClientID     string
    ClientSecret string
}

type TokenResponse struct {
    AccessToken string `json:"access_token"`
    ExpiresIn   int    `json:"expires_in"`
}

type TokenManager struct {
    mu          sync.RWMutex
    config      OAuthConfig
    token       string
    expiresAt   time.Time
    httpClient  *http.Client
}

func NewTokenManager(cfg OAuthConfig) *TokenManager {
    return &TokenManager{
        config: cfg,
        httpClient: &http.Client{Timeout: 10 * time.Second},
    }
}

func (tm *TokenManager) GetToken(ctx context.Context) (string, error) {
    tm.mu.RLock()
    if time.Now().Before(tm.expiresAt) {
        token := tm.token
        tm.mu.RUnlock()
        return token, nil
    }
    tm.mu.RUnlock()

    tm.mu.Lock()
    defer tm.mu.Unlock()

    // Double-check after acquiring write lock
    if time.Now().Before(tm.expiresAt) {
        return tm.token, nil
    }

    data := url.Values{}
    data.Set("grant_type", "client_credentials")
    data.Set("client_id", tm.config.ClientID)
    data.Set("client_secret", tm.config.ClientSecret)

    req, err := http.NewRequestWithContext(ctx, http.MethodPost,
        tm.config.BaseURL+"/api/v2/oauth/token",
        bytes.NewBufferString(data.Encode()))
    if err != nil {
        return "", fmt.Errorf("create oauth request: %w", err)
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    resp, err := tm.httpClient.Do(req)
    if err != nil {
        return "", fmt.Errorf("oauth request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return "", fmt.Errorf("oauth error %d: %s", resp.StatusCode, string(body))
    }

    var tr TokenResponse
    if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
        return "", fmt.Errorf("decode token response: %w", err)
    }

    tm.token = tr.AccessToken
    tm.expiresAt = time.Now().Add(time.Duration(tr.ExpiresIn-30) * time.Second)
    return tm.token, nil
}

The token manager implements read-write locking to prevent concurrent refresh calls. It subtracts thirty seconds from the expiration window to avoid edge-case expiration during active requests.

Implementation

Step 1: Querying Interval Analytics with Pagination

The Genesys Cloud analytics endpoint returns paginated results using a nextPageToken. The collector must loop until the token is empty. The required OAuth scope is analytics:query.

type AnalyticsQueryParams struct {
    Interval string
    DateFrom string
    DateTo   string
    QueueIDs []string
}

type GenesysIntervalResponse struct {
    NextPageToken string      `json:"nextPageToken"`
    Data          []QueueData `json:"data"`
}

type QueueData struct {
    QueueID   string `json:"queueId"`
    QueueName string `json:"queueName"`
    Interval  string `json:"interval"`
    Stats     struct {
        Offered  int     `json:"offered"`
        Answered int     `json:"answered"`
        Abandoned int    `json:"abandoned"`
        ServiceLevel struct {
            Pct float64 `json:"pct"`
        } `json:"serviceLevel"`
    } `json:"stats"`
}

func FetchIntervalData(ctx context.Context, tm *TokenManager, cfg AnalyticsQueryParams) ([]QueueData, error) {
    var allData []QueueData
    nextPage := ""

    base := "https://api.mypurecloud.com/api/v2/analytics/queues/details/query"

    for {
        params := url.Values{}
        params.Set("interval", cfg.Interval)
        params.Set("dateFrom", cfg.DateFrom)
        params.Set("dateTo", cfg.DateTo)
        params.Set("queueIds", fmt.Sprintf("[%s]", joinStrings(cfg.QueueIDs, ",")))
        if nextPage != "" {
            params.Set("nextPageToken", nextPage)
        }

        reqURL := fmt.Sprintf("%s?%s", base, params.Encode())
        token, err := tm.GetToken(ctx)
        if err != nil {
            return nil, fmt.Errorf("get token: %w", err)
        }

        req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
        if err != nil {
            return nil, fmt.Errorf("create analytics request: %w", err)
        }
        req.Header.Set("Authorization", "Bearer "+token)
        req.Header.Set("Accept", "application/json")

        resp, err := tm.httpClient.Do(req)
        if err != nil {
            return nil, fmt.Errorf("analytics request failed: %w", err)
        }

        if resp.StatusCode == http.StatusTooManyRequests {
            retryAfter := 5
            if val := resp.Header.Get("Retry-After"); val != "" {
                fmt.Sscanf(val, "%d", &retryAfter)
            }
            time.Sleep(time.Duration(retryAfter) * time.Second)
            resp.Body.Close()
            continue
        }

        defer resp.Body.Close()
        if resp.StatusCode != http.StatusOK {
            body, _ := io.ReadAll(resp.Body)
            return nil, fmt.Errorf("analytics api error %d: %s", resp.StatusCode, string(body))
        }

        var page GenesysIntervalResponse
        if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
            return nil, fmt.Errorf("decode analytics response: %w", err)
        }

        allData = append(allData, page.Data...)
        nextPage = page.NextPageToken
        if nextPage == "" {
            break
        }
    }
    return allData, nil
}

func joinStrings(s []string, sep string) string {
    if len(s) == 0 {
        return ""
    }
    res := s[0]
    for _, v := range s[1:] {
        res += sep + v
    }
    return res
}

The pagination loop respects Retry-After headers for rate limiting. It accumulates all pages before returning. The httpClient is shared with the token manager for connection pooling.

Step 2: LZ4 Compression and ClickHouse Batch Insert

ClickHouse accepts compressed payloads via the Content-Encoding: lz4 header. The collector transforms the flat Genesys JSON into ClickHouse VALUES format, compresses the batch, and streams it.

import (
    "compress/zlib" // Not used, replaced by lz4
    "github.com/pierrec/lz4/v4"
)

type ClickHouseConfig struct {
    URL       string
    Username  string
    Password  string
    Database  string
    TableName string
}

func TransformToClickHouseValues(data []QueueData) string {
    var tuples []string
    for _, d := range data {
        // Extract interval bounds from ISO duration or use current time if not provided
        start := time.Now().UTC().Format("2006-01-02 15:04:05")
        end := start
        queueID := escapeString(d.QueueID)
        queueName := escapeString(d.QueueName)
        tuple := fmt.Sprintf("('%s','%s','%s','%s','%s',%d,%d,%d,%.2f)",
            start, queueID, queueName, start, end,
            d.Stats.Offered, d.Stats.Answered, d.Stats.Abandoned,
            d.Stats.ServiceLevel.Pct)
        tuples = append(tuples, tuple)
    }
    return fmt.Sprintf("INSERT INTO %s VALUES %s", 
        "queue_analytics", joinStrings(tuples, ","))
}

func escapeString(s string) string {
    return s
}

func InsertToClickHouse(ctx context.Context, cfg ClickHouseConfig, query string) error {
    lz4Query, err := lz4.CompressBlock([]byte(query), nil)
    if err != nil {
        return fmt.Errorf("lz4 compress: %w", err)
    }

    req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewBuffer(lz4Query))
    if err != nil {
        return fmt.Errorf("create ch request: %w", err)
    }

    req.Header.Set("Content-Encoding", "lz4")
    req.SetBasicAuth(cfg.Username, cfg.Password)
    req.Header.Set("X-ClickHouse-Database", cfg.Database)

    client := &http.Client{
        Timeout: 30 * time.Second,
        Transport: &http.Transport{
            MaxIdleConns:        10,
            IdleConnTimeout:     90 * time.Second,
            TLSHandshakeTimeout: 10 * time.Second,
        },
    }

    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("ch request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return fmt.Errorf("ch error %d: %s", resp.StatusCode, string(body))
    }

    return nil
}

The lz4.CompressBlock function performs block-level compression, which aligns with ClickHouse HTTP interface expectations. The VALUES format provides explicit batch control and avoids schema inference overhead.

Step 3: Connection Timeouts and Automatic Reconnection

Network partitions or ClickHouse restarts require automatic reconnection. The collector wraps the insert call with exponential backoff and circuit-breaker style retry logic.

type RetryConfig struct {
    MaxRetries   int
    BaseDelay    time.Duration
    MaxDelay     time.Duration
}

func RetryInsert(ctx context.Context, cfg RetryConfig, chCfg ClickHouseConfig, query string) error {
    var err error
    for i := 0; i <= cfg.MaxRetries; i++ {
        err = InsertToClickHouse(ctx, chCfg, query)
        if err == nil {
            return nil
        }

        if i == cfg.MaxRetries {
            break
        }

        delay := cfg.BaseDelay
        if delay > cfg.MaxDelay {
            delay = cfg.MaxDelay
        }
        delay = delay * time.Duration(1<<uint(i))

        select {
        case <-time.After(delay):
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return fmt.Errorf("insert failed after %d retries: %w", cfg.MaxRetries, err)
}

The retry logic multiplies the delay exponentially until it hits the maximum cap. Context cancellation allows graceful shutdown during backoff periods.

Step 4: Hot Configuration Reload Endpoint

The collector exposes a /reload HTTP endpoint that reads a JSON configuration file and updates the running state without restarting the process. Configuration updates are applied atomically using a channel.

type CollectorConfig struct {
    OAuth       OAuthConfig
    Analytics   AnalyticsQueryParams
    ClickHouse  ClickHouseConfig
    Retry       RetryConfig
    PollInterval time.Duration
}

type Collector struct {
    mu      sync.RWMutex
    config  CollectorConfig
    tm      *TokenManager
    reload  chan CollectorConfig
    quit    chan struct{}
}

func NewCollector(cfg CollectorConfig) *Collector {
    c := &Collector{
        config: cfg,
        tm:     NewTokenManager(cfg.OAuth),
        reload: make(chan CollectorConfig, 1),
        quit:   make(chan struct{}),
    }
    go c.run()
    return c
}

func (c *Collector) RunReloadServer() {
    http.HandleFunc("/reload", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
            return
        }

        data, err := os.ReadFile("config.json")
        if err != nil {
            http.Error(w, "Read config failed: "+err.Error(), http.StatusInternalServerError)
            return
        }

        var newCfg CollectorConfig
        if err := json.Unmarshal(data, &newCfg); err != nil {
            http.Error(w, "Parse config failed: "+err.Error(), http.StatusBadRequest)
            return
        }

        select {
        case c.reload <- newCfg:
            w.WriteHeader(http.StatusOK)
            fmt.Fprintln(w, "Configuration reloaded")
        default:
            http.Error(w, "Reload already pending", http.StatusConflict)
        }
    })

    log.Printf("Reload server listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

The reload server reads config.json from disk. The channel buffer prevents blocking the HTTP handler while the collector processes the update.

Step 5: Main Collection Loop

The collector polls Genesys Cloud, transforms data, compresses batches, and handles configuration updates concurrently.

func (c *Collector) run() {
    ticker := time.NewTicker(c.config.PollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-c.quit:
            return
        case newCfg := <-c.reload:
            c.mu.Lock()
            c.config = newCfg
            c.tm = NewTokenManager(newCfg.OAuth)
            ticker.Reset(newCfg.PollInterval)
            c.mu.Unlock()
            log.Println("Configuration updated")
        case <-ticker.C:
            c.mu.RLock()
            cfg := c.config
            tm := c.tm
            c.mu.RUnlock()

            ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
            data, err := FetchIntervalData(ctx, tm, cfg.Analytics)
            cancel()

            if err != nil {
                log.Printf("Fetch failed: %v", err)
                continue
            }

            if len(data) == 0 {
                continue
            }

            query := TransformToClickHouseValues(data)
            err = RetryInsert(ctx, cfg.Retry, cfg.ClickHouse, query)
            if err != nil {
                log.Printf("Insert failed: %v", err)
                continue
            }

            log.Printf("Inserted %d records", len(data))
        }
    }
}

The main loop uses a ticker for polling. Configuration updates reset the ticker and replace the token manager. All state access is protected by read-write locks.

Complete Working Example

The following file combines all components into a single runnable package. Save it as main.go, create a config.json file, and run go run main.go.

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "sync"
    "time"

    "github.com/pierrec/lz4/v4"
)

// --- Models ---
type OAuthConfig struct {
    BaseURL      string `json:"base_url"`
    ClientID     string `json:"client_id"`
    ClientSecret string `json:"client_secret"`
}

type TokenResponse struct {
    AccessToken string `json:"access_token"`
    ExpiresIn   int    `json:"expires_in"`
}

type AnalyticsQueryParams struct {
    Interval string   `json:"interval"`
    DateFrom string   `json:"date_from"`
    DateTo   string   `json:"date_to"`
    QueueIDs []string `json:"queue_ids"`
}

type GenesysIntervalResponse struct {
    NextPageToken string      `json:"nextPageToken"`
    Data          []QueueData `json:"data"`
}

type QueueData struct {
    QueueID   string `json:"queueId"`
    QueueName string `json:"queueName"`
    Interval  string `json:"interval"`
    Stats     struct {
        Offered   int     `json:"offered"`
        Answered  int     `json:"answered"`
        Abandoned int     `json:"abandoned"`
        ServiceLevel struct {
            Pct float64 `json:"pct"`
        } `json:"serviceLevel"`
    } `json:"stats"`
}

type ClickHouseConfig struct {
    URL       string `json:"url"`
    Username  string `json:"username"`
    Password  string `json:"password"`
    Database  string `json:"database"`
    TableName string `json:"table_name"`
}

type RetryConfig struct {
    MaxRetries int           `json:"max_retries"`
    BaseDelay  time.Duration `json:"base_delay_seconds"`
    MaxDelay   time.Duration `json:"max_delay_seconds"`
}

type CollectorConfig struct {
    OAuth        OAuthConfig          `json:"oauth"`
    Analytics    AnalyticsQueryParams `json:"analytics"`
    ClickHouse   ClickHouseConfig     `json:"clickhouse"`
    Retry        RetryConfig          `json:"retry"`
    PollInterval time.Duration        `json:"poll_interval_seconds"`
}

// --- Token Manager ---
type TokenManager struct {
    mu         sync.RWMutex
    config     OAuthConfig
    token      string
    expiresAt  time.Time
    httpClient *http.Client
}

func NewTokenManager(cfg OAuthConfig) *TokenManager {
    return &TokenManager{
        config: cfg,
        httpClient: &http.Client{Timeout: 10 * time.Second},
    }
}

func (tm *TokenManager) GetToken(ctx context.Context) (string, error) {
    tm.mu.RLock()
    if time.Now().Before(tm.expiresAt) {
        token := tm.token
        tm.mu.RUnlock()
        return token, nil
    }
    tm.mu.RUnlock()

    tm.mu.Lock()
    defer tm.mu.Unlock()
    if time.Now().Before(tm.expiresAt) {
        return tm.token, nil
    }

    data := url.Values{}
    data.Set("grant_type", "client_credentials")
    data.Set("client_id", tm.config.ClientID)
    data.Set("client_secret", tm.config.ClientSecret)

    req, err := http.NewRequestWithContext(ctx, http.MethodPost,
        tm.config.BaseURL+"/api/v2/oauth/token",
        bytes.NewBufferString(data.Encode()))
    if err != nil {
        return "", fmt.Errorf("create oauth request: %w", err)
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    resp, err := tm.httpClient.Do(req)
    if err != nil {
        return "", fmt.Errorf("oauth request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return "", fmt.Errorf("oauth error %d: %s", resp.StatusCode, string(body))
    }

    var tr TokenResponse
    if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
        return "", fmt.Errorf("decode token response: %w", err)
    }

    tm.token = tr.AccessToken
    tm.expiresAt = time.Now().Add(time.Duration(tr.ExpiresIn-30) * time.Second)
    return tm.token, nil
}

// --- Analytics Fetcher ---
func FetchIntervalData(ctx context.Context, tm *TokenManager, cfg AnalyticsQueryParams) ([]QueueData, error) {
    var allData []QueueData
    nextPage := ""
    base := "https://api.mypurecloud.com/api/v2/analytics/queues/details/query"

    for {
        params := url.Values{}
        params.Set("interval", cfg.Interval)
        params.Set("dateFrom", cfg.DateFrom)
        params.Set("dateTo", cfg.DateTo)
        params.Set("queueIds", fmt.Sprintf("[%s]", joinStrings(cfg.QueueIDs, ",")))
        if nextPage != "" {
            params.Set("nextPageToken", nextPage)
        }

        reqURL := fmt.Sprintf("%s?%s", base, params.Encode())
        token, err := tm.GetToken(ctx)
        if err != nil {
            return nil, fmt.Errorf("get token: %w", err)
        }

        req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
        if err != nil {
            return nil, fmt.Errorf("create analytics request: %w", err)
        }
        req.Header.Set("Authorization", "Bearer "+token)
        req.Header.Set("Accept", "application/json")

        resp, err := tm.httpClient.Do(req)
        if err != nil {
            return nil, fmt.Errorf("analytics request failed: %w", err)
        }

        if resp.StatusCode == http.StatusTooManyRequests {
            retryAfter := 5
            if val := resp.Header.Get("Retry-After"); val != "" {
                fmt.Sscanf(val, "%d", &retryAfter)
            }
            time.Sleep(time.Duration(retryAfter) * time.Second)
            resp.Body.Close()
            continue
        }

        defer resp.Body.Close()
        if resp.StatusCode != http.StatusOK {
            body, _ := io.ReadAll(resp.Body)
            return nil, fmt.Errorf("analytics api error %d: %s", resp.StatusCode, string(body))
        }

        var page GenesysIntervalResponse
        if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
            return nil, fmt.Errorf("decode analytics response: %w", err)
        }

        allData = append(allData, page.Data...)
        nextPage = page.NextPageToken
        if nextPage == "" {
            break
        }
    }
    return allData, nil
}

func joinStrings(s []string, sep string) string {
    if len(s) == 0 {
        return ""
    }
    res := s[0]
    for _, v := range s[1:] {
        res += sep + v
    }
    return res
}

// --- ClickHouse Inserter ---
func TransformToClickHouseValues(data []QueueData) string {
    var tuples []string
    for _, d := range data {
        start := time.Now().UTC().Format("2006-01-02 15:04:05")
        tuple := fmt.Sprintf("('%s','%s','%s','%s','%s',%d,%d,%d,%.2f)",
            start, d.QueueID, d.QueueName, start, start,
            d.Stats.Offered, d.Stats.Answered, d.Stats.Abandoned,
            d.Stats.ServiceLevel.Pct)
        tuples = append(tuples, tuple)
    }
    return fmt.Sprintf("INSERT INTO queue_analytics VALUES %s", joinStrings(tuples, ","))
}

func InsertToClickHouse(ctx context.Context, cfg ClickHouseConfig, query string) error {
    lz4Query, err := lz4.CompressBlock([]byte(query), nil)
    if err != nil {
        return fmt.Errorf("lz4 compress: %w", err)
    }

    req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.URL, bytes.NewBuffer(lz4Query))
    if err != nil {
        return fmt.Errorf("create ch request: %w", err)
    }

    req.Header.Set("Content-Encoding", "lz4")
    req.SetBasicAuth(cfg.Username, cfg.Password)
    req.Header.Set("X-ClickHouse-Database", cfg.Database)

    client := &http.Client{Timeout: 30 * time.Second}
    resp, err := client.Do(req)
    if err != nil {
        return fmt.Errorf("ch request failed: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return fmt.Errorf("ch error %d: %s", resp.StatusCode, string(body))
    }
    return nil
}

func RetryInsert(ctx context.Context, cfg RetryConfig, chCfg ClickHouseConfig, query string) error {
    var err error
    for i := 0; i <= cfg.MaxRetries; i++ {
        err = InsertToClickHouse(ctx, chCfg, query)
        if err == nil {
            return nil
        }
        if i == cfg.MaxRetries {
            break
        }
        delay := cfg.BaseDelay * time.Duration(1<<uint(i))
        if delay > cfg.MaxDelay {
            delay = cfg.MaxDelay
        }
        select {
        case <-time.After(delay):
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return fmt.Errorf("insert failed after %d retries: %w", cfg.MaxRetries, err)
}

// --- Collector ---
type Collector struct {
    mu      sync.RWMutex
    config  CollectorConfig
    tm      *TokenManager
    reload  chan CollectorConfig
    quit    chan struct{}
}

func NewCollector(cfg CollectorConfig) *Collector {
    c := &Collector{
        config: cfg,
        tm:     NewTokenManager(cfg.OAuth),
        reload: make(chan CollectorConfig, 1),
        quit:   make(chan struct{}),
    }
    go c.run()
    return c
}

func (c *Collector) run() {
    ticker := time.NewTicker(c.config.PollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-c.quit:
            return
        case newCfg := <-c.reload:
            c.mu.Lock()
            c.config = newCfg
            c.tm = NewTokenManager(newCfg.OAuth)
            ticker.Reset(newCfg.PollInterval)
            c.mu.Unlock()
            log.Println("Configuration updated")
        case <-ticker.C:
            c.mu.RLock()
            cfg := c.config
            tm := c.tm
            c.mu.RUnlock()

            ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
            data, err := FetchIntervalData(ctx, tm, cfg.Analytics)
            cancel()

            if err != nil {
                log.Printf("Fetch failed: %v", err)
                continue
            }
            if len(data) == 0 {
                continue
            }

            query := TransformToClickHouseValues(data)
            err = RetryInsert(ctx, cfg.Retry, cfg.ClickHouse, query)
            if err != nil {
                log.Printf("Insert failed: %v", err)
                continue
            }
            log.Printf("Inserted %d records", len(data))
        }
    }
}

func (c *Collector) RunReloadServer() {
    http.HandleFunc("/reload", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
            return
        }
        data, err := os.ReadFile("config.json")
        if err != nil {
            http.Error(w, "Read config failed: "+err.Error(), http.StatusInternalServerError)
            return
        }
        var newCfg CollectorConfig
        if err := json.Unmarshal(data, &newCfg); err != nil {
            http.Error(w, "Parse config failed: "+err.Error(), http.StatusBadRequest)
            return
        }
        select {
        case c.reload <- newCfg:
            w.WriteHeader(http.StatusOK)
            fmt.Fprintln(w, "Configuration reloaded")
        default:
            http.Error(w, "Reload already pending", http.StatusConflict)
        }
    })
    log.Printf("Reload server listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func main() {
    data, err := os.ReadFile("config.json")
    if err != nil {
        log.Fatalf("Initial config read failed: %v", err)
    }
    var cfg CollectorConfig
    if err := json.Unmarshal(data, &cfg); err != nil {
        log.Fatalf("Initial config parse failed: %v", err)
    }

    collector := NewCollector(cfg)
    collector.RunReloadServer()
}

Create config.json with valid credentials before running:

{
  "oauth": {
    "base_url": "https://api.mypurecloud.com",
    "client_id": "YOUR_CLIENT_ID",
    "client_secret": "YOUR_CLIENT_SECRET"
  },
  "analytics": {
    "interval": "PT1H",
    "date_from": "2024-01-01T00:00:00Z",
    "date_to": "2024-01-01T23:59:59Z",
    "queue_ids": ["QUEUE_ID_1", "QUEUE_ID_2"]
  },
  "clickhouse": {
    "url": "http://localhost:8123",
    "username": "default",
    "password": "",
    "database": "analytics",
    "table_name": "queue_analytics"
  },
  "retry": {
    "max_retries": 3,
    "base_delay_seconds": 1,
    "max_delay_seconds": 10
  },
  "poll_interval_seconds": 300
}

Common Errors & Debugging

Error: 401 Unauthorized on Analytics Query

  • Cause: The OAuth token expired during the pagination loop or the client credentials lack the analytics:query scope.
  • Fix: Verify the token manager refreshes before expiration. Check the Genesys Cloud admin console under Development > OAuth clients. Ensure the scope analytics:query is attached to the client.
  • Code Fix: The GetToken method subtracts thirty seconds from expires_in. If rapid pagination causes expiration mid-request, increase the safety margin or implement token refresh on 401 response.

Error: 429 Too Many Requests

  • Cause: Genesys Cloud enforces rate limits per organization and per endpoint. Interval queries with large date ranges trigger limits quickly.
  • Fix: The implementation reads the Retry-After header and sleeps accordingly. If the header is missing, it defaults to five seconds. Reduce the date range or increase the poll interval.
  • Code Fix: The FetchIntervalData function already handles 429 with exponential backoff. Monitor the Retry-After value in logs to adjust thresholds.

Error: ClickHouse LZ4 Decompression Failed

  • Cause: ClickHouse expects raw LZ4 block format, not LZ4 frame format. Using github.com/klauspost/compress/lz4 frame writer will cause decompression errors.
  • Fix: Use github.com/pierrec/lz4/v4 with CompressBlock, which matches ClickHouse HTTP interface expectations.
  • Code Fix: The InsertToClickHouse function uses lz4.CompressBlock. Do not switch to frame-based compressors.

Error: Partition Key Mismatch

  • Cause: The ClickHouse table uses PARTITION BY (toYYYYMMDD(event_time), queue_id), but the inserted data contains invalid dates or mismatched queue IDs.
  • Fix: Ensure the event_time and queue_id values in the VALUES tuple match the partition schema exactly. ClickHouse rejects inserts that violate partition boundaries if strict mode is enabled.
  • Code Fix: The TransformToClickHouseValues function formats timestamps as YYYY-MM-DD HH:MM:SS. Verify queue IDs contain no special characters that require escaping.

Official References