Caching Expensive Genesys Cloud Custom Action Queries with a Python Redis Decorator

Caching Expensive Genesys Cloud Custom Action Queries with a Python Redis Decorator

What You Will Build

  • A Python decorator that caches Genesys Cloud Custom Action query results in Redis, deterministically hashes input parameters, validates cache freshness against configurable TTLs, and triggers direct API execution with exponential backoff on cache misses or stale-while-revalidate conditions.
  • The implementation uses the Genesys Cloud POST /api/v2/customactions/actions/query endpoint with raw requests HTTP calls for full control over retry logic and header inspection.
  • The tutorial covers Python 3.9+ using redis, requests, hashlib, and standard library modules.

Prerequisites

  • OAuth confidential client with customactions:read scope enabled in the Genesys Cloud admin console
  • Genesys Cloud environment URL (e.g., https://acme.mygen.com)
  • Python 3.9 or higher with requests, redis, and pydantic installed (pip install requests redis pydantic)
  • Running Redis instance accessible via redis://localhost:6379 or equivalent

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server API access. You must cache the access token and handle expiration before making cached queries. The token endpoint requires your client ID, client secret, and the exact scope string.

import requests
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, env: str, client_id: str, client_secret: str, scope: str = "customactions:read"):
        self.base_url = f"https://{env}.mygen.com"
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.token: Optional[str] = None
        self.expires_at: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at - 60:
            return self.token

        token_url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        body = {
            "grant_type": "client_credentials",
            "scope": self.scope,
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(token_url, headers=headers, data=body)
        response.raise_for_status()
        data = response.json()
        
        self.token = data["access_token"]
        self.expires_at = time.time() + data["expires_in"]
        return self.token

The token manager checks expiration before each call and subtracts a sixty-second buffer to prevent mid-request 401 errors. The scope parameter must exactly match the API permission. The customactions:read scope grants access to query and retrieve custom action definitions.

Implementation

Step 1: Redis Connection and Parameter Hashing

Deterministic cache keys prevent collisions when query parameters change order or contain nested structures. You will use hashlib.sha256 on a JSON-serialized representation of the function name and all keyword arguments. Redis stores the payload alongside metadata timestamps.

import redis
import hashlib
import json
import time
from typing import Any, Dict

class RedisCacheManager:
    def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 300, stale_ttl: int = 900):
        self.pool = redis.ConnectionPool.from_url(redis_url, decode_responses=True)
        self.client = redis.Redis(connection_pool=self.pool)
        self.freshness_ttl = ttl
        self.stale_ttl = stale_ttl

    def _build_key(self, func_name: str, **kwargs) -> str:
        payload = {"func": func_name, "params": kwargs}
        serialized = json.dumps(payload, sort_keys=True, default=str)
        hash_hex = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
        return f"genesys:customaction:query:{hash_hex}"

    def get(self, key: str) -> Dict[str, Any]:
        raw = self.client.get(key)
        if not raw:
            return {}
        return json.loads(raw)

    def set(self, key: str, data: Any) -> None:
        entry = {
            "data": data,
            "cached_at": time.time(),
            "freshness_ttl": self.freshness_ttl,
            "stale_ttl": self.stale_ttl
        }
        self.client.setex(key, self.stale_ttl, json.dumps(entry))

The _build_key method sorts keys and converts non-serializable types to strings to guarantee identical hashes for identical logical queries. The set method writes the payload with a TTL equal to stale_ttl. Redis automatically evicts the key when the stale window expires, preventing unbounded memory growth.

Step 2: Freshness Validation and Stale-While-Revalidate Fallback

Cache validity depends on the difference between cached_at and the current timestamp. You will return fresh data immediately. When the freshness window expires but the stale window remains open, the decorator returns the cached payload while triggering a background API refresh. This pattern mimics HTTP stale-while-revalidate behavior without requiring framework-level middleware.

from functools import wraps
import threading
from typing import Callable, Any, Dict

def redis_cached_custom_action(redis_url: str = "redis://localhost:6379", 
                                freshness_ttl: int = 300, 
                                stale_ttl: int = 900):
    cache_mgr = RedisCacheManager(redis_url=redis_url, ttl=freshness_ttl, stale_ttl=stale_ttl)

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            key = cache_mgr._build_key(func.__name__, **kwargs)
            cached_entry = cache_mgr.get(key)

            if cached_entry:
                age = time.time() - cached_entry["cached_at"]
                if age < cached_entry["freshness_ttl"]:
                    return cached_entry["data"]
                
                if age < cached_entry["stale_ttl"]:
                    # Stale-while-revalidate condition
                    cached_entry["swr_refresh"] = True
                    cache_mgr.set(key, cached_entry["data"])
                    
                    def background_refresh():
                        try:
                            fresh_data = func(*args, **kwargs)
                            cache_mgr.set(key, fresh_data)
                        except Exception as exc:
                            print(f"Background refresh failed for {key}: {exc}")
                    
                    threading.Thread(target=background_refresh, daemon=True).start()
                    return cached_entry["data"]

            # Cache miss or fully expired
            result = func(*args, **kwargs)
            cache_mgr.set(key, result)
            return result

        return wrapper
    return decorator

The decorator evaluates cache age against two thresholds. Fresh data returns immediately. Stale data returns the cached payload while spawning a daemon thread to fetch fresh results. This prevents blocking the caller when data is only slightly outdated. The swr_refresh flag allows downstream systems to detect that a background update is in progress.

Step 3: Direct API Execution with Exponential Backoff

When the cache misses or the stale window expires, the decorator falls back to the raw Genesys Cloud API. You will implement exponential backoff with jitter to handle 429 Too Many Requests and transient 5xx errors. The query endpoint supports pagination via nextPageToken, which you will handle in a loop.

import requests
import time
import random
from typing import Any, Dict, List

def query_custom_actions(auth: GenesysAuth, query_body: Dict[str, Any]) -> Dict[str, Any]:
    endpoint = f"{auth.base_url}/api/v2/customactions/actions/query"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    all_entities: List[Dict[str, Any]] = []
    page_token = None
    max_retries = 5

    while True:
        payload = {**query_body}
        if page_token:
            payload["pageToken"] = page_token

        for attempt in range(max_retries):
            response = requests.post(endpoint, headers=headers, json=payload)
            
            if response.status_code == 200:
                data = response.json()
                all_entities.extend(data.get("entities", []))
                page_token = data.get("pagination", {}).get("nextPageToken")
                break
            elif response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
                print(f"Rate limited (429). Retrying in {retry_after}s...")
                time.sleep(retry_after)
                continue
            elif response.status_code >= 500:
                backoff = 2 ** attempt + random.uniform(0, 1)
                print(f"Server error ({response.status_code}). Retrying in {backoff}s...")
                time.sleep(backoff)
                continue
            else:
                response.raise_for_status()

        if not page_token:
            break

    return {"entities": all_entities, "total_count": len(all_entities)}

The request cycle follows this exact structure:

  • Method: POST
  • Path: /api/v2/customactions/actions/query
  • Headers: Authorization: Bearer <token>, Content-Type: application/json, Accept: application/json
  • Request Body:
{
  "query": {
    "filter": {
      "name": {
        "contains": "order_lookup"
      }
    },
    "sort": [{"attribute": "name", "type": "string", "order": "asc"}]
  }
}
  • Response Body:
{
  "entities": [
    {
      "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
      "name": "order_lookup",
      "description": "Fetches order details by ID",
      "type": "custom",
      "status": "active"
    }
  ],
  "pagination": {
    "total": 1,
    "count": 1,
    "pageSize": 25,
    "pageToken": null,
    "nextPageToken": null
  }
}

The retry loop respects Retry-After headers when present. Fallback to exponential backoff with random jitter prevents thundering herd scenarios across multiple concurrent workers. Pagination continues until nextPageToken returns null.

Complete Working Example

The following script combines authentication, caching, and query execution into a single runnable module. Replace the placeholder credentials before execution.

import requests
import redis
import hashlib
import json
import time
import random
import threading
from functools import wraps
from typing import Any, Dict, List, Callable, Optional

class GenesysAuth:
    def __init__(self, env: str, client_id: str, client_secret: str, scope: str = "customactions:read"):
        self.base_url = f"https://{env}.mygen.com"
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self.token: Optional[str] = None
        self.expires_at: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at - 60:
            return self.token

        token_url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        body = {
            "grant_type": "client_credentials",
            "scope": self.scope,
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(token_url, headers=headers, data=body)
        response.raise_for_status()
        data = response.json()
        
        self.token = data["access_token"]
        self.expires_at = time.time() + data["expires_in"]
        return self.token

class RedisCacheManager:
    def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 300, stale_ttl: int = 900):
        self.pool = redis.ConnectionPool.from_url(redis_url, decode_responses=True)
        self.client = redis.Redis(connection_pool=self.pool)
        self.freshness_ttl = ttl
        self.stale_ttl = stale_ttl

    def _build_key(self, func_name: str, **kwargs) -> str:
        payload = {"func": func_name, "params": kwargs}
        serialized = json.dumps(payload, sort_keys=True, default=str)
        hash_hex = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
        return f"genesys:customaction:query:{hash_hex}"

    def get(self, key: str) -> Dict[str, Any]:
        raw = self.client.get(key)
        if not raw:
            return {}
        return json.loads(raw)

    def set(self, key: str, data: Any) -> None:
        entry = {
            "data": data,
            "cached_at": time.time(),
            "freshness_ttl": self.freshness_ttl,
            "stale_ttl": self.stale_ttl
        }
        self.client.setex(key, self.stale_ttl, json.dumps(entry))

def redis_cached_custom_action(redis_url: str = "redis://localhost:6379", 
                                freshness_ttl: int = 300, 
                                stale_ttl: int = 900):
    cache_mgr = RedisCacheManager(redis_url=redis_url, ttl=freshness_ttl, stale_ttl=stale_ttl)

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            key = cache_mgr._build_key(func.__name__, **kwargs)
            cached_entry = cache_mgr.get(key)

            if cached_entry:
                age = time.time() - cached_entry["cached_at"]
                if age < cached_entry["freshness_ttl"]:
                    return cached_entry["data"]
                
                if age < cached_entry["stale_ttl"]:
                    cache_mgr.set(key, cached_entry["data"])
                    
                    def background_refresh():
                        try:
                            fresh_data = func(*args, **kwargs)
                            cache_mgr.set(key, fresh_data)
                        except Exception as exc:
                            print(f"Background refresh failed for {key}: {exc}")
                    
                    threading.Thread(target=background_refresh, daemon=True).start()
                    return cached_entry["data"]

            result = func(*args, **kwargs)
            cache_mgr.set(key, result)
            return result

        return wrapper
    return decorator

def query_custom_actions(auth: GenesysAuth, query_body: Dict[str, Any]) -> Dict[str, Any]:
    endpoint = f"{auth.base_url}/api/v2/customactions/actions/query"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    all_entities: List[Dict[str, Any]] = []
    page_token = None
    max_retries = 5

    while True:
        payload = {**query_body}
        if page_token:
            payload["pageToken"] = page_token

        for attempt in range(max_retries):
            response = requests.post(endpoint, headers=headers, json=payload)
            
            if response.status_code == 200:
                data = response.json()
                all_entities.extend(data.get("entities", []))
                page_token = data.get("pagination", {}).get("nextPageToken")
                break
            elif response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
                print(f"Rate limited (429). Retrying in {retry_after}s...")
                time.sleep(retry_after)
                continue
            elif response.status_code >= 500:
                backoff = 2 ** attempt + random.uniform(0, 1)
                print(f"Server error ({response.status_code}). Retrying in {backoff}s...")
                time.sleep(backoff)
                continue
            else:
                response.raise_for_status()

        if not page_token:
            break

    return {"entities": all_entities, "total_count": len(all_entities)}

# Apply decorator
@redis_cached_custom_action(freshness_ttl=300, stale_ttl=900)
def get_order_lookup_actions(auth: GenesysAuth, **kwargs) -> Dict[str, Any]:
    query_body = {
        "query": {
            "filter": {
                "name": {
                    "contains": "order_lookup"
                }
            },
            "sort": [{"attribute": "name", "type": "string", "order": "asc"}]
        }
    }
    return query_custom_actions(auth, query_body)

if __name__ == "__main__":
    ENV = "acme"
    CLIENT_ID = "YOUR_CLIENT_ID"
    CLIENT_SECRET = "YOUR_CLIENT_SECRET"
    
    auth = GenesysAuth(env=ENV, client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
    
    print("First call (cache miss):")
    start = time.time()
    result1 = get_order_lookup_actions(auth)
    print(f"Returned {result1['total_count']} entities in {time.time() - start:.3f}s")
    
    print("\nSecond call (cache hit):")
    start = time.time()
    result2 = get_order_lookup_actions(auth)
    print(f"Returned {result2['total_count']} entities in {time.time() - start:.3f}s")

The script applies the decorator to get_order_lookup_actions. The first invocation triggers the API call, handles pagination, and stores the result in Redis. The second invocation returns immediately from cache. Background refresh activates automatically when the freshness window expires but the stale window remains valid.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired access token, missing customactions:read scope, or incorrect client credentials.
  • Fix: Verify the OAuth client has the exact scope string enabled. Ensure the token manager refreshes before expiration. Check that client_id and client_secret match the confidential client configuration.
  • Code fix: Add explicit scope validation in the token request body and log the raw token response for debugging.

Error: 403 Forbidden

  • Cause: The OAuth client lacks organizational permissions for custom actions, or the environment URL is incorrect.
  • Fix: Assign the client to a user or group with Custom Actions read permissions in the Genesys Cloud admin console. Verify the environment subdomain matches the target tenant.
  • Code fix: Catch 403 explicitly and raise a descriptive exception with the client ID and scope for audit logging.

Error: 429 Too Many Requests

  • Cause: Exceeded Genesys Cloud rate limits for the custom actions query endpoint.
  • Fix: The exponential backoff loop handles this automatically. Adjust initial backoff values if your workload spikes. Implement request coalescing if multiple threads trigger identical queries simultaneously.
  • Code fix: The retry loop respects Retry-After headers. If the header is missing, it falls back to 2 ** attempt + jitter. Increase max_retries for high-throughput environments.

Error: Redis Connection or Serialization Failure

  • Cause: Redis instance unreachable, authentication mismatch, or non-JSON-serializable objects in the query payload.
  • Fix: Test Redis connectivity with redis-cli ping. Ensure decode_responses=True matches your serialization strategy. Convert datetime or Decimal objects to strings before hashing.
  • Code fix: Add connection health checks before decorator initialization. Wrap json.dumps in a try-except block with a fallback serializer.

Official References