Balancing Genesys Cloud Multi-Skill Queue Assignments with Python and Linear Programming

Balancing Genesys Cloud Multi-Skill Queue Assignments with Python and Linear Programming

What You Will Build

A Python worker that queries real-time agent states, solves a linear programming problem to calculate optimal queue capacities, and applies those capacities via the Routing API to minimize estimated average wait times. This tutorial uses the Genesys Cloud Routing API and scipy.optimize.linprog. The code is written in Python 3.9+.

Prerequisites

  • OAuth 2.0 Client Credentials flow with scopes: routing:agent:view, routing:queue:view, routing:queue:write
  • Genesys Cloud Python SDK v2.0.0+ (referenced for architecture context)
  • Python 3.9+, requests, scipy, numpy
  • Environment variables for GENESYS_REGION, GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET
  • Target queue configuration mapping queue IDs to skill IDs

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials flow for machine-to-machine authentication. The SDK class PureCloudPlatformClientV2 handles token lifecycle internally, but implementing the flow directly with requests provides full visibility into expiration windows and refresh boundaries. The following class caches tokens and refreshes them before expiration.

import os
import time
import requests
from typing import Optional, Dict, Any

class GenesysAuth:
    def __init__(self, region: str, client_id: str, client_secret: str):
        self.region = region
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.expires_at: float = 0.0
        self.base_url = f"https://{region}.mygenesys.com"

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at:
            return self.token
            
        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        
        response = requests.post(url, headers=headers, data=payload)
        response.raise_for_status()
        
        data: Dict[str, Any] = response.json()
        self.token = data["access_token"]
        self.expires_at = time.time() + data["expires_in"] - 300
        
        return self.token

    def make_request(self, method: str, path: str, **kwargs) -> requests.Response:
        kwargs.setdefault("headers", {})
        kwargs["headers"]["Authorization"] = f"Bearer {self.get_token()}"
        kwargs["headers"]["Content-Type"] = "application/json"
        url = f"{self.base_url}{path}"
        return requests.request(method, url, **kwargs)

Implementation

Step 1: Fetch Real-Time Agent Availability and Skill Mappings

The Routing API exposes user definitions and real-time states on separate endpoints. You must paginate through /api/v2/routing/users to retrieve skill assignments, then fetch bulk states from /api/v2/routing/users/state. The code below implements continuation token pagination and exponential backoff for 429 rate limits.

import time
import json
from typing import List, Dict, Any

def fetch_with_retry(auth: GenesysAuth, method: str, path: str, params: Dict = None) -> requests.Response:
    max_retries = 3
    for attempt in range(max_retries):
        response = auth.make_request(method, path, params=params)
        if response.status_code == 429:
            retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
            print(f"Rate limited. Retrying in {retry_after} seconds.")
            time.sleep(retry_after)
            continue
        response.raise_for_status()
        return response
    raise RuntimeError("Max retries exceeded for 429 response")

def get_all_users(auth: GenesysAuth) -> List[Dict[str, Any]]:
    users: List[Dict[str, Any]] = []
    continuation_token: Optional[str] = None
    
    while True:
        params = {"page_size": 250}
        if continuation_token:
            params["continuation_token"] = continuation_token
            
        resp = fetch_with_retry(auth, "GET", "/api/v2/routing/users", params=params)
        data = resp.json()
        users.extend(data.get("entities", []))
        
        continuation_token = data.get("continuation_token")
        if not continuation_token:
            break
            
    return users

def get_available_agents_per_skill(auth: GenesysAuth, users: List[Dict[str, Any]], skill_ids: set) -> Dict[str, int]:
    resp = fetch_with_retry(auth, "GET", "/api/v2/routing/users/state")
    states = resp.json().get("entities", [])
    
    user_map = {u["id"]: u for u in users}
    skill_availability: Dict[str, int] = {sid: 0 for sid in skill_ids}
    
    for state in states:
        if state.get("stateCode") != "Available":
            continue
            
        user_id = state.get("userId")
        user = user_map.get(user_id)
        if not user:
            continue
            
        for skill in user.get("skills", []):
            skill_id = skill.get("id")
            if skill_id in skill_availability:
                skill_availability[skill_id] += 1
                
    return skill_availability

Step 2: Calculate Optimal Capacities with Linear Programming

Queue capacity directly influences Erlang-C wait time estimates. Linear programming requires a linear objective function and linear constraints. You approximate wait time minimization by weighting capacity variables inversely to available agents. The solver minimizes total weighted capacity while enforcing coverage ratio bounds.

import numpy as np
from scipy.optimize import linprog
from typing import List, Tuple, Dict

def calculate_optimal_capacities(
    queue_config: List[Dict[str, Any]],
    skill_availability: Dict[str, int],
    min_coverage_ratio: float = 0.85
) -> Dict[str, int]:
    # Extract ordered queue IDs and skill mappings
    queue_ids = [q["queue_id"] for q in queue_config]
    skill_ids = [q["skill_id"] for q in queue_config]
    n = len(queue_ids)
    
    # Objective: minimize sum(weight_i * capacity_i)
    # weight_i inversely proportional to availability (proxy for wait time)
    c = np.array([1.0 / max(skill_availability.get(sid, 0), 1) for sid in skill_ids])
    
    # Bounds: [min_capacity, max_capacity] per queue
    bounds = [(q.get("min_capacity", 1), q.get("max_capacity", 200)) for q in queue_config]
    
    # Constraints: capacity_i <= available_i / min_coverage_ratio
    # Rewritten as: capacity_i <= limit_i
    # linprog expects A_ub @ x <= b_ub
    A_ub = np.eye(n)
    b_ub = np.array([
        max(skill_availability.get(sid, 0), 1) / min_coverage_ratio 
        for sid in skill_ids
    ])
    
    result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
    
    if not result.success:
        raise RuntimeError(f"LP solver failed: {result.message}")
        
    optimal_capacities = {}
    for i, qid in enumerate(queue_ids):
        optimal_capacities[qid] = max(1, round(result.x[i]))
        
    return optimal_capacities

Step 3: Rebalance Queue Capacities via PUT

The Routing API accepts capacity updates via PUT /api/v2/routing/queues/{queueId}. You must send only the fields you intend to modify. The request body below updates inbound_capacity while preserving existing queue configuration. Error handling covers 400 (invalid capacity range), 403 (scope mismatch), and 409 (concurrent modification).

def update_queue_capacity(auth: GenesysAuth, queue_id: str, capacity: int) -> Dict[str, Any]:
    path = f"/api/v2/routing/queues/{queue_id}"
    payload = {"inbound_capacity": capacity}
    
    resp = fetch_with_retry(auth, "PUT", path, json=payload)
    
    if resp.status_code == 409:
        print(f"Conflict updating queue {queue_id}. Resource modified concurrently.")
        # In production, implement optimistic locking with version headers
        return {"status": "conflict", "queue_id": queue_id}
        
    return resp.json()

Complete Working Example

The following script integrates authentication, data fetching, linear programming, and capacity updates. Replace the placeholder configuration with your environment values.

import os
import sys
import time
import requests
import numpy as np
from scipy.optimize import linprog
from typing import List, Dict, Any, Optional

class GenesysAuth:
    def __init__(self, region: str, client_id: str, client_secret: str):
        self.region = region
        self.client_id = client_id
        self.client_secret = client_secret
        self.token: Optional[str] = None
        self.expires_at: float = 0.0
        self.base_url = f"https://{region}.mygenesys.com"

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at:
            return self.token
        url = f"{self.base_url}/oauth/token"
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        payload = response.json()
        self.token = payload["access_token"]
        self.expires_at = time.time() + payload["expires_in"] - 300
        return self.token

    def make_request(self, method: str, path: str, **kwargs) -> requests.Response:
        kwargs.setdefault("headers", {})
        kwargs["headers"]["Authorization"] = f"Bearer {self.get_token()}"
        kwargs["headers"]["Content-Type"] = "application/json"
        url = f"{self.base_url}{path}"
        return requests.request(method, url, **kwargs)

def fetch_with_retry(auth: GenesysAuth, method: str, path: str, params: Dict = None) -> requests.Response:
    max_retries = 3
    for attempt in range(max_retries):
        response = auth.make_request(method, path, params=params)
        if response.status_code == 429:
            retry_after = float(response.headers.get("Retry-After", 2 ** attempt))
            print(f"Rate limited on {path}. Retrying in {retry_after}s.")
            time.sleep(retry_after)
            continue
        response.raise_for_status()
        return response
    raise RuntimeError(f"Max retries exceeded for {path}")

def get_all_users(auth: GenesysAuth) -> List[Dict[str, Any]]:
    users: List[Dict[str, Any]] = []
    continuation_token: Optional[str] = None
    while True:
        params = {"page_size": 250}
        if continuation_token:
            params["continuation_token"] = continuation_token
        resp = fetch_with_retry(auth, "GET", "/api/v2/routing/users", params=params)
        data = resp.json()
        users.extend(data.get("entities", []))
        continuation_token = data.get("continuation_token")
        if not continuation_token:
            break
    return users

def get_available_agents_per_skill(auth: GenesysAuth, users: List[Dict[str, Any]], skill_ids: set) -> Dict[str, int]:
    resp = fetch_with_retry(auth, "GET", "/api/v2/routing/users/state")
    states = resp.json().get("entities", [])
    user_map = {u["id"]: u for u in users}
    skill_availability: Dict[str, int] = {sid: 0 for sid in skill_ids}
    for state in states:
        if state.get("stateCode") != "Available":
            continue
        user_id = state.get("userId")
        user = user_map.get(user_id)
        if not user:
            continue
        for skill in user.get("skills", []):
            skill_id = skill.get("id")
            if skill_id in skill_availability:
                skill_availability[skill_id] += 1
    return skill_availability

def calculate_optimal_capacities(
    queue_config: List[Dict[str, Any]],
    skill_availability: Dict[str, int],
    min_coverage_ratio: float = 0.85
) -> Dict[str, int]:
    queue_ids = [q["queue_id"] for q in queue_config]
    skill_ids = [q["skill_id"] for q in queue_config]
    n = len(queue_ids)
    c = np.array([1.0 / max(skill_availability.get(sid, 0), 1) for sid in skill_ids])
    bounds = [(q.get("min_capacity", 1), q.get("max_capacity", 200)) for q in queue_config]
    A_ub = np.eye(n)
    b_ub = np.array([
        max(skill_availability.get(sid, 0), 1) / min_coverage_ratio 
        for sid in skill_ids
    ])
    result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
    if not result.success:
        raise RuntimeError(f"LP solver failed: {result.message}")
    return {qid: max(1, round(result.x[i])) for i, qid in enumerate(queue_ids)}

def update_queue_capacity(auth: GenesysAuth, queue_id: str, capacity: int) -> Dict[str, Any]:
    path = f"/api/v2/routing/queues/{queue_id}"
    payload = {"inbound_capacity": capacity}
    resp = fetch_with_retry(auth, "PUT", path, json=payload)
    if resp.status_code == 409:
        print(f"Conflict updating queue {queue_id}.")
        return {"status": "conflict", "queue_id": queue_id}
    return resp.json()

def main():
    region = os.environ.get("GENESYS_REGION", "mypurecloud.ie")
    client_id = os.environ.get("GENESYS_CLIENT_ID")
    client_secret = os.environ.get("GENESYS_CLIENT_SECRET")
    
    if not all([region, client_id, client_secret]):
        print("Missing environment variables: GENESYS_REGION, GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET")
        sys.exit(1)
        
    auth = GenesysAuth(region, client_id, client_secret)
    
    # Configuration: map queues to skills and define capacity bounds
    queue_config = [
        {"queue_id": "a1b2c3d4-1111-2222-3333-444444444444", "skill_id": "s1-skill-id", "min_capacity": 1, "max_capacity": 50},
        {"queue_id": "e5f6g7h8-5555-6666-7777-888888888888", "skill_id": "s2-skill-id", "min_capacity": 1, "max_capacity": 50}
    ]
    
    target_skills = {q["skill_id"] for q in queue_config}
    
    print("Fetching users and skill mappings...")
    users = get_all_users(auth)
    
    print("Fetching real-time agent states...")
    availability = get_available_agents_per_skill(auth, users, target_skills)
    print(f"Available agents per skill: {availability}")
    
    print("Solving linear programming model...")
    optimal = calculate_optimal_capacities(queue_config, availability, min_coverage_ratio=0.80)
    print(f"Optimal capacities: {optimal}")
    
    print("Applying capacity updates...")
    for qid, cap in optimal.items():
        try:
            result = update_queue_capacity(auth, qid, cap)
            print(f"Queue {qid} updated to capacity {cap}")
        except requests.exceptions.HTTPError as e:
            print(f"Failed to update queue {qid}: {e}")
            
    print("Rebalancing complete.")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Expired or invalid OAuth token, incorrect client credentials, or missing Authorization header.
  • Fix: Verify GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET match the OAuth client registered in the Admin Console. Ensure the make_request method attaches the Bearer token before every call. The provided GenesysAuth class automatically refreshes tokens 300 seconds before expiration.

Error: 403 Forbidden

  • Cause: OAuth client lacks required scopes.
  • Fix: Navigate to Admin > Security > OAuth Clients. Edit your client and ensure routing:agent:view, routing:queue:view, and routing:queue:write are checked. Save and regenerate the client secret if scopes were added retroactively.

Error: 429 Too Many Requests

  • Cause: Exceeding Genesys Cloud rate limits (typically 100 requests per second per client for REST APIs).
  • Fix: The fetch_with_retry function implements exponential backoff. For high-frequency workers, add a fixed delay between iterations or implement a token bucket rate limiter. Respect the Retry-After header when present.

Error: 400 Bad Request

  • Cause: Invalid capacity values, malformed JSON, or updating read-only fields.
  • Fix: Validate that inbound_capacity falls between 1 and the queue maximum. Ensure the request body contains only inbound_capacity (or outbound_capacity if applicable). Do not send the full queue object unless performing a complete replacement.

Error: LP Infeasibility or Unbounded

  • Cause: Constraints conflict with bounds, or min_coverage_ratio exceeds available agent capacity.
  • Fix: Lower min_coverage_ratio or increase max_capacity in queue_config. Verify that skill_availability contains accurate counts. The solver uses scipy.optimize.linprog with the HiGHS method, which returns explicit failure messages when constraints cannot be satisfied.

Official References