Querying Genesys Cloud Analytics: Grouping Conversations by Queue and Media Type
What You Will Build
- A script that queries the Genesys Cloud Analytics API to retrieve aggregated conversation metrics.
- The query groups results by
queue.idandmediaTypeto identify volume distribution across channels. - The implementation uses Python with the
requestslibrary for direct HTTP interaction.
Prerequisites
- OAuth Client: A Genesys Cloud OAuth 2.0 client with the following scopes:
analytics:query:readanalytics:conversation:view
- SDK/Library: Python 3.8+ with
requestsandpython-dateutil. - Environment Variables:
GENESYS_REGION: The API region (e.g.,mypurecloud.com,usw2.pure.cloud).GENESYS_CLIENT_ID: Your OAuth Client ID.GENESYS_CLIENT_SECRET: Your OAuth Client Secret.
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials grant for server-to-server integrations. You must obtain an access token before making any API calls. The token expires after 3600 seconds, so production code should implement token caching or refresh logic. For this tutorial, we will fetch a fresh token at runtime.
import requests
import os
from datetime import datetime, timezone
import json
def get_access_token() -> str:
"""
Authenticates with Genesys Cloud OAuth and returns an access token.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Construct the OAuth token endpoint
token_url = f"https://{region}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, headers=headers, data=data)
if response.status_code != 200:
raise Exception(f"Authentication failed: {response.status_code} - {response.text}")
token_data = response.json()
return token_data["access_token"]
# Fetch token
access_token = get_access_token()
print("Authentication successful.")
Implementation
Step 1: Constructing the Analytics Query Body
The core of this tutorial is constructing the JSON payload for POST /api/v2/analytics/conversations/details/query. This endpoint allows complex aggregations. To group by queue and media type, we must define groupBys and specify the metrics we want to aggregate.
Key parameters:
dateFrom/dateTo: The time window for the query. Use ISO 8601 format.groupBys: An array of strings defining the dimensions to group by. We usequeue.idandmediaType.metrics: The values to calculate (e.g.,conversationCount,handledCount).filter: Optional filters to narrow down the data (e.g., specific queues or media types).
def build_analytics_query() -> dict:
"""
Constructs the analytics query body for grouping by queue and media type.
"""
# Define the time range: Last 24 hours
now = datetime.now(timezone.utc)
date_to = now.strftime("%Y-%m-%dT%H:%M:%SZ")
date_from = (now - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
# Required: Import timedelta
from datetime import timedelta
query_body = {
"dateFrom": date_from,
"dateTo": date_to,
"interval": "P1D", # Aggregate by day, though we are grouping by dimension
"groupBys": [
"queue.id",
"mediaType"
],
"metrics": [
"conversationCount",
"handledCount",
"abandonedCount",
"averageHandleTime"
],
"filter": {
"type": "and",
"clauses": [
{
"type": "dimension",
"dimension": "mediaType",
"operator": "eq",
"value": ["voice", "chat"] # Optional: Limit to voice and chat
}
]
},
"pageSize": 100,
"pageToken": None
}
return query_body
Step 2: Executing the Query and Handling Pagination
The Analytics API returns paginated results. You must check the nextPageToken in the response to fetch subsequent pages until all data is retrieved. If the query returns no results, the entities array will be empty.
import time
def fetch_analytics_data(access_token: str, query_body: dict) -> list:
"""
Sends the analytics query to Genesys Cloud and handles pagination.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
api_url = f"https://{region}/api/v2/analytics/conversations/details/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
all_results = []
page_token = None
max_retries = 3
while True:
# Update pageToken in body if it exists
if page_token:
query_body["pageToken"] = page_token
else:
query_body["pageToken"] = None
for attempt in range(max_retries):
try:
response = requests.post(api_url, headers=headers, json=query_body)
# Handle Rate Limiting (429)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
# Handle Authentication/Authorization Errors
if response.status_code in [401, 403]:
raise Exception(f"Auth Error: {response.status_code} - {response.text}")
# Handle Server Errors
if response.status_code >= 500:
raise Exception(f"Server Error: {response.status_code} - {response.text}")
# Success
break
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
raise
data = response.json()
# Append entities from this page
if "entities" in data and len(data["entities"]) > 0:
all_results.extend(data["entities"])
# Check for next page
page_token = data.get("nextPageToken")
if not page_token:
break
# Small delay to respect rate limits between pages
time.sleep(1)
return all_results
Step 3: Processing and Formatting Results
The response contains an array of entities. Each entity represents a unique combination of the groupBys dimensions (Queue ID and Media Type). You need to map the queue IDs to human-readable names for useful reporting.
First, you may want to fetch the queue names to resolve the IDs.
def get_queue_names(access_token: str, queue_ids: set) -> dict:
"""
Fetches queue details to map IDs to names.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
api_url = f"https://{region}/api/v2/queues"
headers = {
"Authorization": f"Bearer {access_token}"
}
queue_map = {}
page_token = None
while True:
params = {
"pageSize": 100,
"pageToken": page_token if page_token else ""
}
response = requests.get(api_url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Failed to fetch queues: {response.status_code}")
data = response.json()
for queue in data.get("entities", []):
if queue["id"] in queue_ids:
queue_map[queue["id"]] = queue["name"]
page_token = data.get("nextPageToken")
if not page_token:
break
return queue_map
Now, combine the analytics data with the queue names.
def process_results(analytics_data: list, queue_map: dict) -> list:
"""
Formats the raw analytics entities into a readable structure.
"""
formatted_results = []
for entity in analytics_data:
# Extract dimensions
queue_id = entity.get("dimensions", {}).get("queue.id", {}).get("value", "Unknown")
media_type = entity.get("dimensions", {}).get("mediaType", {}).get("value", "Unknown")
# Resolve queue name
queue_name = queue_map.get(queue_id, queue_id)
# Extract metrics
metrics = entity.get("metrics", {})
formatted_results.append({
"Queue Name": queue_name,
"Queue ID": queue_id,
"Media Type": media_type,
"Conversation Count": metrics.get("conversationCount", {}).get("value", 0),
"Handled Count": metrics.get("handledCount", {}).get("value", 0),
"Abandoned Count": metrics.get("abandonedCount", {}).get("value", 0),
"Average Handle Time (seconds)": metrics.get("averageHandleTime", {}).get("value", 0)
})
return formatted_results
Complete Working Example
Below is the complete, runnable script. Save this as analytics_queue_media.py.
import requests
import os
import json
import time
from datetime import datetime, timezone, timedelta
def get_access_token() -> str:
"""
Authenticates with Genesys Cloud OAuth and returns an access token.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
token_url = f"https://{region}/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, headers=headers, data=data)
if response.status_code != 200:
raise Exception(f"Authentication failed: {response.status_code} - {response.text}")
return response.json()["access_token"]
def get_queue_names(access_token: str, queue_ids: set) -> dict:
"""
Fetches queue details to map IDs to names.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
api_url = f"https://{region}/api/v2/queues"
headers = {"Authorization": f"Bearer {access_token}"}
queue_map = {}
page_token = None
while True:
params = {"pageSize": 100, "pageToken": page_token if page_token else ""}
response = requests.get(api_url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Failed to fetch queues: {response.status_code}")
data = response.json()
for queue in data.get("entities", []):
if queue["id"] in queue_ids:
queue_map[queue["id"]] = queue["name"]
page_token = data.get("nextPageToken")
if not page_token:
break
return queue_map
def build_analytics_query() -> dict:
"""
Constructs the analytics query body.
"""
now = datetime.now(timezone.utc)
date_to = now.strftime("%Y-%m-%dT%H:%M:%SZ")
date_from = (now - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
return {
"dateFrom": date_from,
"dateTo": date_to,
"interval": "P1D",
"groupBys": ["queue.id", "mediaType"],
"metrics": [
"conversationCount",
"handledCount",
"abandonedCount",
"averageHandleTime"
],
"filter": {
"type": "and",
"clauses": [
{
"type": "dimension",
"dimension": "mediaType",
"operator": "eq",
"value": ["voice", "chat"]
}
]
},
"pageSize": 100,
"pageToken": None
}
def fetch_analytics_data(access_token: str, query_body: dict) -> list:
"""
Sends the analytics query and handles pagination.
"""
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
api_url = f"https://{region}/api/v2/analytics/conversations/details/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
all_results = []
page_token = None
max_retries = 3
while True:
if page_token:
query_body["pageToken"] = page_token
else:
query_body["pageToken"] = None
for attempt in range(max_retries):
try:
response = requests.post(api_url, headers=headers, json=query_body)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
time.sleep(retry_after)
continue
if response.status_code not in [200, 207]:
raise Exception(f"API Error: {response.status_code} - {response.text}")
break
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
raise
data = response.json()
if "entities" in data and len(data["entities"]) > 0:
all_results.extend(data["entities"])
page_token = data.get("nextPageToken")
if not page_token:
break
time.sleep(1)
return all_results
def process_results(analytics_data: list, queue_map: dict) -> list:
"""
Formats the raw analytics entities.
"""
formatted_results = []
for entity in analytics_data:
dimensions = entity.get("dimensions", {})
queue_id = dimensions.get("queue.id", {}).get("value", "Unknown")
media_type = dimensions.get("mediaType", {}).get("value", "Unknown")
queue_name = queue_map.get(queue_id, queue_id)
metrics = entity.get("metrics", {})
formatted_results.append({
"Queue Name": queue_name,
"Queue ID": queue_id,
"Media Type": media_type,
"Conversation Count": metrics.get("conversationCount", {}).get("value", 0),
"Handled Count": metrics.get("handledCount", {}).get("value", 0),
"Abandoned Count": metrics.get("abandonedCount", {}).get("value", 0),
"Average Handle Time (seconds)": metrics.get("averageHandleTime", {}).get("value", 0)
})
return formatted_results
if __name__ == "__main__":
try:
# 1. Authenticate
token = get_access_token()
# 2. Build Query
query = build_analytics_query()
# 3. Fetch Data
print("Fetching analytics data...")
raw_data = fetch_analytics_data(token, query)
# 4. Extract Queue IDs to resolve names
queue_ids = set()
for entity in raw_data:
q_id = entity.get("dimensions", {}).get("queue.id", {}).get("value")
if q_id:
queue_ids.add(q_id)
# 5. Resolve Queue Names
if queue_ids:
queue_map = get_queue_names(token, queue_ids)
else:
queue_map = {}
# 6. Process and Print Results
results = process_results(raw_data, queue_map)
if not results:
print("No data found for the specified criteria.")
else:
print("\n--- Analytics Report: Queue & Media Type ---")
print(json.dumps(results, indent=2))
except Exception as e:
print(f"Error: {e}")
Common Errors & Debugging
Error: 400 Bad Request - Invalid Filter
Cause: The filter clause uses an incorrect operator or dimension name. For example, using mediaType instead of mediaType (case-sensitive) or using eq with a single value when the API expects an array.
Fix: Ensure the value in the filter is an array, even if it contains only one item.
# Correct
"value": ["voice"]
# Incorrect
"value": "voice"
Error: 429 Too Many Requests
Cause: The Analytics API has strict rate limits, especially for complex aggregation queries. Sending requests too quickly triggers a 429.
Fix: Implement exponential backoff or respect the Retry-After header. The code above includes a basic retry loop with Retry-After handling.
Error: Empty Results
Cause: The time window (dateFrom/dateTo) might not contain any conversations, or the filter is too restrictive (e.g., filtering for a queue ID that does not exist or has no activity).
Fix:
- Verify the time range is recent (last 24 hours).
- Remove the
filterclause temporarily to ensure data exists. - Check that the OAuth client has permissions to view the specific queues.
Error: 403 Forbidden
Cause: The OAuth client lacks the required scopes.
Fix: Ensure the client has analytics:query:read and analytics:conversation:view scopes assigned in the Genesys Cloud Admin portal under Users > OAuth Clients.