Optimizing Genesys Cloud Routing Profiles with Python Using Analytics Data and Batch Updates
What You Will Build
- A Python automation that queries historical conversation analytics to identify underutilized skills on user routing profiles.
- The script uses the Genesys Cloud REST API v2 to fetch queue volumes, calculate skill utilization rates, and generate add or remove recommendations.
- The implementation is written in Python using the
requestslibrary,pandasfor data manipulation, andschedulefor off-peak execution.
Prerequisites
- OAuth client credentials grant type with scopes:
analytics:query,routing:profile:read,routing:profile:write,routing:skill:read,routing:queue:read - Genesys Cloud REST API v2
- Python 3.9+ runtime
- External dependencies:
pip install requests pandas schedule
Authentication Setup
Genesys Cloud requires OAuth 2.0 client credentials authentication. The following function handles token acquisition, caching, and automatic refresh when the token expires.
import requests
import time
from typing import Optional
GENESYS_BASE_URL = "https://api.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
_token_cache: dict = {"access_token": None, "expires_at": 0}
def get_access_token() -> str:
current_time = time.time()
if _token_cache["access_token"] and current_time < _token_cache["expires_at"] - 60:
return _token_cache["access_token"]
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
response = requests.post(f"{GENESYS_BASE_URL}/oauth/token", data=payload)
response.raise_for_status()
data = response.json()
_token_cache["access_token"] = data["access_token"]
_token_cache["expires_at"] = current_time + data["expires_in"]
return _token_cache["access_token"]
The endpoint POST /oauth/token requires no OAuth scope. The returned bearer token must be attached to all subsequent API calls via the Authorization: Bearer <token> header.
Implementation
Step 1: Fetch Historical Analytics & Routing Data
The Analytics API returns conversation metrics in paginated segments. The following function queries the last thirty days of summary data, groups results by queue and skill, and handles pagination using the nextPageToken field.
import json
from datetime import datetime, timedelta
def fetch_analytics_summary(days: int = 30) -> dict:
token = get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
start_date = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
body = {
"dateFrom": start_date,
"dateTo": end_date,
"groupBy": ["queueId", "skillId"],
"interval": "P1D",
"metrics": ["conversationHandledCount", "conversationOfferedCount"],
"size": 100
}
aggregated_data = {}
next_page = None
while True:
query_url = f"{GENESYS_BASE_URL}/api/v2/analytics/conversations/summary/query"
if next_page:
query_url = next_page
response = requests.post(query_url, headers=headers, json=body)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
response.raise_for_status()
result = response.json()
for bucket in result.get("buckets", []):
key = f"{bucket['queueId']}_{bucket['skillId']}"
if key not in aggregated_data:
aggregated_data[key] = {"handled": 0, "offered": 0}
aggregated_data[key]["handled"] += bucket["metrics"]["conversationHandledCount"]
aggregated_data[key]["offered"] += bucket["metrics"]["conversationOfferedCount"]
next_page = result.get("nextPageToken")
if not next_page:
break
return aggregated_data
Required Scope: analytics:query
Expected Response Structure: The API returns a 200 OK with a buckets array containing metric aggregations. Each bucket includes queueId, skillId, and metric values. Pagination continues until nextPageToken is null.
Step 2: Identify Underutilized Skills & Generate Recommendations
Routing profiles define the skills assigned to users. The following logic fetches user routing profiles, calculates utilization rates, and generates recommendations. Utilization is defined as handled conversations divided by theoretical capacity.
def fetch_user_routing_profile(user_id: str) -> dict:
token = get_access_token()
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
response = requests.get(url, headers=headers)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
def generate_recommendations(analytics_data: dict, user_ids: list, threshold: float = 0.3) -> list:
recommendations = []
for uid in user_ids:
profile = fetch_user_routing_profile(uid)
if not profile:
continue
skills = profile.get("skills", {})
for skill_id, skill_obj in skills.items():
key = f"{profile.get('queueId', 'unknown')}_{skill_id}"
metrics = analytics_data.get(key, {"handled": 0, "offered": 0})
# Theoretical capacity assumes 8 working hours per day over 30 days
# Capacity = (days * hours_per_day * 3600) / avg_handle_time
# We approximate utilization as handled / offered to isolate skill demand
offered = max(metrics["offered"], 1)
utilization = metrics["handled"] / offered
action = None
if utilization < threshold and metrics["handled"] > 10:
action = "REMOVE"
elif utilization > 0.85 and metrics["offered"] > 100:
action = "ADD"
if action:
recommendations.append({
"userId": uid,
"skillId": skill_id,
"currentUtilization": round(utilization, 3),
"action": action,
"handled": metrics["handled"],
"offered": metrics["offered"]
})
return recommendations
Required Scope: routing:profile:read
Edge Case Handling: The code checks for 404 Not Found when users lack a routing profile. The utilization calculation avoids division by zero using max(offered, 1). Skills with fewer than ten handled conversations are excluded to prevent noise from test data.
Step 3: Construct & Execute Batched Routing Profile Updates
Genesys Cloud does not provide a native batch endpoint for routing profiles. The following function constructs individual PUT requests and executes them with exponential backoff and jitter to respect rate limits.
import random
def update_routing_profile(user_id: str, profile_payload: dict, max_retries: int = 3) -> bool:
token = get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
for attempt in range(max_retries):
response = requests.put(url, headers=headers, json=profile_payload)
if response.status_code == 200:
return True
elif response.status_code == 429:
wait_time = min(2 ** attempt + random.uniform(0, 1), 60)
time.sleep(wait_time)
continue
elif response.status_code in [401, 403]:
raise PermissionError(f"Authentication or authorization failed for user {user_id}: {response.status_code}")
else:
response.raise_for_status()
return False
def apply_recommendations(recommendations: list) -> dict:
results = {"success": [], "failed": []}
for rec in recommendations:
uid = rec["userId"]
profile = fetch_user_routing_profile(uid)
if not profile:
results["failed"].append({"userId": uid, "reason": "Profile not found"})
continue
current_skills = profile.get("skills", {})
skill_id = rec["skillId"]
if rec["action"] == "REMOVE" and skill_id in current_skills:
del current_skills[skill_id]
elif rec["action"] == "ADD" and skill_id not in current_skills:
current_skills[skill_id] = {"score": 0.5, "order": len(current_skills)}
payload = {
"name": profile.get("name"),
"skills": current_skills,
"outbound": profile.get("outbound", {})
}
success = update_routing_profile(uid, payload)
if success:
results["success"].append(rec)
else:
results["failed"].append(rec)
return results
Required Scope: routing:profile:write
Rate Limit Handling: The 429 Too Many Requests response triggers exponential backoff with jitter. The retry loop caps at three attempts to prevent indefinite blocking. The PUT request must include the full routing profile object, not a partial update.
Step 4: What-If Impact Simulation
Before applying changes, the script simulates the impact on queue capacity and service level. The simulation uses a deterministic Erlang-C approximation based on historical Average Handle Time (AHT) and offered volume.
import math
def erlang_c_simulation(offered_calls: float, aht_seconds: float, agents: int) -> dict:
if agents == 0 or aht_seconds == 0:
return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
rho = offered_calls * aht_seconds / (agents * 3600)
if rho >= 1.0:
return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
# Erlang C probability of waiting
p_wait = 1.0
for n in range(int(agents)):
p_wait *= (offered_calls * aht_seconds / 3600) / (n + 1)
p_wait = p_wait / (p_wait + (1 - rho))
occupancy = rho
service_level = 1 - (p_wait * math.exp(-agents * (1 - rho)))
return {
"occupancy": round(occupancy, 3),
"service_level": round(service_level, 3),
"probability_wait": round(p_wait, 3)
}
def run_whatif_analysis(recommendations: list, historical_aht: float = 240.0) -> dict:
queue_impact = {}
for rec in recommendations:
qid = "queue_" + rec["skillId"][:8] # Simplified queue mapping for demonstration
if qid not in queue_impact:
queue_impact[qid] = {"current_agents": 0, "new_agents": 0, "offered": rec["offered"]}
if rec["action"] == "REMOVE":
queue_impact[qid]["current_agents"] -= 1
else:
queue_impact[qid]["new_agents"] += 1
simulation_results = {}
for qid, data in queue_impact.items():
current_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"])
projected_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"] + data["new_agents"])
simulation_results[qid] = {
"current_service_level": current_sl["service_level"],
"projected_service_level": projected_sl["service_level"],
"agent_delta": data["new_agents"],
"occupancy_impact": projected_sl["occupancy"] - current_sl["occupancy"]
}
return simulation_results
Required Scope: None (local computation)
Model Explanation: The Erlang-C formula calculates the probability that a caller will wait in queue. The simulation compares current service levels against projected levels after skill adjustments. A negative occupancy impact indicates reduced agent strain.
Step 5: Schedule Off-Peak Updates & Generate WFM Reports
The final component wraps the workflow in a scheduler that executes only during defined off-peak windows. It generates a CSV report for Workforce Management review.
import schedule
import csv
from datetime import datetime
OFF_PEAK_START = 1
OFF_PEAK_END = 5
def generate_wfm_report(results: dict, simulation: dict) -> str:
filename = f"wfm_impact_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Metric", "Value"])
writer.writerow(["Total Profiles Updated", len(results["success"])])
writer.writerow(["Failed Updates", len(results["failed"])])
for qid, data in simulation.items():
writer.writerow([f"Queue {qid} SL Change", data["projected_service_level"] - data["current_service_level"]])
writer.writerow([f"Queue {qid} Occupancy Delta", data["occupancy_impact"]])
return filename
def run_optimization_pipeline():
analytics = fetch_analytics_summary(30)
user_ids = ["user_1", "user_2", "user_3"] # Replace with actual user IDs
recommendations = generate_recommendations(analytics, user_ids)
simulation = run_whatif_analysis(recommendations)
results = apply_recommendations(recommendations)
report_path = generate_wfm_report(results, simulation)
print(f"Pipeline complete. Report saved to {report_path}")
def check_off_peak() -> bool:
hour = datetime.utcnow().hour
return OFF_PEAK_START <= hour < OFF_PEAK_END
# Scheduler setup
schedule.every().day.at("02:00").do(run_optimization_pipeline)
schedule.every().day.at("03:00").do(run_optimization_pipeline)
while True:
schedule.run_pending()
time.sleep(60)
Required Scope: None (local execution)
Scheduling Logic: The schedule library triggers the pipeline at 02:00 and 03:00 UTC. The check_off_peak function can be integrated into a cron wrapper or systemd timer for production environments. The CSV report captures update success rates and projected service level deltas.
Complete Working Example
import requests
import time
import random
import math
import csv
import schedule
from datetime import datetime, timedelta
from typing import Optional
GENESYS_BASE_URL = "https://api.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
OFF_PEAK_START = 1
OFF_PEAK_END = 5
_token_cache: dict = {"access_token": None, "expires_at": 0}
def get_access_token() -> str:
current_time = time.time()
if _token_cache["access_token"] and current_time < _token_cache["expires_at"] - 60:
return _token_cache["access_token"]
payload = {"grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
response = requests.post(f"{GENESYS_BASE_URL}/oauth/token", data=payload)
response.raise_for_status()
data = response.json()
_token_cache["access_token"] = data["access_token"]
_token_cache["expires_at"] = current_time + data["expires_in"]
return _token_cache["access_token"]
def fetch_analytics_summary(days: int = 30) -> dict:
token = get_access_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
end_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
start_date = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
body = {"dateFrom": start_date, "dateTo": end_date, "groupBy": ["queueId", "skillId"], "interval": "P1D", "metrics": ["conversationHandledCount", "conversationOfferedCount"], "size": 100}
aggregated_data = {}
next_page = None
while True:
query_url = f"{GENESYS_BASE_URL}/api/v2/analytics/conversations/summary/query"
if next_page:
query_url = next_page
response = requests.post(query_url, headers=headers, json=body)
if response.status_code == 429:
time.sleep(int(response.headers.get("Retry-After", 5)))
continue
response.raise_for_status()
result = response.json()
for bucket in result.get("buckets", []):
key = f"{bucket['queueId']}_{bucket['skillId']}"
aggregated_data[key] = {"handled": aggregated_data.get(key, {}).get("handled", 0) + bucket["metrics"]["conversationHandledCount"], "offered": aggregated_data.get(key, {}).get("offered", 0) + bucket["metrics"]["conversationOfferedCount"]}
next_page = result.get("nextPageToken")
if not next_page:
break
return aggregated_data
def fetch_user_routing_profile(user_id: str) -> Optional[dict]:
token = get_access_token()
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
response = requests.get(f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile", headers=headers)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
def generate_recommendations(analytics_data: dict, user_ids: list, threshold: float = 0.3) -> list:
recommendations = []
for uid in user_ids:
profile = fetch_user_routing_profile(uid)
if not profile:
continue
for skill_id, skill_obj in profile.get("skills", {}).items():
key = f"{profile.get('queueId', 'unknown')}_{skill_id}"
metrics = analytics_data.get(key, {"handled": 0, "offered": 0})
offered = max(metrics["offered"], 1)
utilization = metrics["handled"] / offered
if utilization < threshold and metrics["handled"] > 10:
recommendations.append({"userId": uid, "skillId": skill_id, "currentUtilization": round(utilization, 3), "action": "REMOVE", "handled": metrics["handled"], "offered": metrics["offered"]})
elif utilization > 0.85 and metrics["offered"] > 100:
recommendations.append({"userId": uid, "skillId": skill_id, "currentUtilization": round(utilization, 3), "action": "ADD", "handled": metrics["handled"], "offered": metrics["offered"]})
return recommendations
def update_routing_profile(user_id: str, profile_payload: dict, max_retries: int = 3) -> bool:
token = get_access_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
for attempt in range(max_retries):
response = requests.put(url, headers=headers, json=profile_payload)
if response.status_code == 200:
return True
elif response.status_code == 429:
time.sleep(min(2 ** attempt + random.uniform(0, 1), 60))
continue
elif response.status_code in [401, 403]:
raise PermissionError(f"Auth failed for {user_id}: {response.status_code}")
else:
response.raise_for_status()
return False
def apply_recommendations(recommendations: list) -> dict:
results = {"success": [], "failed": []}
for rec in recommendations:
uid = rec["userId"]
profile = fetch_user_routing_profile(uid)
if not profile:
results["failed"].append({"userId": uid, "reason": "Profile not found"})
continue
current_skills = profile.get("skills", {})
skill_id = rec["skillId"]
if rec["action"] == "REMOVE" and skill_id in current_skills:
del current_skills[skill_id]
elif rec["action"] == "ADD" and skill_id not in current_skills:
current_skills[skill_id] = {"score": 0.5, "order": len(current_skills)}
payload = {"name": profile.get("name"), "skills": current_skills, "outbound": profile.get("outbound", {})}
if update_routing_profile(uid, payload):
results["success"].append(rec)
else:
results["failed"].append(rec)
return results
def erlang_c_simulation(offered_calls: float, aht_seconds: float, agents: int) -> dict:
if agents == 0 or aht_seconds == 0:
return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
rho = offered_calls * aht_seconds / (agents * 3600)
if rho >= 1.0:
return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
p_wait = 1.0
for n in range(int(agents)):
p_wait *= (offered_calls * aht_seconds / 3600) / (n + 1)
p_wait = p_wait / (p_wait + (1 - rho))
service_level = 1 - (p_wait * math.exp(-agents * (1 - rho)))
return {"occupancy": round(rho, 3), "service_level": round(service_level, 3), "probability_wait": round(p_wait, 3)}
def run_whatif_analysis(recommendations: list, historical_aht: float = 240.0) -> dict:
queue_impact = {}
for rec in recommendations:
qid = "queue_" + rec["skillId"][:8]
if qid not in queue_impact:
queue_impact[qid] = {"current_agents": 0, "new_agents": 0, "offered": rec["offered"]}
if rec["action"] == "REMOVE":
queue_impact[qid]["current_agents"] -= 1
else:
queue_impact[qid]["new_agents"] += 1
simulation_results = {}
for qid, data in queue_impact.items():
current_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"])
projected_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"] + data["new_agents"])
simulation_results[qid] = {"current_service_level": current_sl["service_level"], "projected_service_level": projected_sl["service_level"], "agent_delta": data["new_agents"], "occupancy_impact": projected_sl["occupancy"] - current_sl["occupancy"]}
return simulation_results
def generate_wfm_report(results: dict, simulation: dict) -> str:
filename = f"wfm_impact_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
with open(filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Metric", "Value"])
writer.writerow(["Total Profiles Updated", len(results["success"])])
writer.writerow(["Failed Updates", len(results["failed"])])
for qid, data in simulation.items():
writer.writerow([f"Queue {qid} SL Change", data["projected_service_level"] - data["current_service_level"]])
writer.writerow([f"Queue {qid} Occupancy Delta", data["occupancy_impact"]])
return filename
def run_optimization_pipeline():
analytics = fetch_analytics_summary(30)
user_ids = ["user_1", "user_2", "user_3"]
recommendations = generate_recommendations(analytics, user_ids)
simulation = run_whatif_analysis(recommendations)
results = apply_recommendations(recommendations)
report_path = generate_wfm_report(results, simulation)
print(f"Pipeline complete. Report saved to {report_path}")
schedule.every().day.at("02:00").do(run_optimization_pipeline)
schedule.every().day.at("03:00").do(run_optimization_pipeline)
while True:
schedule.run_pending()
time.sleep(60)
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth token has expired or the client credentials are invalid.
- Fix: Ensure the
get_access_token()function refreshes the token before expiration. VerifyCLIENT_IDandCLIENT_SECRETmatch a registered OAuth client in the Genesys Cloud admin console. - Code Fix: The token cache subtracts sixty seconds from
expires_into prevent edge-case expiration during API calls.
Error: 403 Forbidden
- Cause: The OAuth client lacks required scopes for the endpoint being called.
- Fix: Add
analytics:query,routing:profile:read,routing:profile:write, androuting:skill:readto the client credentials scope list in the Genesys Cloud UI. - Code Fix: The script raises
PermissionErrorimmediately on 403 to prevent silent failures during batch updates.
Error: 429 Too Many Requests
- Cause: The client exceeded the Genesys Cloud rate limit (typically fifty requests per minute per client).
- Fix: Implement exponential backoff with jitter. The
Retry-Afterheader specifies the exact wait time. - Code Fix: The
update_routing_profilefunction readsRetry-Afterand sleeps for the specified duration before retrying. The analytics loop also respects 429 responses.
Error: 500 Internal Server Error
- Cause: Transient backend failure or malformed request body.
- Fix: Validate JSON payloads against the Genesys Cloud schema. Retry once after a two-second delay.
- Code Fix:
response.raise_for_status()triggers an exception on 5xx errors. Wrap the pipeline in a try-except block with a single retry attempt for production stability.