How to Get Real-Time Queue Observation Data via the Genesys Cloud Statistics API
What You Will Build
- You will build a Python script that retrieves real-time queue metrics, specifically the number of waiting interactions and available agents, for a specified Genesys Cloud organization.
- This tutorial uses the Genesys Cloud CX Statistics API v2 endpoint
/api/v2/analytics/queues/details/queryto fetch aggregated queue data. - The implementation is written in Python 3.9+ using the
requestslibrary for HTTP communication and standard JSON parsing.
Prerequisites
Before running the code, ensure you have the following credentials and environment setup:
- OAuth Client Credentials: You need a Genesys Cloud OAuth client with the
publicorconfidentialflow capability. For server-side scripts, the Client Credentials Grant flow is recommended as it requires no user interaction. - Required OAuth Scopes: The client must have the
analytics:queue:readscope. Without this scope, the API will return a 403 Forbidden error. - Python Version: Python 3.9 or higher.
- Dependencies: The
requestslibrary. Install it via pip:pip install requests
Authentication Setup
Genesys Cloud APIs require a valid OAuth 2.0 bearer token for every request. For backend services and scripts, the Client Credentials Grant is the most robust method because it does not expire as quickly as user tokens and does not require a human to log in.
The following function handles the token acquisition. It sends a POST request to the Genesys Cloud authorization server. Note that the token endpoint is https://login.mypurecloud.com/oauth/token for US regions. Adjust the domain for other regions (e.g., login.eu.mypurecloud.com for Europe).
import requests
import time
import json
from typing import Optional, Dict, Any
# Configuration constants
GENESYS_DOMAIN = "https://api.mypurecloud.com"
AUTH_DOMAIN = "https://login.mypurecloud.com"
CLIENT_ID = "your_client_id_here"
CLIENT_SECRET = "your_client_secret_here"
SCOPES = ["analytics:queue:read"]
def get_access_token() -> str:
"""
Retrieves an OAuth2 access token using the Client Credentials flow.
Returns the token string. Raises an exception if authentication fails.
"""
token_url = f"{AUTH_DOMAIN}/oauth/token"
# The client credentials grant requires a specific body format
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": " ".join(SCOPES)
}
response = requests.post(token_url, data=payload)
if response.status_code != 200:
raise Exception(f"Authentication failed with status {response.status_code}: {response.text}")
token_data = response.json()
return token_data["access_token"]
def get_headers() -> Dict[str, str]:
"""
Returns the standard headers required for Genesys Cloud API calls.
Includes the Bearer token and application/json content type.
"""
token = get_access_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
Note on Token Caching: In a production application, you should not call get_access_token() on every API request. Tokens are valid for one hour. Implement a simple cache that checks if the current time is within the token’s expiration window (expires_in) before requesting a new one. For this tutorial, we assume a single-run script where a fresh token is acceptable.
Implementation
Step 1: Constructing the Queue Details Query
The Genesys Cloud Statistics API uses a query-based model. Instead of simple GET parameters, you send a JSON body describing what data you want. For real-time data, you must specify an intervalType of realtime.
The endpoint is POST /api/v2/analytics/queues/details/query.
Key parameters in the request body:
intervalType: Set torealtimeto get current snapshot data.groupBy: Set to["queueId"]to aggregate data per queue.view: Set toa(Averaged) orr(Raw). For real-time counts,ais typically sufficient.select: This is the most critical part. You must explicitly list the metrics you want. Common metrics for this use case are:waiting: The number of interactions currently waiting in the queue.agentsAvailable: The number of agents currently available to take interactions.agentsBusy: The number of agents currently on a call or interaction.
where: Optional filters. You can filter by specific queue IDs if you do not want data for all queues.
def build_queue_query(queue_ids: Optional[list] = None) -> Dict[str, Any]:
"""
Constructs the JSON body for the queue details query.
Args:
queue_ids: Optional list of queue IDs to filter. If None, returns data for all queues.
Returns:
A dictionary representing the query payload.
"""
query_body = {
"intervalType": "realtime",
"groupBy": ["queueId"],
"view": "a",
"select": [
"waiting",
"agentsAvailable",
"agentsBusy",
"talkTime",
"holdTime",
"wrapupTime"
],
"metrics": []
}
# Apply filter if specific queues are requested
if queue_ids:
query_body["where"] = [
{
"dimension": "queueId",
"operator": "in",
"values": queue_ids
}
]
return query_body
Step 2: Executing the API Request
Now we combine the authentication headers and the query body to make the actual API call. We use requests.post because this endpoint accepts a JSON body.
We must handle potential HTTP errors explicitly:
- 401 Unauthorized: Invalid or expired token.
- 403 Forbidden: Missing
analytics:queue:readscope. - 422 Unprocessable Entity: Invalid query structure (e.g., missing
intervalType). - 429 Too Many Requests: Rate limiting. The response headers will contain
Retry-After.
def fetch_realtime_queue_data(queue_ids: Optional[list] = None) -> Dict[str, Any]:
"""
Fetches real-time queue statistics from Genesys Cloud.
Args:
queue_ids: Optional list of queue IDs.
Returns:
The JSON response from the API.
"""
endpoint = f"{GENESYS_DOMAIN}/api/v2/analytics/queues/details/query"
headers = get_headers()
payload = build_queue_query(queue_ids)
try:
response = requests.post(endpoint, json=payload, headers=headers)
# Check for success
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
error_text = e.response.text
if status_code == 401:
raise Exception("Authentication failed. Check Client ID, Secret, and Token validity.") from e
elif status_code == 403:
raise Exception("Forbidden. Ensure the OAuth client has the 'analytics:queue:read' scope.") from e
elif status_code == 422:
raise Exception(f"Bad Request. The query structure is invalid. Response: {error_text}") from e
elif status_code == 429:
retry_after = e.response.headers.get("Retry-After", 1)
raise Exception(f"Rate limited. Please wait {retry_after} seconds before retrying.") from e
else:
raise Exception(f"HTTP Error {status_code}: {error_text}") from e
except requests.exceptions.RequestException as e:
raise Exception(f"Network error occurred: {str(e)}") from e
Step 3: Processing the Results
The response from the Statistics API is nested. The data resides in the entities array. Each entity represents a row of aggregated data. Since we grouped by queueId, each entity corresponds to one queue.
The metrics are located inside the metrics object within each entity. The values are often returned as integers or floats depending on the metric type.
def process_queue_response(response_data: Dict[str, Any]) -> list:
"""
Parses the raw API response into a cleaner list of dictionaries.
Args:
response_data: The JSON response from fetch_realtime_queue_data.
Returns:
A list of dictionaries, each containing queueId, waiting, and agentsAvailable.
"""
results = []
# The API returns entities in a nested structure
entities = response_data.get("entities", [])
for entity in entities:
queue_id = entity.get("queueId")
metrics = entity.get("metrics", {})
# Extract specific metrics, providing defaults if missing
waiting_count = metrics.get("waiting", 0)
agents_available = metrics.get("agentsAvailable", 0)
agents_busy = metrics.get("agentsBusy", 0)
# Format the output
queue_status = {
"queueId": queue_id,
"waiting": waiting_count,
"agentsAvailable": agents_available,
"agentsBusy": agents_busy
}
results.append(queue_status)
return results
Complete Working Example
The following script combines all previous steps into a single runnable module. It fetches real-time data for all queues in the organization and prints a summary.
import requests
import json
import sys
from typing import Optional, Dict, Any, List
# --- Configuration ---
GENESYS_DOMAIN = "https://api.mypurecloud.com"
AUTH_DOMAIN = "https://login.mypurecloud.com"
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
SCOPES = ["analytics:queue:read"]
# --- Authentication Module ---
def get_access_token() -> str:
"""Retrieves an OAuth2 access token using Client Credentials flow."""
token_url = f"{AUTH_DOMAIN}/oauth/token"
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": " ".join(SCOPES)
}
try:
response = requests.post(token_url, data=payload)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
except requests.exceptions.HTTPError as e:
print(f"Authentication Error: {e.response.status_code} - {e.response.text}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Unexpected error during auth: {e}", file=sys.stderr)
sys.exit(1)
def get_headers() -> Dict[str, str]:
"""Returns headers with Bearer token."""
token = get_access_token()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# --- Query Builder Module ---
def build_queue_query(queue_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Constructs the query body for real-time queue statistics."""
query_body = {
"intervalType": "realtime",
"groupBy": ["queueId"],
"view": "a",
"select": [
"waiting",
"agentsAvailable",
"agentsBusy"
]
}
if queue_ids:
query_body["where"] = [
{
"dimension": "queueId",
"operator": "in",
"values": queue_ids
}
]
return query_body
# --- API Execution Module ---
def fetch_realtime_queue_data(queue_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Fetches real-time queue data from Genesys Cloud."""
endpoint = f"{GENESYS_DOMAIN}/api/v2/analytics/queues/details/query"
headers = get_headers()
payload = build_queue_query(queue_ids)
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 429:
retry_after = e.response.headers.get("Retry-After", "unknown")
print(f"Rate Limited (429). Retry after {retry_after}s.", file=sys.stderr)
elif status_code == 403:
print("Forbidden (403). Check OAuth scopes.", file=sys.stderr)
else:
print(f"HTTP Error {status_code}: {e.response.text}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Request failed: {e}", file=sys.stderr)
sys.exit(1)
# --- Data Processing Module ---
def display_queue_status(data: Dict[str, Any]) -> None:
"""Parses and prints the queue status."""
entities = data.get("entities", [])
if not entities:
print("No queue data returned. Ensure queues exist and data is available.")
return
print(f"{'Queue ID':<35} | {'Waiting':<10} | {'Avail. Agents':<15} | {'Busy Agents':<12}")
print("-" * 80)
for entity in entities:
queue_id = entity.get("queueId", "Unknown")
metrics = entity.get("metrics", {})
waiting = metrics.get("waiting", 0)
avail = metrics.get("agentsAvailable", 0)
busy = metrics.get("agentsBusy", 0)
# Truncate queue ID for display if it is long
display_id = queue_id[:35]
print(f"{display_id:<35} | {waiting:<10} | {avail:<15} | {busy:<12}")
# --- Main Execution ---
def main():
"""Main entry point."""
print("Fetching real-time queue statistics...")
# Optional: Filter by specific queue IDs
# specific_queues = ["queue-id-1", "queue-id-2"]
specific_queues = None
try:
raw_data = fetch_realtime_queue_data(specific_queues)
display_queue_status(raw_data)
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth client used to generate the token does not have the analytics:queue:read scope assigned.
Fix:
- Log in to the Genesys Cloud Admin portal.
- Navigate to Admin > Integrations > OAuth.
- Select your client.
- Under Scopes, ensure
analytics:queue:readis checked. - Regenerate the token.
Error: 422 Unprocessable Entity
Cause: The JSON body sent to the analytics query endpoint is malformed. Common issues include:
- Missing
intervalType. - Invalid
groupByvalues. - Requesting a metric that is not supported for the
realtimeview.
Fix: Validate your JSON payload against the API specification. EnsureintervalTypeis exactly"realtime".
Error: 429 Too Many Requests
Cause: You have exceeded the rate limit for the Statistics API. Genesys Cloud enforces strict rate limits to protect data integrity.
Fix: Implement exponential backoff. Check the Retry-After header in the response. Do not retry immediately.
# Example retry logic snippet
import time
def fetch_with_retry(endpoint, headers, payload, max_retries=3):
for attempt in range(max_retries):
response = requests.post(endpoint, json=payload, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
return response
raise Exception("Max retries exceeded")
Error: Empty Entities List
Cause: The query executed successfully, but no data was returned.
Fix:
- Verify that the queues specified in the
whereclause actually exist. - Ensure that the queues are currently active and have had recent activity. Real-time data is only available for queues that have processed interactions in the recent window.
- Check if you are querying across regions. If your queues are in the EU region, ensure your API domain is
api.eu.mypurecloud.com.