Choosing Between Real-Time Conversations and Analytics Data in Genesys Cloud

Choosing Between Real-Time Conversations and Analytics Data in Genesys Cloud

What You Will Build

  • You will build two distinct data retrieval scripts: one for live, event-driven conversation updates and one for historical, aggregated business intelligence.
  • This tutorial uses the Genesys Cloud REST API and the Python genesys-cloud-python SDK.
  • The primary language is Python, with specific focus on handling WebSocket streams versus paginated REST queries.

Prerequisites

  • OAuth Client Type: Service Account (Confidential Client) with client_credentials grant type.
  • Required Scopes:
    • For Real-Time: conversation:read, presence:read, routing:read (depending on data needs).
    • For Analytics: analytics:conversation:read, analytics:detail:read.
  • SDK Version: genesys-cloud-python v2.0.0 or later.
  • Runtime: Python 3.9+.
  • Dependencies: pip install genesys-cloud-python requests python-dotenv

Authentication Setup

Both endpoints require a valid OAuth 2.0 access token. The Genesys Cloud API does not support API key authentication for these specific endpoints. You must use the Client Credentials flow.

The following utility function handles token acquisition and caching. In production, implement a cache with a TTL slightly less than the token expiration (usually 59 minutes for a 60-minute token).

import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()

GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

# In-memory cache for demo purposes. Use Redis or database in production.
_token_cache = {
    "token": None,
    "expires_at": None
}

def get_access_token() -> str:
    """
    Retrieves a Genesys Cloud OAuth access token.
    Returns cached token if valid, otherwise fetches a new one.
    """
    now = datetime.utcnow()
    
    # Check cache
    if _token_cache["token"] and _token_cache["expires_at"] and now < _token_cache["expires_at"]:
        return _token_cache["token"]

    # Fetch new token
    url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }

    try:
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        token_data = response.json()
        
        # Cache the token
        expires_in = token_data.get("expires_in", 3600)
        _token_cache["token"] = token_data["access_token"]
        _token_cache["expires_at"] = now + timedelta(seconds=expires_in - 60) # Buffer for safety
        
        return _token_cache["token"]
    
    except requests.exceptions.HTTPError as e:
        raise Exception(f"Failed to fetch token: {e.response.text}")
    except requests.exceptions.RequestException as e:
        raise Exception(f"Network error during token fetch: {str(e)}")

Implementation

Step 1: Real-Time Data with /api/v2/conversations (WebSocket)

The /api/v2/conversations endpoint is not a standard REST resource you query for a list. It is the entry point for a WebSocket subscription. You use this when you need to know right now what is happening. Use this for:

  • Live dashboards showing current queue wait times.
  • Real-time agent assistance tools.
  • Triggering immediate workflows based on conversation events (e.g., “send SMS if customer waits > 2 mins”).

Technical Constraint: You cannot “pull” history from this endpoint. If you miss a WebSocket event, you must query the Analytics API to fill the gap.

Code: Subscribing to Conversation Events

This example uses the websockets library (standard in modern Python async stacks) to connect to the Genesys Cloud WebSocket server.

import asyncio
import json
import websockets
import ssl
from urllib.parse import urlencode

async def listen_to_conversations(token: str):
    """
    Connects to the Genesys Cloud WebSocket for real-time conversation updates.
    """
    # The WebSocket endpoint differs from the REST API base URL
    ws_url = f"wss://{GENESYS_CLOUD_REGION.replace('my.', '')}/api/v2/conversations"
    
    # Query parameters for subscription filters
    # You can filter by types (voice, chat, message, callback, etc.)
    params = {
        "types": "voice,chat,message",
        "limit": "1000" # Max events per buffer
    }
    
    ws_url_with_params = f"{ws_url}?{urlencode(params)}"
    
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Configure SSL context to handle potential certificate issues in dev environments
    ssl_context = ssl._create_unverified_context() if os.getenv("DEV_MODE") else ssl.create_default_context()

    try:
        print(f"Connecting to {ws_url_with_params}...")
        async with websockets.connect(ws_url_with_params, extra_headers=headers, ssl=ssl_context) as websocket:
            print("Connected. Listening for events...")
            
            async for message in websocket:
                try:
                    data = json.loads(message)
                    event_type = data.get("eventType")
                    conversation_id = data.get("conversationId")
                    
                    if event_type == "conversation:created":
                        print(f"[NEW] Conversation started: {conversation_id}")
                    elif event_type == "conversation:updated":
                        # Check for specific updates like status changes
                        changes = data.get("changes", [])
                        for change in changes:
                            if change.get("field") == "state":
                                print(f"[UPDATE] Conversation {conversation_id} moved to: {change.get('value')}")
                    elif event_type == "conversation:deleted":
                        print(f"[END] Conversation ended: {conversation_id}")
                        
                except json.JSONDecodeError:
                    print(f"Received non-JSON data: {message}")
                except Exception as e:
                    print(f"Error processing message: {e}")

    except websockets.exceptions.InvalidStatusCode as e:
        print(f"Connection failed with status {e.status_code}. Check OAuth scope: conversation:read")
    except ConnectionRefusedError:
        print("Could not connect to WebSocket server. Check network/firewall.")
    except Exception as e:
        print(f"Unexpected error: {e}")

# To run this, you would call:
# asyncio.run(listen_to_conversations(get_access_token()))

Error Handling Note: If you receive a 401 Unauthorized on the WebSocket handshake, it usually means the token has expired or lacks the conversation:read scope. The WebSocket connection does not automatically refresh tokens. You must implement a heartbeat monitor and reconnect with a fresh token when the current one nears expiration.

Step 2: Historical Data with /api/v2/analytics/conversations (REST Query)

The /api/v2/analytics/conversations/details/query endpoint is a standard REST POST endpoint. You use this when you need to answer questions about the past. Use this for:

  • Daily performance reports (Average Handle Time, Abandon Rate).
  • Compliance audits (retrieving transcripts from last week).
  • Training analysis (finding specific interactions where a keyword was spoken).

Technical Constraint: This is not real-time. There is typically a latency of 1-5 minutes before a completed conversation appears in the analytics store. Do not use this for live monitoring.

Code: Querying Historical Conversation Details

This example uses the genesys-cloud-python SDK to construct a complex query. The Analytics API uses a specific query object structure that supports filtering, grouping, and sorting.

from genesyscloud import AnalyticsApi, Configuration
from genesyscloud.rest import ApiException
from datetime import datetime, timedelta

def get_historical_conversations(token: str) -> list:
    """
    Queries the Analytics API for completed voice conversations from the last 24 hours.
    """
    # Configure the API client
    configuration = Configuration()
    configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
    configuration.access_token = token
    
    analytics_api = AnalyticsApi(configuration)
    
    # Define the time range
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Construct the query body
    # Note: The SDK generates the classes, but the structure mirrors the JSON body
    from genesyscloud.models import ConversationDetailQuery
    
    query_body = ConversationDetailQuery(
        view="raw", # 'raw' returns individual records, 'summary' returns aggregates
        time_grouping="none", # Do not aggregate by time bucket
        interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
        size=25, # Page size (max 250)
        sort_by=["conversation.startTime"],
        sort_order="desc",
        filters=[
            {
                "type": "conversation",
                "path": "type",
                "operation": "eq",
                "value": ["voice"] # Filter for voice only
            }
        ]
    )

    try:
        print(f"Querying analytics for {start_time.isoformat()} to {end_time.isoformat()}...")
        
        # Execute the query
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        if response.entity and response.entity.total > 0:
            print(f"Found {response.entity.total} conversations.")
            return response.entity.conversations
        else:
            print("No conversations found in the specified time range.")
            return []
            
    except ApiException as e:
        print(f"Analytics API Error: {e.status} {e.reason}")
        if e.body:
            print(f"Response Body: {e.body}")
        raise e
    except Exception as e:
        print(f"Unexpected error during analytics query: {e}")
        raise e

# Example usage:
# conversations = get_historical_conversations(get_access_token())
# if conversations:
#     print(f"Latest conversation ID: {conversations[0].id}")

Pagination Handling: The Analytics API supports pagination via the nextPageToken field in the response. For large datasets, you must loop until nextPageToken is null.

def get_all_historical_conversations(token: str) -> list:
    """
    Handles pagination to retrieve all matching conversations.
    """
    configuration = Configuration()
    configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
    configuration.access_token = token
    analytics_api = AnalyticsApi(configuration)
    
    all_conversations = []
    next_page_token = None
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    while True:
        query_body = ConversationDetailQuery(
            view="raw",
            time_grouping="none",
            interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
            size=250, # Max page size
            sort_by=["conversation.startTime"],
            sort_order="desc",
            filters=[{"type": "conversation", "path": "type", "operation": "eq", "value": ["voice"]}]
        )
        
        if next_page_token:
            query_body.page_token = next_page_token
            
        try:
            response = analytics_api.post_analytics_conversations_details_query(body=query_body)
            
            if response.entity and response.entity.conversations:
                all_conversations.extend(response.entity.conversations)
                print(f"Retrieved {len(response.entity.conversations)} records. Total so far: {len(all_conversations)}")
            
            next_page_token = response.entity.next_page_token if response.entity else None
            
            if not next_page_token:
                break
                
        except ApiException as e:
            print(f"Error on page fetch: {e}")
            break
            
    return all_conversations

Step 3: Processing Results and Mapping Data Models

The data models returned by these two endpoints are fundamentally different.

  1. Real-Time (/api/v2/conversations): Returns event objects. A single conversation generates multiple events (created, updated, participant:added, media:connected). You must maintain state in your application to reconstruct the current status of a conversation.
  2. Analytics (/api/v2/analytics/conversations): Returns a denormalized snapshot of the conversation at the time of completion (or the latest update). It contains calculated fields like totalHoldTime, wrapUpTime, and queueWaitTime which are not available in real-time events.

Code: Comparing Data Structures

def analyze_data_difference():
    """
    Illustrates the structural difference between real-time events and analytics records.
    """
    # Example Real-Time Event Payload (simplified)
    real_time_event = {
        "eventType": "conversation:updated",
        "conversationId": "abc-123",
        "changes": [
            {"field": "state", "value": "connected", "previousValue": "queued"}
        ]
    }
    
    # Example Analytics Record Payload (simplified)
    analytics_record = {
        "id": "abc-123",
        "type": "voice",
        "startTime": "2023-10-27T10:00:00Z",
        "endTime": "2023-10-27T10:05:00Z",
        "metrics": {
            "totalHoldTime": 120.5,
            "wrapUpTime": 30.0,
            "queueWaitTime": 15.0
        }
    }
    
    print("Real-Time Data Structure:")
    print(f"- Focus: Event-driven changes")
    print(f"- Key Field: eventType")
    print(f"- Usage: Immediate reaction to state change")
    
    print("\nAnalytics Data Structure:")
    print(f"- Focus: Aggregated metrics and historical state")
    print(f"- Key Field: metrics (calculated values)")
    print(f"- Usage: Reporting, auditing, trend analysis")

Complete Working Example

The following script combines authentication, real-time listening, and historical querying into a single module. It demonstrates the proper separation of concerns.

import os
import asyncio
import json
import websockets
import ssl
from datetime import datetime, timedelta
from urllib.parse import urlencode
from dotenv import load_dotenv
from genesyscloud import AnalyticsApi, Configuration
from genesyscloud.rest import ApiException
from genesyscloud.models import ConversationDetailQuery

load_dotenv()

GENESYS_CLOUD_REGION = os.getenv("GENESYS_CLOUD_REGION", "my.genesys.cloud")
CLIENT_ID = os.getenv("GENESYS_CLOUD_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")

_token_cache = {"token": None, "expires_at": None}

def get_access_token() -> str:
    from datetime import timedelta
    import requests
    now = datetime.utcnow()
    if _token_cache["token"] and _token_cache["expires_at"] and now < _token_cache["expires_at"]:
        return _token_cache["token"]
    url = f"https://{GENESYS_CLOUD_REGION}/oauth/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
    try:
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        token_data = response.json()
        _token_cache["token"] = token_data["access_token"]
        _token_cache["expires_at"] = now + timedelta(seconds=token_data.get("expires_in", 3600) - 60)
        return _token_cache["token"]
    except Exception as e:
        raise Exception(f"Token error: {e}")

async def realtime_listener(token: str, duration_seconds: int = 60):
    ws_url = f"wss://{GENESYS_CLOUD_REGION.replace('my.', '')}/api/v2/conversations"
    params = {"types": "voice", "limit": "100"}
    ws_url_with_params = f"{ws_url}?{urlencode(params)}"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    ssl_context = ssl.create_default_context()
    
    try:
        async with websockets.connect(ws_url_with_params, extra_headers=headers, ssl=ssl_context) as websocket:
            end_time = asyncio.get_event_loop().time() + duration_seconds
            while asyncio.get_event_loop().time() < end_time:
                try:
                    message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
                    data = json.loads(message)
                    print(f"[REAL-TIME] {data.get('eventType')}: {data.get('conversationId')}")
                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    print(f"WS Error: {e}")
    except Exception as e:
        print(f"WS Connection Error: {e}")

def historical_query(token: str):
    configuration = Configuration()
    configuration.host = f"https://{GENESYS_CLOUD_REGION}/api/v2"
    configuration.access_token = token
    analytics_api = AnalyticsApi(configuration)
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=1)
    
    query_body = ConversationDetailQuery(
        view="raw",
        time_grouping="none",
        interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
        size=10,
        filters=[{"type": "conversation", "path": "type", "operation": "eq", "value": ["voice"]}]
    )
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        if response.entity and response.entity.conversations:
            print(f"[ANALYTICS] Found {len(response.entity.conversations)} conversations in last hour.")
            for conv in response.entity.conversations:
                print(f"  - ID: {conv.id}, Start: {conv.startTime}")
        else:
            print("[ANALYTICS] No conversations found.")
    except ApiException as e:
        print(f"Analytics Error: {e}")

def main():
    token = get_access_token()
    
    print("=== Fetching Historical Data ===")
    historical_query(token)
    
    print("\n=== Listening to Real-Time Data (30 seconds) ===")
    asyncio.run(realtime_listener(token, duration_seconds=30))
    
    print("\n=== Done ===")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized on WebSocket

  • Cause: The OAuth token provided in the WebSocket header has expired or is invalid.
  • Fix: Ensure your token refresh logic is robust. WebSocket connections are long-lived. If your token expires mid-connection, the server will close the connection. You must detect this closure and reconnect with a fresh token.
  • Code Fix: Implement a reconnection loop with exponential backoff in your realtime_listener.

Error: 403 Forbidden on Analytics Query

  • Cause: The OAuth client lacks the analytics:conversation:read scope.
  • Fix: Go to the Genesys Cloud Admin Console > Applications > [Your App] > Scopes. Add analytics:conversation:read and regenerate the client secret if necessary (though usually, scope updates are immediate).

Error: “No data found” in Analytics

  • Cause: You are querying a time range that has not yet been processed.
  • Fix: Analytics data is not real-time. It is typically available 1-5 minutes after the conversation ends. If you just ended a test call, wait 5 minutes before querying.
  • Debugging: Use the view="summary" to check if any data exists in the bucket, or widen the time interval to last 24 hours.

Error: WebSocket Connection Refused

  • Cause: Firewall or network proxy blocking WebSocket traffic.
  • Fix: Ensure your environment allows outbound traffic to wss://*.genesys.cloud/api/v2/conversations. Standard HTTP proxies often block WebSocket upgrades. You may need to configure your proxy to allow Upgrade: websocket headers.

Official References