Querying Live vs. Historical Conversations in Genesys Cloud CX

Querying Live vs. Historical Conversations in Genesys Cloud CX

What You Will Build

  • This tutorial provides working Python code to retrieve live, in-progress interactions using the Conversations API and historical, aggregated data using the Analytics API.
  • It demonstrates the precise technical differences between real-time presence data and post-interaction analytics records.
  • The implementation covers Python 3.9+ using the official purecloud-platform-client-v2 SDK.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant) or JWT Client.
  • Required Scopes:
    • For Conversations API: conversation:read, interaction:read
    • For Analytics API: analytics:read, analytics:query:read
  • SDK Version: purecloud-platform-client-v2 >= 130.0.0
  • Runtime: Python 3.9 or later
  • Dependencies:
    pip install purecloud-platform-client-v2 python-dotenv
    

Authentication Setup

Genesys Cloud CX uses OAuth 2.0 for all API access. The Conversations API and Analytics API share the same authentication mechanism, but they enforce different scopes. You must ensure your OAuth client has the specific scopes listed above.

The following code sets up the SDK client with automatic token refresh handling. This is the foundation for both API calls.

import os
from purecloudplatformclientv2 import Configuration, ApiClient, ConversationApi, AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

def get_auth_configuration() -> Configuration:
    """
    Configures the Genesys Cloud API client with OAuth2 credentials.
    """
    config = Configuration()
    config.host = "https://api.mypurecloud.com"  # Update to your environment (e.g., api.us-gov-purecloud.com)
    config.access_token_url = "https://login.mypurecloud.com/oauth/token"
    
    # Credentials from environment variables
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    
    # Define required scopes
    # Note: You can request multiple scopes in one token
    config.scope = [
        "conversation:read",
        "interaction:read",
        "analytics:read",
        "analytics:query:read"
    ]
    
    return config

def create_api_clients(config: Configuration):
    """
    Instantiates the ConversationApi and AnalyticsApi clients.
    """
    api_client = ApiClient(config)
    
    # Client for real-time/live conversations
    conversation_api = ConversationApi(api_client)
    
    # Client for historical/analytics data
    analytics_api = AnalyticsApi(api_client)
    
    return conversation_api, analytics_api

Implementation

Step 1: Retrieving Live Conversations with /api/v2/conversations

The /api/v2/conversations endpoint returns active, in-progress interactions. This data is ephemeral. Once a conversation ends (hangup, disconnect, or status change to closed), it is removed from this endpoint’s result set after a short grace period.

Use Case: You need to display the current status of an agent (e.g., “On Call with John Doe”) or trigger a real-time event (e.g., send a Slack notification when a call starts).

Key Parameters:

  • type: Filter by interaction type (voice, chat, email, sms, webchat).
  • status: Filter by conversation status (queued, contact, wrapup, closed).
def get_live_conversations(conversation_api: ConversationApi, interaction_type: str = "voice"):
    """
    Retrieves currently active conversations of a specific type.
    
    Args:
        conversation_api: The initialized ConversationApi client.
        interaction_type: The type of conversation (e.g., 'voice', 'chat').
        
    Returns:
        A list of active Conversation objects.
    """
    try:
        # GET /api/v2/conversations
        # We filter by type to keep the response payload small
        response = conversation_api.get_conversations(
            type=interaction_type,
            status="contact"  # Only return conversations currently in progress
        )
        
        if not response.entities:
            print(f"No active {interaction_type} conversations found.")
            return []
            
        print(f"Found {len(response.entities)} active {interaction_type} conversation(s).")
        return response.entities

    except ApiException as e:
        print(f"Exception when calling ConversationApi->get_conversations: {e}")
        if e.status == 401:
            print("Error: Unauthorized. Check your OAuth token or scopes.")
        elif e.status == 403:
            print("Error: Forbidden. Ensure you have 'conversation:read' scope.")
        raise

Expected Response Structure (Simplified):

{
  "entities": [
    {
      "id": "conv-uuid-123",
      "type": "voice",
      "state": "contact",
      "direction": "inbound",
      "to": "+12025550199",
      "from": "+12025550100",
      "participants": [
        {
          "id": "participant-uuid-456",
          "name": "Agent Name",
          "address": "agent-id-789",
          "type": "agent",
          "state": "connected"
        }
      ]
    }
  ],
  "pageSize": 25,
  "pageNumber": 1
}

Step 2: Querying Historical Data with /api/v2/analytics/conversations/details/query

The /api/v2/analytics/conversations/details/query endpoint returns completed interactions. This data is immutable and stored in the analytics data warehouse. It includes detailed metrics such as wait time, talk time, and hold time, which are only known after the conversation ends.

Use Case: You need to generate a report of call durations for the last hour, calculate average handle time (AHT), or archive conversation transcripts for compliance.

Key Parameters:

  • interval: ISO 8601 time range (e.g., 2023-10-01T00:00:00Z/2023-10-01T01:00:00Z).
  • groupBy: Optional aggregation (e.g., by:interaction).
  • select: Fields to include in the response (e.g., id, startTime, endTime, metrics.talk).
from datetime import datetime, timedelta, timezone

def get_historical_conversations(analytics_api: AnalyticsApi, lookback_hours: int = 1):
    """
    Queries completed conversations for a specific time interval.
    
    Args:
        analytics_api: The initialized AnalyticsApi client.
        lookback_hours: How many hours back to query.
        
    Returns:
        A list of ConversationDetail objects.
    """
    # Define the time interval
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(hours=lookback_hours)
    
    # Format as ISO 8601 with Z suffix for UTC
    interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
    
    # Define the query body
    # We want details for each interaction, grouped by the interaction itself
    query_body = {
        "interval": interval,
        "groupBy": ["by:interaction"],
        "select": [
            "id",
            "type",
            "startTime",
            "endTime",
            "direction",
            "metrics.talk",
            "metrics.hold",
            "metrics.wait"
        ],
        "filter": [
            {
                "type": "timeFilter",
                "field": "startTime",
                "operator": "within",
                "value": interval
            }
        ],
        "size": 25  # Max 1000 per page
    }

    try:
        # POST /api/v2/analytics/conversations/details/query
        response = analytics_api.post_analytics_conversations_details_query(
            body=query_body
        )
        
        if not response.entities:
            print(f"No completed conversations found in the last {lookback_hours} hour(s).")
            return []
            
        print(f"Found {len(response.entities)} completed conversation(s).")
        return response.entities

    except ApiException as e:
        print(f"Exception when calling AnalyticsApi->post_analytics_conversations_details_query: {e}")
        if e.status == 400:
            print("Error: Bad Request. Check the interval format or query syntax.")
        elif e.status == 429:
            print("Error: Rate Limited. Implement exponential backoff.")
        raise

Expected Response Structure (Simplified):

{
  "interval": "2023-10-27T10:00:00Z/2023-10-27T11:00:00Z",
  "entities": [
    {
      "id": "conv-uuid-123",
      "type": "voice",
      "startTime": "2023-10-27T10:15:00Z",
      "endTime": "2023-10-27T10:20:00Z",
      "direction": "inbound",
      "metrics": {
        "talk": {
          "value": 240.5,
          "unit": "seconds"
        },
        "hold": {
          "value": 10.2,
          "unit": "seconds"
        },
        "wait": {
          "value": 5.0,
          "unit": "seconds"
        }
      }
    }
  ],
  "pageSize": 25,
  "pageNumber": 1
}

Step 3: Handling Pagination and Large Datasets

The Analytics API can return thousands of records. The Conversations API is generally smaller but still paginates if many agents are active. Both endpoints return pageSize and pageNumber. For Analytics, you must check the nextPage link or increment the page number manually until entities is empty.

def get_all_historical_conversations(analytics_api: AnalyticsApi, lookback_hours: int = 1):
    """
    Iterates through all pages of historical conversation data.
    """
    all_conversations = []
    page = 1
    size = 1000  # Max page size for analytics
    
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(hours=lookback_hours)
    interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
    
    query_body = {
        "interval": interval,
        "groupBy": ["by:interaction"],
        "select": ["id", "type", "startTime", "endTime", "metrics.talk"],
        "size": size
    }

    while True:
        try:
            response = analytics_api.post_analytics_conversations_details_query(
                body=query_body,
                page=page
            )
            
            if not response.entities:
                break  # No more data
                
            all_conversations.extend(response.entities)
            print(f"Fetched page {page}: {len(response.entities)} records.")
            
            # If we got fewer records than requested, we have reached the end
            if len(response.entities) < size:
                break
                
            page += 1
            
        except ApiException as e:
            print(f"Error fetching page {page}: {e}")
            break

    return all_conversations

Complete Working Example

This script combines authentication, live query, and historical query into a single runnable module.

import os
import sys
from datetime import datetime, timedelta, timezone
from purecloudplatformclientv2 import Configuration, ApiClient, ConversationApi, AnalyticsApi
from purecloudplatformclientv2.rest import ApiException
from dotenv import load_dotenv

def load_config() -> Configuration:
    load_dotenv()
    config = Configuration()
    config.host = "https://api.mypurecloud.com"
    config.access_token_url = "https://login.mypurecloud.com/oauth/token"
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    config.scope = ["conversation:read", "interaction:read", "analytics:read", "analytics:query:read"]
    return config

def main():
    if not os.getenv("GENESYS_CLIENT_ID") or not os.getenv("GENESYS_CLIENT_SECRET"):
        print("Error: GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
        sys.exit(1)

    config = load_config()
    api_client = ApiClient(config)
    conversation_api = ConversationApi(api_client)
    analytics_api = AnalyticsApi(api_client)

    # 1. Get Live Voice Conversations
    print("--- Fetching Live Voice Conversations ---")
    try:
        live_convs = get_live_conversations(conversation_api, interaction_type="voice")
        for conv in live_convs:
            print(f"Live ID: {conv.id}, State: {conv.state}, From: {conv.from_}")
    except Exception as e:
        print(f"Failed to fetch live conversations: {e}")

    # 2. Get Historical Voice Conversations (Last 1 Hour)
    print("\n--- Fetching Historical Voice Conversations (Last 1 Hour) ---")
    try:
        hist_convs = get_all_historical_conversations(analytics_api, lookback_hours=1)
        for conv in hist_convs:
            talk_time = conv.metrics.get('talk', {}).get('value', 0) if conv.metrics else 0
            print(f"Hist ID: {conv.id}, Start: {conv.startTime}, Talk Time: {talk_time}s")
    except Exception as e:
        print(f"Failed to fetch historical conversations: {e}")

if __name__ == "__main__":
    main()

# Include helper functions from previous steps here for a single-file run
def get_live_conversations(conversation_api, interaction_type="voice"):
    try:
        response = conversation_api.get_conversations(type=interaction_type, status="contact")
        return response.entities if response.entities else []
    except ApiException as e:
        print(f"Conversations API Error: {e}")
        return []

def get_all_historical_conversations(analytics_api, lookback_hours=1):
    all_conversations = []
    page = 1
    size = 1000
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(hours=lookback_hours)
    interval = f"{start_time.strftime('%Y-%m-%dT%H:%M:%SZ')}/{end_time.strftime('%Y-%m-%dT%H:%M:%SZ')}"
    
    query_body = {
        "interval": interval,
        "groupBy": ["by:interaction"],
        "select": ["id", "type", "startTime", "endTime", "metrics.talk"],
        "size": size
    }

    while True:
        try:
            response = analytics_api.post_analytics_conversations_details_query(body=query_body, page=page)
            if not response.entities:
                break
            all_conversations.extend(response.entities)
            if len(response.entities) < size:
                break
            page += 1
        except ApiException as e:
            print(f"Analytics API Error: {e}")
            break
    return all_conversations

Common Errors & Debugging

Error: 401 Unauthorized

Cause: The OAuth token is expired, invalid, or the client credentials are incorrect.
Fix: Ensure your client_id and client_secret match a Confidential Client in Genesys Cloud. The SDK handles token refresh automatically if the initial token is valid. If you see this immediately, check your environment variables.

Error: 403 Forbidden

Cause: The OAuth token does not contain the required scope.
Fix:

  • For Conversations API: Add conversation:read to your client’s scopes.
  • For Analytics API: Add analytics:read and analytics:query:read.
  • Note: Scopes are granted at the client level, not the token request level (for Client Credentials). You must update the client in the Genesys Cloud Admin Console.

Error: 400 Bad Request (Analytics)

Cause: Invalid interval format or unsupported select fields.
Fix:

  • Ensure the interval is in ISO 8601 format with a Z suffix for UTC.
  • Ensure the start time is before the end time.
  • Check that the select fields exist in the documentation for ConversationDetail.

Error: 429 Too Many Requests

Cause: You have exceeded the rate limit for the API endpoint.
Fix: Implement exponential backoff. The Analytics API has stricter rate limits than the Conversations API. Do not query analytics in tight loops without delays.

import time

def api_call_with_retry(func, *args, max_retries=3, **kwargs):
    for attempt in range(max_retries):
        try:
            return func(*args, **kwargs)
        except ApiException as e:
            if e.status == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Official References