Querying Genesys Cloud Analytics: Aggregating Conversations by Queue and Media Type
What You Will Build
- A Python script that queries the Genesys Cloud Analytics API to retrieve conversation metrics aggregated by queue ID and media type.
- The solution uses the
genesys-cloud-pythonSDK and the/api/v2/analytics/conversations/details/queryendpoint. - The tutorial covers Python 3.9+ and demonstrates proper OAuth2 authentication, query construction, and result parsing.
Prerequisites
- OAuth Client Type: Private Key (JWT) or Client Credentials. For this tutorial, we assume a Private Key setup, which is standard for server-to-server integrations.
- Required Scopes:
analytics:conversation:readandanalytics:queue:read. - SDK Version:
genesys-cloud-pythonversion 2.0.0 or higher. - Runtime: Python 3.9 or higher.
- Dependencies:
genesys-cloud-pythonpython-dotenv(for secure credential management)
Install the dependencies using pip:
pip install genesys-cloud-python python-dotenv
Authentication Setup
Genesys Cloud uses OAuth 2.0. The Python SDK handles the token acquisition and refresh cycles automatically if you provide the correct configuration. You must store your private key securely. Never hardcode private keys in source code.
Create a .env file in your project root:
GENESYS_CLOUD_REGION=us-east-1 # Adjust to your region
GENESYS_CLOUD_CLIENT_ID=your-client-id-here
GENESYS_CLOUD_PRIVATE_KEY_PATH=./path/to/your/private.key
Initialize the SDK client in your script:
import os
from dotenv import load_dotenv
from purerecl import PureCloudPlatformClientV2
from purerecl.auth import OAuthClient
# Load environment variables
load_dotenv()
def init_client():
"""
Initializes the Genesys Cloud SDK client.
Raises an exception if authentication fails.
"""
region = os.getenv("GENESYS_CLOUD_REGION")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
private_key_path = os.getenv("GENESYS_CLOUD_PRIVATE_KEY_PATH")
if not all([region, client_id, private_key_path]):
raise ValueError("Missing required environment variables.")
# The SDK uses the default region configuration based on the provided region string
platform_client = PureCloudPlatformClientV2(region)
# Configure the OAuth client
oauth_client = OAuthClient(
client_id=client_id,
private_key_path=private_key_path
)
# Authenticate and attach to the platform client
oauth_client.authenticate()
platform_client.set_oauth_client(oauth_client)
return platform_client
if __name__ == "__main__":
try:
client = init_client()
print("Authentication successful.")
except Exception as e:
print(f"Authentication failed: {e}")
Implementation
Step 1: Constructing the Query Body
The Analytics API requires a specific JSON structure to define the aggregation. The core object is QueryDefinition. To group by queue and media type, you must specify the groupBy array.
Key parameters:
groupBy: An array of strings. Use"queue"for queue-level aggregation and"mediaType"for media type (voice, chat, etc.).timeframe: A string defining the start and end of the query window (ISO 8601 format).interval: Optional. If omitted, the API returns a single aggregated result for the entire timeframe. If provided (e.g.,"PT1H"), it breaks results down by time intervals. For this tutorial, we will omit it to get a single snapshot.select: The metrics you want to retrieve. Common metrics include"totalHandleTime","abandonCount", and"serviceLevel".
Here is the construction of the query body:
from purerecl.models import AnalyticsQueryRequest, QueryDefinition
def build_query_request(start_time: str, end_time: str) -> AnalyticsQueryRequest:
"""
Constructs the AnalyticsQueryRequest object.
Args:
start_time: ISO 8601 start time (e.g., "2023-10-01T00:00:00Z")
end_time: ISO 8601 end time (e.g., "2023-10-02T00:00:00Z")
Returns:
AnalyticsQueryRequest object ready for the API call.
"""
# Define the query definition
query_definition = QueryDefinition(
group_by=["queue", "mediaType"],
timeframe=f"{start_time}/{end_time}",
# Select specific metrics.
# Note: You can also use "*" to select all available metrics,
# but it is best practice to select only what you need.
select=[
"totalHandleTime",
"abandonCount",
"offerCount",
"answeredCount"
]
)
# Wrap in the request object
analytics_request = AnalyticsQueryRequest(
query=query_definition
)
return analytics_request
Step 2: Executing the Query
The analytics_api.query_conversations_details method executes the query. This endpoint supports pagination via the next_page token. Since we are grouping by queue and media type, the result set could be large if you have many queues.
The SDK returns a QueryResponse object. This object contains:
entities: A list ofConversationDetailobjects.next_page: A token for the next page of results, if applicable.total: The total number of entities found.
from purerecl.apis import AnalyticsApi
from purerecl.exceptions import ApiException
def fetch_aggregated_data(client: PureCloudPlatformClientV2, request: AnalyticsQueryRequest):
"""
Fetches aggregated conversation data, handling pagination.
Args:
client: The authenticated PureCloudPlatformClientV2 instance.
request: The AnalyticsQueryRequest object.
Returns:
A list of ConversationDetail objects.
"""
analytics_api = AnalyticsApi(client)
all_results = []
# Initial request
try:
response = analytics_api.query_conversations_details(body=request)
except ApiException as e:
print(f"API Error: {e.status} - {e.reason}")
if e.status == 401:
print("Check your OAuth token or credentials.")
elif e.status == 403:
print("Check your OAuth scopes. You need 'analytics:conversation:read'.")
elif e.status == 429:
print("Rate limited. Implement exponential backoff.")
raise
# Process the first page
if response.entities:
all_results.extend(response.entities)
# Handle pagination
next_page = response.next_page
while next_page:
try:
# Use the next_page token in the request
response = analytics_api.query_conversations_details(body=request, next_page=next_page)
if response.entities:
all_results.extend(response.entities)
next_page = response.next_page
except ApiException as e:
print(f"Pagination Error: {e.status} - {e.reason}")
break
return all_results
Step 3: Processing the Results
The ConversationDetail object returned by the API contains the aggregated metrics. Because we grouped by queue and mediaType, each entity in the entities list represents a unique combination of a queue and a media type.
Key fields in ConversationDetail:
queue: Containsidandnameof the queue.mediaType: The string identifier for the media type (e.g.,"voice","chat","sms").totalHandleTime: Total handle time in milliseconds.abandonCount: Number of abandoned conversations.
from purerecl.models import ConversationDetail
def process_results(conversation_details: list[ConversationDetail]):
"""
Iterates through the results and prints a formatted summary.
Args:
conversation_details: List of ConversationDetail objects.
"""
print(f"{'Queue Name':<20} | {'Media Type':<10} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (ms)':<15}")
print("-" * 70)
for detail in conversation_details:
queue_name = detail.queue.name if detail.queue else "Unknown Queue"
media_type = detail.media_type if detail.media_type else "Unknown"
# Calculate metrics safely
answered = detail.answered_count if detail.answered_count else 0
abandoned = detail.abandon_count if detail.abandon_count else 0
total_handle = detail.total_handle_time if detail.total_handle_time else 0
# Calculate average handle time only if there were answered conversations
avg_handle = (total_handle / answered) if answered > 0 else 0
print(f"{queue_name:<20} | {media_type:<10} | {answered:<10} | {abandoned:<10} | {avg_handle:<15.2f}")
Complete Working Example
Below is the full, runnable script. Replace the placeholder values in the .env file with your actual credentials.
import os
import sys
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
# Import Genesys Cloud SDK components
from purerecl import PureCloudPlatformClientV2
from purerecl.auth import OAuthClient
from purerecl.apis import AnalyticsApi
from purerecl.models import AnalyticsQueryRequest, QueryDefinition
from purerecl.exceptions import ApiException
def load_config():
"""Loads environment variables from .env file."""
load_dotenv()
config = {
"region": os.getenv("GENESYS_CLOUD_REGION"),
"client_id": os.getenv("GENESYS_CLOUD_CLIENT_ID"),
"private_key_path": os.getenv("GENESYS_CLOUD_PRIVATE_KEY_PATH")
}
for key, value in config.items():
if not value:
raise ValueError(f"Missing required config: {key}")
return config
def init_client(config: dict) -> PureCloudPlatformClientV2:
"""Initializes and authenticates the SDK client."""
platform_client = PureCloudPlatformClientV2(config["region"])
oauth_client = OAuthClient(
client_id=config["client_id"],
private_key_path=config["private_key_path"]
)
oauth_client.authenticate()
platform_client.set_oauth_client(oauth_client)
return platform_client
def build_query() -> AnalyticsQueryRequest:
"""Builds the analytics query for the last 7 days."""
now = datetime.now(timezone.utc)
start_time = now - timedelta(days=7)
# Format to ISO 8601
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_str = now.strftime("%Y-%m-%dT%H:%M:%SZ")
query_definition = QueryDefinition(
group_by=["queue", "mediaType"],
timeframe=f"{start_str}/{end_str}",
select=[
"totalHandleTime",
"abandonCount",
"offerCount",
"answeredCount"
]
)
return AnalyticsQueryRequest(query=query_definition)
def fetch_data(client: PureCloudPlatformClientV2, request: AnalyticsQueryRequest) -> list:
"""Fetches data with pagination support."""
analytics_api = AnalyticsApi(client)
all_results = []
next_page = None
while True:
try:
if next_page:
response = analytics_api.query_conversations_details(body=request, next_page=next_page)
else:
response = analytics_api.query_conversations_details(body=request)
if response.entities:
all_results.extend(response.entities)
next_page = response.next_page
if not next_page:
break
except ApiException as e:
print(f"Error fetching data: {e.status} - {e.reason}")
if e.status == 403:
print("Ensure 'analytics:conversation:read' scope is granted.")
sys.exit(1)
return all_results
def print_results(results: list):
"""Prints formatted results."""
if not results:
print("No data found for the selected timeframe.")
return
print(f"{'Queue Name':<25} | {'Media Type':<10} | {'Handled':<10} | {'Abandoned':<10} | {'Avg Handle (ms)':<15}")
print("-" * 80)
for detail in results:
queue_name = detail.queue.name if detail.queue else "Unassigned"
media_type = detail.media_type if detail.media_type else "Unknown"
answered = detail.answered_count if detail.answered_count else 0
abandoned = detail.abandon_count if detail.abandon_count else 0
total_handle = detail.total_handle_time if detail.total_handle_time else 0
avg_handle = (total_handle / answered) if answered > 0 else 0
# Truncate queue name if too long
if len(queue_name) > 24:
queue_name = queue_name[:21] + "..."
print(f"{queue_name:<25} | {media_type:<10} | {answered:<10} | {abandoned:<10} | {avg_handle:<15.2f}")
if __name__ == "__main__":
try:
config = load_config()
client = init_client(config)
print("Authentication successful.")
request = build_query()
print(f"Querying analytics for last 7 days...")
results = fetch_data(client, request)
print_results(results)
except Exception as e:
print(f"Fatal error: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth token does not have the required scopes.
Fix: Ensure your OAuth client in the Genesys Cloud Admin Console has the analytics:conversation:read scope. If you are using a Private Key client, go to Admin > Security > OAuth Clients, select your client, and add the scope.
Code Check:
# In your OAuthClient initialization, scopes are granted by the client configuration,
# not the code itself. However, you can verify the token info if needed.
token_info = oauth_client.get_token_info()
print(token_info.scopes) # Should include 'analytics:conversation:read'
Error: 429 Too Many Requests
Cause: You have exceeded the API rate limit. Analytics queries are heavy and have stricter limits than standard CRUD operations.
Fix: Implement exponential backoff. The SDK does not automatically retry 429s for analytics queries.
Code Fix:
import time
def fetch_with_retry(client, request, max_retries=3):
analytics_api = AnalyticsApi(client)
for attempt in range(max_retries):
try:
return analytics_api.query_conversations_details(body=request)
except ApiException as e:
if e.status == 429:
wait_time = 2 ** attempt
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded")
Error: Empty Results
Cause: The timeframe is too short, or no conversations occurred in the selected queues.
Fix: Verify the timeframe string format. It must be ISO 8601 with timezone designator (Z). Also, ensure the queues you expect to see are active and have had conversations in that window.
Debugging:
# Print the timeframe being used
print(f"Timeframe: {query_definition.timeframe}")