Optimizing Genesys Cloud Queue Wait Times via Historical Wrap-Up Code Analysis and Dynamic Skill Weight Updates
What You Will Build
- A Python script that queries historical conversation analytics grouped by wrap-up code to identify routing bottlenecks.
- A weight calculation engine that translates historical volume and wait time metrics into normalized skill weights.
- A routing configuration updater that applies the calculated weights to a target queue routing rule via the Genesys Cloud REST API.
Prerequisites
- OAuth2 client credentials with scopes:
analytics:conversation:read,routing:rule:read,routing:rule:write,routing:skill:read - Genesys Cloud REST API v2
- Python 3.9+
- External dependencies:
requests>=2.31.0,python-dateutil>=2.8.2
Authentication Setup
The script uses the OAuth2 client credentials flow. Token caching prevents unnecessary authentication requests, and a refresh threshold ensures the token remains valid during long-running analytics queries.
import requests
import time
import base64
from typing import Dict, Optional
class GenesysAuth:
def __init__(self, org_host: str, client_id: str, client_secret: str):
self.org_host = org_host
self.client_id = client_id
self.client_secret = client_secret
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
self.auth_url = f"https://{org_host}/oauth/token"
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry - 300:
return self.access_token
credentials = f"{self.client_id}:{self.client_secret}"
b64_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Authorization": f"Basic {b64_credentials}",
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {"grant_type": "client_credentials"}
response = requests.post(self.auth_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
Implementation
Step 1: Query Historical Wrap-Up Analytics
Retrieve conversation metrics grouped by wrap-up code for a specific queue. The analytics summary endpoint supports pagination, which the script handles automatically. Required scope: analytics:conversation:read.
import datetime
from dateutil.relativedelta import relativedelta
import json
def fetch_wrapup_analytics(auth: GenesysAuth, queue_id: str, days_back: int = 30) -> Dict:
base_url = f"https://{auth.org_host}/api/v2/analytics/conversations/summary/query"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
end_date = datetime.datetime.utcnow()
start_date = end_date - relativedelta(days=days_back)
query_body = {
"view": "conversations",
"groupBy": ["wrapupCodeId"],
"metrics": ["conversationCount", "avgWaitTime"],
"filter": {
"type": "and",
"clauses": [
{"type": "equals", "dimension": "queueId", "value": queue_id},
{
"type": "dateRange",
"dimension": "conversationDate",
"from": start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"to": end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
}
]
},
"pageSize": 100
}
all_entities = []
next_page_token = None
max_retries = 5
while True:
for attempt in range(max_retries):
try:
if next_page_token:
query_body["nextPageToken"] = next_page_token
response = requests.post(base_url, json=query_body, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
data = response.json()
all_entities.extend(data.get("entities", []))
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
return {"entities": all_entities}
Step 2: Calculate Optimal Skill Weights
Map wrap-up codes to routing skills and compute weights using historical volume and wait time. Higher historical wait times combined with high conversation volumes receive proportionally higher weights to balance load. Required scope: routing:skill:read.
def fetch_skills(auth: GenesysAuth) -> Dict[str, str]:
base_url = f"https://{auth.org_host}/api/v2/routing/skills"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
response = requests.get(base_url, headers=headers)
response.raise_for_status()
skills = response.json().get("entities", [])
return {skill["name"].lower().replace(" ", ""): skill["id"] for skill in skills}
def calculate_skill_weights(
analytics_data: Dict,
skill_mapping: Dict[str, str],
wrapup_to_skill_map: Dict[str, str]
) -> Dict[str, int]:
# wrapup_to_skill_map: {"wrapup_code_id": "skill_name"}
volume_wait_pairs = []
for entity in analytics_data.get("entities", []):
wrapup_id = entity.get("wrapupCodeId")
if not wrapup_id or wrapup_id not in wrapup_to_skill_map:
continue
skill_name = wrapup_to_skill_map[wrapup_id]
skill_id = skill_mapping.get(skill_name)
if not skill_id:
continue
volume = entity.get("conversationCount", 0)
avg_wait = entity.get("avgWaitTime", 0)
# Weight formula: prioritize skills with high volume AND high historical wait
composite_score = (volume * 0.6) + (avg_wait * 0.4)
volume_wait_pairs.append({"skill_id": skill_id, "score": composite_score})
if not volume_wait_pairs:
return {}
max_score = max(item["score"] for item in volume_wait_pairs)
min_score = min(item["score"] for item in volume_wait_pairs)
score_range = max_score - min_score if max_score != min_score else 1
calculated_weights = {}
for item in volume_wait_pairs:
normalized = (item["score"] - min_score) / score_range
# Genesys skill weights range from 1 to 100
weight = max(1, min(100, int(normalized * 99) + 1))
calculated_weights[item["skill_id"]] = weight
return calculated_weights
Step 3: Update Routing Configuration
Fetch the target routing rule, merge the new skill weights into the existing rule payload, and submit the update. Required scopes: routing:rule:read, routing:rule:write.
def update_routing_rule(
auth: GenesysAuth,
queue_id: str,
new_weights: Dict[str, int]
) -> Dict:
base_url = f"https://{auth.org_host}/api/v2/routing/rules"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
# Locate the routing rule for the target queue
response = requests.get(base_url, headers=headers)
response.raise_for_status()
rules = response.json().get("entities", [])
target_rule = None
for rule in rules:
if rule.get("queueId") == queue_id:
target_rule = rule
break
if not target_rule:
raise ValueError(f"No routing rule found for queue {queue_id}")
# Construct updated skills array
updated_skills = []
existing_skills = target_rule.get("skills", [])
existing_skill_map = {s["id"]: s for s in existing_skills}
for skill_id, weight in new_weights.items():
if skill_id in existing_skill_map:
existing_skill_map[skill_id]["weight"] = weight
else:
existing_skill_map[skill_id] = {"id": skill_id, "weight": weight}
updated_skills = list(existing_skill_map.values())
target_rule["skills"] = updated_skills
# Submit update with retry logic for 429
rule_id = target_rule["id"]
update_url = f"{base_url}/{rule_id}"
max_retries = 5
for attempt in range(max_retries):
try:
response = requests.put(update_url, json=target_rule, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
Complete Working Example
Copy the following script, populate the configuration dictionary, and execute it. The script handles authentication, analytics retrieval, weight calculation, and routing rule updates in a single execution flow.
import requests
import time
import base64
import datetime
from typing import Dict, Optional
from dateutil.relativedelta import relativedelta
class GenesysAuth:
def __init__(self, org_host: str, client_id: str, client_secret: str):
self.org_host = org_host
self.client_id = client_id
self.client_secret = client_secret
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
self.auth_url = f"https://{org_host}/oauth/token"
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry - 300:
return self.access_token
credentials = f"{self.client_id}:{self.client_secret}"
b64_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Authorization": f"Basic {b64_credentials}",
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {"grant_type": "client_credentials"}
response = requests.post(self.auth_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
def make_api_call(auth: GenesysAuth, method: str, url: str, headers: Dict, payload=None) -> requests.Response:
max_retries = 5
for attempt in range(max_retries):
try:
response = requests.request(method, url, headers=headers, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
return response
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
def fetch_wrapup_analytics(auth: GenesysAuth, queue_id: str, days_back: int = 30) -> Dict:
base_url = f"https://{auth.org_host}/api/v2/analytics/conversations/summary/query"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
end_date = datetime.datetime.utcnow()
start_date = end_date - relativedelta(days=days_back)
query_body = {
"view": "conversations",
"groupBy": ["wrapupCodeId"],
"metrics": ["conversationCount", "avgWaitTime"],
"filter": {
"type": "and",
"clauses": [
{"type": "equals", "dimension": "queueId", "value": queue_id},
{
"type": "dateRange",
"dimension": "conversationDate",
"from": start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"to": end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
}
]
},
"pageSize": 100
}
all_entities = []
next_page_token = None
while True:
if next_page_token:
query_body["nextPageToken"] = next_page_token
response = make_api_call(auth, "POST", base_url, headers, query_body)
response.raise_for_status()
data = response.json()
all_entities.extend(data.get("entities", []))
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
return {"entities": all_entities}
def fetch_skills(auth: GenesysAuth) -> Dict[str, str]:
base_url = f"https://{auth.org_host}/api/v2/routing/skills"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
response = make_api_call(auth, "GET", base_url, headers)
response.raise_for_status()
skills = response.json().get("entities", [])
return {skill["name"].lower().replace(" ", ""): skill["id"] for skill in skills}
def calculate_skill_weights(
analytics_data: Dict,
skill_mapping: Dict[str, str],
wrapup_to_skill_map: Dict[str, str]
) -> Dict[str, int]:
volume_wait_pairs = []
for entity in analytics_data.get("entities", []):
wrapup_id = entity.get("wrapupCodeId")
if not wrapup_id or wrapup_id not in wrapup_to_skill_map:
continue
skill_name = wrapup_to_skill_map[wrapup_id]
skill_id = skill_mapping.get(skill_name)
if not skill_id:
continue
volume = entity.get("conversationCount", 0)
avg_wait = entity.get("avgWaitTime", 0)
composite_score = (volume * 0.6) + (avg_wait * 0.4)
volume_wait_pairs.append({"skill_id": skill_id, "score": composite_score})
if not volume_wait_pairs:
return {}
max_score = max(item["score"] for item in volume_wait_pairs)
min_score = min(item["score"] for item in volume_wait_pairs)
score_range = max_score - min_score if max_score != min_score else 1
calculated_weights = {}
for item in volume_wait_pairs:
normalized = (item["score"] - min_score) / score_range
weight = max(1, min(100, int(normalized * 99) + 1))
calculated_weights[item["skill_id"]] = weight
return calculated_weights
def update_routing_rule(auth: GenesysAuth, queue_id: str, new_weights: Dict[str, int]) -> Dict:
base_url = f"https://{auth.org_host}/api/v2/routing/rules"
headers = {"Authorization": f"Bearer {auth.get_token()}", "Content-Type": "application/json"}
response = make_api_call(auth, "GET", base_url, headers)
response.raise_for_status()
rules = response.json().get("entities", [])
target_rule = None
for rule in rules:
if rule.get("queueId") == queue_id:
target_rule = rule
break
if not target_rule:
raise ValueError(f"No routing rule found for queue {queue_id}")
updated_skills = []
existing_skills = target_rule.get("skills", [])
existing_skill_map = {s["id"]: s for s in existing_skills}
for skill_id, weight in new_weights.items():
if skill_id in existing_skill_map:
existing_skill_map[skill_id]["weight"] = weight
else:
existing_skill_map[skill_id] = {"id": skill_id, "weight": weight}
updated_skills = list(existing_skill_map.values())
target_rule["skills"] = updated_skills
rule_id = target_rule["id"]
update_url = f"{base_url}/{rule_id}"
response = make_api_call(auth, "PUT", update_url, headers, target_rule)
response.raise_for_status()
return response.json()
def main():
config = {
"org_host": "your-org.mypurecloud.com",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"queue_id": "your-target-queue-id",
"analysis_days": 30,
# Map wrap-up code IDs to routing skill names
"wrapup_to_skill_map": {
"wrapup-id-1": "billing",
"wrapup-id-2": "technicalsupport",
"wrapup-id-3": "sales"
}
}
auth = GenesysAuth(config["org_host"], config["client_id"], config["client_secret"])
print("Fetching historical wrap-up analytics...")
analytics = fetch_wrapup_analytics(auth, config["queue_id"], config["analysis_days"])
print(f"Retrieved {len(analytics['entities'])} wrap-up code groups.")
print("Fetching routing skills...")
skills = fetch_skills(auth)
print("Calculating optimal skill weights...")
weights = calculate_skill_weights(analytics, skills, config["wrapup_to_skill_map"])
print(f"Calculated weights: {weights}")
if not weights:
print("No valid weight calculations found. Verify wrap-up code mapping.")
return
print("Updating routing rule...")
result = update_routing_rule(auth, config["queue_id"], weights)
print("Routing rule updated successfully.")
print(f"Rule ID: {result['id']}, Updated Skills: {result['skills']}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized
- What causes it: The OAuth token has expired, the client credentials are incorrect, or the token was not attached to the request header.
- How to fix it: Verify the
client_idandclient_secretmatch the Genesys Cloud integration. Ensure theAuthorization: Bearer <token>header is present on every API call. The token caching logic inGenesysAuthautomatically refreshes tokens that are older than 15 minutes. - Code showing the fix: The
get_token()method checkstime.time() < self.token_expiry - 300and re-authenticates before the token expires.
Error: 403 Forbidden
- What causes it: The OAuth client lacks the required scopes for the targeted endpoints.
- How to fix it: Navigate to the Genesys Cloud admin console, locate the OAuth client integration, and ensure the following scopes are assigned:
analytics:conversation:read,routing:rule:read,routing:rule:write,routing:skill:read. Re-generate the client secret if scopes were recently added.
Error: 429 Too Many Requests
- What causes it: The script exceeded Genesys Cloud rate limits, typically triggered during pagination of large analytics datasets or rapid rule updates.
- How to fix it: Implement exponential backoff. The
make_api_callfunction catches 429 responses, extracts theRetry-Afterheader, and sleeps before retrying. If the header is missing, it falls back to2 ** attemptseconds. - Code showing the fix: The retry loop in
make_api_callhandles 429 responses automatically without breaking execution.
Error: 400 Bad Request
- What causes it: The routing rule payload contains invalid skill weights, missing required fields, or references non-existent skill IDs.
- How to fix it: Validate that all skill IDs in
new_weightsexist in the tenant. Ensure weights are integers between 1 and 100. Thecalculate_skill_weightsfunction clamps values usingmax(1, min(100, ...))to prevent out-of-range errors.
Error: 5xx Server Error
- What causes it: Genesys Cloud backend instability or transient database locks during rule updates.
- How to fix it: Retry the request with a longer delay. The retry logic covers 5xx errors by default in
make_api_call. If the error persists beyond five attempts, log the response body and wait for platform stability.