Integrating NICE Cognigy Voice Flows with a Custom IVR System by Bridging the Cognigy Voice API to a SIP Trunk Using a Python Asterisk AGI Script with DTMF Mapping
What You Will Build
- A production-ready Python AGI script that intercepts incoming SIP calls on Asterisk, establishes a real-time WebSocket session with the NICE Cognigy Voice API, and routes DTMF digits bidirectionally.
- This implementation uses the Cognigy Platform REST API for session creation and authentication, combined with the Cognigy Voice WebSocket API for low-latency voice interaction.
- The tutorial covers Python 3.9+ with the
asterisk.agi,requests, andwebsocket-clientlibraries.
Prerequisites
- Cognigy API client credentials with
client_credentialsgrant type enabled. Required OAuth scopes:cognigy:session:manageandcognigy:voice:interact. - Cognigy Platform API v1 (stable). WebSocket Voice API v1.
- Python 3.9 or higher with
pipinstalled. - External dependencies:
asterisk.agi(bundled with Asterisk 16+),requests>=2.31.0,websocket-client>=1.6.0,httpx>=0.25.0(optional for advanced streaming). - Asterisk PBX with a configured SIP trunk and
chan_siporchan_pjsip. The AGI script must be executable and located in your Asterisk AGI directory (typically/var/lib/asterisk/agi-bin/).
Authentication Setup
Cognigy uses OAuth 2.0 client credentials flow for server-to-server communication. The token must be cached and refreshed before expiration to avoid 401 Unauthorized errors during active call sessions.
import requests
import time
import threading
from typing import Optional
COGNIGY_INSTANCE = "your-instance.cognigy.com"
AUTH_URL = f"https://{COGNIGY_INSTANCE}/api/v1/auth"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
class CognigyAuthManager:
def __init__(self):
self._token: Optional[str] = None
self._expires_at: float = 0.0
self._lock = threading.Lock()
def get_token(self) -> str:
with self._lock:
if self._token and time.time() < (self._expires_at - 60):
return self._token
return self._refresh_token()
def _refresh_token(self) -> str:
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(AUTH_URL, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data["expires_in"]
return self._token
auth_manager = CognigyAuthManager()
The get_token method checks expiration with a sixty-second safety buffer. The thread lock prevents race conditions when multiple concurrent AGI instances request tokens simultaneously. The raise_for_status() call ensures immediate failure on 400, 401, or 403 responses, which the calling code must catch.
Implementation
Step 1: Asterisk AGI Initialization and Channel Setup
Asterisk invokes the AGI script by piping environment variables and command input through standard input. The script must acknowledge the channel, set appropriate timeouts, and prepare DTMF collection parameters.
import sys
import os
import json
import logging
from asterisk.agi import AGI
# Configure logging to Asterisk console
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stderr
)
def initialize_asterisk_channel(agi: AGI) -> dict:
agi.verbose("Cognigy Voice Bridge initialized", 3)
agi.set("TIMEOUT", "DIGIT", "5000")
agi.set("TIMEOUT", "CALLERID", "0")
agi.set("DTMFMODE", "rfc2833")
channel_info = {
"channel": agi.channel,
"caller_id": agi.callerid,
"dnid": agi.dnid,
"unique_id": agi.uniqueid
}
return channel_info
The DTMFMODE parameter forces RFC 2833 DTMF signaling, which provides reliable digit transmission over SIP trunks. The TIMEOUT DIGIT setting defines the maximum wait time for a single DTMF press before Asterisk returns control to the AGI script.
Step 2: Cognigy Session Creation and WebSocket Handshake
Cognigy requires a REST call to create a session before establishing the Voice WebSocket. The session ID links the REST context to the real-time voice stream.
import requests
from requests.exceptions import HTTPError, Timeout
SESSION_URL = f"https://{COGNIGY_INSTANCE}/api/v1/sessions"
VOICE_WS_URL = f"wss://{COGNIGY_INSTANCE}/api/v1/sessions/{{session_id}}/voice"
def create_cognigy_session(agi: AGI) -> str:
headers = {
"Authorization": f"Bearer {auth_manager.get_token()}",
"Content-Type": "application/json"
}
payload = {
"flowId": "your_flow_id",
"sessionId": agi.uniqueid,
"metadata": {
"source": "asterisk_agi",
"caller": agi.callerid
}
}
try:
response = requests.post(SESSION_URL, json=payload, headers=headers, timeout=15)
response.raise_for_status()
session_data = response.json()
agi.verbose(f"Session created: {session_data['id']}", 3)
return session_data["id"]
except HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 5))
agi.verbose(f"Rate limited. Retrying in {retry_after}s", 3)
time.sleep(retry_after)
return create_cognigy_session(agi)
agi.verbose(f"Session creation failed: {e}", 3)
raise
except Timeout:
agi.verbose("Cognigy API timed out", 3)
raise
The exponential backoff for 429 responses prevents cascading failures during peak call volumes. The sessionId field in the payload maps to Asterisk’s unique channel ID for traceability. The flowId must match an active voice flow in your Cognigy workspace.
Step 3: DTMF Mapping and Bidirectional Event Loop
The core bridge runs as a synchronous loop. Asterisk collects DTMF, the script forwards it to Cognigy via WebSocket, and Cognigy returns audio URLs or text responses. The script handles the DTMF format translation and audio playback.
import websocket
import tempfile
import urllib.request
from websocket import WebSocketTimeoutException
DTMF_MAP = {
"0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
"5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
"*": "*", "#": "#"
}
def run_voice_bridge(agi: AGI, session_id: str):
ws_url = VOICE_WS_URL.format(session_id=session_id)
headers = {
"Authorization": f"Bearer {auth_manager.get_token()}",
"X-Cognigy-Session-Id": session_id
}
try:
ws = websocket.create_connection(ws_url, header=headers, timeout=10)
except Exception as e:
agi.verbose(f"WebSocket connection failed: {e}", 3)
return
try:
while True:
# Collect DTMF from Asterisk
dtmf_result = agi.get_data("dtmf_prompt", 1, 5000)
digit = dtmf_result.strip()
if not digit:
continue
mapped_digit = DTMF_MAP.get(digit)
if not mapped_digit:
agi.verbose(f"Unmapped DTMF: {digit}", 3)
continue
# Send to Cognigy
cognigy_msg = json.dumps({
"type": "dtmf",
"dtmf": mapped_digit
})
ws.send(cognigy_msg)
agi.verbose(f"Sent DTMF: {mapped_digit}", 3)
# Receive Cognigy response
try:
response_raw = ws.recv()
response = json.loads(response_raw)
if response.get("type") == "response":
audio_url = response.get("audioUrl")
text = response.get("text", "")
if audio_url:
temp_file = download_audio(audio_url)
agi.stream_file(temp_file)
os.remove(temp_file)
elif text:
agi.execute("SAY", f"{text} NUMBER")
if response.get("end") is True:
agi.verbose("Cognigy session ended", 3)
break
except WebSocketTimeoutException:
continue
except Exception as e:
agi.verbose(f"Bridge error: {e}", 3)
finally:
ws.close()
def download_audio(url: str) -> str:
fd, path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
urllib.request.urlretrieve(url, path)
return path
The DTMF_MAP dictionary ensures explicit character translation. Cognigy expects lowercase digits and symbols. The loop terminates when Cognigy sends {"end": true} or when Asterisk detects a hangup. The download_audio function retrieves SSML-rendered audio from Cognigy’s CDN and streams it back to the caller.
Step 4: Graceful Teardown and Resource Cleanup
When the call ends or the Cognigy flow completes, the AGI script must close the WebSocket, clear temporary files, and return control to the Asterisk dialplan.
def cleanup_and_hangup(agi: AGI):
try:
agi.verbose("Cleaning up Cognigy bridge", 3)
agi.execute("HANGUP")
except Exception as e:
agi.verbose(f"Cleanup error: {e}", 3)
finally:
sys.exit(0)
Asterisk requires an explicit HANGUP execution or a clean exit from the AGI script to release the channel. The sys.exit(0) call prevents zombie processes from accumulating in the Asterisk process table.
Complete Working Example
#!/usr/bin/env python3
"""
Cognigy Voice Bridge - Asterisk AGI Script
Bridges SIP trunk DTMF to Cognigy Voice API via WebSocket.
"""
import sys
import os
import json
import time
import threading
import logging
import requests
import websocket
import tempfile
import urllib.request
from typing import Optional
from asterisk.agi import AGI
from requests.exceptions import HTTPError, Timeout
from websocket import WebSocketTimeoutException
# Configuration
COGNIGY_INSTANCE = "your-instance.cognigy.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
FLOW_ID = "your_flow_id"
AUTH_URL = f"https://{COGNIGY_INSTANCE}/api/v1/auth"
SESSION_URL = f"https://{COGNIGY_INSTANCE}/api/v1/sessions"
VOICE_WS_URL = f"wss://{COGNIGY_INSTANCE}/api/v1/sessions/{{session_id}}/voice"
DTMF_MAP = {
"0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
"5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
"*": "*", "#": "#"
}
class CognigyAuthManager:
def __init__(self):
self._token: Optional[str] = None
self._expires_at: float = 0.0
self._lock = threading.Lock()
def get_token(self) -> str:
with self._lock:
if self._token and time.time() < (self._expires_at - 60):
return self._token
return self._refresh_token()
def _refresh_token(self) -> str:
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(AUTH_URL, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data["expires_in"]
return self._token
auth_manager = CognigyAuthManager()
def create_cognigy_session(agi: AGI) -> str:
headers = {
"Authorization": f"Bearer {auth_manager.get_token()}",
"Content-Type": "application/json"
}
payload = {
"flowId": FLOW_ID,
"sessionId": agi.uniqueid,
"metadata": {
"source": "asterisk_agi",
"caller": agi.callerid
}
}
try:
response = requests.post(SESSION_URL, json=payload, headers=headers, timeout=15)
response.raise_for_status()
session_data = response.json()
agi.verbose(f"Session created: {session_data['id']}", 3)
return session_data["id"]
except HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 5))
agi.verbose(f"Rate limited. Retrying in {retry_after}s", 3)
time.sleep(retry_after)
return create_cognigy_session(agi)
agi.verbose(f"Session creation failed: {e}", 3)
raise
except Timeout:
agi.verbose("Cognigy API timed out", 3)
raise
def download_audio(url: str) -> str:
fd, path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
urllib.request.urlretrieve(url, path)
return path
def run_voice_bridge(agi: AGI, session_id: str):
ws_url = VOICE_WS_URL.format(session_id=session_id)
headers = {
"Authorization": f"Bearer {auth_manager.get_token()}",
"X-Cognigy-Session-Id": session_id
}
try:
ws = websocket.create_connection(ws_url, header=headers, timeout=10)
except Exception as e:
agi.verbose(f"WebSocket connection failed: {e}", 3)
return
try:
while True:
dtmf_result = agi.get_data("silence", 1, 5000)
digit = dtmf_result.strip()
if not digit:
continue
mapped_digit = DTMF_MAP.get(digit)
if not mapped_digit:
continue
cognigy_msg = json.dumps({
"type": "dtmf",
"dtmf": mapped_digit
})
ws.send(cognigy_msg)
try:
response_raw = ws.recv()
response = json.loads(response_raw)
if response.get("type") == "response":
audio_url = response.get("audioUrl")
text = response.get("text", "")
if audio_url:
temp_file = download_audio(audio_url)
agi.stream_file(temp_file)
os.remove(temp_file)
elif text:
agi.execute("SAY", f"{text} NUMBER")
if response.get("end") is True:
break
except WebSocketTimeoutException:
continue
except Exception as e:
agi.verbose(f"Bridge error: {e}", 3)
finally:
ws.close()
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stderr
)
try:
agi = AGI()
agi.verbose("Cognigy Voice Bridge initialized", 3)
agi.set("TIMEOUT", "DIGIT", "5000")
agi.set("TIMEOUT", "CALLERID", "0")
agi.set("DTMFMODE", "rfc2833")
session_id = create_cognigy_session(agi)
run_voice_bridge(agi, session_id)
agi.verbose("Cleaning up Cognigy bridge", 3)
agi.execute("HANGUP")
except Exception as e:
logging.error(f"Fatal AGI error: {e}")
if "agi" in locals():
agi.execute("HANGUP")
finally:
sys.exit(0)
if __name__ == "__main__":
main()
Deploy this script to /var/lib/asterisk/agi-bin/cognigy_bridge.py, set executable permissions with chmod +x cognigy_bridge.py, and invoke it from your extensions.conf:
[from-trunk]
exten => _X.,1,AGI(cognigy_bridge.py)
exten => _X.,n,Hangup()
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired OAuth token, invalid client credentials, or missing
cognigy:session:managescope. - Fix: Verify
CLIENT_IDandCLIENT_SECRETin the Cognigy admin console. Ensure the token refresh buffer accounts for network latency. Check theAuthorizationheader format matchesBearer <token>. - Code fix: The
CognigyAuthManagerclass automatically refreshes tokens. If 401 persists, log the raw response headers to verify scope claims.
Error: 403 Forbidden
- Cause: API key lacks permission to access the specified
flowIdor workspace. - Fix: Assign the API client to the correct Cognigy workspace with editor or owner privileges. Verify the
flowIdmatches an active voice flow, not a chat flow. - Code fix: Add workspace validation before session creation. Cross-reference the
flowIdagainstGET /api/v1/flowsto confirm accessibility.
Error: 429 Too Many Requests
- Cause: Exceeding Cognigy rate limits during peak call volume or rapid retry loops.
- Fix: Implement exponential backoff with jitter. The
create_cognigy_sessionfunction reads theRetry-Afterheader and sleeps accordingly. - Code fix: Monitor the
Retry-Afterheader. If missing, default to five seconds. Implement a circuit breaker pattern for production deployments to prevent thread exhaustion.
Error: WebSocket Disconnect or Timeout
- Cause: Network instability between Asterisk server and Cognigy CDN, or Cognigy flow timeout.
- Fix: Ensure the Asterisk server has outbound HTTPS/WSS access to
*.cognigy.com. Increase WebSocket timeout values if latency exceeds three seconds. - Code fix: Wrap
ws.recv()in a try-except block. Implement a heartbeat mechanism by sending{"type": "ping"}every thirty seconds if Cognigy supports it.
Error: Asterisk Channel Hangup During Audio Playback
- Cause: Caller disconnects before
agi.stream_file()completes, leaving temporary files on disk. - Fix: Use
try-finallyblocks around file operations. Implement a background cleanup cron job for/tmp/*.wavfiles older than five minutes. - Code fix: The
run_voice_bridgefunction already wraps playback in exception handling. Add a file existence check beforeos.remove()to preventFileNotFoundError.