Choosing Between Real-Time Conversations and Historical Analytics APIs in Genesys Cloud CX

Choosing Between Real-Time Conversations and Historical Analytics APIs in Genesys Cloud CX

What You Will Build

  • You will build a Python service that retrieves active conversation details using the Real-Time API and historical interaction data using the Analytics API.
  • You will use the Genesys Cloud Python SDK (genesys-cloud-sdk) and raw requests to demonstrate the distinct data models and query patterns.
  • You will implement a hybrid workflow that correlates a live conversation ID with its historical context to demonstrate when each API is appropriate.

Prerequisites

  • Genesys Cloud Org: An active Genesys Cloud organization with API access.
  • OAuth Client: A Confidential Client (Client Credentials Grant) with the following scopes:
    • conversation:view (for /api/v2/conversations)
    • analytics:query (for /api/v2/analytics/conversations/details/query)
    • conversation:read (optional, for deeper metadata)
  • Python Environment: Python 3.9+ installed.
  • Dependencies:
    • genesys-cloud-sdk (latest version)
    • requests
    • pydantic (for data validation in examples)

Install the dependencies:

pip install genesys-cloud-sdk requests pydantic

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API access. For server-to-server integrations, the Client Credentials Grant is the standard flow. The token expires after 59 minutes, so your application must handle token refresh or regeneration.

Below is a robust authentication helper using the genesys-cloud-sdk. This SDK handles the token caching and refresh logic automatically if configured correctly, but for explicit control in this tutorial, we will initialize the client manually.

import os
from genesyscloud.platform.client import PlatformClientBuilder
from genesyscloud.api.exceptions import ApiException

def get_genesis_client():
    """
    Initializes and returns a Genesys Cloud Platform Client.
    Uses environment variables for security.
    """
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")

    try:
        # The SDK automatically handles OAuth token acquisition and refresh
        builder = PlatformClientBuilder()
        client = builder.build(
            environment=environment,
            client_id=client_id,
            client_secret=client_secret
        )
        return client
    except Exception as e:
        raise RuntimeError(f"Failed to initialize Genesys Client: {e}")

Implementation

Step 1: Understanding the Data Models

The fundamental difference between the two APIs lies in their temporal scope and data granularity.

  1. /api/v2/conversations (Real-Time API):

    • Scope: Current, active interactions only.
    • Data Model: Returns the full conversation object, including participants, channels, and real-time state (e.g., active, queued, closed).
    • Use Case: Live dashboards, real-time agent assist, immediate routing decisions, or fetching metadata for a conversation that is happening right now.
    • Limitation: Data is ephemeral. Once a conversation closes, it may disappear from this endpoint depending on retention policies, but generally, this endpoint is for live state.
  2. /api/v2/analytics/conversations/details/query (Analytics API):

    • Scope: Historical data, typically up to 30 days or more depending on your org’s storage tier.
    • Data Model: Returns aggregated or detailed metrics (duration, wrap-up time, queue wait time) for interactions. It does not return the full conversation transcript or participant list in the same structure as the Real-Time API. It returns metrics about the conversation.
    • Use Case: Reporting, SLA monitoring, historical trend analysis, and auditing past performance.
    • Limitation: Not suitable for real-time state changes. There is a latency (usually 1-5 minutes) between an event occurring and it appearing in Analytics.

Step 2: Fetching Active Conversations (Real-Time API)

We will use the ConversationsApi from the SDK to fetch all active conversations. This is a simple GET request.

Required Scope: conversation:view

from genesyscloud.conversations.api import ConversationsApi
from genesyscloud.platform.client import PlatformClientBuilder

def get_active_conversations(client: PlatformClientBuilder):
    """
    Fetches all currently active conversations.
    """
    conversations_api = ConversationsApi(client)
    
    try:
        # Get all conversations. The SDK handles pagination automatically 
        # if you use the iterator, but for a simple list, we fetch the first page.
        response = conversations_api.get_conversations(
            expand=["participants"], # Include participant details
            page_size=25
        )
        
        if response.body and response.body.conversations:
            return response.body.conversations
        else:
            return []
            
    except ApiError as e:
        print(f"Error fetching conversations: {e.status_code} - {e.reason}")
        raise

Expected Response Snippet:

{
  "conversations": [
    {
      "id": "123e4567-e89b-12d3-a456-426614174000",
      "type": "voice",
      "state": "active",
      "startTime": "2023-10-27T10:00:00.000Z",
      "participants": [
        {
          "id": "agent-uuid",
          "type": "agent",
          "state": "connected"
        }
      ]
    }
  ],
  "pageSize": 25,
  "pageCount": 1
}

Step 3: Querying Historical Data (Analytics API)

The Analytics API is significantly more complex. It uses a query body to filter and aggregate data. We will query for detailed conversation metrics from the last 24 hours.

Required Scope: analytics:query

import requests
import json
from datetime import datetime, timedelta, timezone

def get_historical_conversations(client: PlatformClientBuilder, days_back: int = 1):
    """
    Queries the Analytics API for conversation details over a specific time range.
    Uses raw requests to demonstrate the complex query body structure.
    """
    # Get the current access token from the client
    # Note: In production, use the SDK's analytics API if available, 
    # but raw requests offer clearer visibility into the query structure.
    token = client.get_access_token()
    
    base_url = client.get_base_url()
    endpoint = "/api/v2/analytics/conversations/details/query"
    
    # Calculate time range
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(days=days_back)
    
    # Format times in ISO 8601
    start_time_str = start_time.isoformat()
    end_time_str = end_time.isoformat()

    # The Query Body
    # This structure is critical. 'metrics' defines what data you want back.
    # 'groupings' allows you to aggregate data (e.g., by queue or agent).
    query_body = {
        "interval": "PT1H", # Aggregate by 1-hour intervals
        "dateRange": {
            "startDate": start_time_str,
            "endDate": end_time_str
        },
        "metrics": [
            "conversation.count",
            "conversation.duration",
            "queue.waittime",
            "agent.wrapuptime"
        ],
        "groupings": [
            {
                "name": "queue",
                "type": "queue"
            }
        ],
        "filter": {
            "type": "conversation",
            "typeFilter": {
                "types": ["voice", "chat"]
            }
        }
    }

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(
            f"{base_url}{endpoint}",
            headers=headers,
            json=query_body
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Analytics API Error: {e}")
        raise

Key Parameters Explained:

  • interval: Defines the time bucket for aggregation. Use PT1H for hourly, P1D for daily.
  • metrics: The specific data points you want. conversation.count is the number of interactions. conversation.duration is the total talk time.
  • groupings: Allows you to slice the data. Here, we group by queue. If you remove groupings, you get a single aggregated row for the entire time range.
  • filter: Restricts the data to specific conversation types (voice, chat, webchat, etc.).

Step 4: Correlating Live and Historical Data

A common pattern is to fetch a live conversation and then check if that specific conversation ID has historical context or to verify if a conversation that was active is now in the analytics store.

However, note that the Analytics API does not allow you to query by a single conversation ID easily in the standard details query. It is designed for aggregation. To get details for a specific historical conversation, you would typically use the /api/v2/conversations/{id} endpoint if it is still within the short-term retention window, or rely on the Analytics API for aggregated trends.

For this tutorial, we will demonstrate a hybrid approach:

  1. Fetch active conversations.
  2. For each active conversation, extract the Queue ID.
  3. Query Analytics to see the average wait time for that Queue over the last hour.
  4. Display this context alongside the live conversation.
def correlate_live_and_history(client: PlatformClientBuilder):
    """
    Demonstrates a hybrid workflow:
    1. Get live conversations.
    2. Extract Queue IDs.
    3. Query Analytics for recent queue performance.
    """
    
    # Step 1: Get Active Conversations
    active_convs = get_active_conversations(client)
    
    if not active_convs:
        print("No active conversations found.")
        return

    # Collect unique queue IDs from active conversations
    queue_ids = set()
    for conv in active_convs:
        # Participants might have queue info, but often we need to fetch 
        # the conversation details to get the routing context.
        # For simplicity, assume we can derive queue from participant or 
        # we query all queues if unknown.
        # In a real app, you might fetch /api/v2/conversations/{id} to get full routing details.
        pass 

    # Step 2: Query Analytics for Queue Performance
    # We will query all queues for the last hour
    analytics_data = get_historical_conversations(client, days_back=0) # 0 days back implies current day
    
    # Process Analytics Data
    queue_metrics = {}
    if 'data' in analytics_data:
        for row in analytics_data['data']:
            # The structure depends on groupings. 
            # If grouped by queue, each row has a 'queue' object.
            if 'queue' in row and row['queue']:
                queue_name = row['queue']['name']
                queue_id = row['queue']['id']
                
                metrics = row.get('metrics', {})
                wait_time = metrics.get('queue.waittime', 0)
                count = metrics.get('conversation.count', 0)
                
                queue_metrics[queue_id] = {
                    'name': queue_name,
                    'avg_wait_time': wait_time,
                    'total_conversations': count
                }

    # Step 3: Output Correlated Data
    print(f"{'Conversation ID':<40} | {'Type':<10} | {'Queue Avg Wait (Sec)':<20}")
    print("-" * 80)
    
    for conv in active_convs:
        conv_id = conv.id
        conv_type = conv.type
        
        # In a real scenario, you would need to map the conversation to a queue.
        # This requires fetching the conversation details or checking participant routing context.
        # For this example, we will print the conversation and note that queue mapping 
        # requires an additional API call to /api/v2/conversations/{id}
        
        print(f"{conv_id:<40} | {conv_type:<10} | {'(See Note)':<20}")

    print("\nQueue Performance (Last Hour):")
    for q_id, metrics in queue_metrics.items():
        print(f"Queue: {metrics['name']} | Avg Wait: {metrics['avg_wait_time']}s | Count: {metrics['total_conversations']}")

Complete Working Example

The following script combines authentication, real-time fetching, and analytics querying into a single executable module.

import os
import sys
import requests
from datetime import datetime, timedelta, timezone
from genesyscloud.platform.client import PlatformClientBuilder
from genesyscloud.conversations.api import ConversationsApi
from genesyscloud.api.exceptions import ApiError

def get_genesis_client():
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")

    try:
        builder = PlatformClientBuilder()
        client = builder.build(
            environment=environment,
            client_id=client_id,
            client_secret=client_secret
        )
        return client
    except Exception as e:
        raise RuntimeError(f"Failed to initialize Genesys Client: {e}")

def get_active_conversations(client):
    conversations_api = ConversationsApi(client)
    try:
        response = conversations_api.get_conversations(expand=["participants"], page_size=25)
        if response.body and response.body.conversations:
            return response.body.conversations
        return []
    except ApiError as e:
        print(f"Error fetching conversations: {e.status_code} - {e.reason}")
        return []

def get_queue_analytics(client, queue_id=None):
    token = client.get_access_token()
    base_url = client.get_base_url()
    endpoint = "/api/v2/analytics/conversations/details/query"
    
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(hours=1)
    
    query_body = {
        "interval": "P1D", # Aggregate for the whole day for simplicity in this example
        "dateRange": {
            "startDate": start_time.isoformat(),
            "endDate": end_time.isoformat()
        },
        "metrics": [
            "queue.waittime",
            "conversation.count"
        ],
        "groupings": [
            {
                "name": "queue",
                "type": "queue"
            }
        ],
        "filter": {
            "type": "conversation",
            "typeFilter": {
                "types": ["voice"]
            }
        }
    }
    
    # If a specific queue ID is provided, add it to the filter
    if queue_id:
        query_body["filter"]["queueFilter"] = {
            "ids": [queue_id]
        }

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(f"{base_url}{endpoint}", headers=headers, json=query_body)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Analytics API Error: {e}")
        return {}

def main():
    print("Initializing Genesys Client...")
    client = get_genesis_client()
    
    print("Fetching active conversations...")
    active_convs = get_active_conversations(client)
    
    if not active_convs:
        print("No active conversations found.")
        return

    print(f"Found {len(active_convs)} active conversation(s).")
    
    # Fetch analytics for all queues
    print("Fetching analytics data for last 24 hours...")
    analytics_data = get_queue_analytics(client)
    
    # Map queue IDs to metrics
    queue_metrics = {}
    if 'data' in analytics_data:
        for row in analytics_data['data']:
            if 'queue' in row and row['queue']:
                q_id = row['queue']['id']
                q_name = row['queue']['name']
                metrics = row.get('metrics', {})
                wait_time = metrics.get('queue.waittime', 0)
                count = metrics.get('conversation.count', 0)
                queue_metrics[q_id] = {
                    'name': q_name,
                    'wait_time': wait_time,
                    'count': count
                }

    print("\n--- Live Conversation Status ---")
    for conv in active_convs:
        print(f"ID: {conv.id}")
        print(f"Type: {conv.type}")
        print(f"State: {conv.state}")
        print(f"Start Time: {conv.startTime}")
        
        # Note: To link this conversation to a specific queue, you would need 
        # to inspect the participants or routing details. 
        # This example assumes you have that context from a deeper fetch.
        print("-" * 40)

    print("\n--- Queue Performance Summary ---")
    for q_id, metrics in queue_metrics.items():
        print(f"Queue: {metrics['name']} (ID: {q_id})")
        print(f"  Total Conversations: {metrics['count']}")
        print(f"  Avg Wait Time: {metrics['wait_time']} seconds")
        print("-" * 40)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token is expired or invalid.
  • Fix: Ensure your GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET are correct. The SDK handles refresh, but if you are using raw requests, you must regenerate the token every 59 minutes. Check the expiration timestamp in the token response.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the required scope.
  • Fix:
    • For /api/v2/conversations, ensure conversation:view is added to the client’s scopes.
    • For /api/v2/analytics/conversations/details/query, ensure analytics:query is added.
    • Go to Admin > Security > OAuth Clients, select your client, and edit the Scopes.

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for the API endpoint.
  • Fix: Implement exponential backoff. The Analytics API has stricter rate limits than the Real-Time API. Do not poll the Analytics API in a tight loop. Cache analytics results if possible.
import time

def safe_analytics_request(func, *args, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func(*args)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise RuntimeError("Max retries exceeded for Analytics API")

Error: Empty Analytics Data

  • Cause: The time range is incorrect, or no data exists for the filtered criteria.
  • Fix: Verify the startDate and endDate are in ISO 8601 format with timezone info (e.g., 2023-10-27T10:00:00Z). Ensure the filter types match the conversations in your org (e.g., if you filter for voice but only have chat data, you will get an empty result).

Official References