How to get real-time queue observation data (waiting count, agents available) via the Statistics API
What You Will Build
- You will build a script that queries the Genesys Cloud CX API to retrieve the current number of waiting interactions and available agents for a specific queue.
- This solution uses the Genesys Cloud CX Statistics API endpoint
/api/v2/analytics/queues/details/query. - The tutorial covers implementation in Python using the official
genesys-cloud-purecloud-platform-clientSDK and raw HTTP requests usinghttpx.
Prerequisites
Before writing code, ensure you have the following components ready:
- Genesys Cloud CX Tenant Access: You need a tenant URL (e.g.,
https://mycompany.mypurecloud.com) and API credentials. - OAuth Client Credentials:
- Client ID and Client Secret.
- The client must be configured with the
client_credentialsgrant type. - Required Scopes:
analytics:queues:readandanalytics:conversations:read. Without these, the API will return a 403 Forbidden error.
- Python Environment:
- Python 3.8 or higher.
- Install the official SDK:
pip install genesys-cloud-purecloud-platform-client. - Install HTTP library for raw examples:
pip install httpx.
- Queue ID: You need the specific ID of the queue you wish to monitor. You can find this via the Admin UI or by querying
/api/v2/queues.
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 for authentication. For server-to-server integrations like this one, the Client Credentials flow is the standard approach.
The following Python function handles token acquisition and caching. It checks for an existing valid token before making a new request to avoid unnecessary network overhead and rate limits.
import time
import httpx
from typing import Optional
class GenesysAuthManager:
def __init__(self, tenant_url: str, client_id: str, client_secret: str):
self.tenant_url = tenant_url.rstrip('/')
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expiry: float = 0
self.base_auth_url = f"{self.tenant_url}/oauth/token"
def get_access_token(self) -> str:
"""
Returns a valid OAuth access token.
Refreshes automatically if the current token is expired.
"""
if self.token and time.time() < self.token_expiry:
return self.token
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = httpx.post(self.base_auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
# Set expiry slightly before actual expiry to allow for network latency
self.token_expiry = time.time() + token_data["expires_in"] - 10
return self.token
except httpx.HTTPStatusError as e:
raise RuntimeError(f"Failed to obtain OAuth token: {e.response.text}") from e
except Exception as e:
raise RuntimeError(f"Unexpected error during authentication: {str(e)}") from e
def get_auth_header(self) -> dict:
"""Returns the Authorization header dict for API calls."""
token = self.get_access_token()
return {"Authorization": f"Bearer {token}"}
Implementation
Step 1: Constructing the Query Request Body
The Statistics API does not provide a simple “get current status” endpoint with a single ID parameter. Instead, it uses a query-based model similar to Elasticsearch. You must construct a JSON body that defines the interval (time range) and the filter (which queue to look at).
For real-time observation, we define a short time interval (e.g., the last 10 minutes) and group the results by queueId.
Key Parameters in the Request Body:
interval: Defines the time window. UseR10/PT10Mto get the last 10 minutes aggregated into 1-minute buckets, orT10for the last 10 minutes as a single bucket. For a simple “current state” snapshot, a 1-minute interval ending now is sufficient.groupBy: Must includequeueIdto isolate the specific queue.metrics: Specifies which data points to return.waitingCount: Number of interactions waiting.agentCount: Total agents logged in to the queue.availableCount: Agents ready to take a call.wrapupCount: Agents in wrap-up state.
filter: Restricts the query to specific entities.
Here is the structure of the query body:
{
"interval": "R1/PT1M",
"groupBy": [
"queueId"
],
"metrics": [
"waitingCount",
"agentCount",
"availableCount",
"wrapupCount"
],
"filter": {
"type": "queueIds",
"ids": [
"YOUR_QUEUE_ID_HERE"
]
}
}
Step 2: Executing the Query via Raw HTTP (httpx)
Using httpx provides visibility into the exact HTTP request and response. This approach is useful for debugging or if you prefer not to install the full SDK.
import httpx
import json
from datetime import datetime, timedelta
def get_queue_stats_raw(auth_manager: GenesysAuthManager, queue_id: str) -> dict:
"""
Fetches real-time queue statistics using raw HTTP requests.
"""
# Define the endpoint
endpoint = f"{auth_manager.tenant_url}/api/v2/analytics/queues/details/query"
# Define the query body
# We look at the last 1 minute to get the most recent snapshot
query_body = {
"interval": "R1/PT1M",
"groupBy": ["queueId"],
"metrics": ["waitingCount", "agentCount", "availableCount", "wrapupCount"],
"filter": {
"type": "queueIds",
"ids": [queue_id]
}
}
headers = {
"Content-Type": "application/json",
**auth_manager.get_auth_header()
}
try:
# Use timeout to prevent hanging on slow network conditions
response = httpx.post(
endpoint,
json=query_body,
headers=headers,
timeout=10.0
)
# Handle HTTP Errors
if response.status_code == 401:
raise PermissionError("Invalid or expired OAuth token.")
elif response.status_code == 403:
raise PermissionError("Insufficient scopes. Ensure 'analytics:queues:read' is granted.")
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After", 5)
raise RuntimeError(f"Rate limited. Please wait {retry_after} seconds.")
else:
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise RuntimeError(f"Network error occurred: {str(e)}") from e
def parse_raw_response(response_data: dict, queue_id: str) -> dict:
"""
Parses the JSON response to extract specific metrics for the queue.
"""
if "data" not in response_data or not response_data["data"]:
return {
"waitingCount": 0,
"agentCount": 0,
"availableCount": 0,
"wrapupCount": 0,
"status": "No data available"
}
# The API returns a list of buckets. We want the most recent one.
buckets = response_data["data"]
# Find the bucket corresponding to our queue ID
queue_bucket = next((b for b in buckets if b.get("groupBy") and b["groupBy"].get("queueId") == queue_id), None)
if not queue_bucket:
return {
"waitingCount": 0,
"agentCount": 0,
"availableCount": 0,
"wrapupCount": 0,
"status": "Queue not found in response"
}
# Extract metrics from the first interval in the bucket
# The API returns intervals in chronological order, usually.
# We take the last one for the "current" state.
intervals = queue_bucket.get("intervals", [])
if not intervals:
return {"status": "No intervals found"}
latest_interval = intervals[-1]
metrics = latest_interval.get("metrics", {})
return {
"waitingCount": metrics.get("waitingCount", {}).get("total", 0),
"agentCount": metrics.get("agentCount", {}).get("total", 0),
"availableCount": metrics.get("availableCount", {}).get("total", 0),
"wrapupCount": metrics.get("wrapupCount", {}).get("total", 0),
"status": "Success"
}
Step 3: Executing the Query via Official SDK
The official Python SDK (genesys-cloud-purecloud-platform-client) abstracts the HTTP details and provides type-safe objects. This is the recommended approach for production applications.
First, you must initialize the platform client.
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
QueuesApi,
QueueDetailsQueryRequest
)
from purecloudplatformclientv2.rest import ApiException
def get_queue_stats_sdk(queue_id: str, tenant_url: str, client_id: str, client_secret: str) -> dict:
"""
Fetches queue statistics using the official Genesys Cloud Python SDK.
"""
# 1. Configure the API Client
config = Configuration(
host=tenant_url,
client_id=client_id,
client_secret=client_secret
)
with ApiClient(config) as api_client:
queues_api = QueuesApi(api_client)
# 2. Construct the Query Request Object
# The SDK maps directly to the JSON structure described in Step 1
# Define the filter for the specific queue
filter_obj = {
"type": "queueIds",
"ids": [queue_id]
}
query_request = QueueDetailsQueryRequest(
interval="R1/PT1M",
group_by=["queueId"],
metrics=["waitingCount", "agentCount", "availableCount", "wrapupCount"],
filter=filter_obj
)
try:
# 3. Execute the Query
# Note: The SDK method name is typically post_analytics_queues_details_query
response = queues_api.post_analytics_queues_details_query(body=query_request)
# 4. Parse the Response
if not response.data:
return {"status": "No data", "waitingCount": 0}
# The SDK response object mirrors the JSON structure
for bucket in response.data:
if bucket.group_by and bucket.group_by.queue_id == queue_id:
intervals = bucket.intervals
if intervals:
latest = intervals[-1]
metrics = latest.metrics
# Helper to safely get metric totals
get_total = lambda m: m.total if m and m.total else 0
return {
"waitingCount": get_total(metrics.waiting_count),
"agentCount": get_total(metrics.agent_count),
"availableCount": get_total(metrics.available_count),
"wrapupCount": get_total(metrics.wrapup_count),
"status": "Success"
}
return {"status": "Queue ID mismatch in response"}
except ApiException as e:
if e.status == 401:
raise PermissionError("Authentication failed. Check Client ID/Secret.")
elif e.status == 403:
raise PermissionError("Forbidden. Check OAuth Scopes.")
elif e.status == 429:
raise RuntimeError("Rate limit exceeded.")
else:
raise RuntimeError(f"API Error {e.status}: {e.body}") from e
Complete Working Example
This script combines authentication, SDK usage, and error handling into a single runnable module. Replace the placeholder values with your actual credentials.
import sys
import os
import time
from purecloudplatformclientv2 import (
ApiClient,
Configuration,
QueuesApi,
QueueDetailsQueryRequest
)
from purecloudplatformclientv2.rest import ApiException
# Configuration
TENANT_URL = "https://mycompany.mypurecloud.com"
CLIENT_ID = "your_client_id_here"
CLIENT_SECRET = "your_client_secret_here"
QUEUE_ID = "your_queue_id_here"
def get_queue_status():
"""Main function to retrieve and print queue status."""
# Initialize Configuration
config = Configuration(
host=TENANT_URL,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)
# Use context manager to ensure API client is properly closed
with ApiClient(config) as api_client:
queues_api = QueuesApi(api_client)
# Define the query parameters
# Interval: R1/PT1M means "Repeat 1 time, every 1 minute"
# This gives us the most recent 1-minute bucket.
query_body = QueueDetailsQueryRequest(
interval="R1/PT1M",
group_by=["queueId"],
metrics=[
"waitingCount",
"agentCount",
"availableCount",
"wrapupCount",
"handledCount" # Optional: to see volume
],
filter={
"type": "queueIds",
"ids": [QUEUE_ID]
}
)
try:
print(f"Querying stats for Queue: {QUEUE_ID}...")
# Execute the API call
response = queues_api.post_analytics_queues_details_query(body=query_body)
if not response.data:
print("No data returned. The queue may have no activity in the last minute.")
return
# Parse the response
# The response contains a list of buckets, one per groupBy entity (queue)
for bucket in response.data:
if bucket.group_by and bucket.group_by.queue_id == QUEUE_ID:
if not bucket.intervals:
print("No intervals found for this queue.")
return
# Get the most recent interval
latest_interval = bucket.intervals[-1]
metrics = latest_interval.metrics
# Extract values safely
waiting = metrics.waiting_count.total if metrics.waiting_count else 0
agents = metrics.agent_count.total if metrics.agent_count else 0
available = metrics.available_count.total if metrics.available_count else 0
wrapup = metrics.wrapup_count.total if metrics.wrapup_count else 0
print("\n--- Queue Status Report ---")
print(f"Queue ID: {QUEUE_ID}")
print(f"Timestamp: {latest_interval.from_} to {latest_interval.to_}")
print(f"Waiting Interactions: {waiting}")
print(f"Total Agents Logged In: {agents}")
print(f"Available Agents: {available}")
print(f"Agents in Wrap-Up: {wrapup}")
print("-------------------------\n")
return {
"waiting": waiting,
"available": available
}
print(f"Queue ID {QUEUE_ID} not found in response data.")
except ApiException as e:
print(f"Error: {e.status}")
print(f"Reason: {e.reason}")
print(f"Body: {e.body}")
sys.exit(1)
if __name__ == "__main__":
get_queue_status()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth token does not have the required scopes.
Fix: Ensure your OAuth client in the Genesys Cloud Admin Console has the analytics:queues:read scope assigned. If you are using a user-based token, ensure the user has the Analytics role or a custom role with this permission.
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Statistics API. Genesys Cloud enforces strict rate limits on analytics endpoints because they are computationally expensive.
Fix: Implement exponential backoff. Do not poll this endpoint every second. For real-time monitoring, a polling interval of 15-30 seconds is recommended. If you need higher frequency, consider using Webhooks for queue events instead of polling.
import time
def poll_with_backoff(func, max_retries=5):
for attempt in range(max_retries):
try:
return func()
except RuntimeError as e:
if "429" in str(e):
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise RuntimeError("Max retries exceeded due to rate limiting.")
Error: Empty Data or Zero Counts
Cause: The interval parameter is too short, or there is no activity in the queue during that window.
Fix: The Statistics API aggregates data. If you query R1/PT1M (last 1 minute) and no calls arrived in that minute, waitingCount might be 0 even if there are currently waiting calls, depending on how the aggregation window aligns with the current second.
Recommendation: Use R1/PT5M (last 5 minutes) for a more stable view, or combine this with the /api/v2/conversations/queues endpoint if you need strictly real-time conversation counts rather than historical statistics. Note that /api/v2/conversations/queues provides the current count of waiting interactions instantly, while the Statistics API provides aggregated metrics over time.
If you need the absolute current number of waiting calls right now, the Statistics API is not the best tool. Use the Conversations API instead:
# Alternative for strictly real-time waiting count
from purecloudplatformclientv2 import ConversationsApi
# ... inside ApiClient context ...
conversations_api = ConversationsApi(api_client)
# This endpoint returns the current state of the queue
queue_status = conversations_api.get_conversations_queues(queue_id=QUEUE_ID)
# queue_status.waiting_count gives the exact current number