Building Robust REST API Calls in Cognigy.AI with Python
What You Will Build
- A production-grade Python module that executes REST API requests from within Cognigy.AI Script or REST nodes.
- The code implements dynamic URL construction, session variable injection, schema validation, conditional error branching, exponential backoff retries, structured logging, and a local mock server for isolated testing.
- The tutorial uses Python 3.9+ with
requests,pydantic,tenacity,fastapi, anduvicorn.
Prerequisites
- Python 3.9 or higher
requests>=2.31.0pydantic>=2.5.0tenacity>=8.2.0fastapi>=0.104.0uvicorn>=0.24.0- Cognigy.AI Studio access with Script or REST node permissions
- Bearer token or API key for the target external service
- Target API:
https://api.github.com/repos/{owner}/{repo}/issues(real endpoint, requiresread:reposcope for private repositories)
Authentication Setup
Cognigy.AI passes authentication credentials through the session object or environment variables. The following code demonstrates how to cache and refresh a Bearer token before making API calls.
import os
import time
import requests
from typing import Optional
class TokenManager:
def __init__(self, token_url: str, client_id: str, client_secret: str):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self._token: Optional[str] = None
self._expires_at: float = 0.0
def get_token(self) -> str:
if self._token and time.time() < self._expires_at - 60:
return self._token
response = requests.post(
self.token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "read:repo"
}
)
response.raise_for_status()
payload = response.json()
self._token = payload["access_token"]
self._expires_at = time.time() + payload["expires_in"]
return self._token
The OAuth scope read:repo is required for the GitHub Issues endpoint. Cognigy.AI REST nodes accept the token in the Authorization header. The manager above caches the token and refreshes it sixty seconds before expiration to prevent mid-flow authentication failures.
Implementation
Step 1: Dynamic URL Construction and Session Variable Injection
Cognigy.AI injects conversation state into the cognigy.session dictionary. The following function constructs the target URL and populates headers and body using session values.
import uuid
import logging
from typing import Any, Dict
logger = logging.getLogger("cognigy.rest_client")
def build_request(session: Dict[str, Any]) -> Dict[str, Any]:
owner = session.get("github_owner", "octocat")
repo = session.get("github_repo", "Hello-World")
issue_state = session.get("issue_state", "open")
bearer_token = session.get("bearer_token")
url = f"https://api.github.com/repos/{owner}/{repo}/issues?state={issue_state}"
request_id = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {bearer_token}",
"Accept": "application/vnd.github.v3+json",
"X-Request-Id": request_id,
"User-Agent": "CognigyAI-Integration/1.0"
}
body = {
"filter": {"created_after": session.get("date_filter", "2024-01-01")},
"metadata": {"source": "cognigy_script_node", "conversation_id": session.get("conversationId")}
}
logger.info("Constructed request", extra={"url": url, "request_id": request_id, "method": "GET"})
return {"url": url, "method": "GET", "headers": headers, "body": body}
The function extracts github_owner, github_repo, and issue_state from the session. It falls back to safe defaults when values are missing. The X-Request-Id header enables trace correlation across microservices.
Step 2: Retry Logic and HTTP Error Code Branching
Transient failures require exponential backoff. The tenacity library handles 429 and 5xx responses automatically. Explicit branching handles client errors.
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_result
def is_transient_error(response: requests.Response) -> bool:
return response.status_code in (429, 500, 502, 503, 504)
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1.5, min=2, max=30),
retry=retry_if_result(is_transient_error),
reraise=True
)
def execute_request(session: Dict[str, Any]) -> requests.Response:
config = build_request(session)
logger.info("Executing HTTP call", extra={"url": config["url"], "method": config["method"]})
response = requests.request(
method=config["method"],
url=config["url"],
headers=config["headers"],
json=config["body"] if config["method"] in ("POST", "PUT", "PATCH") else None,
timeout=30
)
logger.info("HTTP response received", extra={"status": response.status_code, "request_id": config["headers"]["X-Request-Id"]})
if response.status_code == 401:
raise PermissionError("Invalid or expired Bearer token. Verify OAuth scope and token freshness.")
if response.status_code == 403:
raise PermissionError("Access denied. The token lacks required repository permissions.")
if response.status_code == 404:
raise ValueError(f"Resource not found at {config['url']}. Verify owner/repo parameters.")
if response.status_code == 422:
raise ValueError("Unprocessable entity. Check request payload structure against API schema.")
response.raise_for_status()
return response
The decorator retries on rate limits and server errors. The branching logic raises specific exceptions for 401, 403, 404, and 422. Cognigy.AI script nodes catch these exceptions and route the conversation to error handling branches.
Step 3: Response Schema Validation and Field Extraction
Raw JSON responses must be validated before consumption. pydantic enforces structure and extracts typed fields.
from pydantic import BaseModel, Field
from typing import List, Optional
class IssueLabel(BaseModel):
id: int
name: str
color: str
class GitHubIssue(BaseModel):
number: int
title: str
state: str
labels: List[IssueLabel] = Field(default_factory=list)
created_at: str
html_url: str
def parse_response(response: requests.Response) -> List[GitHubIssue]:
logger.info("Validating response schema", extra={"content_length": len(response.content)})
if not response.headers.get("Content-Type", "").startswith("application/json"):
raise ValueError("Expected JSON response. Received non-JSON content type.")
data = response.json()
validated_issues = []
for item in data:
try:
validated_issues.append(GitHubIssue(**item))
except Exception as validation_error:
logger.warning("Skipped malformed issue record", extra={"error": str(validation_error), "item_id": item.get("id")})
continue
logger.info("Schema validation complete", extra={"valid_count": len(validated_issues), "total_count": len(data)})
return validated_issues
The function iterates over the response array, validates each record against GitHubIssue, and logs skipped records without halting execution. This prevents single-record corruption from breaking the entire flow.
Step 4: Logging Traces and Mock Server for Testing
Production logging requires structured output. The mock server simulates the external API for offline development.
import json
import logging
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Structured JSON logger setup
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(json.dumps({"time": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s", "extra": "%(extra)s"})))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Mock Server
app = FastAPI(title="Cognigy REST Mock Server")
class MockIssue(BaseModel):
number: int
title: str
state: str
labels: list[dict]
created_at: str
html_url: str
@app.get("/repos/{owner}/{repo}/issues")
async def mock_issues(owner: str, repo: str, state: str = "open", request: Request = None):
# Simulate transient failure on specific request IDs
request_id = request.headers.get("X-Request-Id")
if request_id and request_id.startswith("fail-"):
return JSONResponse(status_code=429, content={"message": "Rate limit exceeded"})
# Simulate 500 error on missing auth
auth_header = request.headers.get("Authorization")
if not auth_header:
return JSONResponse(status_code=401, content={"message": "Missing token"})
return JSONResponse(content=[
MockIssue(number=1, title="Setup CI pipeline", state="open", labels=[{"id": 1, "name": "automation", "color": "blue"}], created_at="2024-01-15T10:00:00Z", html_url="https://github.com/example/repo/issues/1").model_dump(),
MockIssue(number=2, title="Fix login redirect", state="closed", labels=[{"id": 2, "name": "bug", "color": "red"}], created_at="2024-01-10T08:30:00Z", html_url="https://github.com/example/repo/issues/2").model_dump()
])
Run the mock server with uvicorn main:app --reload. Point the script to http://localhost:8000 during development. The mock server mirrors real GitHub response structures and injects testable failure conditions via the X-Request-Id header.
Complete Working Example
The following script combines authentication, request construction, retry logic, schema validation, and logging into a single runnable module. Replace the endpoint and token with your target API.
import os
import time
import uuid
import logging
import requests
import json
from typing import Any, Dict, List, Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
from pydantic import BaseModel, Field
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("cognigy.rest_client")
# --- Pydantic Models ---
class IssueLabel(BaseModel):
id: int
name: str
color: str
class GitHubIssue(BaseModel):
number: int
title: str
state: str
labels: List[IssueLabel] = Field(default_factory=list)
created_at: str
html_url: str
# --- Request Builder ---
def build_request(session: Dict[str, Any]) -> Dict[str, Any]:
owner = session.get("github_owner", "octocat")
repo = session.get("github_repo", "Hello-World")
issue_state = session.get("issue_state", "open")
bearer_token = session.get("bearer_token")
url = f"https://api.github.com/repos/{owner}/{repo}/issues?state={issue_state}"
request_id = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {bearer_token}",
"Accept": "application/vnd.github.v3+json",
"X-Request-Id": request_id,
"User-Agent": "CognigyAI-Integration/1.0"
}
body = {
"filter": {"created_after": session.get("date_filter", "2024-01-01")},
"metadata": {"source": "cognigy_script_node", "conversation_id": session.get("conversationId")}
}
logger.info("Constructed request", extra={"url": url, "request_id": request_id, "method": "GET"})
return {"url": url, "method": "GET", "headers": headers, "body": body}
# --- Execution & Retry ---
def is_transient_error(response: requests.Response) -> bool:
return response.status_code in (429, 500, 502, 503, 504)
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1.5, min=2, max=30),
retry=retry_if_result(is_transient_error),
reraise=True
)
def execute_request(session: Dict[str, Any]) -> requests.Response:
config = build_request(session)
logger.info("Executing HTTP call", extra={"url": config["url"], "method": config["method"]})
response = requests.request(
method=config["method"],
url=config["url"],
headers=config["headers"],
json=config["body"] if config["method"] in ("POST", "PUT", "PATCH") else None,
timeout=30
)
logger.info("HTTP response received", extra={"status": response.status_code, "request_id": config["headers"]["X-Request-Id"]})
if response.status_code == 401:
raise PermissionError("Invalid or expired Bearer token. Verify OAuth scope and token freshness.")
if response.status_code == 403:
raise PermissionError("Access denied. The token lacks required repository permissions.")
if response.status_code == 404:
raise ValueError(f"Resource not found at {config['url']}. Verify owner/repo parameters.")
if response.status_code == 422:
raise ValueError("Unprocessable entity. Check request payload structure against API schema.")
response.raise_for_status()
return response
# --- Validation & Parsing ---
def parse_response(response: requests.Response) -> List[GitHubIssue]:
logger.info("Validating response schema", extra={"content_length": len(response.content)})
if not response.headers.get("Content-Type", "").startswith("application/json"):
raise ValueError("Expected JSON response. Received non-JSON content type.")
data = response.json()
validated_issues = []
for item in data:
try:
validated_issues.append(GitHubIssue(**item))
except Exception as validation_error:
logger.warning("Skipped malformed issue record", extra={"error": str(validation_error), "item_id": item.get("id")})
continue
logger.info("Schema validation complete", extra={"valid_count": len(validated_issues), "total_count": len(data)})
return validated_issues
# --- Cognigy.AI Integration Entry Point ---
def cognigy_rest_node_handler(cognigy: Any) -> None:
"""
Entry point for Cognigy.AI Script Node.
cognigy.session contains conversation state.
cognigy.output stores results for downstream nodes.
"""
try:
response = execute_request(cognigy.session)
issues = parse_response(response)
cognigy.session["api_issues"] = [issue.model_dump() for issue in issues]
cognigy.output["status"] = "success"
cognigy.output["issue_count"] = len(issues)
logger.info("Node execution successful", extra={"output_issue_count": len(issues)})
except PermissionError as auth_error:
cognigy.output["status"] = "authentication_failed"
cognigy.output["error_message"] = str(auth_error)
logger.error("Authentication failure in REST node", extra={"error": str(auth_error)})
except ValueError as schema_error:
cognigy.output["status"] = "validation_failed"
cognigy.output["error_message"] = str(schema_error)
logger.error("Schema or payload validation failure", extra={"error": str(schema_error)})
except Exception as unexpected_error:
cognigy.output["status"] = "unexpected_error"
cognigy.output["error_message"] = str(unexpected_error)
logger.exception("Unhandled exception in REST node")
# --- Local Test Runner ---
if __name__ == "__main__":
test_session = {
"github_owner": "octocat",
"github_repo": "Hello-World",
"issue_state": "open",
"bearer_token": os.getenv("GITHUB_TOKEN", "dummy_token_for_local_test"),
"conversationId": "conv_test_001",
"date_filter": "2024-01-01"
}
class MockCognigy:
def __init__(self, session):
self.session = session
self.output = {}
mock_cognigy = MockCognigy(test_session)
cognigy_rest_node_handler(mock_cognigy)
print("Node Output:", json.dumps(mock_cognigy.output, indent=2))
Run the script with python cognigy_rest_client.py. Set the GITHUB_TOKEN environment variable to a valid token with read:repo scope. The script executes the full request cycle, validates the response, and prints the node output.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired Bearer token, missing
Authorizationheader, or incorrect token format. - Fix: Verify the token is passed in the session. Use the
TokenManagerclass to refresh credentials automatically. Ensure the OAuth client has theread:reposcope. - Code Fix: The branching logic in
execute_requestcatches 401 and routes tocognigy.output["status"] = "authentication_failed".
Error: 403 Forbidden
- Cause: Valid token but insufficient permissions for the repository or missing API scope.
- Fix: Grant the OAuth application
read:repoaccess. For private repositories, verify the token belongs to a user with repository membership. - Code Fix: Explicit 403 handling raises
PermissionError, which the handler catches and logs.
Error: 429 Rate Limit Exceeded
- Cause: Exceeding GitHub API rate limits (60 requests per hour for unauthenticated, 5000 per hour for authenticated).
- Fix: The
tenacitydecorator retries with exponential backoff. Implement request deduplication in Cognigy.AI to avoid duplicate calls. Cache responses when possible. - Code Fix:
retry_if_result(is_transient_error)triggers retries for 429. Adjuststop_after_attemptandwait_exponentialparameters based on your quota.
Error: 422 Unprocessable Entity
- Cause: Malformed JSON, missing required fields, or invalid query parameters.
- Fix: Validate the request body against the target API schema before sending. Use
pydanticmodels to enforce structure. - Code Fix: The 422 branch raises
ValueError. Review thebuild_requestfunction to ensure all required parameters match the API specification.
Error: Pydantic ValidationError
- Cause: Response structure changed or contains unexpected fields.
- Fix: Update
GitHubIssueandIssueLabelmodels to match the current API version. UseField(default_factory=list)for optional arrays. - Code Fix: The
parse_responsefunction catches validation errors per record and logs them without halting execution.