Verifying Webhook Signature Headers to Prevent Replay Attacks

Verifying Webhook Signature Headers to Prevent Replay Attacks

What You Will Build

  • A secure webhook receiver endpoint that validates incoming Genesys Cloud event notifications against cryptographic signatures.
  • Implementation of HMAC-SHA256 verification to ensure payload integrity and timestamp validation to reject stale or replayed requests.
  • Production-ready Python code using the FastAPI framework and the hmac standard library.

Prerequisites

  • OAuth Client Type: Server-to-Server (Client Credentials) is not required for receiving webhooks, but you need a Genesys Cloud organization with webhook management permissions to create the test webhook.
  • Required Scopes: webhook:read (to create the test webhook via API) or manual configuration via Admin Console.
  • SDK/API Version: Genesys Cloud API v2.
  • Language/Runtime: Python 3.9+.
  • External Dependencies: fastapi, uvicorn, pydantic, requests.

Authentication Setup

While the webhook receiver itself does not use OAuth tokens (it uses secret key verification), you must authenticate to the Genesys Cloud Admin API to create the webhook endpoint. This tutorial assumes you have already configured a Secret Key in the Genesys Cloud Admin Console under Integrations > Webhooks > Secret.

If you need to create the webhook programmatically, you would use the PureCloudPlatformClientV2 SDK. However, for this tutorial, we assume the webhook URL is already registered in Genesys Cloud, and we are focusing on the receiver side.

The critical piece of configuration is the Secret. Genesys Cloud uses this secret to sign the payload. You must keep this secret identical in your application environment variables.

Implementation

Step 1: Define the Security Middleware and Validation Logic

Genesys Cloud appends three specific headers to every webhook request:

  1. X-Genesys-Signature: The HMAC-SHA256 signature of the payload.
  2. X-Genesys-Date: The timestamp of when the webhook was sent (RFC 2822 format).
  3. X-Genesys-Nonce: A unique identifier for the request (used for idempotency, though signature verification is the primary defense against tampering).

To prevent replay attacks, we must verify two things:

  1. Integrity: Does the signature match the payload and secret?
  2. Freshness: Is the timestamp within an acceptable window (e.g., 5 minutes) of the current server time?

First, install the required dependencies:

pip install fastapi uvicorn pydantic requests

Next, create the core validation logic. We will use Python’s built-in hmac and hashlib libraries.

import hmac
import hashlib
import time
from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime

# Configuration
WEBHOOK_SECRET = "your_genesis_cloud_webhook_secret"
MAX_TIMESTAMP_AGE_SECONDS = 300  # 5 minutes

def verify_signature(payload: bytes, signature: str, timestamp: str, nonce: str) -> bool:
    """
    Verifies the Genesys Cloud webhook signature and timestamp.
    
    Args:
        payload: The raw body of the HTTP request.
        signature: The value from the X-Genesys-Signature header.
        timestamp: The value from the X-Genesys-Date header.
        nonce: The value from the X-Genesys-Nonce header.
        
    Returns:
        True if valid, False otherwise.
    """
    if not WEBHOOK_SECRET or WEBHOOK_SECRET == "your_genesis_cloud_webhook_secret":
        raise ValueError("WEBHOOK_SECRET environment variable is not set.")

    # 1. Parse the timestamp
    try:
        webhook_time = parsedate_to_datetime(timestamp)
        # Ensure the timestamp is timezone-aware (Genesys sends UTC)
        if webhook_time.tzinfo is None:
            webhook_time = webhook_time.replace(tzinfo=timezone.utc)
    except Exception:
        return False

    # 2. Check for replay attacks (Timestamp Validation)
    current_time = datetime.now(timezone.utc)
    time_diff = abs((current_time - webhook_time).total_seconds())
    
    if time_diff > MAX_TIMESTAMP_AGE_SECONDS:
        # Log this for debugging, but do not raise an exception in the middleware
        # so we can return a specific HTTP status.
        print(f"Warning: Webhook timestamp is too old. Diff: {time_diff}s")
        return False

    # 3. Construct the string to sign
    # Genesys Cloud signs: nonce + timestamp + payload
    string_to_sign = f"{nonce}{timestamp}{payload.decode('utf-8')}"
    
    # 4. Compute the expected HMAC-SHA256 signature
    expected_signature = hmac.new(
        WEBHOOK_SECRET.encode('utf-8'),
        string_to_sign.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    # 5. Compare signatures using constant-time comparison to prevent timing attacks
    return hmac.compare_digest(expected_signature, signature)

Step 2: Implement the FastAPI Webhook Receiver

Now we integrate this validation into a FastAPI application. We will create a generic POST endpoint that accepts all webhook events. In a production system, you might route specific events to different handlers, but the validation middleware remains the same.

from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional

app = FastAPI(title="Genesys Cloud Webhook Receiver")

@app.post("/webhook/genesys")
async def handle_webhook(
    request: Request,
    x_genesys_signature: Optional[str] = Header(None),
    x_genesys_date: Optional[str] = Header(None),
    x_genesys_nonce: Optional[str] = Header(None)
):
    """
    Endpoint to receive and verify Genesys Cloud webhooks.
    """
    # 1. Retrieve raw body
    body = await request.body()
    
    # 2. Validate headers exist
    if not all([x_genesys_signature, x_genesys_date, x_genesys_nonce]):
        raise HTTPException(
            status_code=400, 
            detail="Missing required Genesys Cloud headers"
        )

    # 3. Verify signature and timestamp
    if not verify_signature(body, x_genesys_signature, x_genesys_date, x_genesys_nonce):
        raise HTTPException(
            status_code=401, 
            detail="Invalid signature or expired timestamp"
        )

    # 4. Process the payload
    # At this point, the request is verified as authentic and fresh.
    try:
        import json
        data = json.loads(body)
        
        # Example: Log the event type
        event_type = data.get("eventType")
        print(f"Received verified event: {event_type}")
        
        # Here you would dispatch to specific business logic
        # e.g., if event_type == "queue.conversation.added": handle_new_queue_event(data)
        
        return {"status": "success", "message": "Event processed"}
    
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON payload")

Step 3: Handling Idempotency with Nonce

While signature verification prevents tampering, it does not prevent duplicate delivery if Genesys Cloud retries a failed request. Genesys Cloud includes a X-Genesys-Nonce header which is unique per webhook dispatch. To build a robust system, you should store processed nonces to reject duplicates.

For this tutorial, we will use a simple in-memory set for demonstration. In production, use Redis or a database with TTL (Time-To-Live) indexing.

# Global store for processed nonces (Replace with Redis/DB in production)
processed_nonces = set()
NONCE_TTL = 3600 # 1 hour

import time

def is_nonce_processed(nonce: str) -> bool:
    """Checks if a nonce has already been processed."""
    # In a real app, use Redis: redis.get(f"nonce:{nonce}")
    return nonce in processed_nonces

def mark_nonce_processed(nonce: str):
    """Marks a nonce as processed."""
    # In a real app, use Redis: redis.setex(f"nonce:{nonce}", NONCE_TTL, 1)
    processed_nonces.add(nonce)
    
    # Clean up old nonces (simplified for demo)
    # In production, rely on DB/Redis TTL
    if len(processed_nonces) > 1000:
        processed_nonces.clear()

# Update the FastAPI handler to include nonce checking

@app.post("/webhook/genesys/secure")
async def handle_webhook_secure(
    request: Request,
    x_genesys_signature: Optional[str] = Header(None),
    x_genesys_date: Optional[str] = Header(None),
    x_genesys_nonce: Optional[str] = Header(None)
):
    body = await request.body()
    
    if not all([x_genesys_signature, x_genesys_date, x_genesys_nonce]):
        raise HTTPException(status_code=400, detail="Missing headers")

    # Check for replay/duplicate via Nonce
    if is_nonce_processed(x_genesys_nonce):
        # Return 200 to acknowledge receipt, but do not process again
        return {"status": "duplicate", "message": "Nonce already processed"}

    if not verify_signature(body, x_genesys_signature, x_genesys_date, x_genesys_nonce):
        raise HTTPException(status_code=401, detail="Invalid signature or timestamp")

    try:
        import json
        data = json.loads(body)
        
        # Process business logic here
        event_type = data.get("eventType")
        print(f"Processing new event: {event_type}")
        
        # Mark as processed after successful logic execution
        mark_nonce_processed(x_genesys_nonce)
        
        return {"status": "success", "message": "Event processed"}
    
    except Exception as e:
        # If processing fails, do not mark nonce as processed
        # so Genesys can retry
        print(f"Error processing event: {e}")
        raise HTTPException(status_code=500, detail="Internal processing error")

Complete Working Example

Below is the complete, single-file Python script. Save this as webhook_server.py.

import hmac
import hashlib
import time
import json
from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
import uvicorn

# --- Configuration ---
# Replace this with your actual Genesys Cloud Webhook Secret
WEBHOOK_SECRET = "super_secret_key_12345"
MAX_TIMESTAMP_AGE_SECONDS = 300  # 5 minutes tolerance
NONCE_TTL = 3600 # 1 hour for duplicate check

# --- State Management (Use Redis/DB in Production) ---
processed_nonces = set()

# --- Security Functions ---

def verify_signature(payload: bytes, signature: str, timestamp: str, nonce: str) -> bool:
    """
    Verifies the Genesys Cloud webhook signature and timestamp.
    """
    if not WEBHOOK_SECRET:
        raise ValueError("WEBHOOK_SECRET is not configured.")

    # 1. Parse Timestamp
    try:
        webhook_time = parsedate_to_datetime(timestamp)
        if webhook_time.tzinfo is None:
            webhook_time = webhook_time.replace(tzinfo=timezone.utc)
    except Exception:
        return False

    # 2. Check Timestamp Freshness (Replay Attack Prevention)
    current_time = datetime.now(timezone.utc)
    time_diff = abs((current_time - webhook_time).total_seconds())
    
    if time_diff > MAX_TIMESTAMP_AGE_SECONDS:
        print(f"Signature valid but timestamp too old: {time_diff}s")
        return False

    # 3. Construct String to Sign: nonce + timestamp + payload
    string_to_sign = f"{nonce}{timestamp}{payload.decode('utf-8')}"
    
    # 4. Compute HMAC-SHA256
    expected_signature = hmac.new(
        WEBHOOK_SECRET.encode('utf-8'),
        string_to_sign.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    # 5. Constant-time comparison
    return hmac.compare_digest(expected_signature, signature)

def is_nonce_processed(nonce: str) -> bool:
    return nonce in processed_nonces

def mark_nonce_processed(nonce: str):
    processed_nonces.add(nonce)
    # Simple cleanup for demo purposes
    if len(processed_nonces) > 1000:
        processed_nonces.clear()

# --- Application ---

app = FastAPI(title="Genesys Secure Webhook Receiver")

@app.post("/webhook/genesys")
async def handle_webhook(
    request: Request,
    x_genesys_signature: Optional[str] = Header(None),
    x_genesys_date: Optional[str] = Header(None),
    x_genesys_nonce: Optional[str] = Header(None)
):
    """
    Receives Genesys Cloud webhooks, verifies signature, checks timestamp,
    and ensures idempotency via nonce.
    """
    body = await request.body()
    
    # 1. Header Presence Check
    if not all([x_genesys_signature, x_genesys_date, x_genesys_nonce]):
        raise HTTPException(
            status_code=400, 
            detail="Missing required Genesys Cloud headers"
        )

    # 2. Idempotency Check (Duplicate Prevention)
    if is_nonce_processed(x_genesys_nonce):
        return {"status": "duplicate", "message": "Nonce already processed"}

    # 3. Signature & Timestamp Verification
    if not verify_signature(body, x_genesys_signature, x_genesys_date, x_genesys_nonce):
        raise HTTPException(
            status_code=401, 
            detail="Invalid signature or expired timestamp"
        )

    # 4. Process Payload
    try:
        data = json.loads(body)
        event_type = data.get("eventType", "unknown")
        
        # Simulate business logic
        print(f"[VERIFIED] Processing event: {event_type}")
        
        # Mark nonce as processed ONLY after successful logic
        mark_nonce_processed(x_genesys_nonce)
        
        return {"status": "success", "message": "Event processed securely"}
    
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON payload")
    except Exception as e:
        # Do not mark nonce if processing fails, allowing retry
        print(f"Error processing event: {str(e)}")
        raise HTTPException(status_code=500, detail="Internal processing error")

if __name__ == "__main__":
    # Run with uvicorn for local development
    uvicorn.run(app, host="0.0.0.0", port=8000)

Common Errors & Debugging

Error: 401 Invalid Signature or Expired Timestamp

Cause:

  1. The WEBHOOK_SECRET in your code does not match the secret configured in Genesys Cloud Admin.
  2. The server clock is significantly out of sync with UTC.
  3. The payload body was modified by a proxy or load balancer before reaching your application (e.g., compression or character encoding changes).

Fix:
Ensure the secret is identical. Check your server time:

date -u

If using a reverse proxy (Nginx/Apache), ensure it is not stripping headers or modifying the body. The raw body must be passed exactly as received.

Error: 400 Missing Required Headers

Cause:
A load balancer, WAF (Web Application Firewall), or cloud provider (e.g., AWS ALB, Azure Front Door) is stripping the custom X-Genesys-* headers.

Fix:
Configure your load balancer to allow custom headers. For example, in AWS ALB, ensure the security group allows incoming traffic and that the listener rules do not drop custom headers.

Error: Signature Mismatch Despite Correct Secret

Cause:
The string being signed is constructed incorrectly. Genesys Cloud signs nonce + timestamp + payload. Common mistakes include:

  • Adding newlines or spaces between the components.
  • Using the wrong character encoding (must be UTF-8).
  • Using the raw timestamp string from the header instead of the exact string value.

Fix:
Debug by printing the string_to_sign and the expected_signature and comparing them with a manual calculation using an online HMAC tool or a simple Python snippet:

import hmac, hashlib
secret = "super_secret_key_12345"
nonce = "abc123"
ts = "Wed, 21 Oct 2015 07:28:00 GMT"
payload = '{"eventType":"queue.conversation.added"}'

string_to_sign = f"{nonce}{ts}{payload}"
sig = hmac.new(secret.encode(), string_to_sign.encode(), hashlib.sha256).hexdigest()
print(sig)

Official References