Integrating NICE Cognigy Voice Flows with a Custom IVR System by Bridging the Cognigy Voice API to a SIP Trunk Using a Python Asterisk AGI Script with DTMF Mapping

Integrating NICE Cognigy Voice Flows with a Custom IVR System by Bridging the Cognigy Voice API to a SIP Trunk Using a Python Asterisk AGI Script with DTMF Mapping

What You Will Build

  • A production-ready Python AGI script that intercepts incoming SIP calls on Asterisk, establishes a real-time WebSocket session with the NICE Cognigy Voice API, and routes DTMF digits bidirectionally.
  • This implementation uses the Cognigy Platform REST API for session creation and authentication, combined with the Cognigy Voice WebSocket API for low-latency voice interaction.
  • The tutorial covers Python 3.9+ with the asterisk.agi, requests, and websocket-client libraries.

Prerequisites

  • Cognigy API client credentials with client_credentials grant type enabled. Required OAuth scopes: cognigy:session:manage and cognigy:voice:interact.
  • Cognigy Platform API v1 (stable). WebSocket Voice API v1.
  • Python 3.9 or higher with pip installed.
  • External dependencies: asterisk.agi (bundled with Asterisk 16+), requests>=2.31.0, websocket-client>=1.6.0, httpx>=0.25.0 (optional for advanced streaming).
  • Asterisk PBX with a configured SIP trunk and chan_sip or chan_pjsip. The AGI script must be executable and located in your Asterisk AGI directory (typically /var/lib/asterisk/agi-bin/).

Authentication Setup

Cognigy uses OAuth 2.0 client credentials flow for server-to-server communication. The token must be cached and refreshed before expiration to avoid 401 Unauthorized errors during active call sessions.

import requests
import time
import threading
from typing import Optional

COGNIGY_INSTANCE = "your-instance.cognigy.com"
AUTH_URL = f"https://{COGNIGY_INSTANCE}/api/v1/auth"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"

class CognigyAuthManager:
    def __init__(self):
        self._token: Optional[str] = None
        self._expires_at: float = 0.0
        self._lock = threading.Lock()

    def get_token(self) -> str:
        with self._lock:
            if self._token and time.time() < (self._expires_at - 60):
                return self._token
            return self._refresh_token()

    def _refresh_token(self) -> str:
        payload = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET
        }
        response = requests.post(AUTH_URL, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()
        self._token = data["access_token"]
        self._expires_at = time.time() + data["expires_in"]
        return self._token

auth_manager = CognigyAuthManager()

The get_token method checks expiration with a sixty-second safety buffer. The thread lock prevents race conditions when multiple concurrent AGI instances request tokens simultaneously. The raise_for_status() call ensures immediate failure on 400, 401, or 403 responses, which the calling code must catch.

Implementation

Step 1: Asterisk AGI Initialization and Channel Setup

Asterisk invokes the AGI script by piping environment variables and command input through standard input. The script must acknowledge the channel, set appropriate timeouts, and prepare DTMF collection parameters.

import sys
import os
import json
import logging
from asterisk.agi import AGI

# Configure logging to Asterisk console
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    stream=sys.stderr
)

def initialize_asterisk_channel(agi: AGI) -> dict:
    agi.verbose("Cognigy Voice Bridge initialized", 3)
    agi.set("TIMEOUT", "DIGIT", "5000")
    agi.set("TIMEOUT", "CALLERID", "0")
    agi.set("DTMFMODE", "rfc2833")
    
    channel_info = {
        "channel": agi.channel,
        "caller_id": agi.callerid,
        "dnid": agi.dnid,
        "unique_id": agi.uniqueid
    }
    return channel_info

The DTMFMODE parameter forces RFC 2833 DTMF signaling, which provides reliable digit transmission over SIP trunks. The TIMEOUT DIGIT setting defines the maximum wait time for a single DTMF press before Asterisk returns control to the AGI script.

Step 2: Cognigy Session Creation and WebSocket Handshake

Cognigy requires a REST call to create a session before establishing the Voice WebSocket. The session ID links the REST context to the real-time voice stream.

import requests
from requests.exceptions import HTTPError, Timeout

SESSION_URL = f"https://{COGNIGY_INSTANCE}/api/v1/sessions"
VOICE_WS_URL = f"wss://{COGNIGY_INSTANCE}/api/v1/sessions/{{session_id}}/voice"

def create_cognigy_session(agi: AGI) -> str:
    headers = {
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "Content-Type": "application/json"
    }
    payload = {
        "flowId": "your_flow_id",
        "sessionId": agi.uniqueid,
        "metadata": {
            "source": "asterisk_agi",
            "caller": agi.callerid
        }
    }
    
    try:
        response = requests.post(SESSION_URL, json=payload, headers=headers, timeout=15)
        response.raise_for_status()
        session_data = response.json()
        agi.verbose(f"Session created: {session_data['id']}", 3)
        return session_data["id"]
    except HTTPError as e:
        if e.response.status_code == 429:
            retry_after = int(e.response.headers.get("Retry-After", 5))
            agi.verbose(f"Rate limited. Retrying in {retry_after}s", 3)
            time.sleep(retry_after)
            return create_cognigy_session(agi)
        agi.verbose(f"Session creation failed: {e}", 3)
        raise
    except Timeout:
        agi.verbose("Cognigy API timed out", 3)
        raise

The exponential backoff for 429 responses prevents cascading failures during peak call volumes. The sessionId field in the payload maps to Asterisk’s unique channel ID for traceability. The flowId must match an active voice flow in your Cognigy workspace.

Step 3: DTMF Mapping and Bidirectional Event Loop

The core bridge runs as a synchronous loop. Asterisk collects DTMF, the script forwards it to Cognigy via WebSocket, and Cognigy returns audio URLs or text responses. The script handles the DTMF format translation and audio playback.

import websocket
import tempfile
import urllib.request
from websocket import WebSocketTimeoutException

DTMF_MAP = {
    "0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
    "5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
    "*": "*", "#": "#"
}

def run_voice_bridge(agi: AGI, session_id: str):
    ws_url = VOICE_WS_URL.format(session_id=session_id)
    headers = {
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "X-Cognigy-Session-Id": session_id
    }
    
    try:
        ws = websocket.create_connection(ws_url, header=headers, timeout=10)
    except Exception as e:
        agi.verbose(f"WebSocket connection failed: {e}", 3)
        return

    try:
        while True:
            # Collect DTMF from Asterisk
            dtmf_result = agi.get_data("dtmf_prompt", 1, 5000)
            digit = dtmf_result.strip()
            
            if not digit:
                continue
                
            mapped_digit = DTMF_MAP.get(digit)
            if not mapped_digit:
                agi.verbose(f"Unmapped DTMF: {digit}", 3)
                continue

            # Send to Cognigy
            cognigy_msg = json.dumps({
                "type": "dtmf",
                "dtmf": mapped_digit
            })
            ws.send(cognigy_msg)
            agi.verbose(f"Sent DTMF: {mapped_digit}", 3)

            # Receive Cognigy response
            try:
                response_raw = ws.recv()
                response = json.loads(response_raw)
                
                if response.get("type") == "response":
                    audio_url = response.get("audioUrl")
                    text = response.get("text", "")
                    
                    if audio_url:
                        temp_file = download_audio(audio_url)
                        agi.stream_file(temp_file)
                        os.remove(temp_file)
                    elif text:
                        agi.execute("SAY", f"{text} NUMBER")
                        
                if response.get("end") is True:
                    agi.verbose("Cognigy session ended", 3)
                    break
            except WebSocketTimeoutException:
                continue
    except Exception as e:
        agi.verbose(f"Bridge error: {e}", 3)
    finally:
        ws.close()

def download_audio(url: str) -> str:
    fd, path = tempfile.mkstemp(suffix=".wav")
    os.close(fd)
    urllib.request.urlretrieve(url, path)
    return path

The DTMF_MAP dictionary ensures explicit character translation. Cognigy expects lowercase digits and symbols. The loop terminates when Cognigy sends {"end": true} or when Asterisk detects a hangup. The download_audio function retrieves SSML-rendered audio from Cognigy’s CDN and streams it back to the caller.

Step 4: Graceful Teardown and Resource Cleanup

When the call ends or the Cognigy flow completes, the AGI script must close the WebSocket, clear temporary files, and return control to the Asterisk dialplan.

def cleanup_and_hangup(agi: AGI):
    try:
        agi.verbose("Cleaning up Cognigy bridge", 3)
        agi.execute("HANGUP")
    except Exception as e:
        agi.verbose(f"Cleanup error: {e}", 3)
    finally:
        sys.exit(0)

Asterisk requires an explicit HANGUP execution or a clean exit from the AGI script to release the channel. The sys.exit(0) call prevents zombie processes from accumulating in the Asterisk process table.

Complete Working Example

#!/usr/bin/env python3
"""
Cognigy Voice Bridge - Asterisk AGI Script
Bridges SIP trunk DTMF to Cognigy Voice API via WebSocket.
"""

import sys
import os
import json
import time
import threading
import logging
import requests
import websocket
import tempfile
import urllib.request
from typing import Optional
from asterisk.agi import AGI
from requests.exceptions import HTTPError, Timeout
from websocket import WebSocketTimeoutException

# Configuration
COGNIGY_INSTANCE = "your-instance.cognigy.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
FLOW_ID = "your_flow_id"

AUTH_URL = f"https://{COGNIGY_INSTANCE}/api/v1/auth"
SESSION_URL = f"https://{COGNIGY_INSTANCE}/api/v1/sessions"
VOICE_WS_URL = f"wss://{COGNIGY_INSTANCE}/api/v1/sessions/{{session_id}}/voice"

DTMF_MAP = {
    "0": "0", "1": "1", "2": "2", "3": "3", "4": "4",
    "5": "5", "6": "6", "7": "7", "8": "8", "9": "9",
    "*": "*", "#": "#"
}

class CognigyAuthManager:
    def __init__(self):
        self._token: Optional[str] = None
        self._expires_at: float = 0.0
        self._lock = threading.Lock()

    def get_token(self) -> str:
        with self._lock:
            if self._token and time.time() < (self._expires_at - 60):
                return self._token
            return self._refresh_token()

    def _refresh_token(self) -> str:
        payload = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET
        }
        response = requests.post(AUTH_URL, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()
        self._token = data["access_token"]
        self._expires_at = time.time() + data["expires_in"]
        return self._token

auth_manager = CognigyAuthManager()

def create_cognigy_session(agi: AGI) -> str:
    headers = {
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "Content-Type": "application/json"
    }
    payload = {
        "flowId": FLOW_ID,
        "sessionId": agi.uniqueid,
        "metadata": {
            "source": "asterisk_agi",
            "caller": agi.callerid
        }
    }
    
    try:
        response = requests.post(SESSION_URL, json=payload, headers=headers, timeout=15)
        response.raise_for_status()
        session_data = response.json()
        agi.verbose(f"Session created: {session_data['id']}", 3)
        return session_data["id"]
    except HTTPError as e:
        if e.response.status_code == 429:
            retry_after = int(e.response.headers.get("Retry-After", 5))
            agi.verbose(f"Rate limited. Retrying in {retry_after}s", 3)
            time.sleep(retry_after)
            return create_cognigy_session(agi)
        agi.verbose(f"Session creation failed: {e}", 3)
        raise
    except Timeout:
        agi.verbose("Cognigy API timed out", 3)
        raise

def download_audio(url: str) -> str:
    fd, path = tempfile.mkstemp(suffix=".wav")
    os.close(fd)
    urllib.request.urlretrieve(url, path)
    return path

def run_voice_bridge(agi: AGI, session_id: str):
    ws_url = VOICE_WS_URL.format(session_id=session_id)
    headers = {
        "Authorization": f"Bearer {auth_manager.get_token()}",
        "X-Cognigy-Session-Id": session_id
    }
    
    try:
        ws = websocket.create_connection(ws_url, header=headers, timeout=10)
    except Exception as e:
        agi.verbose(f"WebSocket connection failed: {e}", 3)
        return

    try:
        while True:
            dtmf_result = agi.get_data("silence", 1, 5000)
            digit = dtmf_result.strip()
            
            if not digit:
                continue
                
            mapped_digit = DTMF_MAP.get(digit)
            if not mapped_digit:
                continue

            cognigy_msg = json.dumps({
                "type": "dtmf",
                "dtmf": mapped_digit
            })
            ws.send(cognigy_msg)

            try:
                response_raw = ws.recv()
                response = json.loads(response_raw)
                
                if response.get("type") == "response":
                    audio_url = response.get("audioUrl")
                    text = response.get("text", "")
                    
                    if audio_url:
                        temp_file = download_audio(audio_url)
                        agi.stream_file(temp_file)
                        os.remove(temp_file)
                    elif text:
                        agi.execute("SAY", f"{text} NUMBER")
                        
                if response.get("end") is True:
                    break
            except WebSocketTimeoutException:
                continue
    except Exception as e:
        agi.verbose(f"Bridge error: {e}", 3)
    finally:
        ws.close()

def main():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        stream=sys.stderr
    )
    
    try:
        agi = AGI()
        agi.verbose("Cognigy Voice Bridge initialized", 3)
        agi.set("TIMEOUT", "DIGIT", "5000")
        agi.set("TIMEOUT", "CALLERID", "0")
        agi.set("DTMFMODE", "rfc2833")
        
        session_id = create_cognigy_session(agi)
        run_voice_bridge(agi, session_id)
        
        agi.verbose("Cleaning up Cognigy bridge", 3)
        agi.execute("HANGUP")
    except Exception as e:
        logging.error(f"Fatal AGI error: {e}")
        if "agi" in locals():
            agi.execute("HANGUP")
    finally:
        sys.exit(0)

if __name__ == "__main__":
    main()

Deploy this script to /var/lib/asterisk/agi-bin/cognigy_bridge.py, set executable permissions with chmod +x cognigy_bridge.py, and invoke it from your extensions.conf:

[from-trunk]
exten => _X.,1,AGI(cognigy_bridge.py)
exten => _X.,n,Hangup()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired OAuth token, invalid client credentials, or missing cognigy:session:manage scope.
  • Fix: Verify CLIENT_ID and CLIENT_SECRET in the Cognigy admin console. Ensure the token refresh buffer accounts for network latency. Check the Authorization header format matches Bearer <token>.
  • Code fix: The CognigyAuthManager class automatically refreshes tokens. If 401 persists, log the raw response headers to verify scope claims.

Error: 403 Forbidden

  • Cause: API key lacks permission to access the specified flowId or workspace.
  • Fix: Assign the API client to the correct Cognigy workspace with editor or owner privileges. Verify the flowId matches an active voice flow, not a chat flow.
  • Code fix: Add workspace validation before session creation. Cross-reference the flowId against GET /api/v1/flows to confirm accessibility.

Error: 429 Too Many Requests

  • Cause: Exceeding Cognigy rate limits during peak call volume or rapid retry loops.
  • Fix: Implement exponential backoff with jitter. The create_cognigy_session function reads the Retry-After header and sleeps accordingly.
  • Code fix: Monitor the Retry-After header. If missing, default to five seconds. Implement a circuit breaker pattern for production deployments to prevent thread exhaustion.

Error: WebSocket Disconnect or Timeout

  • Cause: Network instability between Asterisk server and Cognigy CDN, or Cognigy flow timeout.
  • Fix: Ensure the Asterisk server has outbound HTTPS/WSS access to *.cognigy.com. Increase WebSocket timeout values if latency exceeds three seconds.
  • Code fix: Wrap ws.recv() in a try-except block. Implement a heartbeat mechanism by sending {"type": "ping"} every thirty seconds if Cognigy supports it.

Error: Asterisk Channel Hangup During Audio Playback

  • Cause: Caller disconnects before agi.stream_file() completes, leaving temporary files on disk.
  • Fix: Use try-finally blocks around file operations. Implement a background cleanup cron job for /tmp/*.wav files older than five minutes.
  • Code fix: The run_voice_bridge function already wraps playback in exception handling. Add a file existence check before os.remove() to prevent FileNotFoundError.

Official References