Choosing Between Real-Time Conversations and Analytics History in Genesys Cloud

Choosing Between Real-Time Conversations and Analytics History in Genesys Cloud

What You Will Build

  • You will build a Python script that retrieves active, in-progress conversations using the Real-Time API.
  • You will build a second Python script that queries historical conversation data using the Analytics API.
  • The tutorial covers Python using the purecloudplatformclientv2 SDK and the requests library for raw HTTP comparison.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Flow).
  • Required Scopes:
    • Real-Time: conversation:active:read, conversation:participant:read.
    • Analytics: analytics:query:read.
  • SDK Version: purecloudplatformclientv2 >= 160.0.0.
  • Runtime: Python 3.8+.
  • Dependencies:
    pip install purecloudplatformclientv2 requests pyyaml
    

Authentication Setup

Both APIs require a valid OAuth 2.0 access token. The token acquisition process is identical for both endpoints. You must use the Client Credentials Grant flow.

import requests
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://login.{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[float] = None

    def get_token(self) -> str:
        """
        Retrieves an OAuth token. Caches it until expiry.
        """
        if self.access_token and self.token_expiry and time.time() < self.token_expiry:
            return self.access_token

        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(self.token_url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data["access_token"]
        # Tokens expire in 3600 seconds; refresh 60 seconds early
        self.token_expiry = time.time() + (token_data["expires_in"] - 60)
        
        return self.access_token

    def get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

Implementation

Step 1: Understanding the Real-Time API (/api/v2/conversations)

The /api/v2/conversations endpoint is part of the Real-Time API. It returns a snapshot of conversations that are currently active. “Active” means the conversation has not yet been terminated. This includes:

  • Calls currently on the phone.
  • Chats currently connected to an agent.
  • Messages currently being processed in a queue.
  • Screen shares currently active.

Key Characteristic: This is a live snapshot. If you poll this endpoint every second, the data will change. It is designed for dashboards, live wallboards, and real-time routing logic. It does not store historical data. Once a conversation ends, it disappears from this endpoint.

Code: Retrieving Active Conversations

We will use the official Python SDK for this example, as it handles the complex filtering parameters for real-time data.

from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    ConversationApi,
    ConversationFilterOptions,
    ConversationState
)
import purecloudplatformclientv2

def get_active_conversations(auth: GenesysAuth):
    """
    Retrieves currently active conversations using the Real-Time API.
    """
    # 1. Configure the SDK
    configuration = Configuration()
    configuration.host = f"https://api.{auth.environment}"
    configuration.access_token = auth.get_token()
    
    # 2. Initialize the API client
    api_client = ApiClient(configuration)
    conversation_api = ConversationApi(api_client)

    try:
        # 3. Define filter options
        # We only want conversations that are currently 'active'
        filter_options = ConversationFilterOptions(
            states=[ConversationState.ACTIVE] 
        )

        # 4. Execute the query
        # Note: max_records defaults to 1000. For large deployments, 
        # you may need to implement pagination if you expect >1000 active chats.
        response = conversation_api.post_conversations_query(filter_options=filter_options)

        print(f"--- Real-Time API Result ---")
        print(f"Total Active Conversations: {len(response.entities)}")
        
        for conv in response.entities[:5]: # Show first 5
            print(f"ID: {conv.id}, Type: {conv.type}, State: {conv.state}")
            if conv.participants:
                print(f"  Participants: {len(conv.participants)}")

        return response

    except purecloudplatformclientv2.exceptions.ApiException as e:
        print(f"Real-Time API Error: {e.status} - {e.reason}")
        print(f"Response Body: {e.body}")
        raise

# Usage
# auth = GenesysAuth("CLIENT_ID", "CLIENT_SECRET")
# get_active_conversations(auth)

Why use this?

  • You need to know how many agents are currently busy.
  • You need to trigger an action immediately when a chat enters a queue.
  • You are building a live supervisor dashboard.

Step 2: Understanding the Analytics API (/api/v2/analytics/conversations)

The /api/v2/analytics/conversations endpoint is part of the Analytics API. It returns historical data based on a time range. This data is aggregated and stored in Genesys Cloud’s data warehouse.

Key Characteristic: This is a historical query. It is not real-time. There is a delay (typically 15-60 minutes, depending on your data freshness settings) before a completed conversation appears here. It is designed for reporting, compliance, quality assurance, and business intelligence.

Code: Querying Historical Conversations

The Analytics API uses a different query structure. Instead of simple filters, it uses a JSON body with groupBy, interval, and where clauses.

import json
from datetime import datetime, timedelta
import requests

def get_historical_conversations(auth: GenesysAuth):
    """
    Retrieves historical conversation data using the Analytics API.
    """
    base_url = f"https://api.{auth.environment}"
    endpoint = "/api/v2/analytics/conversations/details/query"
    
    # 1. Define the time range
    # Analytics requires a start and end time.
    # We query the last 24 hours.
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Format as ISO 8601 with timezone (UTC)
    start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S.000Z")

    # 2. Construct the query body
    # This query asks for all conversation details in the last 24 hours.
    # It groups by 'type' (voice, chat, etc.) to show counts per type.
    query_body = {
        "interval": f"{start_iso}/{end_iso}",
        "groupBy": ["type"],
        "where": [],
        "select": [
            "count",
            "sum(handleDuration), sum(waitDuration)"
        ]
    }

    headers = auth.get_headers()

    try:
        # 3. Execute the POST request
        response = requests.post(
            f"{base_url}{endpoint}",
            headers=headers,
            json=query_body
        )
        response.raise_for_status()

        data = response.json()

        print(f"--- Analytics API Result ---")
        print(f"Query Interval: {data['interval']}")
        print(f"Total Records Returned: {len(data['entities'])}")
        
        for entity in data['entities']:
            conv_type = entity.get('type', 'Unknown')
            count = entity.get('count', 0)
            handle_dur = entity.get('sum(handleDuration)', 0)
            wait_dur = entity.get('sum(waitDuration)', 0)
            
            print(f"Type: {conv_type}")
            print(f"  Count: {count}")
            print(f"  Total Handle Duration (ms): {handle_dur}")
            print(f"  Total Wait Duration (ms): {wait_dur}")
            print("-" * 20)

        return data

    except requests.exceptions.HTTPError as e:
        print(f"Analytics API HTTP Error: {e.response.status_code}")
        print(f"Response Body: {e.response.text}")
        raise
    except Exception as e:
        print(f"Unexpected Error: {e}")
        raise

# Usage
# auth = GenesysAuth("CLIENT_ID", "CLIENT_SECRET")
# get_historical_conversations(auth)

Why use this?

  • You need to calculate average handle time (AHT) for last week.
  • You need to generate a PDF report of all calls handled by a specific queue.
  • You are training an ML model on past conversation transcripts.

Step 3: Comparing Performance and Rate Limits

The two APIs have different performance characteristics and rate limits.

Feature Real-Time (/conversations) Analytics (/analytics/conversations)
Data Freshness Live (Seconds) Delayed (15-60 mins)
Rate Limit Lower (Polling heavy) Higher (Batch queries)
Response Size Small (Current state only) Large (Historical aggregates)
Use Case Live Dashboards Reporting/BI

Handling Rate Limits (429 Errors)

If you poll the Real-Time API too frequently, you will hit a 429 Too Many Requests error. The Analytics API is less prone to this because queries are heavier but less frequent.

Here is a robust retry mechanism for the Analytics query, which is more likely to timeout or be throttled due to complex data aggregation.

import time

def robust_analytics_query(auth: GenesysAuth, max_retries: int = 3):
    """
    Executes an analytics query with exponential backoff for 429 errors.
    """
    base_url = f"https://api.{auth.environment}"
    endpoint = "/api/v2/analytics/conversations/details/query"
    
    # Simplified query for demonstration
    query_body = {
        "interval": "2023-01-01T00:00:00.000Z/2023-01-02T00:00:00.000Z",
        "groupBy": ["type"],
        "select": ["count"]
    }

    headers = auth.get_headers()

    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{base_url}{endpoint}",
                headers=headers,
                json=query_body
            )

            if response.status_code == 429:
                # Extract Retry-After header if present
                retry_after = response.headers.get("Retry-After", 5)
                wait_time = float(retry_after) * (2 ** attempt) # Exponential backoff
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
                continue
            else:
                response.raise_for_status()
                return response.json()

        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise

    raise Exception("Max retries exceeded for Analytics Query")

Complete Working Example

Below is a unified script that demonstrates both calls. Replace CLIENT_ID and CLIENT_SECRET with your actual credentials.

import requests
import time
from datetime import datetime, timedelta
import json
from typing import Optional

# --- Configuration ---
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
ENVIRONMENT = "mypurecloud.com"

# --- Authentication Module ---
class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://login.{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: Optional[float] = None

    def get_token(self) -> str:
        if self.access_token and self.token_expiry and time.time() < self.token_expiry:
            return self.access_token

        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(self.token_url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.token_expiry = time.time() + (token_data["expires_in"] - 60)
        return self.access_token

    def get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

# --- Real-Time API Function ---
def fetch_realtime_conversations(auth: GenesysAuth):
    print("\n=== Fetching Real-Time Conversations ===")
    
    # For Real-Time, we often use raw HTTP if we want to avoid SDK overhead for simple polls
    # Endpoint: POST /api/v2/conversations/query
    url = f"https://api.{auth.environment}/api/v2/conversations/query"
    
    # Filter for active conversations
    body = {
        "states": ["active"],
        "maxRecords": 100
    }

    try:
        response = requests.post(url, headers=auth.get_headers(), json=body)
        response.raise_for_status()
        
        data = response.json()
        print(f"Found {len(data.get('entities', []))} active conversations.")
        
        for conv in data.get('entities', [])[:3]:
            print(f"  - ID: {conv['id']}, Type: {conv['type']}, State: {conv['state']}")
            
    except requests.exceptions.HTTPError as e:
        print(f"Error: {e.response.status_code} - {e.response.text}")

# --- Analytics API Function ---
def fetch_analytics_conversations(auth: GenesysAuth):
    print("\n=== Fetching Historical Analytics Conversations ===")
    
    url = f"https://api.{auth.environment}/api/v2/analytics/conversations/details/query"
    
    # Define last 24 hours
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    body = {
        "interval": f"{start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')}/{end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')}",
        "groupBy": ["type"],
        "select": ["count", "sum(handleDuration)"]
    }

    try:
        response = requests.post(url, headers=auth.get_headers(), json=body)
        response.raise_for_status()
        
        data = response.json()
        print(f"Query Interval: {data['interval']}")
        print(f"Total Entities in Report: {len(data.get('entities', []))}")
        
        for entity in data.get('entities', []):
            conv_type = entity.get('type', 'N/A')
            count = entity.get('count', 0)
            handle_dur = entity.get('sum(handleDuration)', 0)
            print(f"  - Type: {conv_type}, Count: {count}, Total Handle (ms): {handle_dur}")
            
    except requests.exceptions.HTTPError as e:
        print(f"Error: {e.response.status_code} - {e.response.text}")

# --- Main Execution ---
if __name__ == "__main__":
    auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
    
    # 1. Get Real-Time Data
    fetch_realtime_conversations(auth)
    
    # 2. Get Historical Data
    fetch_analytics_conversations(auth)

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or is invalid.
  • Fix: Ensure your GenesysAuth class checks the token_expiry before every request. The token expires in 3600 seconds. If your script runs longer than an hour, it must refresh the token.

Error: 403 Forbidden

  • Cause: The OAuth client does not have the required scopes.
  • Fix:
    • For Real-Time: Add conversation:active:read to your client’s scopes in the Genesys Admin console.
    • For Analytics: Add analytics:query:read to your client’s scopes.
    • After changing scopes, you must generate a new token.

Error: 400 Bad Request (Analytics)

  • Cause: The interval format is incorrect or the select fields are invalid.
  • Fix: Ensure the interval is in ISO 8601 format (YYYY-MM-DDTHH:mm:ss.sssZ). Verify that the fields in select are valid for the groupBy dimensions. You cannot select a metric that is not compatible with the group by dimension.

Error: Empty Results from Real-Time API

  • Cause: There are no active conversations at the moment of the poll.
  • Fix: This is normal behavior. The Real-Time API returns an empty entities array if no conversations match the filter. It does not return historical data.

Official References