Implementing Automated WFM Schedule Adherence Monitoring and Escalation

Implementing Automated WFM Schedule Adherence Monitoring and Escalation

What This Guide Covers

You are building an automated real-time adherence monitoring system that continuously compares each agent’s current state (Available, In a Call, After Call Work, Break, Lunch) against their published WFM schedule, detects deviations beyond configurable thresholds, and escalates violations through a tiered notification pipeline - from a Slack alert to the team supervisor at 5 minutes, to an operations manager email at 15 minutes, to an automatic coaching record creation in the QM system at 30 minutes. When complete, supervisors respond to adherence violations in real time without manually watching agent state boards.


Prerequisites, Roles & Licensing

  • Licensing: Genesys Cloud CX 3 with WFM (Workforce Management) module; the WFM API requires CX 3 licensing
  • Permissions required (service account):
    • Workforce Management > Schedule > View
    • Workforce Management > Adherence > View
    • Analytics > Conversation Detail > View (for presence correlation)
    • Users > User > View
  • OAuth scopes: workforce-management, analytics, routing
  • External dependencies: Slack or Microsoft Teams webhook (for supervisor alerts); an email relay (SES, SendGrid) for escalation; optionally Genesys Cloud Quality Management write access to create coaching sessions

The Implementation Deep-Dive

1. Understanding Genesys Cloud WFM Adherence Data Model

Genesys Cloud WFM tracks adherence through two parallel data streams:

Schedule data: What each agent is supposed to be doing at any given moment (shift start, break, lunch, shift end, off-schedule activities). Retrieved via the WFM Schedule API.

Actual state data: What each agent is actually doing at any given moment. Retrieved via the Genesys Cloud real-time agent state (presence + routing status) from the Analytics API or Notification Service.

Adherence = comparison of these two streams at each evaluation interval.

The WFM module has a built-in adherence view in the Genesys Cloud admin UI, but it does not natively trigger automated external notifications or escalation workflows. Your automated monitoring system fills this gap.

Adherence scoring model:

Adherent: Agent's actual state maps to their scheduled activity type
  Examples: Scheduled "On Queue" → Actual "Available" = Adherent
            Scheduled "Break" → Actual "On Break" = Adherent

Non-Adherent: Mismatch between scheduled and actual state
  Examples: Scheduled "On Queue" → Actual "Away" = Non-Adherent
            Scheduled "Lunch" → Actual "Available" = Non-Adherent (unusual but happens)

Genesys Cloud WFM maps routing status and presence states to schedule activity types via an “Activity Code Mapping.” Configure this mapping in Admin > Workforce Management > Activity Codes before building adherence monitoring.


2. Fetching the Published Schedule for Active Agents

import requests
from datetime import datetime, timedelta

def get_schedule_for_timerange(
    management_unit_id: str,
    week_of: str,  # ISO date string: "2025-05-12"
    access_token: str,
    base_url: str = "https://api.mypurecloud.com"
) -> dict:
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    resp = requests.post(
        f"{base_url}/api/v2/workforcemanagement/managementunits/{management_unit_id}/schedules/search",
        headers=headers,
        json={
            "startWeekDate": week_of,
            "status": "Published"
        }
    )
    resp.raise_for_status()
    return resp.json()

def get_agent_current_scheduled_activity(
    schedule: dict,
    agent_id: str,
    evaluation_time: datetime
) -> dict | None:
    """
    Returns the activity the agent is scheduled for at the given evaluation time.
    """
    user_schedules = schedule.get("userSchedules", {})
    agent_schedule = user_schedules.get(agent_id)
    
    if not agent_schedule:
        return None
    
    eval_epoch_ms = int(evaluation_time.timestamp() * 1000)
    
    for shift in agent_schedule.get("shifts", []):
        for activity in shift.get("activities", []):
            start_ms = activity.get("startOffsetMinutes", 0) * 60 * 1000 + shift["startTime"]
            duration_ms = activity.get("lengthMinutes", 0) * 60 * 1000
            end_ms = start_ms + duration_ms
            
            if start_ms <= eval_epoch_ms < end_ms:
                return {
                    "activityCode": activity.get("activityCodeId"),
                    "activityCategory": activity.get("activityCodeCategory"),
                    "scheduledStart": start_ms,
                    "scheduledEnd": end_ms
                }
    
    return None  # Agent is not scheduled at this time (off shift)

3. Fetching Real-Time Agent States

def get_current_agent_states(
    user_ids: list[str],
    access_token: str,
    base_url: str
) -> dict:
    """
    Returns mapping of {userId → {routingStatus, presenceDefinition, startTime}}
    """
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    # Batch users analytics query
    resp = requests.post(
        f"{base_url}/api/v2/analytics/users/details/query",
        headers=headers,
        json={
            "interval": f"{(datetime.utcnow() - timedelta(minutes=15)).isoformat()}Z/{datetime.utcnow().isoformat()}Z",
            "filters": [
                {"type": "and", "predicates": [
                    {"dimension": "userId", "operator": "matches", "value": user_id}
                ]}
            ] if len(user_ids) == 1 else [],
            "order": "asc",
            "orderBy": "startTime"
        }
    )
    
    states = {}
    for user_data in resp.json().get("userDetails", []):
        user_id = user_data["userId"]
        # Get most recent state segment
        primary_presence = user_data.get("primaryPresence", [{}])[-1] if user_data.get("primaryPresence") else {}
        routing_status = user_data.get("routingStatus", [{}])[-1] if user_data.get("routingStatus") else {}
        
        states[user_id] = {
            "presenceSystemName": primary_presence.get("systemPresence", "Unknown"),
            "routingStatus": routing_status.get("routingStatus", "UNKNOWN"),
            "stateStart": routing_status.get("startTime")
        }
    
    return states

4. Adherence Evaluation and Violation Detection

from dataclasses import dataclass
from enum import Enum

class AdherenceStatus(Enum):
    ADHERENT = "ADHERENT"
    NON_ADHERENT = "NON_ADHERENT"
    UNKNOWN = "UNKNOWN"
    OFF_SHIFT = "OFF_SHIFT"

# Activity category to expected routing status mapping
ACTIVITY_ROUTING_MAP = {
    "OnQueueWork": ["IDLE", "INTERACTING", "COMMUNICATING"],
    "Break": ["IDLE", "NOT_RESPONDING"],  # Some orgs allow break while in routing idle
    "Meal": ["IDLE", "NOT_RESPONDING", "OFF_QUEUE"],
    "TimeOff": ["OFFLINE", "NOT_RESPONDING"],
    "Training": ["OFF_QUEUE", "IDLE"],
    "Offline": ["OFFLINE", "NOT_RESPONDING"]
}

# Presence state to routing status equivalence
PRESENCE_ROUTING_ADHERENT_COMBOS = {
    "OnQueueWork": [
        ("AVAILABLE", "IDLE"),
        ("AVAILABLE", "INTERACTING"),
        ("BUSY", "INTERACTING"),
        ("BUSY", "COMMUNICATING"),
        ("AVAILABLE", "COMMUNICATING")
    ],
    "Break": [
        ("BREAK", "NOT_RESPONDING"),
        ("AWAY", "NOT_RESPONDING"),
        ("BREAK", "IDLE")
    ],
    "Meal": [
        ("MEAL", "NOT_RESPONDING"),
        ("AWAY", "NOT_RESPONDING")
    ],
    "Offline": [
        ("OFFLINE", "OFFLINE"),
        ("OFFLINE", "NOT_RESPONDING")
    ]
}

@dataclass
class AdherenceEvaluation:
    agent_id: str
    agent_name: str
    status: AdherenceStatus
    scheduled_activity: str
    actual_presence: str
    actual_routing: str
    violation_duration_minutes: float
    evaluated_at: str

def evaluate_adherence(
    agent_id: str,
    agent_name: str,
    scheduled_activity: dict | None,
    actual_state: dict,
    violation_start_times: dict  # Persistent state tracking violation start
) -> AdherenceEvaluation:
    now = datetime.utcnow()
    
    if not scheduled_activity:
        return AdherenceEvaluation(
            agent_id=agent_id, agent_name=agent_name,
            status=AdherenceStatus.OFF_SHIFT,
            scheduled_activity="OFF_SHIFT",
            actual_presence=actual_state.get("presenceSystemName", "Unknown"),
            actual_routing=actual_state.get("routingStatus", "Unknown"),
            violation_duration_minutes=0,
            evaluated_at=now.isoformat() + "Z"
        )
    
    activity_category = scheduled_activity.get("activityCategory", "OnQueueWork")
    actual_presence = actual_state.get("presenceSystemName", "Unknown")
    actual_routing = actual_state.get("routingStatus", "Unknown")
    
    # Check if current state matches expected state for scheduled activity
    allowed_combos = PRESENCE_ROUTING_ADHERENT_COMBOS.get(activity_category, [])
    is_adherent = (actual_presence.upper(), actual_routing.upper()) in [
        (p.upper(), r.upper()) for p, r in allowed_combos
    ]
    
    if is_adherent:
        # Clear any active violation tracking
        violation_start_times.pop(agent_id, None)
        violation_duration = 0.0
    else:
        # Record violation start if not already tracking
        if agent_id not in violation_start_times:
            violation_start_times[agent_id] = now
        violation_duration = (now - violation_start_times[agent_id]).total_seconds() / 60
    
    return AdherenceEvaluation(
        agent_id=agent_id, agent_name=agent_name,
        status=AdherenceStatus.ADHERENT if is_adherent else AdherenceStatus.NON_ADHERENT,
        scheduled_activity=activity_category,
        actual_presence=actual_presence,
        actual_routing=actual_routing,
        violation_duration_minutes=violation_duration,
        evaluated_at=now.isoformat() + "Z"
    )

5. Tiered Escalation Pipeline

import boto3

ses = boto3.client("ses", region_name="us-east-1")

SUPERVISOR_SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../..."
OPS_MANAGER_EMAIL = "ops-manager@yourorg.com"

async def escalate_violation(evaluation: AdherenceEvaluation, supervisor_info: dict):
    duration = evaluation.violation_duration_minutes
    
    if duration < 5:
        # Tier 0: Log only - no notification (grace period for brief state changes)
        pass
    
    elif 5 <= duration < 15:
        # Tier 1: Slack alert to immediate supervisor
        payload = {
            "text": f"⚠️ *Adherence Alert* - {evaluation.agent_name}",
            "attachments": [{
                "color": "#ffcc00",
                "fields": [
                    {"title": "Scheduled", "value": evaluation.scheduled_activity, "short": True},
                    {"title": "Actual", "value": f"{evaluation.actual_presence} / {evaluation.actual_routing}", "short": True},
                    {"title": "Duration", "value": f"{duration:.1f} minutes out of adherence", "short": True},
                    {"title": "Supervisor Action", "value": "Please check in with the agent.", "short": False}
                ]
            }]
        }
        requests.post(supervisor_info["slack_webhook"], json=payload)
    
    elif 15 <= duration < 30:
        # Tier 2: Email to operations manager
        ses.send_email(
            Source="adherence-monitor@yourorg.com",
            Destination={"ToAddresses": [OPS_MANAGER_EMAIL]},
            Message={
                "Subject": {"Data": f"WFM Adherence Escalation: {evaluation.agent_name} - {duration:.0f} min"},
                "Body": {"Text": {"Data": (
                    f"Agent {evaluation.agent_name} ({evaluation.agent_id}) has been out of adherence "
                    f"for {duration:.1f} minutes.\n\n"
                    f"Scheduled Activity: {evaluation.scheduled_activity}\n"
                    f"Actual State: {evaluation.actual_presence} / {evaluation.actual_routing}\n"
                    f"Violation Start: {evaluation.evaluated_at}\n\n"
                    f"Please review and take appropriate action."
                )}}
            }
        )
    
    elif duration >= 30:
        # Tier 3: Auto-create coaching session in Genesys Cloud QM
        create_coaching_session(
            agent_id=evaluation.agent_id,
            violation_type=f"Schedule Adherence - {evaluation.scheduled_activity}",
            violation_duration_minutes=duration,
            access_token=get_service_token()
        )

def create_coaching_session(agent_id: str, violation_type: str, violation_duration_minutes: float, access_token: str):
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
    
    requests.post(
        "https://api.mypurecloud.com/api/v2/coaching/appointments",
        headers=headers,
        json={
            "name": f"Adherence Coaching: {violation_type}",
            "status": "Scheduled",
            "dateStart": (datetime.utcnow() + timedelta(hours=24)).isoformat() + "Z",
            "lengthInMinutes": 15,
            "facilitator": {"id": "supervisor-user-id"},
            "attendees": [{"id": agent_id}],
            "conversations": [],
            "documents": [],
            "overrideDefaultAvailability": True
        }
    )

The Trap - triggering escalations on state transitions, not sustained violations: If an agent’s state fluctuates rapidly (Available → Busy → Available) every 30 seconds during normal handling, a naive evaluator fires false positive alerts on each transition. The violation_start_times dict above tracks when the violation started - escalation only fires after the agent has been continuously out of adherence for N minutes. Always measure violation duration, not occurrence count.


6. Scheduling the Monitor Loop

Run the evaluation loop as a scheduled AWS Lambda (every 2 minutes via EventBridge Scheduler):

# Lambda handler - runs every 2 minutes
def lambda_handler(event, context):
    access_token = get_cached_token()  # OAuth token with 1-hour TTL, cached in SSM
    
    management_units = get_active_management_units(access_token)
    
    for mu in management_units:
        schedule = get_schedule_for_timerange(mu["id"], get_current_week_date(), access_token)
        active_agents = get_active_agents_in_mu(mu["id"], access_token)
        actual_states = get_current_agent_states([a["id"] for a in active_agents], access_token)
        
        for agent in active_agents:
            scheduled_activity = get_agent_current_scheduled_activity(
                schedule, agent["id"], datetime.utcnow()
            )
            actual_state = actual_states.get(agent["id"], {})
            
            evaluation = evaluate_adherence(
                agent["id"], agent["name"], scheduled_activity, actual_state, violation_start_times
            )
            
            if evaluation.status == AdherenceStatus.NON_ADHERENT:
                await escalate_violation(evaluation, get_supervisor_info(agent["id"]))

Validation, Edge Cases & Troubleshooting

Edge Case 1: Agents on Approved Unplanned Activities

Agents who take an unplanned early break (approved by supervisor in real time) will show as non-adherent even though the deviation is sanctioned. Integrate with WFM’s Real-Time Adherence exception workflow: before escalating, check whether a WFM adherence exception has been submitted for the agent within the last 15 minutes. If an approved exception exists, suppress the escalation.

Edge Case 2: Agents Mid-Interaction During Scheduled Break

An agent who hits their scheduled break time while still on a call is technically non-adherent to the break schedule, but this is unavoidable and expected. Add a check: if the agent’s routing status is INTERACTING, suppress adherence alerts regardless of scheduled activity. The WFM system applies its own “adherence forgiveness” for active interactions - your custom monitor should mirror this logic.

Edge Case 3: Schedule Revisions Published Mid-Day

When WFM publishes an updated schedule mid-shift (adding an unplanned training block), your monitor may continue comparing against the old cached schedule. Refresh the schedule cache every 15 minutes - not just at the start of the monitoring loop. Also subscribe to workforce-management.schedule.published Notification Service events to invalidate the cache immediately when a new version is published.

Edge Case 4: Multi-Management Unit Organizations

Large organizations with multiple WFM management units (by region, BPO, or business line) require the monitor to iterate across all management units. Query GET /api/v2/workforcemanagement/managementunits to discover all MUs and parallelize the evaluation across them. Each MU has independent schedules, activity code mappings, and supervisor assignments - ensure your supervisor routing logic maps agent → supervisor by MU, not globally.


Official References