Building Robust REST API Calls in Cognigy.AI with Python

Building Robust REST API Calls in Cognigy.AI with Python

What You Will Build

  • A production-grade Python module that executes REST API requests from within Cognigy.AI Script or REST nodes.
  • The code implements dynamic URL construction, session variable injection, schema validation, conditional error branching, exponential backoff retries, structured logging, and a local mock server for isolated testing.
  • The tutorial uses Python 3.9+ with requests, pydantic, tenacity, fastapi, and uvicorn.

Prerequisites

  • Python 3.9 or higher
  • requests>=2.31.0
  • pydantic>=2.5.0
  • tenacity>=8.2.0
  • fastapi>=0.104.0
  • uvicorn>=0.24.0
  • Cognigy.AI Studio access with Script or REST node permissions
  • Bearer token or API key for the target external service
  • Target API: https://api.github.com/repos/{owner}/{repo}/issues (real endpoint, requires read:repo scope for private repositories)

Authentication Setup

Cognigy.AI passes authentication credentials through the session object or environment variables. The following code demonstrates how to cache and refresh a Bearer token before making API calls.

import os
import time
import requests
from typing import Optional

class TokenManager:
    def __init__(self, token_url: str, client_id: str, client_secret: str):
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self._token: Optional[str] = None
        self._expires_at: float = 0.0

    def get_token(self) -> str:
        if self._token and time.time() < self._expires_at - 60:
            return self._token

        response = requests.post(
            self.token_url,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "scope": "read:repo"
            }
        )
        response.raise_for_status()
        payload = response.json()
        self._token = payload["access_token"]
        self._expires_at = time.time() + payload["expires_in"]
        return self._token

The OAuth scope read:repo is required for the GitHub Issues endpoint. Cognigy.AI REST nodes accept the token in the Authorization header. The manager above caches the token and refreshes it sixty seconds before expiration to prevent mid-flow authentication failures.

Implementation

Step 1: Dynamic URL Construction and Session Variable Injection

Cognigy.AI injects conversation state into the cognigy.session dictionary. The following function constructs the target URL and populates headers and body using session values.

import uuid
import logging
from typing import Any, Dict

logger = logging.getLogger("cognigy.rest_client")

def build_request(session: Dict[str, Any]) -> Dict[str, Any]:
    owner = session.get("github_owner", "octocat")
    repo = session.get("github_repo", "Hello-World")
    issue_state = session.get("issue_state", "open")
    bearer_token = session.get("bearer_token")

    url = f"https://api.github.com/repos/{owner}/{repo}/issues?state={issue_state}"
    request_id = str(uuid.uuid4())

    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "Accept": "application/vnd.github.v3+json",
        "X-Request-Id": request_id,
        "User-Agent": "CognigyAI-Integration/1.0"
    }

    body = {
        "filter": {"created_after": session.get("date_filter", "2024-01-01")},
        "metadata": {"source": "cognigy_script_node", "conversation_id": session.get("conversationId")}
    }

    logger.info("Constructed request", extra={"url": url, "request_id": request_id, "method": "GET"})
    return {"url": url, "method": "GET", "headers": headers, "body": body}

The function extracts github_owner, github_repo, and issue_state from the session. It falls back to safe defaults when values are missing. The X-Request-Id header enables trace correlation across microservices.

Step 2: Retry Logic and HTTP Error Code Branching

Transient failures require exponential backoff. The tenacity library handles 429 and 5xx responses automatically. Explicit branching handles client errors.

import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_result

def is_transient_error(response: requests.Response) -> bool:
    return response.status_code in (429, 500, 502, 503, 504)

@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1.5, min=2, max=30),
    retry=retry_if_result(is_transient_error),
    reraise=True
)
def execute_request(session: Dict[str, Any]) -> requests.Response:
    config = build_request(session)
    logger.info("Executing HTTP call", extra={"url": config["url"], "method": config["method"]})

    response = requests.request(
        method=config["method"],
        url=config["url"],
        headers=config["headers"],
        json=config["body"] if config["method"] in ("POST", "PUT", "PATCH") else None,
        timeout=30
    )

    logger.info("HTTP response received", extra={"status": response.status_code, "request_id": config["headers"]["X-Request-Id"]})

    if response.status_code == 401:
        raise PermissionError("Invalid or expired Bearer token. Verify OAuth scope and token freshness.")
    if response.status_code == 403:
        raise PermissionError("Access denied. The token lacks required repository permissions.")
    if response.status_code == 404:
        raise ValueError(f"Resource not found at {config['url']}. Verify owner/repo parameters.")
    if response.status_code == 422:
        raise ValueError("Unprocessable entity. Check request payload structure against API schema.")

    response.raise_for_status()
    return response

The decorator retries on rate limits and server errors. The branching logic raises specific exceptions for 401, 403, 404, and 422. Cognigy.AI script nodes catch these exceptions and route the conversation to error handling branches.

Step 3: Response Schema Validation and Field Extraction

Raw JSON responses must be validated before consumption. pydantic enforces structure and extracts typed fields.

from pydantic import BaseModel, Field
from typing import List, Optional

class IssueLabel(BaseModel):
    id: int
    name: str
    color: str

class GitHubIssue(BaseModel):
    number: int
    title: str
    state: str
    labels: List[IssueLabel] = Field(default_factory=list)
    created_at: str
    html_url: str

def parse_response(response: requests.Response) -> List[GitHubIssue]:
    logger.info("Validating response schema", extra={"content_length": len(response.content)})
    
    if not response.headers.get("Content-Type", "").startswith("application/json"):
        raise ValueError("Expected JSON response. Received non-JSON content type.")

    data = response.json()
    
    validated_issues = []
    for item in data:
        try:
            validated_issues.append(GitHubIssue(**item))
        except Exception as validation_error:
            logger.warning("Skipped malformed issue record", extra={"error": str(validation_error), "item_id": item.get("id")})
            continue

    logger.info("Schema validation complete", extra={"valid_count": len(validated_issues), "total_count": len(data)})
    return validated_issues

The function iterates over the response array, validates each record against GitHubIssue, and logs skipped records without halting execution. This prevents single-record corruption from breaking the entire flow.

Step 4: Logging Traces and Mock Server for Testing

Production logging requires structured output. The mock server simulates the external API for offline development.

import json
import logging
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel

# Structured JSON logger setup
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(json.dumps({"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s", "extra": "%(extra)s"})))
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Mock Server
app = FastAPI(title="Cognigy REST Mock Server")

class MockIssue(BaseModel):
    number: int
    title: str
    state: str
    labels: list[dict]
    created_at: str
    html_url: str

@app.get("/repos/{owner}/{repo}/issues")
async def mock_issues(owner: str, repo: str, state: str = "open", request: Request = None):
    # Simulate transient failure on specific request IDs
    request_id = request.headers.get("X-Request-Id")
    if request_id and request_id.startswith("fail-"):
        return JSONResponse(status_code=429, content={"message": "Rate limit exceeded"})

    # Simulate 500 error on missing auth
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        return JSONResponse(status_code=401, content={"message": "Missing token"})

    return JSONResponse(content=[
        MockIssue(number=1, title="Setup CI pipeline", state="open", labels=[{"id": 1, "name": "automation", "color": "blue"}], created_at="2024-01-15T10:00:00Z", html_url="https://github.com/example/repo/issues/1").model_dump(),
        MockIssue(number=2, title="Fix login redirect", state="closed", labels=[{"id": 2, "name": "bug", "color": "red"}], created_at="2024-01-10T08:30:00Z", html_url="https://github.com/example/repo/issues/2").model_dump()
    ])

Run the mock server with uvicorn main:app --reload. Point the script to http://localhost:8000 during development. The mock server mirrors real GitHub response structures and injects testable failure conditions via the X-Request-Id header.

Complete Working Example

The following script combines authentication, request construction, retry logic, schema validation, and logging into a single runnable module. Replace the endpoint and token with your target API.

import os
import time
import uuid
import logging
import requests
import json
from typing import Any, Dict, List, Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
from pydantic import BaseModel, Field

# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("cognigy.rest_client")

# --- Pydantic Models ---
class IssueLabel(BaseModel):
    id: int
    name: str
    color: str

class GitHubIssue(BaseModel):
    number: int
    title: str
    state: str
    labels: List[IssueLabel] = Field(default_factory=list)
    created_at: str
    html_url: str

# --- Request Builder ---
def build_request(session: Dict[str, Any]) -> Dict[str, Any]:
    owner = session.get("github_owner", "octocat")
    repo = session.get("github_repo", "Hello-World")
    issue_state = session.get("issue_state", "open")
    bearer_token = session.get("bearer_token")

    url = f"https://api.github.com/repos/{owner}/{repo}/issues?state={issue_state}"
    request_id = str(uuid.uuid4())

    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "Accept": "application/vnd.github.v3+json",
        "X-Request-Id": request_id,
        "User-Agent": "CognigyAI-Integration/1.0"
    }

    body = {
        "filter": {"created_after": session.get("date_filter", "2024-01-01")},
        "metadata": {"source": "cognigy_script_node", "conversation_id": session.get("conversationId")}
    }

    logger.info("Constructed request", extra={"url": url, "request_id": request_id, "method": "GET"})
    return {"url": url, "method": "GET", "headers": headers, "body": body}

# --- Execution & Retry ---
def is_transient_error(response: requests.Response) -> bool:
    return response.status_code in (429, 500, 502, 503, 504)

@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1.5, min=2, max=30),
    retry=retry_if_result(is_transient_error),
    reraise=True
)
def execute_request(session: Dict[str, Any]) -> requests.Response:
    config = build_request(session)
    logger.info("Executing HTTP call", extra={"url": config["url"], "method": config["method"]})

    response = requests.request(
        method=config["method"],
        url=config["url"],
        headers=config["headers"],
        json=config["body"] if config["method"] in ("POST", "PUT", "PATCH") else None,
        timeout=30
    )

    logger.info("HTTP response received", extra={"status": response.status_code, "request_id": config["headers"]["X-Request-Id"]})

    if response.status_code == 401:
        raise PermissionError("Invalid or expired Bearer token. Verify OAuth scope and token freshness.")
    if response.status_code == 403:
        raise PermissionError("Access denied. The token lacks required repository permissions.")
    if response.status_code == 404:
        raise ValueError(f"Resource not found at {config['url']}. Verify owner/repo parameters.")
    if response.status_code == 422:
        raise ValueError("Unprocessable entity. Check request payload structure against API schema.")

    response.raise_for_status()
    return response

# --- Validation & Parsing ---
def parse_response(response: requests.Response) -> List[GitHubIssue]:
    logger.info("Validating response schema", extra={"content_length": len(response.content)})
    
    if not response.headers.get("Content-Type", "").startswith("application/json"):
        raise ValueError("Expected JSON response. Received non-JSON content type.")

    data = response.json()
    
    validated_issues = []
    for item in data:
        try:
            validated_issues.append(GitHubIssue(**item))
        except Exception as validation_error:
            logger.warning("Skipped malformed issue record", extra={"error": str(validation_error), "item_id": item.get("id")})
            continue

    logger.info("Schema validation complete", extra={"valid_count": len(validated_issues), "total_count": len(data)})
    return validated_issues

# --- Cognigy.AI Integration Entry Point ---
def cognigy_rest_node_handler(cognigy: Any) -> None:
    """
    Entry point for Cognigy.AI Script Node.
    cognigy.session contains conversation state.
    cognigy.output stores results for downstream nodes.
    """
    try:
        response = execute_request(cognigy.session)
        issues = parse_response(response)
        
        cognigy.session["api_issues"] = [issue.model_dump() for issue in issues]
        cognigy.output["status"] = "success"
        cognigy.output["issue_count"] = len(issues)
        logger.info("Node execution successful", extra={"output_issue_count": len(issues)})
    except PermissionError as auth_error:
        cognigy.output["status"] = "authentication_failed"
        cognigy.output["error_message"] = str(auth_error)
        logger.error("Authentication failure in REST node", extra={"error": str(auth_error)})
    except ValueError as schema_error:
        cognigy.output["status"] = "validation_failed"
        cognigy.output["error_message"] = str(schema_error)
        logger.error("Schema or payload validation failure", extra={"error": str(schema_error)})
    except Exception as unexpected_error:
        cognigy.output["status"] = "unexpected_error"
        cognigy.output["error_message"] = str(unexpected_error)
        logger.exception("Unhandled exception in REST node")

# --- Local Test Runner ---
if __name__ == "__main__":
    test_session = {
        "github_owner": "octocat",
        "github_repo": "Hello-World",
        "issue_state": "open",
        "bearer_token": os.getenv("GITHUB_TOKEN", "dummy_token_for_local_test"),
        "conversationId": "conv_test_001",
        "date_filter": "2024-01-01"
    }
    
    class MockCognigy:
        def __init__(self, session):
            self.session = session
            self.output = {}
    
    mock_cognigy = MockCognigy(test_session)
    cognigy_rest_node_handler(mock_cognigy)
    print("Node Output:", json.dumps(mock_cognigy.output, indent=2))

Run the script with python cognigy_rest_client.py. Set the GITHUB_TOKEN environment variable to a valid token with read:repo scope. The script executes the full request cycle, validates the response, and prints the node output.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired Bearer token, missing Authorization header, or incorrect token format.
  • Fix: Verify the token is passed in the session. Use the TokenManager class to refresh credentials automatically. Ensure the OAuth client has the read:repo scope.
  • Code Fix: The branching logic in execute_request catches 401 and routes to cognigy.output["status"] = "authentication_failed".

Error: 403 Forbidden

  • Cause: Valid token but insufficient permissions for the repository or missing API scope.
  • Fix: Grant the OAuth application read:repo access. For private repositories, verify the token belongs to a user with repository membership.
  • Code Fix: Explicit 403 handling raises PermissionError, which the handler catches and logs.

Error: 429 Rate Limit Exceeded

  • Cause: Exceeding GitHub API rate limits (60 requests per hour for unauthenticated, 5000 per hour for authenticated).
  • Fix: The tenacity decorator retries with exponential backoff. Implement request deduplication in Cognigy.AI to avoid duplicate calls. Cache responses when possible.
  • Code Fix: retry_if_result(is_transient_error) triggers retries for 429. Adjust stop_after_attempt and wait_exponential parameters based on your quota.

Error: 422 Unprocessable Entity

  • Cause: Malformed JSON, missing required fields, or invalid query parameters.
  • Fix: Validate the request body against the target API schema before sending. Use pydantic models to enforce structure.
  • Code Fix: The 422 branch raises ValueError. Review the build_request function to ensure all required parameters match the API specification.

Error: Pydantic ValidationError

  • Cause: Response structure changed or contains unexpected fields.
  • Fix: Update GitHubIssue and IssueLabel models to match the current API version. Use Field(default_factory=list) for optional arrays.
  • Code Fix: The parse_response function catches validation errors per record and logs them without halting execution.

Official References