Querying Live Conversations vs. Analytics Data in Genesys Cloud

Querying Live Conversations vs. Analytics Data in Genesys Cloud

What You Will Build

  • This tutorial demonstrates how to retrieve real-time conversation details using the Conversations API and historical conversation metrics using the Analytics API.
  • The code uses the Genesys Cloud Platform REST API v2 and the Python genesyscloud SDK.
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client: A Genesys Cloud OAuth client with the following scopes:
    • For Conversations API: conversation:read, conversation:view
    • For Analytics API: analytics:read, analytics:query
  • SDK Version: genesyscloud Python SDK (version 150.0.0 or later).
  • Language/Runtime: Python 3.9 or higher.
  • External Dependencies:
    • pip install genesyscloud
    • pip install python-dotenv (for secure credential management)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. The Python SDK handles token acquisition and refresh automatically when initialized correctly. You must store your client credentials securely. This example uses a .env file.

Create a .env file in your project root:

GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_REGION=us-east-1

Initialize the SDK in your script:

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow

# Load environment variables
load_dotenv()

def get_api_client():
    """
    Initializes and returns a configured PureCloudPlatformClientV2 ApiClient.
    """
    # Create configuration with OAuth flow
    config = Configuration(
        host=f"https://api.{os.getenv('GENESYS_REGION')}.mypurecloud.com",
        oauth_client_id=os.getenv('GENESYS_CLIENT_ID'),
        oauth_client_secret=os.getenv('GENESYS_CLIENT_SECRET')
    )
    
    # Configure authentication
    config.access_token = PureCloudAuthFlow(
        client_id=config.oauth_client_id,
        client_secret=config.oauth_client_secret
    )
    
    # Create API client
    api_client = ApiClient(configuration=config)
    return api_client

# Initialize client
api_client = get_api_client()

Implementation

Step 1: Understanding the Core Distinction

The /api/v2/conversations endpoint returns live, active interaction objects. It is stateful, real-time, and designed for operational systems (e.g., CTI softphones, real-time dashboards, IVR state management). It does not aggregate data. It tells you “what is happening right now.”

The /api/v2/analytics/conversations endpoint returns historical, aggregated metrics. It is stateless, batch-oriented, and designed for reporting, BI, and performance analysis. It tells you “what happened over a period of time.”

Key Difference:

  • Conversations API: Returns individual conversation entities with current status, participants, and media.
  • Analytics API: Returns time-series buckets of aggregated statistics (handle time, wait time, count) across many conversations.

Step 2: Retrieving Live Conversations (Operational Use Case)

Use the Conversations API when you need to interact with an active call, chat, or task. For example, an agent assist tool that displays customer context during a live call.

Endpoint: GET /api/v2/conversations
Required Scopes: conversation:read, conversation:view

from purecloudplatformclientv2 import ConversationApi, ConversationType

def get_live_conversations():
    """
    Retrieves all active conversations for the authenticated user/organization.
    """
    conversation_api = ConversationApi(api_client)
    
    try:
        # Query parameters
        # expand: Include additional details like participants and media
        # type: Filter by conversation type (CALL, CHAT, TASK, etc.)
        response = conversation_api.get_conversations(
            expand=['participants', 'media'],
            type=ConversationType.CALL  # Optional: filter by type
        )
        
        print(f"Found {len(response.entities)} active conversations.")
        
        for conv in response.entities:
            print(f"Conversation ID: {conv.id}")
            print(f"Status: {conv.state}")
            print(f"Type: {conv.type}")
            
            # Access participant details
            if conv.participants:
                for participant in conv.participants:
                    print(f"  Participant: {participant.name} (Role: {participant.role})")
            
            # Access media details
            if conv.media and conv.media.call:
                print(f"  Call Direction: {conv.media.call.direction}")
                print(f"  Call Duration: {conv.media.call.duration}s")
                
    except Exception as e:
        print(f"Error retrieving conversations: {e}")
        raise

# Execute
get_live_conversations()

Expected Response Structure (Simplified):

{
  "entities": [
    {
      "id": "12345678-1234-1234-1234-123456789012",
      "state": "connected",
      "type": "call",
      "participants": [
        {
          "id": "agent-123",
          "name": "John Doe",
          "role": "agent"
        }
      ],
      "media": {
        "call": {
          "direction": "inbound",
          "duration": 120
        }
      }
    }
  ]
}

Step 3: Querying Historical Analytics (Reporting Use Case)

Use the Analytics API when you need to calculate average handle time, total call volume, or agent utilization over a date range. This endpoint does not return individual conversation IDs by default; it returns aggregated metrics.

Endpoint: POST /api/v2/analytics/conversations/details/query
Required Scopes: analytics:read, analytics:query

from purecloudplatformclientv2 import AnalyticsApi, ConversationDetailsQueryRequest
from datetime import datetime, timedelta

def query_call_analytics():
    """
    Queries aggregated call metrics for the last 7 days.
    """
    analytics_api = AnalyticsApi(api_client)
    
    # Define time range
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=7)
    
    # Construct the query body
    # This is a complex object; ensure all required fields are present
    query_body = ConversationDetailsQueryRequest(
        interval=f"{start_date.isoformat()}Z/{end_date.isoformat()}Z",
        group_by=['user'],  # Aggregate by agent
        metric_ids=['callHandleTime', 'callCount', 'callWaitTime'],
        filters=[
            {
                "field": "conversationType",
                "operator": "eq",
                "values": ["call"]
            }
        ],
        size=100  # Pagination size
    )
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(
            body=query_body
        )
        
        print(f"Query ID: {response.id}")
        print(f"Total Results: {response.total}")
        
        # Process buckets (time intervals)
        if response.buckets:
            for bucket in response.buckets:
                print(f"Time Bucket: {bucket.interval}")
                
                # Process entities (aggregated groups, e.g., per agent)
                if bucket.entities:
                    for entity in bucket.entities:
                        print(f"  Entity ID: {entity.entity_id}")
                        print(f"  Entity Name: {entity.entity_name}")
                        
                        # Extract metrics
                        for metric in entity.metrics:
                            if metric.metric_id == 'callCount':
                                print(f"    Call Count: {metric.total}")
                            elif metric.metric_id == 'callHandleTime':
                                print(f"    Total Handle Time: {metric.total}s")
                                
    except Exception as e:
        print(f"Error querying analytics: {e}")
        raise

# Execute
query_call_analytics()

Expected Response Structure (Simplified):

{
  "id": "query-12345",
  "total": 1,
  "buckets": [
    {
      "interval": "2023-10-01T00:00:00Z/2023-10-08T00:00:00Z",
      "entities": [
        {
          "entity_id": "agent-123",
          "entity_name": "John Doe",
          "metrics": [
            {
              "metric_id": "callCount",
              "total": 150
            },
            {
              "metric_id": "callHandleTime",
              "total": 45000
            }
          ]
        }
      ]
    }
  ]
}

Step 4: Handling Pagination and Rate Limits

Both APIs support pagination. The Conversations API uses pageSize and pageToken. The Analytics API uses size and continuationToken. Always implement retry logic for 429 Too Many Requests errors.

import time
from purecloudplatformclientv2.rest import ApiException

def fetch_all_live_conversations():
    """
    Fetches all live conversations with pagination.
    """
    conversation_api = ConversationApi(api_client)
    all_conversations = []
    page_token = None
    
    while True:
        try:
            response = conversation_api.get_conversations(
                expand=['participants'],
                page_token=page_token,
                page_size=50
            )
            
            all_conversations.extend(response.entities)
            
            # Check for next page
            if response.next_page_token:
                page_token = response.next_page_token
                time.sleep(1)  # Rate limit protection
            else:
                break
                
        except ApiException as e:
            if e.status == 429:
                # Retry after delay
                retry_after = int(e.headers.get('Retry-After', 5))
                print(f"Rate limited. Waiting {retry_after} seconds.")
                time.sleep(retry_after)
                continue
            else:
                raise e
                
    return all_conversations

def fetch_all_analytics_data(query_body):
    """
    Fetches all analytics data with pagination.
    """
    analytics_api = AnalyticsApi(api_client)
    all_entities = []
    continuation_token = None
    
    while True:
        try:
            # Update query body with continuation token if present
            if continuation_token:
                query_body.continuation_token = continuation_token
                
            response = analytics_api.post_analytics_conversations_details_query(
                body=query_body
            )
            
            # Collect entities from all buckets
            for bucket in response.buckets:
                if bucket.entities:
                    all_entities.extend(bucket.entities)
            
            # Check for next page
            if response.continuation_token:
                continuation_token = response.continuation_token
                time.sleep(1)  # Rate limit protection
            else:
                break
                
        except ApiException as e:
            if e.status == 429:
                retry_after = int(e.headers.get('Retry-After', 5))
                print(f"Rate limited. Waiting {retry_after} seconds.")
                time.sleep(retry_after)
                continue
            else:
                raise e
                
    return all_entities

Complete Working Example

This script demonstrates both APIs in a single workflow. It first checks for live calls, then retrieves historical analytics for the same agent.

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import Configuration, ApiClient, PureCloudAuthFlow, ConversationApi, AnalyticsApi, ConversationDetailsQueryRequest, ConversationType
from datetime import datetime, timedelta
import time
from purecloudplatformclientv2.rest import ApiException

load_dotenv()

def get_api_client():
    config = Configuration(
        host=f"https://api.{os.getenv('GENESYS_REGION')}.mypurecloud.com",
        oauth_client_id=os.getenv('GENESYS_CLIENT_ID'),
        oauth_client_secret=os.getenv('GENESYS_CLIENT_SECRET')
    )
    config.access_token = PureCloudAuthFlow(
        client_id=config.oauth_client_id,
        client_secret=config.oauth_client_secret
    )
    return ApiClient(configuration=config)

def get_live_calls(api_client):
    conversation_api = ConversationApi(api_client)
    try:
        response = conversation_api.get_conversations(
            expand=['participants', 'media'],
            type=ConversationType.CALL
        )
        return response.entities
    except ApiException as e:
        print(f"Error getting live calls: {e}")
        return []

def get_agent_analytics(api_client, agent_id):
    analytics_api = AnalyticsApi(api_client)
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=7)
    
    query_body = ConversationDetailsQueryRequest(
        interval=f"{start_date.isoformat()}Z/{end_date.isoformat()}Z",
        group_by=['user'],
        metric_ids=['callCount', 'callHandleTime'],
        filters=[
            {
                "field": "conversationType",
                "operator": "eq",
                "values": ["call"]
            },
            {
                "field": "userId",
                "operator": "eq",
                "values": [agent_id]
            }
        ],
        size=100
    )
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        return response
    except ApiException as e:
        print(f"Error getting analytics: {e}")
        return None

def main():
    api_client = get_api_client()
    
    # 1. Get Live Conversations
    print("=== Live Conversations ===")
    live_convs = get_live_calls(api_client)
    if live_convs:
        for conv in live_convs:
            print(f"ID: {conv.id}, State: {conv.state}")
    else:
        print("No active calls.")
        
    # 2. Get Analytics for a specific agent (example)
    print("\n=== Analytics for Agent ===")
    # Replace with a real agent ID
    agent_id = "your-agent-id-here" 
    if agent_id != "your-agent-id-here":
        analytics = get_agent_analytics(api_client, agent_id)
        if analytics and analytics.buckets:
            for bucket in analytics.buckets:
                for entity in bucket.entities:
                    print(f"Agent: {entity.entity_name}")
                    for metric in entity.metrics:
                        print(f"  {metric.metric_id}: {metric.total}")
    else:
        print("Skipping analytics (no agent ID provided).")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: Invalid OAuth credentials or expired token.
  • Fix: Verify GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET in .env. Ensure the OAuth client is active and has the correct scopes. The SDK should refresh tokens automatically, but if the client secret is wrong, initialization fails.

Error: 403 Forbidden

  • Cause: Missing OAuth scopes or insufficient user permissions.
  • Fix:
    • For Conversations API: Add conversation:read and conversation:view to the OAuth client.
    • For Analytics API: Add analytics:read and analytics:query to the OAuth client.
    • Ensure the user associated with the OAuth client has access to the relevant data (e.g., can view calls, can view analytics).

Error: 429 Too Many Requests

  • Cause: Exceeding rate limits.
  • Fix: Implement exponential backoff and respect the Retry-After header. The code examples above include basic retry logic. For high-volume queries, stagger requests and use pagination efficiently.

Error: 400 Bad Request (Analytics Query)

  • Cause: Invalid query body structure.
  • Fix:
    • Ensure interval is in ISO 8601 format with Z suffix.
    • Ensure metric_ids are valid for the conversation type (e.g., callHandleTime is for calls, not chats).
    • Ensure filters use valid operators (eq, neq, gt, etc.).
    • Check that group_by fields are compatible with the selected metrics.

Official References