Persisting Cognigy.AI Session State to PostgreSQL with Java
What You Will Build
You will build a Java microservice that intercepts Cognigy.AI dialog transition webhooks, persists conversation context and slot values to PostgreSQL JSONB columns, handles database outages with a disk-backed retry queue, and restores session state to Cognigy.AI on resume events. This tutorial uses the Cognigy.AI Webhook and Session APIs, PostgreSQL, and the Spring Boot framework. The implementation is written in Java 17 with HikariCP for connection pooling and Jackson for JSON serialization.
Prerequisites
- Cognigy.AI API key with
session:readandsession:writepermissions - PostgreSQL 14+ instance with
jsonbsupport enabled - Java 17 LTS runtime
- Maven or Gradle for dependency management
- External dependencies:
spring-boot-starter-web,org.postgresql:postgresql,com.zaxxer:HikariCP,com.fasterxml.jackson.core:jackson-databind
Authentication Setup
Cognigy.AI authenticates API calls using an API key passed in the X-Cognigy-API-Key header. Webhook endpoints validate incoming requests using the same key or a shared secret. You will store the key in application.yml and inject it into your service layer. The key must be granted session:read and session:write permissions in the Cognigy.AI Developer Portal.
Create src/main/resources/application.yml:
cognigy:
api-key: ${COGNIGY_API_KEY}
base-url: https://api.cognigy.ai
webhook-secret: ${COGNIGY_WEBHOOK_SECRET}
spring:
datasource:
url: jdbc:postgresql://localhost:5432/cognigy_sessions
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
maximum-pool-size: 10
minimum-idle: 2
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
leak-detection-threshold: 60000
Implementation
Step 1: Webhook Endpoint and Request Parsing
Cognigy.AI triggers webhooks on dialog transitions with a JSON payload containing the session identifier, slot values, and context data. You will expose a POST endpoint that validates the request, deserializes the payload, and delegates persistence to a service layer.
Create CognigyWebhookController.java:
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import java.util.Map;
@RestController
@RequestMapping("/webhook")
public class CognigyWebhookController {
private final SessionStateService sessionStateService;
public CognigyWebhookController(SessionStateService sessionStateService) {
this.sessionStateService = sessionStateService;
}
@PostMapping("/cognigy")
public ResponseEntity<String> handleWebhook(
@RequestHeader("X-Cognigy-API-Key") String apiKey,
@RequestBody Map<String, Object> payload) {
if (!isValidApiKey(apiKey)) {
return ResponseEntity.status(401).body("Unauthorized: Invalid API key");
}
String sessionId = (String) payload.get("sessionId");
String eventType = (String) payload.get("type");
Map<String, Object> slots = (Map<String, Object>) payload.get("slots");
Map<String, Object> context = (Map<String, Object>) payload.get("context");
if (sessionId == null || sessionId.isEmpty()) {
return ResponseEntity.badRequest().body("Missing sessionId");
}
try {
sessionStateService.processTransition(sessionId, eventType, slots, context);
return ResponseEntity.ok("Accepted");
} catch (Exception e) {
return ResponseEntity.status(500).body("Internal processing error");
}
}
private boolean isValidApiKey(String apiKey) {
// In production, compare against environment variable or vault
return apiKey != null && apiKey.length() > 10;
}
}
The controller returns HTTP 200 immediately to acknowledge receipt. Cognigy.AI expects a 2xx response within 5 seconds or it will retry the webhook. Long-running database operations must be handled asynchronously or in a background thread to avoid timeout failures.
Step 2: Database Schema and HikariCP Configuration
PostgreSQL JSONB columns provide indexed storage for semi-structured data. You will store the conversation context and slot values as JSONB to avoid schema migrations when Cognigy.AI introduces new dynamic fields. HikariCP manages connection lifecycle efficiently. You will configure it explicitly to control pool behavior and prevent connection leaks.
Create DatabaseConfig.java:
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.Map;
@Configuration
public class DatabaseConfig {
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(System.getenv("DB_URL") != null ? System.getenv("DB_URL") : "jdbc:postgresql://localhost:5432/cognigy_sessions");
config.setUsername(System.getenv("DB_USERNAME"));
config.setPassword(System.getenv("DB_PASSWORD"));
config.setMaximumPoolSize(10);
config.setMinimumIdle(2);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setConnectionTestQuery("SELECT 1");
Map<String, Object> props = Map.of(
"cachePrepStmts", "true",
"prepStmtCacheSize", "250",
"prepStmtCacheSqlLimit", "2048",
"useServerPrepStmts", "true"
);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
return new HikariDataSource(config);
}
}
Execute this SQL to create the target table:
CREATE TABLE IF NOT EXISTS session_states (
session_id VARCHAR(255) PRIMARY KEY,
context JSONB NOT NULL DEFAULT '{}',
slots JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_session_states_updated ON session_states(updated_at DESC);
Step 3: UPSERT Logic with JSONB and Conflict Resolution
You will use PostgreSQL ON CONFLICT syntax to handle both initial state creation and subsequent updates in a single atomic operation. This eliminates race conditions when multiple dialog transitions fire concurrently for the same session. Jackson serializes the maps into JSONB-compatible strings.
Create SessionStateRepository.java:
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Repository;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Map;
@Repository
public class SessionStateRepository {
private final DataSource dataSource;
private final ObjectMapper objectMapper;
public SessionStateRepository(DataSource dataSource) {
this.dataSource = dataSource;
this.objectMapper = new ObjectMapper();
}
public void upsertSessionState(String sessionId, Map<String, Object> slots, Map<String, Object> context) throws SQLException {
String sql = """
INSERT INTO session_states (session_id, context, slots, updated_at)
VALUES (?, ?, ?, NOW())
ON CONFLICT (session_id) DO UPDATE
SET context = EXCLUDED.context,
slots = EXCLUDED.slots,
updated_at = NOW()
""";
String slotsJson = objectMapper.writeValueAsString(slots != null ? slots : Map.of());
String contextJson = objectMapper.writeValueAsString(context != null ? context : Map.of());
try (Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, sessionId);
pstmt.setString(2, contextJson);
pstmt.setString(3, slotsJson);
pstmt.executeUpdate();
}
}
}
The EXCLUDED keyword references the proposed values in the conflict target. This pattern guarantees that the most recent webhook payload overwrites stale data without requiring a separate SELECT query.
Step 4: Disk-Backed Buffer and Failure Handling
Database networks experience transient partitions. You will implement a local disk-backed queue that captures state updates when PostgreSQL is unreachable. A scheduled executor retries processing the queue with exponential backoff. Files are named with a timestamp and session ID to preserve ordering and enable deduplication.
Create DiskBufferService.java:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
@Service
public class DiskBufferService {
private final Path bufferDir;
private final SessionStateRepository repository;
private final ObjectMapper objectMapper;
private final ScheduledExecutorService scheduler;
private volatile boolean isDbHealthy = true;
private long retryDelayMs = 5000;
public DiskBufferService(SessionStateRepository repository) throws IOException {
this.repository = repository;
this.objectMapper = new ObjectMapper();
this.bufferDir = Files.createDirectories(Paths.get("data/pending_updates"));
this.scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::processBuffer, 10, 10, TimeUnit.SECONDS);
}
public void queueUpdate(String sessionId, Map<String, Object> slots, Map<String, Object> context) throws IOException {
Map<String, Object> payload = Map.of(
"sessionId", sessionId,
"slots", slots,
"context", context,
"timestamp", System.currentTimeMillis()
);
String filename = String.format("%d_%s.json", System.nanoTime(), sessionId);
Path filePath = bufferDir.resolve(filename);
Files.writeString(filePath, objectMapper.writeValueAsString(payload));
}
@Scheduled(fixedDelayString = "${buffer.retry-interval:10000}")
public void processBuffer() {
try {
File[] files = bufferDir.toFile().listFiles((dir, name) -> name.endsWith(".json"));
if (files == null || files.length == 0) return;
Arrays.sort(files, Comparator.comparingLong(File::lastModified));
for (File file : files) {
try {
Map<String, Object> payload = objectMapper.readValue(file, Map.class);
String sessionId = (String) payload.get("sessionId");
Map<String, Object> slots = (Map<String, Object>) payload.get("slots");
Map<String, Object> context = (Map<String, Object>) payload.get("context");
repository.upsertSessionState(sessionId, slots, context);
Files.delete(file.toPath());
retryDelayMs = 5000; // Reset backoff on success
} catch (SQLException e) {
isDbHealthy = false;
retryDelayMs = Math.min(retryDelayMs * 2, 60000); // Exponential backoff capped at 60s
Thread.sleep(retryDelayMs);
break; // Stop processing until DB recovers
} catch (Exception e) {
// Corrupt file or JSON error, delete to prevent infinite retry
Files.delete(file.toPath());
}
}
} catch (Exception e) {
// Logger would go here in production
}
}
}
The buffer guarantees data durability across application restarts. The exponential backoff prevents hammering a recovering database cluster. Files are processed in chronological order to maintain state consistency.
Step 5: Syncing State Back to Cognigy.AI Session API
When a user returns to a conversation, Cognigy.AI triggers a resume event. You will query PostgreSQL for the persisted state and push it back to the Session API. The endpoint accepts a POST request with slot and context overrides.
Create CognigySessionSyncService.java:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Map;
@Service
public class CognigySessionSyncService {
private final String apiKey;
private final String baseUrl;
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
public CognigySessionSyncService(String apiKey, String baseUrl) {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(java.time.Duration.ofSeconds(10))
.build();
this.objectMapper = new ObjectMapper();
}
public void syncStateToCognigy(String sessionId, Map<String, Object> slots, Map<String, Object> context) {
String endpoint = String.format("%s/api/session/%s/state", baseUrl, sessionId);
Map<String, Object> requestBody = Map.of(
"slots", slots != null ? slots : Map.of(),
"context", context != null ? context : Map.of()
);
try {
String jsonBody = objectMapper.writeValueAsString(requestBody);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("Content-Type", "application/json")
.header("X-Cognigy-API-Key", apiKey)
.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 429) {
// Rate limited, implement jittered retry in production
Thread.sleep(2000);
syncStateToCognigy(sessionId, slots, context); // Recursive retry with backoff
} else if (response.statusCode() >= 400) {
throw new IOException("Cognigy API error: " + response.statusCode() + " " + response.body());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
throw new RuntimeException("Failed to sync state to Cognigy.AI", e);
}
}
}
The Session API returns 200 on success. A 429 response indicates rate limiting. You must implement retry logic with exponential backoff to avoid cascading failures. The X-Cognigy-API-Key header authenticates the request. Missing or invalid keys return 401. Insufficient permissions return 403.
Complete Working Example
Combine the components into a Spring Boot application. Create SessionStateService.java to orchestrate the flow:
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Service
public class SessionStateService {
private final SessionStateRepository repository;
private final DiskBufferService diskBufferService;
private final CognigySessionSyncService syncService;
public SessionStateService(SessionStateRepository repository,
DiskBufferService diskBufferService,
CognigySessionSyncService syncService) {
this.repository = repository;
this.diskBufferService = diskBufferService;
this.syncService = syncService;
}
public void processTransition(String sessionId, String eventType, Map<String, Object> slots, Map<String, Object> context) {
try {
repository.upsertSessionState(sessionId, slots, context);
} catch (Exception e) {
// Database unavailable, queue for later
try {
diskBufferService.queueUpdate(sessionId, slots, context);
} catch (IOException ioEx) {
throw new RuntimeException("Critical: Disk buffer write failed", ioEx);
}
}
}
public void handleResume(String sessionId, Map<String, Object> storedSlots, Map<String, Object> storedContext) {
CompletableFuture.runAsync(() -> {
syncService.syncStateToCognigy(sessionId, storedSlots, storedContext);
});
}
}
Wire the configuration in Application.java:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public CognigySessionSyncService cognigySyncService() {
return new CognigySessionSyncService(
System.getenv("COGNIGY_API_KEY"),
System.getenv("COGNIGY_BASE_URL")
);
}
}
Deploy the application with environment variables set. Trigger a webhook from Cognigy.AI. Verify the session_states table contains JSONB data. Simulate a database outage by stopping PostgreSQL. Observe files accumulating in data/pending_updates. Restart PostgreSQL. Watch the scheduled task clear the queue and apply the pending updates.
Common Errors and Debugging
Error: 401 Unauthorized
- Cause: The
X-Cognigy-API-Keyheader is missing, malformed, or revoked in the Cognigy.AI portal. - Fix: Verify the key matches the environment variable. Check the Cognigy.AI Developer Portal under API Keys. Ensure the key is not expired.
- Code check: Print the header value before the HTTP call. Confirm it matches the exact string from the portal.
Error: 403 Forbidden
- Cause: The API key lacks
session:readorsession:writepermissions. - Fix: Navigate to the Cognigy.AI Developer Portal. Edit the API key permissions. Add
session:writefor state updates. Addsession:readfor resume queries. Save and redeploy.
Error: 429 Too Many Requests
- Cause: Cognigy.AI enforces rate limits on the Session API. High-volume dialog transitions trigger throttling.
- Fix: Implement jittered exponential backoff. Batch state updates if multiple transitions occur within the same second. Reduce webhook frequency by filtering non-critical events in Cognigy.AI.
- Code pattern: Track the last response timestamp. Wait
baseDelay * (2^retryCount) + randomJitterbefore retrying.
Error: PostgreSQL Connection Timeout
- Cause: HikariCP pool exhaustion or network partition. The
connection-timeoutexpires before a free connection becomes available. - Fix: Increase
maximum-pool-sizeif database load is high. Checkmax_connectionsin PostgreSQL. Ensuremax-lifetimeis shorter than the database server idle timeout to prevent stale connections. - Debug step: Query
pg_stat_activityto inspect active connections. Look foridle in transactionstates holding locks.
Error: JSONB Serialization Failure
- Cause: Circular references or unsupported types in the slot/context maps. Jackson fails to convert complex objects to JSON.
- Fix: Flatten nested structures before serialization. Use
Map<String, Object>exclusively. Add@JsonIgnoreProperties(ignoreUnknown = true)to DTOs. Validate payloads with a JSON schema before database insertion.