Caching Expensive Genesys Cloud Custom Action Queries with a Python Redis Decorator
What You Will Build
- A Python decorator that caches Genesys Cloud Custom Action query results in Redis, deterministically hashes input parameters, validates cache freshness against configurable TTLs, and triggers direct API execution with exponential backoff on cache misses or stale-while-revalidate conditions.
- The implementation uses the Genesys Cloud
POST /api/v2/customactions/actions/queryendpoint with rawrequestsHTTP calls for full control over retry logic and header inspection. - The tutorial covers Python 3.9+ using
redis,requests,hashlib, and standard library modules.
Prerequisites
- OAuth confidential client with
customactions:readscope enabled in the Genesys Cloud admin console - Genesys Cloud environment URL (e.g.,
https://acme.mygen.com) - Python 3.9 or higher with
requests,redis, andpydanticinstalled (pip install requests redis pydantic) - Running Redis instance accessible via
redis://localhost:6379or equivalent
Authentication Setup
Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server API access. You must cache the access token and handle expiration before making cached queries. The token endpoint requires your client ID, client secret, and the exact scope string.
import requests
import time
from typing import Optional
class GenesysAuth:
def __init__(self, env: str, client_id: str, client_secret: str, scope: str = "customactions:read"):
self.base_url = f"https://{env}.mygen.com"
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token: Optional[str] = None
self.expires_at: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.expires_at - 60:
return self.token
token_url = f"{self.base_url}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
body = {
"grant_type": "client_credentials",
"scope": self.scope,
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(token_url, headers=headers, data=body)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = time.time() + data["expires_in"]
return self.token
The token manager checks expiration before each call and subtracts a sixty-second buffer to prevent mid-request 401 errors. The scope parameter must exactly match the API permission. The customactions:read scope grants access to query and retrieve custom action definitions.
Implementation
Step 1: Redis Connection and Parameter Hashing
Deterministic cache keys prevent collisions when query parameters change order or contain nested structures. You will use hashlib.sha256 on a JSON-serialized representation of the function name and all keyword arguments. Redis stores the payload alongside metadata timestamps.
import redis
import hashlib
import json
import time
from typing import Any, Dict
class RedisCacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 300, stale_ttl: int = 900):
self.pool = redis.ConnectionPool.from_url(redis_url, decode_responses=True)
self.client = redis.Redis(connection_pool=self.pool)
self.freshness_ttl = ttl
self.stale_ttl = stale_ttl
def _build_key(self, func_name: str, **kwargs) -> str:
payload = {"func": func_name, "params": kwargs}
serialized = json.dumps(payload, sort_keys=True, default=str)
hash_hex = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
return f"genesys:customaction:query:{hash_hex}"
def get(self, key: str) -> Dict[str, Any]:
raw = self.client.get(key)
if not raw:
return {}
return json.loads(raw)
def set(self, key: str, data: Any) -> None:
entry = {
"data": data,
"cached_at": time.time(),
"freshness_ttl": self.freshness_ttl,
"stale_ttl": self.stale_ttl
}
self.client.setex(key, self.stale_ttl, json.dumps(entry))
The _build_key method sorts keys and converts non-serializable types to strings to guarantee identical hashes for identical logical queries. The set method writes the payload with a TTL equal to stale_ttl. Redis automatically evicts the key when the stale window expires, preventing unbounded memory growth.
Step 2: Freshness Validation and Stale-While-Revalidate Fallback
Cache validity depends on the difference between cached_at and the current timestamp. You will return fresh data immediately. When the freshness window expires but the stale window remains open, the decorator returns the cached payload while triggering a background API refresh. This pattern mimics HTTP stale-while-revalidate behavior without requiring framework-level middleware.
from functools import wraps
import threading
from typing import Callable, Any, Dict
def redis_cached_custom_action(redis_url: str = "redis://localhost:6379",
freshness_ttl: int = 300,
stale_ttl: int = 900):
cache_mgr = RedisCacheManager(redis_url=redis_url, ttl=freshness_ttl, stale_ttl=stale_ttl)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
key = cache_mgr._build_key(func.__name__, **kwargs)
cached_entry = cache_mgr.get(key)
if cached_entry:
age = time.time() - cached_entry["cached_at"]
if age < cached_entry["freshness_ttl"]:
return cached_entry["data"]
if age < cached_entry["stale_ttl"]:
# Stale-while-revalidate condition
cached_entry["swr_refresh"] = True
cache_mgr.set(key, cached_entry["data"])
def background_refresh():
try:
fresh_data = func(*args, **kwargs)
cache_mgr.set(key, fresh_data)
except Exception as exc:
print(f"Background refresh failed for {key}: {exc}")
threading.Thread(target=background_refresh, daemon=True).start()
return cached_entry["data"]
# Cache miss or fully expired
result = func(*args, **kwargs)
cache_mgr.set(key, result)
return result
return wrapper
return decorator
The decorator evaluates cache age against two thresholds. Fresh data returns immediately. Stale data returns the cached payload while spawning a daemon thread to fetch fresh results. This prevents blocking the caller when data is only slightly outdated. The swr_refresh flag allows downstream systems to detect that a background update is in progress.
Step 3: Direct API Execution with Exponential Backoff
When the cache misses or the stale window expires, the decorator falls back to the raw Genesys Cloud API. You will implement exponential backoff with jitter to handle 429 Too Many Requests and transient 5xx errors. The query endpoint supports pagination via nextPageToken, which you will handle in a loop.
import requests
import time
import random
from typing import Any, Dict, List
def query_custom_actions(auth: GenesysAuth, query_body: Dict[str, Any]) -> Dict[str, Any]:
endpoint = f"{auth.base_url}/api/v2/customactions/actions/query"
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
all_entities: List[Dict[str, Any]] = []
page_token = None
max_retries = 5
while True:
payload = {**query_body}
if page_token:
payload["pageToken"] = page_token
for attempt in range(max_retries):
response = requests.post(endpoint, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
all_entities.extend(data.get("entities", []))
page_token = data.get("pagination", {}).get("nextPageToken")
break
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
print(f"Rate limited (429). Retrying in {retry_after}s...")
time.sleep(retry_after)
continue
elif response.status_code >= 500:
backoff = 2 ** attempt + random.uniform(0, 1)
print(f"Server error ({response.status_code}). Retrying in {backoff}s...")
time.sleep(backoff)
continue
else:
response.raise_for_status()
if not page_token:
break
return {"entities": all_entities, "total_count": len(all_entities)}
The request cycle follows this exact structure:
- Method:
POST - Path:
/api/v2/customactions/actions/query - Headers:
Authorization: Bearer <token>,Content-Type: application/json,Accept: application/json - Request Body:
{
"query": {
"filter": {
"name": {
"contains": "order_lookup"
}
},
"sort": [{"attribute": "name", "type": "string", "order": "asc"}]
}
}
- Response Body:
{
"entities": [
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "order_lookup",
"description": "Fetches order details by ID",
"type": "custom",
"status": "active"
}
],
"pagination": {
"total": 1,
"count": 1,
"pageSize": 25,
"pageToken": null,
"nextPageToken": null
}
}
The retry loop respects Retry-After headers when present. Fallback to exponential backoff with random jitter prevents thundering herd scenarios across multiple concurrent workers. Pagination continues until nextPageToken returns null.
Complete Working Example
The following script combines authentication, caching, and query execution into a single runnable module. Replace the placeholder credentials before execution.
import requests
import redis
import hashlib
import json
import time
import random
import threading
from functools import wraps
from typing import Any, Dict, List, Callable, Optional
class GenesysAuth:
def __init__(self, env: str, client_id: str, client_secret: str, scope: str = "customactions:read"):
self.base_url = f"https://{env}.mygen.com"
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token: Optional[str] = None
self.expires_at: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.expires_at - 60:
return self.token
token_url = f"{self.base_url}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
body = {
"grant_type": "client_credentials",
"scope": self.scope,
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(token_url, headers=headers, data=body)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = time.time() + data["expires_in"]
return self.token
class RedisCacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 300, stale_ttl: int = 900):
self.pool = redis.ConnectionPool.from_url(redis_url, decode_responses=True)
self.client = redis.Redis(connection_pool=self.pool)
self.freshness_ttl = ttl
self.stale_ttl = stale_ttl
def _build_key(self, func_name: str, **kwargs) -> str:
payload = {"func": func_name, "params": kwargs}
serialized = json.dumps(payload, sort_keys=True, default=str)
hash_hex = hashlib.sha256(serialized.encode("utf-8")).hexdigest()
return f"genesys:customaction:query:{hash_hex}"
def get(self, key: str) -> Dict[str, Any]:
raw = self.client.get(key)
if not raw:
return {}
return json.loads(raw)
def set(self, key: str, data: Any) -> None:
entry = {
"data": data,
"cached_at": time.time(),
"freshness_ttl": self.freshness_ttl,
"stale_ttl": self.stale_ttl
}
self.client.setex(key, self.stale_ttl, json.dumps(entry))
def redis_cached_custom_action(redis_url: str = "redis://localhost:6379",
freshness_ttl: int = 300,
stale_ttl: int = 900):
cache_mgr = RedisCacheManager(redis_url=redis_url, ttl=freshness_ttl, stale_ttl=stale_ttl)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
key = cache_mgr._build_key(func.__name__, **kwargs)
cached_entry = cache_mgr.get(key)
if cached_entry:
age = time.time() - cached_entry["cached_at"]
if age < cached_entry["freshness_ttl"]:
return cached_entry["data"]
if age < cached_entry["stale_ttl"]:
cache_mgr.set(key, cached_entry["data"])
def background_refresh():
try:
fresh_data = func(*args, **kwargs)
cache_mgr.set(key, fresh_data)
except Exception as exc:
print(f"Background refresh failed for {key}: {exc}")
threading.Thread(target=background_refresh, daemon=True).start()
return cached_entry["data"]
result = func(*args, **kwargs)
cache_mgr.set(key, result)
return result
return wrapper
return decorator
def query_custom_actions(auth: GenesysAuth, query_body: Dict[str, Any]) -> Dict[str, Any]:
endpoint = f"{auth.base_url}/api/v2/customactions/actions/query"
headers = {
"Authorization": f"Bearer {auth.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json"
}
all_entities: List[Dict[str, Any]] = []
page_token = None
max_retries = 5
while True:
payload = {**query_body}
if page_token:
payload["pageToken"] = page_token
for attempt in range(max_retries):
response = requests.post(endpoint, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
all_entities.extend(data.get("entities", []))
page_token = data.get("pagination", {}).get("nextPageToken")
break
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt + random.uniform(0, 1)))
print(f"Rate limited (429). Retrying in {retry_after}s...")
time.sleep(retry_after)
continue
elif response.status_code >= 500:
backoff = 2 ** attempt + random.uniform(0, 1)
print(f"Server error ({response.status_code}). Retrying in {backoff}s...")
time.sleep(backoff)
continue
else:
response.raise_for_status()
if not page_token:
break
return {"entities": all_entities, "total_count": len(all_entities)}
# Apply decorator
@redis_cached_custom_action(freshness_ttl=300, stale_ttl=900)
def get_order_lookup_actions(auth: GenesysAuth, **kwargs) -> Dict[str, Any]:
query_body = {
"query": {
"filter": {
"name": {
"contains": "order_lookup"
}
},
"sort": [{"attribute": "name", "type": "string", "order": "asc"}]
}
}
return query_custom_actions(auth, query_body)
if __name__ == "__main__":
ENV = "acme"
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
auth = GenesysAuth(env=ENV, client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
print("First call (cache miss):")
start = time.time()
result1 = get_order_lookup_actions(auth)
print(f"Returned {result1['total_count']} entities in {time.time() - start:.3f}s")
print("\nSecond call (cache hit):")
start = time.time()
result2 = get_order_lookup_actions(auth)
print(f"Returned {result2['total_count']} entities in {time.time() - start:.3f}s")
The script applies the decorator to get_order_lookup_actions. The first invocation triggers the API call, handles pagination, and stores the result in Redis. The second invocation returns immediately from cache. Background refresh activates automatically when the freshness window expires but the stale window remains valid.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: Expired access token, missing
customactions:readscope, or incorrect client credentials. - Fix: Verify the OAuth client has the exact scope string enabled. Ensure the token manager refreshes before expiration. Check that
client_idandclient_secretmatch the confidential client configuration. - Code fix: Add explicit scope validation in the token request body and log the raw token response for debugging.
Error: 403 Forbidden
- Cause: The OAuth client lacks organizational permissions for custom actions, or the environment URL is incorrect.
- Fix: Assign the client to a user or group with
Custom Actionsread permissions in the Genesys Cloud admin console. Verify the environment subdomain matches the target tenant. - Code fix: Catch
403explicitly and raise a descriptive exception with the client ID and scope for audit logging.
Error: 429 Too Many Requests
- Cause: Exceeded Genesys Cloud rate limits for the custom actions query endpoint.
- Fix: The exponential backoff loop handles this automatically. Adjust initial backoff values if your workload spikes. Implement request coalescing if multiple threads trigger identical queries simultaneously.
- Code fix: The retry loop respects
Retry-Afterheaders. If the header is missing, it falls back to2 ** attempt + jitter. Increasemax_retriesfor high-throughput environments.
Error: Redis Connection or Serialization Failure
- Cause: Redis instance unreachable, authentication mismatch, or non-JSON-serializable objects in the query payload.
- Fix: Test Redis connectivity with
redis-cli ping. Ensuredecode_responses=Truematches your serialization strategy. ConvertdatetimeorDecimalobjects to strings before hashing. - Code fix: Add connection health checks before decorator initialization. Wrap
json.dumpsin a try-except block with a fallback serializer.