Managing Barge-In Events in NICE Cognigy Voice Flows with a Python Flask Webhook Handler
What You Will Build
- A Flask webhook service that receives interrupt events from Cognigy voice sessions, aborts active text-to-speech playback via the Cognigy Voice API, and resets dialogue state based on ASR confidence scores.
- This implementation uses the Cognigy.AI Platform REST API (
/api/v1/sessions/{sessionId}/voice/commandsand/api/v1/sessions/{sessionId}/variables) for session control and state mutation. - The tutorial covers Python 3.10+ with
flask,requests, andpydanticfor production-grade request handling, retry logic, and type safety.
Prerequisites
- Cognigy environment URL (e.g.,
https://your-env.cognigy.ai) - Cognigy API credentials (
clientIdandclientSecretor an API key with server-to-server permissions) - Python 3.10 or higher
- Required Python packages:
flask>=3.0,requests>=2.31,pydantic>=2.5,pydantic-settings>=2.1 - Required API scopes:
sessions:write,voice:control,auth:login - A voice connector configured to forward barge-in/interrupt webhooks to your Flask endpoint
Authentication Setup
Cognigy uses bearer token authentication for server-to-server API calls. You must exchange credentials for a short-lived access token before invoking session control endpoints. The token expires after a configurable duration (typically 30 to 60 minutes). You must cache the token and refresh it before expiration or upon receiving a 401 Unauthorized response.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
from typing import Optional
class CognigyAuthClient:
def __init__(self, base_url: str, client_id: str, client_secret: str):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 30:
return self.token
payload = {
"clientId": self.client_id,
"clientSecret": self.client_secret
}
response = self.session.post(
f"{self.base_url}/api/v1/auth/login",
json=payload,
timeout=10
)
response.raise_for_status()
data = response.json()
self.token = data["accessToken"]
self.token_expiry = time.time() + data["expiresIn"]
return self.token
def headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
The authentication flow sends a POST to /api/v1/auth/login with clientId and clientSecret. The response contains accessToken and expiresIn. The CognigyAuthClient caches the token and automatically refreshes it when the remaining lifetime drops below 30 seconds. The retry strategy handles transient 429 rate limits and 5xx server errors during token acquisition.
Implementation
Step 1: Webhook Reception and Payload Validation
Voice platforms forward barge-in events to a webhook URL you configure in the Cognigy voice connector settings. The payload contains the session identifier, interrupt confidence, and ASR transcript. You must validate the structure before processing to prevent downstream failures.
from flask import Flask, request, jsonify
from pydantic import BaseModel, Field
app = Flask(__name__)
class BargeInPayload(BaseModel):
sessionId: str = Field(..., min_length=1)
event: str
confidence: float = Field(..., ge=0.0, le=1.0)
transcript: str
timestamp: int
@app.route("/webhook/cognigy/barge-in", methods=["POST"])
def handle_barge_in():
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 415
try:
payload = BargeInPayload.model_validate(request.json)
except Exception as e:
return jsonify({"error": f"Invalid payload: {str(e)}"}), 422
# Processing logic continues in subsequent steps
return jsonify({"status": "received"}), 200
The BargeInPayload model enforces strict type checking. The confidence field must fall between 0.0 and 1.0. If the platform sends malformed JSON or missing fields, the endpoint returns 422 Unprocessable Entity. You must return a 200 OK to the voice platform within the timeout window (typically 3 to 5 seconds) to prevent retry storms.
Step 2: Abort Active TTS Streams
When a user interrupts, the voice connector may continue playing the previously queued TTS audio unless you explicitly abort it. Cognigy provides a voice command endpoint to stop playback immediately. You must send a POST request with the stop command to the active session.
import logging
logger = logging.getLogger(__name__)
def abort_tts(auth_client: CognigyAuthClient, session_id: str) -> dict:
url = f"{auth_client.base_url}/api/v1/sessions/{session_id}/voice/commands"
command_payload = {
"command": "stop",
"reason": "user_interrupt",
"sessionId": session_id
}
try:
response = auth_client.session.post(
url,
json=command_payload,
headers=auth_client.headers(),
timeout=8
)
if response.status_code == 429:
logger.warning("Rate limited on TTS abort. Retrying with backoff.")
time.sleep(2.0)
response = auth_client.session.post(
url,
json=command_payload,
headers=auth_client.headers(),
timeout=8
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error aborting TTS: {http_err.response.status_code} - {http_err.response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error aborting TTS: {req_err}")
raise
The POST /api/v1/sessions/{sessionId}/voice/commands endpoint accepts a JSON body with a command field. Setting command to stop immediately halts the TTS engine for that session. The request requires the voice:control scope. If the platform returns 429 Too Many Requests, the code implements a fixed 2-second backoff before retrying once. You must capture 401 Unauthorized to trigger token refresh, and 403 Forbidden to verify scope assignment.
Expected Request:
POST /api/v1/sessions/sess_8f3a2b1c/voice/commands
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json
{
"command": "stop",
"reason": "user_interrupt",
"sessionId": "sess_8f3a2b1c"
}
Expected Response:
{
"success": true,
"sessionId": "sess_8f3a2b1c",
"commandExecuted": "stop",
"timestamp": 1718452901234
}
Step 3: Reset Dialogue State Based on Confidence
Cognigy manages dialogue state through session variables. After aborting TTS, you must update the session context to reflect the interrupt. You will reset the current intent, clear partial ASR buffers, and set a flag that routes the flow to a recovery node. The confidence score determines whether the interrupt is genuine or background noise.
def reset_dialogue_state(auth_client: CognigyAuthClient, session_id: str, confidence: float) -> dict:
url = f"{auth_client.base_url}/api/v1/sessions/{session_id}/variables"
# Threshold configuration
CONFIDENCE_THRESHOLD = 0.75
if confidence < CONFIDENCE_THRESHOLD:
logger.info(f"Low confidence ({confidence}) interrupt. Ignoring state reset.")
return {"action": "ignored", "reason": "low_confidence"}
state_payload = {
"variables": [
{"key": "cx.barage_in_detected", "value": True},
{"key": "cx.current_intent", "value": ""},
{"key": "cx.partial_transcript", "value": ""},
{"key": "cx.flow_state", "value": "interrupt_recover"},
{"key": "cx.interrupt_confidence", "value": confidence}
]
}
try:
response = auth_client.session.post(
url,
json=state_payload,
headers=auth_client.headers(),
timeout=8
)
if response.status_code == 429:
logger.warning("Rate limited on state reset. Retrying.")
time.sleep(2.0)
response = auth_client.session.post(
url,
json=state_payload,
headers=auth_client.headers(),
timeout=8
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error resetting state: {http_err.response.status_code}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error resetting state: {req_err}")
raise
The POST /api/v1/sessions/{sessionId}/variables endpoint accepts an array of key-value pairs. You must use the sessions:write scope. The code enforces a 0.75 confidence threshold. Scores below this value trigger a no-op to prevent false interrupts from resetting valid conversations. The cx.flow_state variable switches to interrupt_recover, which your Cognigy flow diagram must route to a dedicated recovery node. The endpoint returns a confirmation object with the updated variable list.
Expected Request:
POST /api/v1/sessions/sess_8f3a2b1c/variables
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json
{
"variables": [
{"key": "cx.barage_in_detected", "value": true},
{"key": "cx.current_intent", "value": ""},
{"key": "cx.partial_transcript", "value": ""},
{"key": "cx.flow_state", "value": "interrupt_recover"},
{"key": "cx.interrupt_confidence", "value": 0.89}
]
}
Expected Response:
{
"sessionId": "sess_8f3a2b1c",
"updatedVariables": 5,
"timestamp": 1718452902105
}
Complete Working Example
The following script combines authentication, webhook routing, TTS abort, and state reset into a single production-ready module. You must set environment variables for credentials before execution.
import os
import time
import logging
from typing import Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from flask import Flask, request, jsonify
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
class CognigyAuthClient:
def __init__(self, base_url: str, client_id: str, client_secret: str):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0.0
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def get_token(self) -> str:
if self.token and time.time() < self.token_expiry - 30:
return self.token
payload = {
"clientId": self.client_id,
"clientSecret": self.client_secret
}
response = self.session.post(
f"{self.base_url}/api/v1/auth/login",
json=payload,
timeout=10
)
response.raise_for_status()
data = response.json()
self.token = data["accessToken"]
self.token_expiry = time.time() + data["expiresIn"]
return self.token
def headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
class BargeInPayload(BaseModel):
sessionId: str = Field(..., min_length=1)
event: str
confidence: float = Field(..., ge=0.0, le=1.0)
transcript: str
timestamp: int
def abort_tts(auth_client: CognigyAuthClient, session_id: str) -> dict:
url = f"{auth_client.base_url}/api/v1/sessions/{session_id}/voice/commands"
command_payload = {
"command": "stop",
"reason": "user_interrupt",
"sessionId": session_id
}
for attempt in range(2):
response = auth_client.session.post(
url, json=command_payload, headers=auth_client.headers(), timeout=8
)
if response.status_code != 429:
break
logger.warning("Rate limited on TTS abort. Backing off.")
time.sleep(2.0)
response.raise_for_status()
return response.json()
def reset_dialogue_state(auth_client: CognigyAuthClient, session_id: str, confidence: float) -> dict:
url = f"{auth_client.base_url}/api/v1/sessions/{session_id}/variables"
CONFIDENCE_THRESHOLD = 0.75
if confidence < CONFIDENCE_THRESHOLD:
logger.info(f"Low confidence ({confidence}) interrupt. Ignoring state reset.")
return {"action": "ignored", "reason": "low_confidence"}
state_payload = {
"variables": [
{"key": "cx.barage_in_detected", "value": True},
{"key": "cx.current_intent", "value": ""},
{"key": "cx.partial_transcript", "value": ""},
{"key": "cx.flow_state", "value": "interrupt_recover"},
{"key": "cx.interrupt_confidence", "value": confidence}
]
}
for attempt in range(2):
response = auth_client.session.post(
url, json=state_payload, headers=auth_client.headers(), timeout=8
)
if response.status_code != 429:
break
logger.warning("Rate limited on state reset. Backing off.")
time.sleep(2.0)
response.raise_for_status()
return response.json()
app = Flask(__name__)
COGNIGY_BASE = os.getenv("COGNIGY_BASE_URL", "https://your-env.cognigy.ai")
COGNIGY_CLIENT_ID = os.getenv("COGNIGY_CLIENT_ID", "")
COGNIGY_CLIENT_SECRET = os.getenv("COGNIGY_CLIENT_SECRET", "")
auth = CognigyAuthClient(COGNIGY_BASE, COGNIGY_CLIENT_ID, COGNIGY_CLIENT_SECRET)
@app.route("/webhook/cognigy/barge-in", methods=["POST"])
def handle_barge_in():
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 415
try:
payload = BargeInPayload.model_validate(request.json)
except Exception as e:
return jsonify({"error": f"Invalid payload: {str(e)}"}), 422
try:
logger.info(f"Processing barge-in for session {payload.sessionId} with confidence {payload.confidence}")
abort_result = abort_tts(auth, payload.sessionId)
state_result = reset_dialogue_state(auth, payload.sessionId, payload.confidence)
logger.info(f"TTS aborted: {abort_result}, State reset: {state_result}")
return jsonify({"status": "processed", "abort": abort_result, "state": state_result}), 200
except requests.exceptions.HTTPError as http_err:
status = http_err.response.status_code
if status == 401:
logger.error("Authentication failed. Check credentials.")
return jsonify({"error": "Authentication failed"}), 502
if status == 403:
logger.error("Forbidden. Verify API scopes.")
return jsonify({"error": "Insufficient permissions"}), 502
logger.error(f"API error {status}: {http_err.response.text}")
return jsonify({"error": f"API error {status}"}), 502
except requests.exceptions.RequestException as req_err:
logger.error(f"Network error: {req_err}")
return jsonify({"error": "Network failure"}), 502
except Exception as unexpected:
logger.error(f"Unexpected error: {unexpected}")
return jsonify({"error": "Internal processing error"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The access token expired or the credentials are invalid. The
CognigyAuthClientcaches tokens and refreshes them automatically, but if the initial login fails, the client raises an HTTP error. - Fix: Verify
COGNIGY_CLIENT_IDandCOGNIGY_CLIENT_SECRETmatch a server-to-server application in the Cognigy admin console. Ensure the application has theauth:logincapability enabled. - Code Handling: The webhook catches
401during API calls and returns502 Bad Gatewayto the voice platform, preventing infinite retry loops while logging the authentication failure.
Error: 403 Forbidden
- Cause: The API key or token lacks the required scopes. Session control requires
sessions:writeandvoice:control. - Fix: Navigate to the Cognigy admin console, edit the API application, and assign the missing scopes. Regenerate the token after scope changes.
- Code Handling: The
handle_barge_inroute explicitly checks for403and returns a structured error payload. You must verify scope assignment before deploying to production.
Error: 422 Unprocessable Entity
- Cause: The webhook payload does not match the
BargeInPayloadschema. Common issues include missingsessionId, confidence values outside0.0to1.0, or non-JSON content types. - Fix: Validate the voice connector webhook configuration. Ensure the platform sends
application/jsonheaders. Check the exact field names in your voice connector documentation. - Code Handling: Pydantic validates the incoming JSON and returns a
422response with the validation error message. You must configure the voice platform to retry or drop malformed payloads.
Error: 429 Too Many Requests
- Cause: Cognigy enforces rate limits on session commands, typically 50 to 100 requests per minute per environment. High-volume voice campaigns trigger throttling.
- Fix: Implement exponential backoff and request coalescing. The provided code uses a fixed 2-second backoff with up to 2 attempts. For production, queue webhook events in Redis or RabbitMQ and process them with rate-limited workers.
- Code Handling: Both
abort_ttsandreset_dialogue_stateinclude retry loops that detect429status codes and pause before retrying. Therequestssession also mounts a retry adapter for transient network errors.
Error: 500 Internal Server Error (Cognigy Side)
- Cause: The session identifier is invalid, expired, or the voice connector is in a disconnected state.
- Fix: Verify the
sessionIdmatches an active conversation. Check the Cognigy environment health dashboard. Ensure the voice connector maintains a persistent WebSocket or SIP trunk. - Code Handling: The webhook logs the
500response body and returns502to the caller. You must implement dead-letter queueing for failed interrupts to prevent data loss.