Choosing the Right Endpoint: /api/v2/conversations vs /api/v2/analytics/conversations in Genesys Cloud CX

Choosing the Right Endpoint: /api/v2/conversations vs /api/v2/analytics/conversations in Genesys Cloud CX

What You Will Build

  • This tutorial demonstrates how to retrieve real-time or recent conversation data using the Conversations API versus aggregated historical metrics using the Analytics API.
  • The code samples utilize the Genesys Cloud PureCloud Platform Client V2 SDK in Python.
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials) or Resource Owner Password Credentials (ROPC).
  • Required Scopes:
    • For Conversations API: conversation:all (read all conversations) or conversation:read (read specific conversations).
    • For Analytics API: analytics:conversation:query (query conversation analytics).
  • SDK Version: genesyscloud-purecloud-platform-client-v2 >= 140.0.0.
  • Runtime: Python 3.9 or higher.
  • External Dependencies:
    • genesyscloud-purecloud-platform-client-v2
    • python-dotenv (for secure credential management)

Authentication Setup

Before making any API calls, you must authenticate with the Genesys Cloud environment. The Python SDK handles token generation and refresh automatically when configured correctly.

Create a .env file in your project root with the following variables:

GENESYS_CLOUD_REGION=us-east-1
GENESYS_CLOUD_CLIENT_ID=your_client_id
GENESYS_CLOUD_CLIENT_SECRET=your_client_secret
GENESYS_CLOUD_USER_EMAIL=your_user_email
GENESYS_CLOUD_USER_PASSWORD=your_user_password

Install the required packages:

pip install genesyscloud-purecloud-platform-client-v2 python-dotenv

Initialize the SDK client in your Python script:

import os
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    OAuthClient,
    ConversationApi,
    AnalyticsApi
)

load_dotenv()

def get_api_client():
    """
    Initializes and returns an authenticated API client.
    Uses ROPC flow for demonstration. In production, consider Client Credentials
    if the actions do not require user-specific context.
    """
    configuration = Configuration()
    configuration.host = f"https://{os.getenv('GENESYS_CLOUD_REGION')}.pure.cloudengage.com"
    
    api_client = ApiClient(configuration)
    oauth_client = OAuthClient(configuration)
    
    # Authenticate using Resource Owner Password Credentials
    token = oauth_client.authenticate(
        os.getenv('GENESYS_CLOUD_USER_EMAIL'),
        os.getenv('GENESYS_CLOUD_USER_PASSWORD')
    )
    
    api_client.configuration.access_token = token.access_token
    
    return api_client

# Initialize clients for both APIs
api_client = get_api_client()
conversation_api = ConversationApi(api_client)
analytics_api = AnalyticsApi(api_client)

Implementation

Step 1: Understanding the Data Model Difference

The fundamental difference between these two endpoints lies in the granularity and purpose of the data.

  1. /api/v2/conversations (Conversations API)

    • Purpose: Retrieves the actual conversation objects. This includes the transcript, media type (voice, chat, email), participants, and the current state (active, queued, closed).
    • Use Case: You need to display the chat transcript to an agent, perform real-time sentiment analysis on live text, or retrieve the specific email body for an archival system.
    • Data Structure: Returns a list of Conversation objects. Each object contains detailed message history, participant IDs, and media-specific attributes.
  2. /api/v2/analytics/conversations/details/query (Analytics API)

    • Purpose: Retrieves aggregated metrics about conversations. This includes wait times, talk times, wrap-up codes, and disposition outcomes. It does not contain the transcript.
    • Use Case: You need to generate a weekly report on average handle time (AHT), identify agents with high abandonment rates, or calculate service level percentages.
    • Data Structure: Returns a QueryResponse containing a groups array with numeric metrics (e.g., sum(talkTime), avg(waitTime)).

Step 2: Retrieving Recent Conversations (Conversations API)

Let us retrieve the 10 most recent voice conversations. This endpoint is ideal when you need the context of the interaction.

Required Scope: conversation:all

from purecloudplatformclientv2 import ConversationApi, Conversation

def get_recent_conversations(api_client: ApiClient, limit: int = 10) -> list[Conversation]:
    """
    Retrieves the most recent conversations.
    
    Args:
        api_client: Authenticated API client.
        limit: Number of conversations to retrieve (max 100 per request).
        
    Returns:
        List of Conversation objects.
    """
    conversation_api = ConversationApi(api_client)
    
    try:
        # GET /api/v2/conversations
        # Parameters:
        # - limit: Max items returned (1-100)
        # - expand: Optional. Use 'participants' to include participant details
        response = conversation_api.get_conversations(
            limit=limit,
            expand="participants"
        )
        
        if response.entities:
            print(f"Retrieved {len(response.entities)} conversations.")
            return response.entities
        else:
            print("No recent conversations found.")
            return []
            
    except Exception as e:
        # Handle 401, 403, 429 errors
        if hasattr(e, 'status') and e.status == 429:
            print("Rate limit exceeded. Implement exponential backoff.")
        elif hasattr(e, 'status') and e.status in [401, 403]:
            print(f"Authentication/Authorization error: {e}")
        else:
            print(f"Error retrieving conversations: {e}")
        raise

# Execute
recent_convos = get_recent_conversations(api_client)

# Inspect the first conversation
if recent_convos:
    first_convo = recent_convos[0]
    print(f"Conversation ID: {first_convo.id}")
    print(f"Type: {first_convo.type}")
    print(f"State: {first_convo.state}")
    
    # Access transcript for chat/email
    if first_convo.type == "chat" and first_convo.transcript:
        print(f"Transcript Lines: {len(first_convo.transcript)}")

Expected Response Snippet:

{
  "entities": [
    {
      "id": "8c7b6a5d-4e3f-2a1b-9c8d-7e6f5a4b3c2d",
      "type": "voice",
      "state": "closed",
      "createdTime": "2023-10-27T14:30:00.000Z",
      "updatedTime": "2023-10-27T14:35:00.000Z",
      "participants": [
        {
          "id": "agent-id-123",
          "name": "John Doe",
          "type": "agent"
        }
      ]
    }
  ],
  "pageSize": 10,
  "pageNumber": 1
}

Step 3: Querying Conversation Metrics (Analytics API)

Now, let us retrieve the same data but focused on performance metrics. We will query for conversations from the last 24 hours.

Required Scope: analytics:conversation:query

The Analytics API uses a POST body to define the query, which is more powerful than simple GET parameters. This allows for complex filtering and grouping.

from purecloudplatformclientv2 import (
    AnalyticsApi,
    QueryPostBody,
    QueryFilter,
    QueryGroupBy,
    Metric
)
from datetime import datetime, timedelta

def get_daily_metrics(api_client: ApiClient) -> dict:
    """
    Queries analytics for conversations from the last 24 hours.
    
    Args:
        api_client: Authenticated API client.
        
    Returns:
        Dictionary containing aggregated metrics.
    """
    analytics_api = AnalyticsApi(api_client)
    
    # Define the time range
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    # Define the metrics to retrieve
    # Common metrics: talkTime, waitTime, wrapTime, holdTime, transferTime
    metrics = [
        Metric(name="talkTime"),
        Metric(name="waitTime"),
        Metric(name="count")
    ]
    
    # Define filters
    # Filter by type (voice) and state (closed)
    filters = [
        QueryFilter(name="type", operator="eq", value="voice"),
        QueryFilter(name="state", operator="eq", value="closed")
    ]
    
    # Define grouping
    # Group by date to see daily trends
    group_by = [
        QueryGroupBy(name="date")
    ]
    
    # Construct the query body
    query_body = QueryPostBody(
        metrics=metrics,
        filters=filters,
        group_by=group_by,
        time_range={
            "startTime": start_time.isoformat(),
            "endTime": end_time.isoformat()
        }
    )
    
    try:
        # POST /api/v2/analytics/conversations/details/query
        response = analytics_api.post_analytics_conversations_details_query(
            body=query_body,
        )
        
        if response.groups:
            print(f"Query returned {len(response.groups)} groups.")
            for group in response.groups:
                print(f"Date: {group.group_by}")
                for metric in group.metrics:
                    if metric.name == "talkTime":
                        print(f"  Total Talk Time (ms): {metric.total}")
                    elif metric.name == "count":
                        print(f"  Conversation Count: {metric.total}")
            return response.groups
        else:
            print("No analytics data found for the specified period.")
            return []
            
    except Exception as e:
        if hasattr(e, 'status') and e.status == 400:
            print(f"Bad Request. Check filter syntax: {e}")
        elif hasattr(e, 'status') and e.status == 429:
            print("Rate limit exceeded. Wait before retrying.")
        else:
            print(f"Error querying analytics: {e}")
        raise

# Execute
daily_metrics = get_daily_metrics(api_client)

Expected Response Snippet:

{
  "queryId": "abc123-def456",
  "status": "complete",
  "groups": [
    {
      "groupBy": {
        "date": "2023-10-27"
      },
      "metrics": [
        {
          "name": "talkTime",
          "total": 1250000,
          "avg": 45000
        },
        {
          "name": "count",
          "total": 28
        }
      ]
    }
  ]
}

Step 4: Handling Pagination and Large Datasets

Both APIs handle large datasets differently.

Conversations API:
Uses standard pagination with pageSize and pageNumber. The maximum page size is 100. To fetch all conversations, you must loop until response.pageNumber * response.pageSize exceeds the total count or no more entities are returned.

Analytics API:
The Analytics API is designed for large aggregations. It does not use traditional pagination for the result set in the same way. Instead, you define the timeRange and groupBy. If the result set is extremely large, the API may return a queryId and a status of processing. You must then poll the GET /api/v2/analytics/conversations/details/query/{queryId} endpoint until the status changes to complete.

Here is how to handle asynchronous analytics queries:

import time

def poll_analytics_query(api_client: ApiClient, query_id: str, max_wait_seconds: int = 300) -> dict:
    """
    Polls an asynchronous analytics query until completion.
    
    Args:
        api_client: Authenticated API client.
        query_id: The ID returned from the initial POST request.
        max_wait_seconds: Maximum time to wait for the query to complete.
        
    Returns:
        The final query response.
    """
    analytics_api = AnalyticsApi(api_client)
    start_time = time.time()
    
    while time.time() - start_time < max_wait_seconds:
        try:
            response = analytics_api.get_analytics_conversations_details_query(query_id)
            
            if response.status == "complete":
                return response
            elif response.status == "failed":
                raise Exception(f"Analytics query failed: {response.message}")
            else:
                # Still processing
                time.sleep(5)
                
        except Exception as e:
            print(f"Error polling query: {e}")
            raise

    raise TimeoutError("Analytics query did not complete within the specified time.")

Complete Working Example

This script combines authentication, conversation retrieval, and analytics querying into a single runnable module.

import os
import time
from datetime import datetime, timedelta
from dotenv import load_dotenv
from purecloudplatformclientv2 import (
    ApiClient,
    Configuration,
    OAuthClient,
    ConversationApi,
    AnalyticsApi,
    QueryPostBody,
    QueryFilter,
    QueryGroupBy,
    Metric
)

load_dotenv()

def init_api_client():
    configuration = Configuration()
    configuration.host = f"https://{os.getenv('GENESYS_CLOUD_REGION')}.pure.cloudengage.com"
    api_client = ApiClient(configuration)
    oauth_client = OAuthClient(configuration)
    
    token = oauth_client.authenticate(
        os.getenv('GENESYS_CLOUD_USER_EMAIL'),
        os.getenv('GENESYS_CLOUD_USER_PASSWORD')
    )
    api_client.configuration.access_token = token.access_token
    return api_client

def fetch_recent_voice_conversations(api_client: ApiClient):
    """Fetches the 5 most recent closed voice conversations."""
    conversation_api = ConversationApi(api_client)
    
    try:
        # Filter for voice conversations is not directly available in GET /conversations
        # We retrieve all recent and filter client-side, or use expand='participants' to check type
        response = conversation_api.get_conversations(limit=5, expand="participants")
        
        voice_conversations = []
        for convo in response.entities:
            if convo.type == "voice":
                voice_conversations.append(convo)
                
        return voice_conversations
    except Exception as e:
        print(f"Error fetching conversations: {e}")
        return []

def fetch_voice_metrics(api_client: ApiClient):
    """Fetches talk time and wait time metrics for the last 24 hours."""
    analytics_api = AnalyticsApi(api_client)
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=24)
    
    query_body = QueryPostBody(
        metrics=[
            Metric(name="talkTime"),
            Metric(name="waitTime"),
            Metric(name="count")
        ],
        filters=[
            QueryFilter(name="type", operator="eq", value="voice"),
            QueryFilter(name="state", operator="eq", value="closed")
        ],
        group_by=[QueryGroupBy(name="date")],
        time_range={
            "startTime": start_time.isoformat(),
            "endTime": end_time.isoformat()
        }
    )
    
    try:
        response = analytics_api.post_analytics_conversations_details_query(body=query_body)
        
        # Handle async processing
        if response.status == "processing":
            print("Query processing asynchronously...")
            for _ in range(60): # Wait up to 5 minutes
                time.sleep(5)
                response = analytics_api.get_analytics_conversations_details_query(response.query_id)
                if response.status == "complete":
                    break
            else:
                raise TimeoutError("Query did not complete.")
        
        return response.groups
    except Exception as e:
        print(f"Error fetching analytics: {e}")
        return []

def main():
    print("Initializing Genesys Cloud API Client...")
    api_client = init_api_client()
    
    print("\n--- Fetching Recent Voice Conversations ---")
    convos = fetch_recent_voice_conversations(api_client)
    for convo in convos:
        print(f"ID: {convo.id}, Created: {convo.createdTime}")
        
    print("\n--- Fetching Voice Metrics (Last 24 Hours) ---")
    metrics = fetch_voice_metrics(api_client)
    if metrics:
        for group in metrics:
            date = group.group_by.get("date", "Unknown")
            talk_time_ms = next((m.total for m in group.metrics if m.name == "talkTime"), 0)
            count = next((m.total for m in group.metrics if m.name == "count"), 0)
            print(f"Date: {date}, Count: {count}, Total Talk Time: {talk_time_ms}ms")
    else:
        print("No metrics data available.")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The OAuth token has expired or was never generated correctly.
  • Fix: Ensure the authenticate method is called before any API requests. The SDK does not automatically refresh tokens if the initial authentication fails. Check your .env credentials.

Error: 403 Forbidden

  • Cause: The OAuth token lacks the required scope.
  • Fix:
    • For Conversations API: Add conversation:all to your client’s scopes in the Genesys Admin console.
    • For Analytics API: Add analytics:conversation:query to your client’s scopes.
    • Re-authenticate after changing scopes.

Error: 400 Bad Request (Analytics)

  • Cause: Invalid filter syntax or unsupported metric name.
  • Fix: Verify metric names against the Analytics API documentation. Common mistakes include using talktime instead of talkTime (case-sensitive) or using an operator like eq on a numeric field that expects gt or lt.

Error: 429 Too Many Requests

  • Cause: Exceeding the rate limit (typically 10-20 requests per second per client).
  • Fix: Implement exponential backoff. The time.sleep() function in the polling example demonstrates a basic wait. For production systems, use a library like tenacity to handle retries automatically.

Error: Empty Results

  • Cause: The time range is too narrow, or the filters are too restrictive.
  • Fix:
    • For Conversations API: Ensure you are querying for a time frame where conversations actually occurred.
    • For Analytics API: Check that the time_range is in ISO 8601 format and that the startTime is before endTime. Also, verify that the type filter matches the actual conversation types in your environment (e.g., voice, chat, email, sms).

Official References