Choosing Between Real-Time Conversations and Analytics Data in Genesys Cloud CX
What You Will Build
You will implement a dual-layer integration that retrieves active, live conversation details using the /api/v2/conversations endpoint and then correlates those live sessions with historical performance metrics using /api/v2/analytics/conversations. You will learn precisely when to query the real-time API for immediate action and when to query the analytics API for reporting or post-processing. This tutorial uses Python with the genesyscloud SDK and the requests library for raw HTTP verification.
Prerequisites
- OAuth Client Type: Service Account (for server-to-server integrations) or Confidential Client (for hybrid apps).
- Required Scopes:
conversation:read(for/api/v2/conversations)analytics:conversation:view(for/api/v2/analytics/conversations)
- SDK Version:
genesyscloudPython SDK v2.0+ or direct REST API access. - Runtime: Python 3.8+.
- Dependencies:
pip install genesyscloud requests tenacity
Authentication Setup
Both APIs share the same OAuth 2.0 infrastructure. You must obtain a Bearer token before making any requests. The token lifecycle is critical; if your token expires during a long-running analytics query, the request will fail with a 401.
Below is a robust authentication helper using the requests library. This function handles the initial token fetch and includes a basic cache mechanism to prevent unnecessary re-authentication.
import requests
import time
from typing import Optional, Tuple
class GenesysAuthManager:
def __init__(self, org_id: str, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.org_id = org_id
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.token_url = f"{base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
def get_token(self) -> str:
# Check if we have a valid token cached
if self.access_token and time.time() < self.token_expiry:
return self.access_token
# Fetch new token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(self.token_url, data=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to authenticate: {response.text}")
data = response.json()
self.access_token = data["access_token"]
# Subtract 60 seconds from expiry to provide a safety buffer
self.token_expiry = time.time() + data["expires_in"] - 60
return self.access_token
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json",
"Accept": "application/json",
"X-Genesys-Organization-ID": self.org_id
}
Implementation
Step 1: Retrieving Live Conversations via /api/v2/conversations
The /api/v2/conversations endpoint is part of the Real-Time API. It returns metadata about conversations that are currently active (status: active or queued). It does not contain historical data, analytics metrics, or detailed transcript content for older conversations. Use this endpoint when you need to:
- Update a conversation state (e.g., put a call on hold).
- Push data to a screen pop in real-time.
- Monitor current queue depths.
- Trigger immediate actions based on participant events.
Key Constraint: This endpoint is volatile. Data changes every millisecond. It is not suitable for reporting.
Code Example: Fetching Active Conversations by Type
We will query for all active voice conversations. Notice the use of the type query parameter to filter noise.
import requests
from typing import List, Dict, Any
def get_active_voice_conversations(auth_manager: GenesysAuthManager) -> List[Dict[str, Any]]:
"""
Fetches all currently active voice conversations.
Endpoint: GET /api/v2/conversations
Scope: conversation:read
"""
url = f"{auth_manager.base_url}/api/v2/conversations"
headers = auth_manager.get_headers()
# Query parameters to filter for active voice calls only
params = {
"type": "voice",
"pageSize": 100 # Max page size for this endpoint is typically 100
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise exception for 4xx/5xx
data = response.json()
conversations = data.get("entities", [])
print(f"Found {len(conversations)} active voice conversations.")
return conversations
except requests.exceptions.HTTPError as http_err:
if response.status_code == 401:
print("Error: Unauthorized. Check your OAuth token.")
elif response.status_code == 403:
print("Error: Forbidden. Check your scopes (conversation:read).")
else:
print(f"HTTP Error: {http_err}")
return []
except Exception as e:
print(f"An error occurred: {e}")
return []
Step 2: Querying Historical Data via /api/v2/analytics/conversations
The /api/v2/analytics/conversations endpoint is part of the Analytics API. It returns aggregated or detailed historical data. It is query-based, meaning you must define a time range (from and to). Use this endpoint when you need to:
- Generate reports on handle time, wait time, or resolution rates.
- Retrieve transcripts for conversations that ended hours or days ago.
- Analyze trends over time.
Key Constraint: This endpoint is computationally expensive. It does not return real-time data. There is a delay (usually 1-5 minutes) between a conversation ending and it appearing in analytics.
Code Example: Querying Conversation Details for a Specific Time Window
We will construct a POST request to /api/v2/anversations/details/query. This allows us to retrieve specific fields (like wrapupcode or talktime) for conversations that occurred in the last hour.
def get_historical_conversation_details(auth_manager: GenesysAuthManager, from_time: str, to_time: str) -> Dict[str, Any]:
"""
Queries analytics for detailed conversation data within a time window.
Endpoint: POST /api/v2/analytics/conversations/details/query
Scope: analytics:conversation:view
"""
url = f"{auth_manager.base_url}/api/v2/analytics/conversations/details/query"
headers = auth_manager.get_headers()
# The body must contain a 'queries' array.
# Each query defines the time range and the filters.
body = {
"queries": [
{
"timeRange": {
"from": from_time, # ISO 8601 format, e.g., "2023-10-27T10:00:00.000Z"
"to": to_time # ISO 8601 format
},
"filter": {
"type": "voice"
},
# Select specific columns to reduce payload size and improve performance
"columns": [
"id",
"conversationtype",
"direction",
"wrapupcode",
"talktime",
"holdtime",
"waittime"
]
}
],
"size": 100 # Number of results per page
}
try:
response = requests.post(url, headers=headers, json=body)
response.raise_for_status()
data = response.json()
# Analytics responses are nested: data -> queries -> [0] -> results
if "queries" in data and len(data["queries"]) > 0:
results = data["queries"][0].get("results", [])
print(f"Retrieved {len(results)} historical conversation records.")
return results
else:
print("No results found in the analytics query.")
return []
except requests.exceptions.HTTPError as http_err:
if response.status_code == 401:
print("Error: Unauthorized. Check your OAuth token.")
elif response.status_code == 403:
print("Error: Forbidden. Check your scopes (analytics:conversation:view).")
elif response.status_code == 400:
print(f"Bad Request: Check your time format or column names. {response.text}")
else:
print(f"HTTP Error: {http_err}")
return []
except Exception as e:
print(f"An error occurred: {e}")
return []
Step 3: Correlating Live and Historical Data
The most common integration pattern is to use the Real-Time API to detect an event, then use the Analytics API to enrich that event with context, or to use the Real-Time API to update state based on analytics thresholds.
In this example, we will:
- Get all active conversations.
- For each active conversation, check if it has a corresponding historical record (if it has been active long enough to appear in analytics, though usually, analytics lag prevents this for truly live calls).
- More realistically, we will demonstrate how to fetch the latest 5 completed conversations for the user associated with an active call to provide “contextual history” to a screen pop.
from datetime import datetime, timedelta, timezone
def enrich_active_calls_with_history(auth_manager: GenesysAuthManager) -> List[Dict[str, Any]]:
"""
Combines real-time active calls with recent historical context.
"""
# Step 1: Get Active Calls
active_calls = get_active_voice_conversations(auth_manager)
if not active_calls:
print("No active voice conversations.")
return []
# Step 2: Define a time window for history (last 2 hours)
now = datetime.now(timezone.utc)
past_two_hours = now - timedelta(hours=2)
from_time = past_two_hours.isoformat()
to_time = now.isoformat()
# Step 3: Get historical details for all voice calls in that window
# Note: In production, you might filter by specific user ID or queue ID to limit scope
historical_data = get_historical_conversation_details(auth_manager, from_time, to_time)
# Step 4: Correlate
# Create a map of participant ID -> recent history
# This is a simplified correlation. Real-world scenarios require matching by
# externalcontactid, queue, or specific user IDs.
enriched_output = []
for call in active_calls:
call_id = call["id"]
participants = call.get("participants", [])
# Extract the agent's user ID if available
agent_user_id = None
for p in participants:
if p.get("routing", {}).get("queue", {}).get("id"):
# Identify agent participant
if p.get("role") == "agent":
agent_user_id = p.get("userId")
break
# Find recent history for this agent
agent_history = []
if agent_user_id:
# Filter historical results for this agent
# Note: The analytics 'details' endpoint returns conversation-level data.
# To filter by agent, you often need to query with a 'filter' on 'userId'
# in the analytics request, or process the results here.
# For this example, we assume we fetched all and filter locally.
agent_history = [h for h in historical_data if h.get("userId") == agent_user_id]
enriched_call = {
"activeCallId": call_id,
"agentUserId": agent_user_id,
"activeCallDirection": call.get("direction"),
"recentHistoryCount": len(agent_history),
"recentWrapUps": [h.get("wrapupcode") for h in agent_history[:5]] # Last 5 wrap-ups
}
enriched_output.append(enriched_call)
return enriched_output
Complete Working Example
This script ties everything together. It initializes authentication, retrieves active calls, fetches recent history, and prints a combined view.
import sys
import os
from datetime import datetime, timedelta, timezone
# Import the classes defined in the previous sections
# In a real module, these would be imported from a utils package
# from auth_manager import GenesysAuthManager
# from api_calls import get_active_voice_conversations, get_historical_conversation_details
def main():
# Configuration
ORG_ID = os.getenv("GENESYS_ORG_ID")
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
if not all([ORG_ID, CLIENT_ID, CLIENT_SECRET]):
print("Error: Missing environment variables. Set GENESYS_ORG_ID, GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET")
sys.exit(1)
# Initialize Auth
auth_manager = GenesysAuthManager(ORG_ID, CLIENT_ID, CLIENT_SECRET)
try:
# Get Active Calls
print("--- Fetching Active Voice Conversations ---")
active_calls = get_active_voice_conversations(auth_manager)
if not active_calls:
print("No active calls to process.")
return
# Define History Window
now = datetime.now(timezone.utc)
past_hour = now - timedelta(hours=1)
from_time = past_hour.isoformat()
to_time = now.isoformat()
print(f"\n--- Fetching Historical Data from {from_time} to {to_time} ---")
historical_results = get_historical_conversation_details(auth_manager, from_time, to_time)
print(f"\n--- Correlation Summary ---")
print(f"Active Calls: {len(active_calls)}")
print(f"Historical Records Found: {len(historical_results)}")
# Simple Output
for call in active_calls:
print(f"Active Call ID: {call['id']}")
print(f" Direction: {call.get('direction')}")
# In a real app, you would link this call's participants to the historical data
# as shown in the 'enrich_active_calls_with_history' function above.
except Exception as e:
print(f"Fatal Error: {e}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden on /api/v2/analytics/conversations
What causes it: The OAuth token lacks the analytics:conversation:view scope. The conversation:read scope is insufficient for analytics endpoints.
How to fix it:
- Go to the Genesys Cloud Admin Console.
- Navigate to Security > API Security > OAuth Clients.
- Select your client.
- Add the scope
analytics:conversation:viewto the “Scopes” list. - Re-authenticate to get a new token with the updated scopes.
Error: 400 Bad Request on Analytics Query
What causes it: The from and to dates are invalid, or the time range exceeds the maximum allowed window (typically 30 days for detail queries). Also, requesting too many columns can cause timeouts or errors.
How to fix it:
- Ensure ISO 8601 format:
YYYY-MM-DDTHH:mm:ss.000Z. - Split large time ranges into smaller chunks (e.g., 1-hour intervals) and aggregate the results.
- Remove unnecessary columns from the
columnsarray in the request body.
Error: 429 Too Many Requests
What causes it: You are polling /api/v2/conversations too frequently. The real-time API has strict rate limits (often 10-20 requests per second per client).
How to fix it:
- Implement exponential backoff.
- Use Webhooks (
/api/v2/webhooks) for real-time event streaming instead of polling. Webhooks are the preferred method for real-time integrations because they push data to you when changes occur, eliminating the need for frequent polling.
import time
def safe_poll(auth_manager, max_retries=5):
for attempt in range(max_retries):
try:
return get_active_voice_conversations(auth_manager)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s...
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
raise Exception("Max retries exceeded due to rate limiting")