Querying Live Conversations vs. Historical Analytics: Choosing the Right Genesys Cloud API

Querying Live Conversations vs. Historical Analytics: Choosing the Right Genesys Cloud API

What You Will Build

  • You will build two distinct Python scripts that demonstrate the operational difference between retrieving active, real-time conversation data and querying historical, aggregated conversation metrics.
  • This tutorial uses the Genesys Cloud Platform REST API v2, specifically the conversations and analytics endpoints.
  • The code examples are written in Python using the requests library and the official genesys-cloud-purecloud-platform-client SDK.

Prerequisites

  • OAuth Client Type: A Genesys Cloud OAuth client with public or confidential access type.
  • Required Scopes:
    • For /api/v2/conversations: conversation:view, conversation:read (depending on data sensitivity).
    • For /api/v2/analytics/conversations/details/query: analytics:conversation:read, analytics:report:read.
  • SDK Version: genesys-cloud-purecloud-platform-client v3.0.0 or later.
  • Runtime: Python 3.8+.
  • Dependencies:
    pip install requests genesys-cloud-purecloud-platform-client
    

Authentication Setup

Before accessing either endpoint, you must obtain a valid OAuth 2.0 Bearer token. The following code demonstrates a standard Client Credentials flow, which is suitable for backend services. In production, you should implement token caching and refresh logic to avoid re-authenticating for every request.

import requests
import os

def get_access_token(client_id: str, client_secret: str, environment: str = "mypurecloud.com") -> str:
    """
    Retrieves an OAuth 2.0 access token using the Client Credentials Grant flow.
    
    :param client_id: The OAuth Client ID from Genesys Cloud.
    :param client_secret: The OAuth Client Secret from Genesys Cloud.
    :param environment: The Genesys Cloud environment domain.
    :return: The access token string.
    """
    auth_url = f"https://api.{environment}/oauth/token"
    payload = {
        "grant_type": "client_credentials"
    }
    
    # Basic Auth header for the token endpoint
    auth = (client_id, client_secret)
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    try:
        response = requests.post(auth_url, data=payload, auth=auth, headers=headers)
        response.raise_for_status()
        token_data = response.json()
        return token_data["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Authentication failed: {e.response.text}")
        raise
    except Exception as e:
        print(f"An error occurred during authentication: {e}")
        raise

# Usage
CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
ACCESS_TOKEN = get_access_token(CLIENT_ID, CLIENT_SECRET)

Implementation

The fundamental distinction between these two API groups lies in temporality and granularity.

  1. /api/v2/conversations: This is an operational endpoint. It returns data about conversations that are currently active or have recently concluded (typically within the last few minutes to hours, depending on the specific sub-endpoint). It provides the full state of the conversation, including participants, media types, and real-time events. It is used for building real-time dashboards, active supervisor monitoring, or immediate post-call processing.
  2. /api/v2/analytics/conversations: This is a reporting endpoint. It queries the historical data warehouse. It does not return the live state of a call. Instead, it returns aggregated metrics (duration, wrap-up time, hold time) or detailed historical records for conversations that have ended and been processed into the analytics database. This data is eventually consistent and may have a delay of several minutes to hours.

Step 1: Retrieving Active Conversations (/api/v2/conversations)

When you need to see who is on the phone right now, or what chat sessions are currently open, you use the /api/v2/conversations endpoint. This endpoint supports filtering by media type (voice, chat, email, etc.) and returns the current status of those interactions.

Key Characteristics:

  • Real-time: Data is available immediately as events occur.
  • Granular: Returns participant IDs, direction, and current state.
  • Limited History: Does not provide historical aggregates like average handle time over the last month.
import requests

def get_active_conversations(access_token: str, environment: str = "mypurecloud.com", media_type: str = "voice"):
    """
    Retrieves currently active conversations of a specific media type.
    
    :param access_token: Valid OAuth 2.0 Bearer token.
    :param environment: Genesys Cloud environment.
    :param media_type: Filter by media type (voice, chat, email, sms).
    :return: List of active conversation objects.
    """
    # Endpoint for retrieving active conversations
    url = f"https://api.{environment}/api/v2/conversations"
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    # Query parameters
    params = {
        "mediaType": media_type,
        "pageSize": 100  # Maximum page size is often 1000, but start small for testing
    }

    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        
        # The response is a ConversationSearchResponse object
        # It contains a 'conversations' array
        conversations = data.get("conversations", [])
        
        print(f"Found {len(conversations)} active {media_type} conversation(s).")
        return conversations
        
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            print("Error: Unauthorized. Check your access token.")
        elif e.response.status_code == 403:
            print("Error: Forbidden. Check your OAuth scopes (conversation:view).")
        else:
            print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
        raise
    except Exception as e:
        print(f"An error occurred: {e}")
        raise

# Example Usage
# active_convs = get_active_conversations(ACCESS_TOKEN)
# for conv in active_convs:
#     print(f"Conversation ID: {conv['id']}, State: {conv.get('state', 'Unknown')}")

Expected Response Structure:
The response body for /api/v2/conversations includes an array of Conversation objects. Key fields include:

  • id: The unique identifier for the conversation.
  • state: Current state (e.g., connected, ringing, ended).
  • participants: An array of objects detailing each participant, including their routingData (queue, skill) and direction.

Step 2: Querying Historical Analytics (/api/v2/analytics/conversations/details/query)

When you need to analyze performance, generate reports, or retrieve data for conversations that ended yesterday, you must use the Analytics API. This endpoint is significantly more complex because it requires a structured query body to define time ranges, filters, and the specific metrics or details you want to retrieve.

Key Characteristics:

  • Historical: Data is pulled from the analytics warehouse.
  • Aggregated or Detailed: You can request summary metrics (counts, averages) or detailed records for each conversation.
  • Eventually Consistent: There is a latency between conversation end and availability in analytics.
import requests
from datetime import datetime, timedelta

def query_historical_conversations(access_token: str, environment: str = "mypurecloud.com", days_back: int = 1):
    """
    Queries historical conversation details using the Analytics API.
    
    :param access_token: Valid OAuth 2.0 Bearer token.
    :param environment: Genesys Cloud environment.
    :param days_back: Number of days to look back for data.
    :return: List of conversation detail records.
    """
    # Endpoint for querying conversation details
    url = f"https://api.{environment}/api/v2/analytics/conversations/details/query"
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    # Define the time range
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)
    
    # Format times as ISO 8601 strings
    start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")

    # Construct the query body
    # This is a POST request with a JSON body
    payload = {
        "dateFrom": start_time_str,
        "dateTo": end_time_str,
        "interval": "PT1H",  # Aggregate by hour, or use "PT1D" for daily
        "metrics": [
            "totalHandled",
            "totalAbandoned",
            "averageHandleTime"
        ],
        "groupings": ["mediaType"],  # Group results by voice, chat, etc.
        "selection": {
            "type": "and",
            "predicates": [
                {
                    "type": "equal",
                    "path": "mediaType",
                    "value": "voice"
                }
            ]
        }
    }

    try:
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()
        data = response.json()
        
        # The response contains a 'entities' array with the results
        entities = data.get("entities", [])
        
        print(f"Retrieved {len(entities)} analytics record(s) for the last {days_back} day(s).")
        return entities
        
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            print("Error: Unauthorized. Check your access token.")
        elif e.response.status_code == 403:
            print("Error: Forbidden. Check your OAuth scopes (analytics:conversation:read).")
        elif e.response.status_code == 400:
            print("Error: Bad Request. Check your query payload structure.")
            print(f"Response Body: {e.response.text}")
        else:
            print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
        raise
    except Exception as e:
        print(f"An error occurred: {e}")
        raise

# Example Usage
# analytics_data = query_historical_conversations(ACCESS_TOKEN)
# for entity in analytics_data:
#     print(f"Media: {entity.get('mediaType')}, Total Handled: {entity.get('totalHandled')}")

Important Note on Pagination:
The Analytics API supports pagination via the pageSize and pageNumber parameters in the query body. If you expect a large volume of data, you must implement a loop to fetch all pages.

# Example of handling pagination in the Analytics API
def query_all_historical_conversations(access_token: str, environment: str = "mypurecloud.com", days_back: int = 1):
    url = f"https://api.{environment}/api/v2/analytics/conversations/details/query"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)
    
    base_payload = {
        "dateFrom": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "dateTo": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "interval": "PT1H",
        "metrics": ["totalHandled"],
        "groupings": ["mediaType"],
        "pageSize": 100,
        "pageNumber": 1
    }

    all_entities = []
    page_number = 1

    while True:
        base_payload["pageNumber"] = page_number
        response = requests.post(url, json=base_payload, headers=headers)
        response.raise_for_status()
        data = response.json()
        
        entities = data.get("entities", [])
        all_entities.extend(entities)
        
        # Check if there are more pages
        # The API returns 'pageCount' in the response
        page_count = data.get("pageCount", 1)
        if page_number >= page_count:
            break
            
        page_number += 1
        
    return all_entities

Step 3: Understanding the Data Models

The data structures returned by these two endpoints are fundamentally different.

/api/v2/conversations Response (Active):

{
  "conversations": [
    {
      "id": "5f123456-7890-1234-5678-901234567890",
      "state": "connected",
      "participants": [
        {
          "id": "12345",
          "routingData": {
            "queueId": "abc-123",
            "skill": ["support"]
          },
          "direction": "inbound"
        }
      ],
      "mediaType": "voice",
      "providerType": "telephony"
    }
  ]
}

/api/v2/analytics/conversations/details/query Response (Historical):

{
  "entities": [
    {
      "mediaType": "voice",
      "totalHandled": 150,
      "totalAbandoned": 5,
      "averageHandleTime": 120.5,
      "dateFrom": "2023-10-01T00:00:00Z",
      "dateTo": "2023-10-01T01:00:00Z"
    }
  ],
  "pageCount": 1,
  "pageSize": 100
}

Complete Working Example

The following script combines both approaches. It first checks for active voice conversations to demonstrate real-time capability. Then, it queries the analytics API for the previous day’s total handled volume to demonstrate historical reporting.

import requests
import os
from datetime import datetime, timedelta

# Configuration
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_ENV = os.getenv("GENESYS_ENV", "mypurecloud.com")

def get_access_token(client_id: str, client_secret: str, environment: str) -> str:
    auth_url = f"https://api.{environment}/oauth/token"
    payload = {"grant_type": "client_credentials"}
    auth = (client_id, client_secret)
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    response = requests.post(auth_url, data=payload, auth=auth, headers=headers)
    response.raise_for_status()
    return response.json()["access_token"]

def get_active_voice_conversations(access_token: str, environment: str) -> list:
    url = f"https://api.{environment}/api/v2/conversations"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {"mediaType": "voice", "pageSize": 10}
    
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    return response.json().get("conversations", [])

def get_historical_voice_metrics(access_token: str, environment: str) -> dict:
    url = f"https://api.{environment}/api/v2/analytics/conversations/details/query"
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=1)
    
    payload = {
        "dateFrom": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "dateTo": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "interval": "PT1D",
        "metrics": ["totalHandled", "totalAbandoned"],
        "groupings": ["mediaType"],
        "selection": {
            "type": "and",
            "predicates": [{"type": "equal", "path": "mediaType", "value": "voice"}]
        }
    }
    
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()
    return response.json().get("entities", [])

def main():
    try:
        print("Authenticating...")
        token = get_access_token(GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_ENV)
        
        print("\n--- Real-Time Data ---")
        active_convs = get_active_voice_conversations(token, GENESYS_ENV)
        print(f"Active Voice Conversations: {len(active_convs)}")
        for conv in active_convs[:3]: # Show first 3
            print(f"  ID: {conv['id']}, State: {conv['state']}")
            
        print("\n--- Historical Data ---")
        historical_data = get_historical_voice_metrics(token, GENESYS_ENV)
        if historical_data:
            for record in historical_data:
                print(f"Date: {record['dateFrom']} to {record['dateTo']}")
                print(f"  Total Handled: {record.get('totalHandled', 0)}")
                print(f"  Total Abandoned: {record.get('totalAbandoned', 0)}")
        else:
            print("No historical data found for the specified period.")
            
    except Exception as e:
        print(f"Critical Error: {e}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The access token is expired, invalid, or missing.
  • Fix: Ensure your get_access_token function is called before making API requests. Tokens typically expire after 1 hour. Implement a retry mechanism or refresh token flow if using Authorization Code Grant.
  • Code Check: Verify that the Authorization: Bearer <token> header is correctly formatted.

Error: 403 Forbidden

  • Cause: The OAuth client lacks the required scopes.
  • Fix:
    • For /api/v2/conversations, ensure conversation:view is granted.
    • For /api/v2/analytics/conversations/details/query, ensure analytics:conversation:read is granted.
    • Update the OAuth client in the Genesys Cloud Admin Console under Developers > OAuth clients.

Error: 400 Bad Request (Analytics API)

  • Cause: The query payload is malformed or contains invalid date ranges.
  • Fix:
    • Ensure dateFrom is strictly before dateTo.
    • Validate that metrics and groupings contain valid field names for the selected mediaType.
    • Check that the JSON structure matches the API specification exactly. Extra fields may cause errors.

Error: 429 Too Many Requests

  • Cause: You have exceeded the API rate limits.
  • Fix: Implement exponential backoff retry logic. The response header Retry-After indicates the number of seconds to wait.
  • Code Example:
    import time
    
    def make_request_with_retry(request_func, *args, max_retries=3):
        for attempt in range(max_retries):
            try:
                return request_func(*args)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:
                    retry_after = int(e.response.headers.get('Retry-After', 5))
                    print(f"Rate limited. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                else:
                    raise
        raise Exception("Max retries exceeded")
    

Official References