Optimizing Genesys Cloud Routing Profiles with Python Using Analytics Data and Batch Updates

Optimizing Genesys Cloud Routing Profiles with Python Using Analytics Data and Batch Updates

What You Will Build

  • A Python automation that queries historical conversation analytics to identify underutilized skills on user routing profiles.
  • The script uses the Genesys Cloud REST API v2 to fetch queue volumes, calculate skill utilization rates, and generate add or remove recommendations.
  • The implementation is written in Python using the requests library, pandas for data manipulation, and schedule for off-peak execution.

Prerequisites

  • OAuth client credentials grant type with scopes: analytics:query, routing:profile:read, routing:profile:write, routing:skill:read, routing:queue:read
  • Genesys Cloud REST API v2
  • Python 3.9+ runtime
  • External dependencies: pip install requests pandas schedule

Authentication Setup

Genesys Cloud requires OAuth 2.0 client credentials authentication. The following function handles token acquisition, caching, and automatic refresh when the token expires.

import requests
import time
from typing import Optional

GENESYS_BASE_URL = "https://api.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"

_token_cache: dict = {"access_token": None, "expires_at": 0}

def get_access_token() -> str:
    current_time = time.time()
    if _token_cache["access_token"] and current_time < _token_cache["expires_at"] - 60:
        return _token_cache["access_token"]

    payload = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(f"{GENESYS_BASE_URL}/oauth/token", data=payload)
    response.raise_for_status()
    
    data = response.json()
    _token_cache["access_token"] = data["access_token"]
    _token_cache["expires_at"] = current_time + data["expires_in"]
    return _token_cache["access_token"]

The endpoint POST /oauth/token requires no OAuth scope. The returned bearer token must be attached to all subsequent API calls via the Authorization: Bearer <token> header.

Implementation

Step 1: Fetch Historical Analytics & Routing Data

The Analytics API returns conversation metrics in paginated segments. The following function queries the last thirty days of summary data, groups results by queue and skill, and handles pagination using the nextPageToken field.

import json
from datetime import datetime, timedelta

def fetch_analytics_summary(days: int = 30) -> dict:
    token = get_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    end_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
    start_date = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    
    body = {
        "dateFrom": start_date,
        "dateTo": end_date,
        "groupBy": ["queueId", "skillId"],
        "interval": "P1D",
        "metrics": ["conversationHandledCount", "conversationOfferedCount"],
        "size": 100
    }
    
    aggregated_data = {}
    next_page = None
    
    while True:
        query_url = f"{GENESYS_BASE_URL}/api/v2/analytics/conversations/summary/query"
        if next_page:
            query_url = next_page
            
        response = requests.post(query_url, headers=headers, json=body)
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            time.sleep(retry_after)
            continue
        response.raise_for_status()
        
        result = response.json()
        for bucket in result.get("buckets", []):
            key = f"{bucket['queueId']}_{bucket['skillId']}"
            if key not in aggregated_data:
                aggregated_data[key] = {"handled": 0, "offered": 0}
            aggregated_data[key]["handled"] += bucket["metrics"]["conversationHandledCount"]
            aggregated_data[key]["offered"] += bucket["metrics"]["conversationOfferedCount"]
            
        next_page = result.get("nextPageToken")
        if not next_page:
            break
            
    return aggregated_data

Required Scope: analytics:query
Expected Response Structure: The API returns a 200 OK with a buckets array containing metric aggregations. Each bucket includes queueId, skillId, and metric values. Pagination continues until nextPageToken is null.

Step 2: Identify Underutilized Skills & Generate Recommendations

Routing profiles define the skills assigned to users. The following logic fetches user routing profiles, calculates utilization rates, and generates recommendations. Utilization is defined as handled conversations divided by theoretical capacity.

def fetch_user_routing_profile(user_id: str) -> dict:
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
    url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
    
    response = requests.get(url, headers=headers)
    if response.status_code == 404:
        return None
    response.raise_for_status()
    return response.json()

def generate_recommendations(analytics_data: dict, user_ids: list, threshold: float = 0.3) -> list:
    recommendations = []
    
    for uid in user_ids:
        profile = fetch_user_routing_profile(uid)
        if not profile:
            continue
            
        skills = profile.get("skills", {})
        for skill_id, skill_obj in skills.items():
            key = f"{profile.get('queueId', 'unknown')}_{skill_id}"
            metrics = analytics_data.get(key, {"handled": 0, "offered": 0})
            
            # Theoretical capacity assumes 8 working hours per day over 30 days
            # Capacity = (days * hours_per_day * 3600) / avg_handle_time
            # We approximate utilization as handled / offered to isolate skill demand
            offered = max(metrics["offered"], 1)
            utilization = metrics["handled"] / offered
            
            action = None
            if utilization < threshold and metrics["handled"] > 10:
                action = "REMOVE"
            elif utilization > 0.85 and metrics["offered"] > 100:
                action = "ADD"
                
            if action:
                recommendations.append({
                    "userId": uid,
                    "skillId": skill_id,
                    "currentUtilization": round(utilization, 3),
                    "action": action,
                    "handled": metrics["handled"],
                    "offered": metrics["offered"]
                })
                
    return recommendations

Required Scope: routing:profile:read
Edge Case Handling: The code checks for 404 Not Found when users lack a routing profile. The utilization calculation avoids division by zero using max(offered, 1). Skills with fewer than ten handled conversations are excluded to prevent noise from test data.

Step 3: Construct & Execute Batched Routing Profile Updates

Genesys Cloud does not provide a native batch endpoint for routing profiles. The following function constructs individual PUT requests and executes them with exponential backoff and jitter to respect rate limits.

import random

def update_routing_profile(user_id: str, profile_payload: dict, max_retries: int = 3) -> bool:
    token = get_access_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
    
    for attempt in range(max_retries):
        response = requests.put(url, headers=headers, json=profile_payload)
        
        if response.status_code == 200:
            return True
        elif response.status_code == 429:
            wait_time = min(2 ** attempt + random.uniform(0, 1), 60)
            time.sleep(wait_time)
            continue
        elif response.status_code in [401, 403]:
            raise PermissionError(f"Authentication or authorization failed for user {user_id}: {response.status_code}")
        else:
            response.raise_for_status()
            
    return False

def apply_recommendations(recommendations: list) -> dict:
    results = {"success": [], "failed": []}
    
    for rec in recommendations:
        uid = rec["userId"]
        profile = fetch_user_routing_profile(uid)
        if not profile:
            results["failed"].append({"userId": uid, "reason": "Profile not found"})
            continue
            
        current_skills = profile.get("skills", {})
        skill_id = rec["skillId"]
        
        if rec["action"] == "REMOVE" and skill_id in current_skills:
            del current_skills[skill_id]
        elif rec["action"] == "ADD" and skill_id not in current_skills:
            current_skills[skill_id] = {"score": 0.5, "order": len(current_skills)}
            
        payload = {
            "name": profile.get("name"),
            "skills": current_skills,
            "outbound": profile.get("outbound", {})
        }
        
        success = update_routing_profile(uid, payload)
        if success:
            results["success"].append(rec)
        else:
            results["failed"].append(rec)
            
    return results

Required Scope: routing:profile:write
Rate Limit Handling: The 429 Too Many Requests response triggers exponential backoff with jitter. The retry loop caps at three attempts to prevent indefinite blocking. The PUT request must include the full routing profile object, not a partial update.

Step 4: What-If Impact Simulation

Before applying changes, the script simulates the impact on queue capacity and service level. The simulation uses a deterministic Erlang-C approximation based on historical Average Handle Time (AHT) and offered volume.

import math

def erlang_c_simulation(offered_calls: float, aht_seconds: float, agents: int) -> dict:
    if agents == 0 or aht_seconds == 0:
        return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
        
    rho = offered_calls * aht_seconds / (agents * 3600)
    if rho >= 1.0:
        return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
        
    # Erlang C probability of waiting
    p_wait = 1.0
    for n in range(int(agents)):
        p_wait *= (offered_calls * aht_seconds / 3600) / (n + 1)
    p_wait = p_wait / (p_wait + (1 - rho))
    
    occupancy = rho
    service_level = 1 - (p_wait * math.exp(-agents * (1 - rho)))
    
    return {
        "occupancy": round(occupancy, 3),
        "service_level": round(service_level, 3),
        "probability_wait": round(p_wait, 3)
    }

def run_whatif_analysis(recommendations: list, historical_aht: float = 240.0) -> dict:
    queue_impact = {}
    
    for rec in recommendations:
        qid = "queue_" + rec["skillId"][:8]  # Simplified queue mapping for demonstration
        if qid not in queue_impact:
            queue_impact[qid] = {"current_agents": 0, "new_agents": 0, "offered": rec["offered"]}
            
        if rec["action"] == "REMOVE":
            queue_impact[qid]["current_agents"] -= 1
        else:
            queue_impact[qid]["new_agents"] += 1
            
    simulation_results = {}
    for qid, data in queue_impact.items():
        current_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"])
        projected_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"] + data["new_agents"])
        
        simulation_results[qid] = {
            "current_service_level": current_sl["service_level"],
            "projected_service_level": projected_sl["service_level"],
            "agent_delta": data["new_agents"],
            "occupancy_impact": projected_sl["occupancy"] - current_sl["occupancy"]
        }
                
    return simulation_results

Required Scope: None (local computation)
Model Explanation: The Erlang-C formula calculates the probability that a caller will wait in queue. The simulation compares current service levels against projected levels after skill adjustments. A negative occupancy impact indicates reduced agent strain.

Step 5: Schedule Off-Peak Updates & Generate WFM Reports

The final component wraps the workflow in a scheduler that executes only during defined off-peak windows. It generates a CSV report for Workforce Management review.

import schedule
import csv
from datetime import datetime

OFF_PEAK_START = 1
OFF_PEAK_END = 5

def generate_wfm_report(results: dict, simulation: dict) -> str:
    filename = f"wfm_impact_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    with open(filename, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Metric", "Value"])
        writer.writerow(["Total Profiles Updated", len(results["success"])])
        writer.writerow(["Failed Updates", len(results["failed"])])
        
        for qid, data in simulation.items():
            writer.writerow([f"Queue {qid} SL Change", data["projected_service_level"] - data["current_service_level"]])
            writer.writerow([f"Queue {qid} Occupancy Delta", data["occupancy_impact"]])
            
    return filename

def run_optimization_pipeline():
    analytics = fetch_analytics_summary(30)
    user_ids = ["user_1", "user_2", "user_3"]  # Replace with actual user IDs
    recommendations = generate_recommendations(analytics, user_ids)
    simulation = run_whatif_analysis(recommendations)
    results = apply_recommendations(recommendations)
    report_path = generate_wfm_report(results, simulation)
    print(f"Pipeline complete. Report saved to {report_path}")

def check_off_peak() -> bool:
    hour = datetime.utcnow().hour
    return OFF_PEAK_START <= hour < OFF_PEAK_END

# Scheduler setup
schedule.every().day.at("02:00").do(run_optimization_pipeline)
schedule.every().day.at("03:00").do(run_optimization_pipeline)

while True:
    schedule.run_pending()
    time.sleep(60)

Required Scope: None (local execution)
Scheduling Logic: The schedule library triggers the pipeline at 02:00 and 03:00 UTC. The check_off_peak function can be integrated into a cron wrapper or systemd timer for production environments. The CSV report captures update success rates and projected service level deltas.

Complete Working Example

import requests
import time
import random
import math
import csv
import schedule
from datetime import datetime, timedelta
from typing import Optional

GENESYS_BASE_URL = "https://api.mypurecloud.com"
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
OFF_PEAK_START = 1
OFF_PEAK_END = 5

_token_cache: dict = {"access_token": None, "expires_at": 0}

def get_access_token() -> str:
    current_time = time.time()
    if _token_cache["access_token"] and current_time < _token_cache["expires_at"] - 60:
        return _token_cache["access_token"]
    payload = {"grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
    response = requests.post(f"{GENESYS_BASE_URL}/oauth/token", data=payload)
    response.raise_for_status()
    data = response.json()
    _token_cache["access_token"] = data["access_token"]
    _token_cache["expires_at"] = current_time + data["expires_in"]
    return _token_cache["access_token"]

def fetch_analytics_summary(days: int = 30) -> dict:
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
    end_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
    start_date = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
    body = {"dateFrom": start_date, "dateTo": end_date, "groupBy": ["queueId", "skillId"], "interval": "P1D", "metrics": ["conversationHandledCount", "conversationOfferedCount"], "size": 100}
    aggregated_data = {}
    next_page = None
    while True:
        query_url = f"{GENESYS_BASE_URL}/api/v2/analytics/conversations/summary/query"
        if next_page:
            query_url = next_page
        response = requests.post(query_url, headers=headers, json=body)
        if response.status_code == 429:
            time.sleep(int(response.headers.get("Retry-After", 5)))
            continue
        response.raise_for_status()
        result = response.json()
        for bucket in result.get("buckets", []):
            key = f"{bucket['queueId']}_{bucket['skillId']}"
            aggregated_data[key] = {"handled": aggregated_data.get(key, {}).get("handled", 0) + bucket["metrics"]["conversationHandledCount"], "offered": aggregated_data.get(key, {}).get("offered", 0) + bucket["metrics"]["conversationOfferedCount"]}
        next_page = result.get("nextPageToken")
        if not next_page:
            break
    return aggregated_data

def fetch_user_routing_profile(user_id: str) -> Optional[dict]:
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
    response = requests.get(f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile", headers=headers)
    if response.status_code == 404:
        return None
    response.raise_for_status()
    return response.json()

def generate_recommendations(analytics_data: dict, user_ids: list, threshold: float = 0.3) -> list:
    recommendations = []
    for uid in user_ids:
        profile = fetch_user_routing_profile(uid)
        if not profile:
            continue
        for skill_id, skill_obj in profile.get("skills", {}).items():
            key = f"{profile.get('queueId', 'unknown')}_{skill_id}"
            metrics = analytics_data.get(key, {"handled": 0, "offered": 0})
            offered = max(metrics["offered"], 1)
            utilization = metrics["handled"] / offered
            if utilization < threshold and metrics["handled"] > 10:
                recommendations.append({"userId": uid, "skillId": skill_id, "currentUtilization": round(utilization, 3), "action": "REMOVE", "handled": metrics["handled"], "offered": metrics["offered"]})
            elif utilization > 0.85 and metrics["offered"] > 100:
                recommendations.append({"userId": uid, "skillId": skill_id, "currentUtilization": round(utilization, 3), "action": "ADD", "handled": metrics["handled"], "offered": metrics["offered"]})
    return recommendations

def update_routing_profile(user_id: str, profile_payload: dict, max_retries: int = 3) -> bool:
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json"}
    url = f"{GENESYS_BASE_URL}/api/v2/routing/users/{user_id}/routingprofile"
    for attempt in range(max_retries):
        response = requests.put(url, headers=headers, json=profile_payload)
        if response.status_code == 200:
            return True
        elif response.status_code == 429:
            time.sleep(min(2 ** attempt + random.uniform(0, 1), 60))
            continue
        elif response.status_code in [401, 403]:
            raise PermissionError(f"Auth failed for {user_id}: {response.status_code}")
        else:
            response.raise_for_status()
    return False

def apply_recommendations(recommendations: list) -> dict:
    results = {"success": [], "failed": []}
    for rec in recommendations:
        uid = rec["userId"]
        profile = fetch_user_routing_profile(uid)
        if not profile:
            results["failed"].append({"userId": uid, "reason": "Profile not found"})
            continue
        current_skills = profile.get("skills", {})
        skill_id = rec["skillId"]
        if rec["action"] == "REMOVE" and skill_id in current_skills:
            del current_skills[skill_id]
        elif rec["action"] == "ADD" and skill_id not in current_skills:
            current_skills[skill_id] = {"score": 0.5, "order": len(current_skills)}
        payload = {"name": profile.get("name"), "skills": current_skills, "outbound": profile.get("outbound", {})}
        if update_routing_profile(uid, payload):
            results["success"].append(rec)
        else:
            results["failed"].append(rec)
    return results

def erlang_c_simulation(offered_calls: float, aht_seconds: float, agents: int) -> dict:
    if agents == 0 or aht_seconds == 0:
        return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
    rho = offered_calls * aht_seconds / (agents * 3600)
    if rho >= 1.0:
        return {"occupancy": 1.0, "service_level": 0.0, "probability_wait": 1.0}
    p_wait = 1.0
    for n in range(int(agents)):
        p_wait *= (offered_calls * aht_seconds / 3600) / (n + 1)
    p_wait = p_wait / (p_wait + (1 - rho))
    service_level = 1 - (p_wait * math.exp(-agents * (1 - rho)))
    return {"occupancy": round(rho, 3), "service_level": round(service_level, 3), "probability_wait": round(p_wait, 3)}

def run_whatif_analysis(recommendations: list, historical_aht: float = 240.0) -> dict:
    queue_impact = {}
    for rec in recommendations:
        qid = "queue_" + rec["skillId"][:8]
        if qid not in queue_impact:
            queue_impact[qid] = {"current_agents": 0, "new_agents": 0, "offered": rec["offered"]}
        if rec["action"] == "REMOVE":
            queue_impact[qid]["current_agents"] -= 1
        else:
            queue_impact[qid]["new_agents"] += 1
    simulation_results = {}
    for qid, data in queue_impact.items():
        current_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"])
        projected_sl = erlang_c_simulation(data["offered"], historical_aht, data["current_agents"] + data["new_agents"])
        simulation_results[qid] = {"current_service_level": current_sl["service_level"], "projected_service_level": projected_sl["service_level"], "agent_delta": data["new_agents"], "occupancy_impact": projected_sl["occupancy"] - current_sl["occupancy"]}
    return simulation_results

def generate_wfm_report(results: dict, simulation: dict) -> str:
    filename = f"wfm_impact_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    with open(filename, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Metric", "Value"])
        writer.writerow(["Total Profiles Updated", len(results["success"])])
        writer.writerow(["Failed Updates", len(results["failed"])])
        for qid, data in simulation.items():
            writer.writerow([f"Queue {qid} SL Change", data["projected_service_level"] - data["current_service_level"]])
            writer.writerow([f"Queue {qid} Occupancy Delta", data["occupancy_impact"]])
    return filename

def run_optimization_pipeline():
    analytics = fetch_analytics_summary(30)
    user_ids = ["user_1", "user_2", "user_3"]
    recommendations = generate_recommendations(analytics, user_ids)
    simulation = run_whatif_analysis(recommendations)
    results = apply_recommendations(recommendations)
    report_path = generate_wfm_report(results, simulation)
    print(f"Pipeline complete. Report saved to {report_path}")

schedule.every().day.at("02:00").do(run_optimization_pipeline)
schedule.every().day.at("03:00").do(run_optimization_pipeline)
while True:
    schedule.run_pending()
    time.sleep(60)

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or the client credentials are invalid.
  • Fix: Ensure the get_access_token() function refreshes the token before expiration. Verify CLIENT_ID and CLIENT_SECRET match a registered OAuth client in the Genesys Cloud admin console.
  • Code Fix: The token cache subtracts sixty seconds from expires_in to prevent edge-case expiration during API calls.

Error: 403 Forbidden

  • Cause: The OAuth client lacks required scopes for the endpoint being called.
  • Fix: Add analytics:query, routing:profile:read, routing:profile:write, and routing:skill:read to the client credentials scope list in the Genesys Cloud UI.
  • Code Fix: The script raises PermissionError immediately on 403 to prevent silent failures during batch updates.

Error: 429 Too Many Requests

  • Cause: The client exceeded the Genesys Cloud rate limit (typically fifty requests per minute per client).
  • Fix: Implement exponential backoff with jitter. The Retry-After header specifies the exact wait time.
  • Code Fix: The update_routing_profile function reads Retry-After and sleeps for the specified duration before retrying. The analytics loop also respects 429 responses.

Error: 500 Internal Server Error

  • Cause: Transient backend failure or malformed request body.
  • Fix: Validate JSON payloads against the Genesys Cloud schema. Retry once after a two-second delay.
  • Code Fix: response.raise_for_status() triggers an exception on 5xx errors. Wrap the pipeline in a try-except block with a single retry attempt for production stability.

Official References