Provisioning Genesys Cloud Routing Queues via REST API with Python SDK

Provisioning Genesys Cloud Routing Queues via REST API with Python SDK

What You Will Build

  • This tutorial builds a Python automation module that provisions Genesys Cloud routing queues with explicit capacity limits, wrap-up code directives, and skill mappings.
  • It uses the Genesys Cloud REST API and the official Python SDK to handle atomic queue registration, schema validation, and webhook synchronization.
  • The implementation covers Python 3.9+ with type hints, HTTP client management, and production-grade error handling.

Prerequisites

  • OAuth client type: Confidential client (Server-to-Server)
  • Required scopes: routing:queue, routing:queue:write, routing:skill, routing:wrapupcode
  • SDK version: genesys-cloud-sdk-python v2.20.0+
  • Language/runtime: Python 3.9+
  • Dependencies: genesys-cloud-sdk-python, httpx, pydantic, python-dotenv, structlog

Authentication Setup

The Genesys Cloud platform uses OAuth 2.0 client credentials flow for service-to-service authentication. You must exchange your client ID and secret for an access token before invoking the SDK. The token expires after 3600 seconds and requires a refresh cycle.

import os
import time
from typing import Optional
import httpx
from dotenv import load_dotenv

load_dotenv()

GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
GENESYS_CLOUD_CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
GENESYS_CLOUD_CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

TOKEN_URL = f"https://{GENESYS_CLOUD_REGION}.mypurecloud.com/api/v2/oauth2/token"

class GenesysAuthManager:
    def __init__(self) -> None:
        self._token: Optional[str] = None
        self._expiry: float = 0.0
        self._client = httpx.Client(timeout=10.0)

    def _fetch_token(self) -> None:
        response = self._client.post(
            TOKEN_URL,
            auth=(GENESYS_CLOUD_CLIENT_ID, GENESYS_CLOUD_CLIENT_SECRET),
            data={"grant_type": "client_credentials"},
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        payload = response.json()
        self._token = payload["access_token"]
        self._expiry = time.time() + payload["expires_in"]

    def get_token(self) -> str:
        if not self._token or time.time() >= self._expiry - 60:
            self._fetch_token()
        return self._token  # type: ignore[return-value]

The authentication manager caches the token and refreshes it 60 seconds before expiration. You will inject this token directly into the SDK client to avoid duplicate token management.

Implementation

Step 1: SDK Initialization & Routing API Client Setup

Initialize the PureCloudPlatformClientV2 with the fetched token. The SDK abstracts HTTP serialization and deserialization, but you must configure the base URL and attach the token provider.

from genesyscloud.platform.client import PureCloudPlatformClientV2
from genesyscloud.routing.api import RoutingApi
from genesyscloud.routing.models import Queue, QueueSkill, QueueOverflow, QueueWrapUpCode

def initialize_routing_client(auth_manager: GenesysAuthManager) -> RoutingApi:
    platform_client = PureCloudPlatformClientV2.create(
        base_url=f"https://{GENESYS_CLOUD_REGION}.mypurecloud.com",
        default_headers={"Authorization": f"Bearer {auth_manager.get_token()}"}
    )
    return RoutingApi(platform_client)

HTTP Request/Response Cycle Equivalent:

POST /api/v2/routing/queues HTTP/1.1
Host: us-east-1.mypurecloud.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json
Accept: application/json

{
  "name": "Premium Support Queue",
  "description": "High priority customer support",
  "member_capacity": 2,
  "member_capacity_override": false,
  "use_st_aging": true,
  "st_aging_enabled": true,
  "allow_wrap_up": true,
  "skills": [
    {"skill_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "level": 1}
  ],
  "overflow": [
    {
      "queue_id": "x9y8z7w6-v5u4-3210-fedc-ba0987654321",
      "skill_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
      "skill_level": 1,
      "overflow_type": "member_count",
      "overflow_threshold": 5
    }
  ],
  "wrap_up_codes": [
    {"id": "w1x2y3z4-a5b6-c7d8-e9f0-1234567890ab", "required": true}
  ]
}

Expected Response:

{
  "id": "q1w2e3r4-t5y6-u7i8-o9p0-1234567890ab",
  "name": "Premium Support Queue",
  "description": "High priority customer support",
  "member_capacity": 2,
  "member_capacity_override": false,
  "use_st_aging": true,
  "st_aging_enabled": true,
  "allow_wrap_up": true,
  "skills": [
    {"skill_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "level": 1}
  ],
  "overflow": [
    {
      "queue_id": "x9y8z7w6-v5u4-3210-fedc-ba0987654321",
      "skill_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
      "skill_level": 1,
      "overflow_type": "member_count",
      "overflow_threshold": 5
    }
  ],
  "wrap_up_codes": [
    {"id": "w1x2y3z4-a5b6-c7d8-e9f0-1234567890ab", "required": true}
  ],
  "self_uri": "/api/v2/routing/queues/q1w2e3r4-t5y6-u7i8-o9p0-1234567890ab"
}

Step 2: Payload Construction & Constraint Validation

Genesys Cloud enforces license tier constraints and maximum concurrent queue limits. You must validate the payload against these limits before submission. The validation pipeline checks capacity matrices, skill mappings, and overflow path integrity.

from pydantic import BaseModel, Field, field_validator
from typing import List, Optional

class QueueCapacityConfig(BaseModel):
    member_capacity: int = Field(ge=1, le=10)
    member_capacity_override: bool = False

class SkillMapping(BaseModel):
    skill_id: str
    level: int = Field(ge=1, le=5)

class OverflowPath(BaseModel):
    queue_id: str
    skill_id: str
    skill_level: int = Field(ge=1, le=5)
    overflow_type: str = Field(pattern="^(member_count|skill_level)$")
    overflow_threshold: int = Field(ge=1)

class WrapUpCodeDirective(BaseModel):
    id: str
    required: bool

class QueueProvisioningPayload(BaseModel):
    name: str = Field(min_length=1, max_length=200)
    description: Optional[str] = None
    capacity: QueueCapacityConfig
    skills: List[SkillMapping]
    overflow: List[OverflowPath]
    wrap_up_codes: List[WrapUpCodeDirective]

    @field_validator("skills")
    def validate_skill_uniqueness(cls, v: List[SkillMapping]) -> List[SkillMapping]:
        skill_ids = [s.skill_id for s in v]
        if len(skill_ids) != len(set(skill_ids)):
            raise ValueError("Duplicate skill IDs detected in skill mapping.")
        return v

    @field_validator("overflow")
    def validate_overflow_paths(cls, v: List[OverflowPath]) -> List[OverflowPath]:
        if len(v) > 5:
            raise ValueError("Maximum of 5 overflow paths allowed per queue.")
        queue_ids = [o.queue_id for o in v]
        if len(queue_ids) != len(set(queue_ids)):
            raise ValueError("Duplicate target queue IDs in overflow paths.")
        return v

The validation logic prevents configuration failures by rejecting invalid capacity matrices and circular overflow references. You will inject safe defaults before the atomic POST operation.

Step 3: Atomic Queue Registration & Default Injection

The POST /api/v2/routing/queues endpoint performs atomic creation. The SDK handles serialization, but you must implement retry logic for 429 Too Many Requests responses. The code below injects routing defaults and executes the registration with exponential backoff.

import time
import logging
from genesyscloud.platform.client.exceptions import ApiException

logger = logging.getLogger("genesys.queue.provisioner")

def inject_routing_defaults(payload: QueueProvisioningPayload) -> Queue:
    # Convert Pydantic model to Genesys SDK Queue model
    sdk_skills = [QueueSkill(skill_id=s.skill_id, level=s.level) for s in payload.skills]
    sdk_overflow = [
        QueueOverflow(
            queue_id=o.queue_id,
            skill_id=o.skill_id,
            skill_level=o.skill_level,
            overflow_type=o.overflow_type,
            overflow_threshold=o.overflow_threshold
        )
        for o in payload.overflow
    ]
    sdk_wrap_ups = [QueueWrapUpCode(id=w.id, required=w.required) for w in payload.wrap_up_codes]

    return Queue(
        name=payload.name,
        description=payload.description,
        member_capacity=payload.capacity.member_capacity,
        member_capacity_override=payload.capacity.member_capacity_override,
        use_st_aging=True,
        st_aging_enabled=True,
        allow_wrap_up=True,
        enable_copilot=False,
        skills=sdk_skills,
        overflow=sdk_overflow,
        wrap_up_codes=sdk_wrap_ups
    )

def provision_queue_atomic(
    routing_api: RoutingApi,
    payload: QueueProvisioningPayload,
    max_retries: int = 3
) -> dict:
    sdk_queue = inject_routing_defaults(payload)
    attempt = 0

    while attempt <= max_retries:
        try:
            response = routing_api.post_routing_queue(body=sdk_queue)
            return response.to_dict()
        except ApiException as e:
            if e.status == 429 and attempt < max_retries:
                backoff = 2 ** attempt
                logger.warning("Rate limited (429). Retrying in %d seconds.", backoff)
                time.sleep(backoff)
                attempt += 1
            elif e.status == 409:
                logger.error("Conflict (409): Queue name or ID already exists.")
                raise
            elif e.status == 403:
                logger.error("Forbidden (403): Missing scope or license restriction.")
                raise
            else:
                logger.error("API Error %d: %s", e.status, e.body)
                raise
    raise RuntimeError("Max retries exceeded for queue provisioning.")

The retry loop handles transient rate limits without dropping the request. The 409 Conflict and 403 Forbidden errors fail fast because they indicate configuration or permission issues that require manual intervention.

Step 4: Webhook Synchronization & Latency Tracking

After successful queue registration, you must synchronize the event with external Workforce Management (WFM) platforms. The code below tracks provisioning latency and pushes a structured payload to a WFM webhook endpoint.

from dataclasses import dataclass
from datetime import datetime, timezone

@dataclass
class ProvisioningMetrics:
    queue_id: str
    queue_name: str
    latency_ms: float
    validation_passed: bool
    timestamp: str = ""

    def __post_init__(self) -> None:
        self.timestamp = datetime.now(timezone.utc).isoformat()

def trigger_wfm_webhook(webhook_url: str, metrics: ProvisioningMetrics) -> None:
    payload = {
        "event_type": "queue.provisioned",
        "timestamp": metrics.timestamp,
        "queue_id": metrics.queue_id,
        "queue_name": metrics.queue_name,
        "latency_ms": metrics.latency_ms,
        "validation_passed": metrics.validation_passed
    }
    
    response = httpx.post(webhook_url, json=payload, timeout=5.0)
    if response.status_code not in (200, 201, 204):
        logger.warning("WFM webhook returned %d: %s", response.status_code, response.text)
    else:
        logger.info("WFM synchronization successful for queue %s.", metrics.queue_id)

Latency tracking uses time.perf_counter() to measure the exact duration between payload validation and API response. This metric feeds operational dashboards for capacity planning.

Step 5: Audit Logging & Governance Compliance

Governance frameworks require immutable audit trails for routing configuration changes. The code below writes structured JSON logs containing the queue payload, validation status, and API response metadata.

import json
import os
from typing import Any

def write_audit_log(queue_id: str, payload: QueueProvisioningPayload, response: dict, success: bool) -> None:
    log_entry = {
        "event": "queue_provisioning",
        "queue_id": queue_id,
        "queue_name": payload.name,
        "success": success,
        "payload_checksum": hash(str(payload.model_dump())),
        "response_metadata": {
            "self_uri": response.get("self_uri"),
            "member_capacity": response.get("member_capacity"),
            "overflow_count": len(response.get("overflow", [])),
            "wrap_up_codes_count": len(response.get("wrap_up_codes", []))
        },
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    
    log_dir = os.getenv("AUDIT_LOG_DIR", "./audit_logs")
    os.makedirs(log_dir, exist_ok=True)
    log_file = os.path.join(log_dir, f"queue_audit_{datetime.now().strftime('%Y%m%d')}.jsonl")
    
    with open(log_file, "a", encoding="utf-8") as f:
        f.write(json.dumps(log_entry) + "\n")
    logger.info("Audit log written for queue %s.", queue_id)

The audit log uses JSON Lines format for stream processing compatibility. Each entry contains a payload checksum to detect tampering during compliance reviews.

Complete Working Example

The following script combines all components into a runnable provisioning module. Replace the environment variables with your Genesys Cloud credentials and WFM webhook URL.

import os
import time
import logging
from dotenv import load_dotenv
from typing import Optional

import httpx
from genesyscloud.platform.client import PureCloudPlatformClientV2
from genesyscloud.routing.api import RoutingApi
from genesyscloud.routing.models import Queue, QueueSkill, QueueOverflow, QueueWrapUpCode
from genesyscloud.platform.client.exceptions import ApiException
from pydantic import BaseModel, Field, field_validator
from typing import List
from dataclasses import dataclass
from datetime import datetime, timezone

# Load configuration
load_dotenv()
GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "us-east-1")
GENESYS_CLOUD_CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
GENESYS_CLOUD_CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
WFM_WEBHOOK_URL = os.getenv("WFM_WEBHOOK_URL", "https://wfm.example.com/api/v1/events")
AUDIT_LOG_DIR = os.getenv("AUDIT_LOG_DIR", "./audit_logs")

TOKEN_URL = f"https://{GENESYS_CLOUD_REGION}.mypurecloud.com/api/v2/oauth2/token"

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("genesys.queue.provisioner")

class GenesysAuthManager:
    def __init__(self) -> None:
        self._token: Optional[str] = None
        self._expiry: float = 0.0
        self._client = httpx.Client(timeout=10.0)

    def _fetch_token(self) -> None:
        response = self._client.post(
            TOKEN_URL,
            auth=(GENESYS_CLOUD_CLIENT_ID, GENESYS_CLOUD_CLIENT_SECRET),
            data={"grant_type": "client_credentials"},
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        response.raise_for_status()
        payload = response.json()
        self._token = payload["access_token"]
        self._expiry = time.time() + payload["expires_in"]

    def get_token(self) -> str:
        if not self._token or time.time() >= self._expiry - 60:
            self._fetch_token()
        return self._token  # type: ignore[return-value]

class QueueCapacityConfig(BaseModel):
    member_capacity: int = Field(ge=1, le=10)
    member_capacity_override: bool = False

class SkillMapping(BaseModel):
    skill_id: str
    level: int = Field(ge=1, le=5)

class OverflowPath(BaseModel):
    queue_id: str
    skill_id: str
    skill_level: int = Field(ge=1, le=5)
    overflow_type: str = Field(pattern="^(member_count|skill_level)$")
    overflow_threshold: int = Field(ge=1)

class WrapUpCodeDirective(BaseModel):
    id: str
    required: bool

class QueueProvisioningPayload(BaseModel):
    name: str = Field(min_length=1, max_length=200)
    description: Optional[str] = None
    capacity: QueueCapacityConfig
    skills: List[SkillMapping]
    overflow: List[OverflowPath]
    wrap_up_codes: List[WrapUpCodeDirective]

    @field_validator("skills")
    def validate_skill_uniqueness(cls, v: List[SkillMapping]) -> List[SkillMapping]:
        skill_ids = [s.skill_id for s in v]
        if len(skill_ids) != len(set(skill_ids)):
            raise ValueError("Duplicate skill IDs detected in skill mapping.")
        return v

    @field_validator("overflow")
    def validate_overflow_paths(cls, v: List[OverflowPath]) -> List[OverflowPath]:
        if len(v) > 5:
            raise ValueError("Maximum of 5 overflow paths allowed per queue.")
        queue_ids = [o.queue_id for o in v]
        if len(queue_ids) != len(set(queue_ids)):
            raise ValueError("Duplicate target queue IDs in overflow paths.")
        return v

@dataclass
class ProvisioningMetrics:
    queue_id: str
    queue_name: str
    latency_ms: float
    validation_passed: bool
    timestamp: str = ""

    def __post_init__(self) -> None:
        self.timestamp = datetime.now(timezone.utc).isoformat()

def initialize_routing_client(auth_manager: GenesysAuthManager) -> RoutingApi:
    platform_client = PureCloudPlatformClientV2.create(
        base_url=f"https://{GENESYS_CLOUD_REGION}.mypurecloud.com",
        default_headers={"Authorization": f"Bearer {auth_manager.get_token()}"}
    )
    return RoutingApi(platform_client)

def inject_routing_defaults(payload: QueueProvisioningPayload) -> Queue:
    sdk_skills = [QueueSkill(skill_id=s.skill_id, level=s.level) for s in payload.skills]
    sdk_overflow = [
        QueueOverflow(
            queue_id=o.queue_id,
            skill_id=o.skill_id,
            skill_level=o.skill_level,
            overflow_type=o.overflow_type,
            overflow_threshold=o.overflow_threshold
        )
        for o in payload.overflow
    ]
    sdk_wrap_ups = [QueueWrapUpCode(id=w.id, required=w.required) for w in payload.wrap_up_codes]

    return Queue(
        name=payload.name,
        description=payload.description,
        member_capacity=payload.capacity.member_capacity,
        member_capacity_override=payload.capacity.member_capacity_override,
        use_st_aging=True,
        st_aging_enabled=True,
        allow_wrap_up=True,
        enable_copilot=False,
        skills=sdk_skills,
        overflow=sdk_overflow,
        wrap_up_codes=sdk_wrap_ups
    )

def provision_queue_atomic(routing_api: RoutingApi, payload: QueueProvisioningPayload, max_retries: int = 3) -> dict:
    sdk_queue = inject_routing_defaults(payload)
    attempt = 0
    while attempt <= max_retries:
        try:
            response = routing_api.post_routing_queue(body=sdk_queue)
            return response.to_dict()
        except ApiException as e:
            if e.status == 429 and attempt < max_retries:
                backoff = 2 ** attempt
                logger.warning("Rate limited (429). Retrying in %d seconds.", backoff)
                time.sleep(backoff)
                attempt += 1
            elif e.status == 409:
                logger.error("Conflict (409): Queue name or ID already exists.")
                raise
            elif e.status == 403:
                logger.error("Forbidden (403): Missing scope or license restriction.")
                raise
            else:
                logger.error("API Error %d: %s", e.status, e.body)
                raise
    raise RuntimeError("Max retries exceeded for queue provisioning.")

def trigger_wfm_webhook(webhook_url: str, metrics: ProvisioningMetrics) -> None:
    payload = {
        "event_type": "queue.provisioned",
        "timestamp": metrics.timestamp,
        "queue_id": metrics.queue_id,
        "queue_name": metrics.queue_name,
        "latency_ms": metrics.latency_ms,
        "validation_passed": metrics.validation_passed
    }
    response = httpx.post(webhook_url, json=payload, timeout=5.0)
    if response.status_code not in (200, 201, 204):
        logger.warning("WFM webhook returned %d: %s", response.status_code, response.text)
    else:
        logger.info("WFM synchronization successful for queue %s.", metrics.queue_id)

def write_audit_log(queue_id: str, payload: QueueProvisioningPayload, response: dict, success: bool) -> None:
    log_entry = {
        "event": "queue_provisioning",
        "queue_id": queue_id,
        "queue_name": payload.name,
        "success": success,
        "payload_checksum": hash(str(payload.model_dump())),
        "response_metadata": {
            "self_uri": response.get("self_uri"),
            "member_capacity": response.get("member_capacity"),
            "overflow_count": len(response.get("overflow", [])),
            "wrap_up_codes_count": len(response.get("wrap_up_codes", []))
        },
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    os.makedirs(AUDIT_LOG_DIR, exist_ok=True)
    log_file = os.path.join(AUDIT_LOG_DIR, f"queue_audit_{datetime.now().strftime('%Y%m%d')}.jsonl")
    with open(log_file, "a", encoding="utf-8") as f:
        f.write(json.dumps(log_entry) + "\n")
    logger.info("Audit log written for queue %s.", queue_id)

def main() -> None:
    auth_manager = GenesysAuthManager()
    routing_api = initialize_routing_client(auth_manager)

    # Example payload construction
    queue_config = QueueProvisioningPayload(
        name="Technical Support Tier 2",
        description="Advanced troubleshooting queue with capacity limits",
        capacity=QueueCapacityConfig(member_capacity=3, member_capacity_override=False),
        skills=[
            SkillMapping(skill_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890", level=2)
        ],
        overflow=[
            OverflowPath(
                queue_id="x9y8z7w6-v5u4-3210-fedc-ba0987654321",
                skill_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
                skill_level=2,
                overflow_type="member_count",
                overflow_threshold=4
            )
        ],
        wrap_up_codes=[
            WrapUpCodeDirective(id="w1x2y3z4-a5b6-c7d8-e9f0-1234567890ab", required=True)
        ]
    )

    start_time = time.perf_counter()
    try:
        validation_passed = True
        response = provision_queue_atomic(routing_api, queue_config)
        latency_ms = (time.perf_counter() - start_time) * 1000
        queue_id = response.get("id", "unknown")

        metrics = ProvisioningMetrics(
            queue_id=queue_id,
            queue_name=queue_config.name,
            latency_ms=latency_ms,
            validation_passed=validation_passed
        )

        trigger_wfm_webhook(WFM_WEBHOOK_URL, metrics)
        write_audit_log(queue_id, queue_config, response, success=True)
        logger.info("Queue %s provisioned successfully in %.2f ms.", queue_id, latency_ms)

    except Exception as e:
        latency_ms = (time.perf_counter() - start_time) * 1000
        logger.error("Provisioning failed: %s", str(e))
        write_audit_log("failed", queue_config, {}, success=False)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 400 Bad Request

  • Cause: The payload violates Genesys Cloud schema constraints. Common triggers include invalid overflow_type values, missing required skill_id references, or member_capacity exceeding the allowed range.
  • Fix: Validate the QueueProvisioningPayload against the Pydantic schema before submission. Ensure overflow_type matches exactly member_count or skill_level. Verify that all referenced skill IDs and wrap-up code IDs exist in your Genesys Cloud organization.
  • Code Fix: The QueueProvisioningPayload model enforces field constraints. Add explicit print(payload.model_dump()) before the POST call to inspect serialized values.

Error: 403 Forbidden

  • Cause: The OAuth token lacks the required scopes, or the organization license tier restricts queue creation.
  • Fix: Confirm the confidential client has routing:queue:write and routing:skill scopes assigned in the Genesys Cloud Admin Console under Organization > OAuth Clients. Verify that your license tier supports the requested member_capacity and overflow configuration.
  • Code Fix: The authentication manager logs token fetch failures. Check the response.json() payload from /api/v2/oauth2/token for scope validation errors.

Error: 409 Conflict

  • Cause: A queue with the exact same name already exists in the target organization, or the queue_id parameter conflicts with an existing resource.
  • Fix: Genesys Cloud enforces unique queue names per organization. Append a timestamp or environment suffix to the name field. Do not supply a custom queue_id during creation unless you are using the specific ID reservation endpoint.
  • Code Fix: Catch ApiException(status=409) and log the existing queue URI from the response headers. Query GET /api/v2/routing/queues with the queue name to locate the duplicate.

Error: 429 Too Many Requests

  • Cause: The API gateway throttles requests exceeding 100 calls per second per client, or you hit the organization-wide rate limit during bulk provisioning.
  • Fix: The retry loop implements exponential backoff. For bulk operations, space requests using time.sleep(0.1) between iterations. Monitor the Retry-After header if present.
  • Code Fix: The provision_queue_atomic function already handles 429 responses with backoff. Increase max_retries if provisioning large queue matrices.

Official References