Choosing Between Real-Time Conversations and Analytics Queries in Genesys Cloud

Choosing Between Real-Time Conversations and Analytics Queries in Genesys Cloud

What You Will Build

  • One sentence: This tutorial demonstrates how to retrieve active conversation data versus historical conversation metrics using the correct Genesys Cloud API endpoints.
  • One sentence: It utilizes the /api/v2/conversations endpoint for real-time state and /api/v2/analytics/conversations for aggregated reporting.
  • One sentence: The implementation uses Python with the genesyscloud SDK and the requests library for direct HTTP calls.

Prerequisites

  • OAuth Client Type: Service Account with client_credentials flow.
  • Required Scopes:
    • conversation:read (for /api/v2/conversations)
    • analytics:conversations:view or analytics:reports:view (for /api/v2/analytics/conversations)
  • SDK Version: genesyscloud >= 130.0.0 (Python)
  • Runtime: Python 3.9+
  • Dependencies:
    • pip install genesyscloud requests

Authentication Setup

Genesys Cloud uses OAuth 2.0 for all API access. For server-side integrations, the client_credentials grant type is the standard. You must cache the access token and handle expiration. The following example establishes the authentication context used in subsequent steps.

import os
import requests
from datetime import datetime, timedelta

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.token_url = f"https://{org_id}.mypurecloud.com/oauth/token"
        self.access_token = None
        self.token_expiry = None

    def get_token(self) -> str:
        """
        Retrieves an OAuth token if expired or not present.
        Returns the access token string.
        """
        if self.access_token and self.token_expiry and datetime.now() < self.token_expiry:
            return self.access_token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.token_url, data=payload)
            response.raise_for_status()
            data = response.json()
            
            self.access_token = data["access_token"]
            expires_in = data.get("expires_in", 3600)
            # Set expiry slightly early to avoid edge-case failures
            self.token_expiry = datetime.now() + timedelta(seconds=expires_in - 60)
            
            return self.access_token
        except requests.exceptions.RequestException as e:
            raise Exception(f"Failed to obtain OAuth token: {e}")

# Initialize Auth
# In production, load these from environment variables
auth = GenesysAuth(
    client_id=os.getenv("GENESYS_CLIENT_ID"),
    client_secret=os.getenv("GENESYS_CLIENT_SECRET"),
    org_id=os.getenv("GENESYS_ORG_ID")
)

Implementation

The core distinction between the two endpoints lies in temporal scope and data granularity.

  1. /api/v2/conversations: Retrieves the current state of conversations that are currently active (or recently closed within a short window). It returns individual conversation objects with detailed participant lists, media types, and real-time status. Use this for live dashboards, agent assist tools, or immediate workflow triggers.
  2. /api/v2/analytics/conversations: Retrieves historical, aggregated data. It does not return individual conversation logs by default. Instead, it returns metrics (count, handle time, wait time) grouped by time intervals, queues, or users. Use this for reporting, SLA monitoring, and historical trend analysis.

Step 1: Retrieving Real-Time Active Conversations

Use the /api/v2/conversations endpoint to list active conversations. This endpoint supports filtering by media type (voice, chat, email) and status.

Endpoint: GET /api/v2/conversations
Scope: conversation:read

def get_active_conversations(auth: GenesysAuth, org_id: str, media_type: str = "all") -> dict:
    """
    Fetches currently active conversations.
    """
    base_url = f"https://{org_id}.mypurecloud.com"
    endpoint = "/api/v2/conversations"
    
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json"
    }
    
    params = {
        "mediaTypes": media_type,  # 'voice', 'chat', 'email', 'callback', 'all'
        "pageSize": 100,
        "pageNumber": 1
    }

    try:
        response = requests.get(base_url + endpoint, headers=headers, params=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        if response.status_code == 429:
            print("Rate limit hit. Implement exponential backoff.")
        raise Exception(f"API Error: {response.status_code} - {response.text}")

# Usage
active_data = get_active_conversations(auth, os.getenv("GENESYS_ORG_ID"))
print(f"Found {active_data.get('totalCount', 0)} active conversations.")

Expected Response Structure:
The response is a paginated list of conversation objects. Each object contains a participants array, which is crucial for identifying who is involved in the conversation.

{
  "entities": [
    {
      "id": "12345678-1234-1234-1234-123456789012",
      "type": "voice",
      "state": "active",
      "participants": [
        {
          "id": "87654321-4321-4321-4321-210987654321",
          "name": "John Doe",
          "role": "agent",
          "state": "connected"
        },
        {
          "id": "11111111-1111-1111-1111-111111111111",
          "name": "Customer",
          "role": "customer",
          "state": "connected"
        }
      ],
      "wrapUpCode": null,
      "initialContact": true
    }
  ],
  "pageCount": 1,
  "pageSize": 100,
  "pageNumber": 1,
  "totalCount": 12
}

Key Insight: If you need the transcript of a chat conversation or the recording URL for a voice conversation, you must use the id from this response to call /api/v2/conversations/{conversationId} or /api/v2/conversations/{conversationId}/recordings. The /api/v2/conversations list endpoint does not include transcripts.

Step 2: Querying Historical Analytics Data

Use the /api/v2/analytics/conversations/details/query endpoint for detailed historical data or /api/v2/analytics/conversations/summary/query for high-level metrics. This tutorial uses the details query, which allows you to retrieve individual conversation records from the past, but with significant limitations on volume and retention compared to the real-time endpoint.

Endpoint: POST /api/v2/analytics/conversations/details/query
Scope: analytics:conversations:view

def query_historical_conversations(auth: GenesysAuth, org_id: str, start_time: str, end_time: str) -> dict:
    """
    Queries historical conversation data using the Analytics API.
    start_time and end_time must be in ISO 8601 format.
    """
    base_url = f"https://{org_id}.mypurecloud.com"
    endpoint = "/api/v2/analytics/conversations/details/query"
    
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json"
    }

    # The body is a Query object
    payload = {
        "viewId": "conversations",
        "timeGroup": "interval",
        "interval": "PT1H",  # Group by 1 hour intervals
        "dateFrom": start_time,
        "dateTo": end_time,
        "select": [
            "conversationId",
            "type",
            "state",
            "queueName",
            "wrapUpCode",
            "totalHandleTime",
            "customerWaitTime"
        ],
        "groupBy": ["queueName"],
        "where": [
            {
                "name": "type",
                "op": "in",
                "values": ["voice", "chat"]
            }
        ],
        "size": 25  # Max 25 for details query in single call without pagination complexity
    }

    try:
        response = requests.post(base_url + endpoint, headers=headers, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        if response.status_code == 429:
            print("Rate limit hit. Implement exponential backoff.")
        raise Exception(f"API Error: {response.status_code} - {response.text}")

# Usage: Query last 24 hours
from datetime import datetime, timedelta
end_time = datetime.utcnow().isoformat() + "Z"
start_time = (datetime.utcnow() - timedelta(hours=24)).isoformat() + "Z"

historical_data = query_historical_conversations(auth, os.getenv("GENESYS_ORG_ID"), start_time, end_time)
print(f"Retrieved {len(historical_data.get('entities', []))} conversation records.")

Expected Response Structure:
The Analytics API returns aggregated data. Even when querying “details,” the response structure is different from the real-time API. It often groups results by the groupBy parameters specified.

{
  "viewId": "conversations",
  "groupBy": ["queueName"],
  "interval": "PT1H",
  "dateFrom": "2023-10-01T00:00:00Z",
  "dateTo": "2023-10-01T01:00:00Z",
  "entities": [
    {
      "queueName": "Sales Support",
      "dateFrom": "2023-10-01T00:00:00Z",
      "dateTo": "2023-10-01T01:00:00Z",
      "metrics": {
        "conversationCount": 15,
        "totalHandleTime": 3600000,
        "customerWaitTime": 120000
      },
      "details": [
        {
          "conversationId": "abc-123-def",
          "type": "voice",
          "state": "completed",
          "wrapUpCode": "Issue Resolved",
          "totalHandleTime": 240000,
          "customerWaitTime": 30000
        },
        {
          "conversationId": "xyz-789-uvw",
          "type": "chat",
          "state": "completed",
          "wrapUpCode": "Transfer",
          "totalHandleTime": 180000,
          "customerWaitTime": 15000
        }
      ]
    }
  ]
}

Key Insight: The Analytics API is eventual consistent. Data may take up to 15-30 minutes to appear in analytics queries after a conversation ends. Do not use this endpoint for real-time monitoring.

Step 3: Comparing Data Granularity and Use Cases

To solidify the distinction, consider these scenarios:

  1. Scenario: Agent Dashboard

    • Requirement: Show the agent their current call status and next available task.
    • Endpoint: /api/v2/conversations
    • Reason: You need the exact current state (active, queued, wrapup) and participant details immediately. Analytics data is stale and aggregated.
  2. Scenario: Daily SLA Report

    • Requirement: Calculate the average handle time (AHT) for the “Support” queue yesterday.
    • Endpoint: /api/v2/analytics/conversations/summary/query
    • Reason: You need aggregated metrics (totalHandleTime, conversationCount) over a time range. Fetching individual conversations via /api/v2/conversations is impossible for historical data because those conversations no longer exist in the real-time list.
  3. Scenario: Compliance Audit

    • Requirement: Retrieve transcripts for all chat conversations handled by Agent X last week.
    • Endpoint: /api/v2/analytics/conversations/details/query (to get conversationIds) THEN /api/v2/conversations/{conversationId}/transcripts
    • Reason: You cannot list historical conversations via /api/v2/conversations. You must use Analytics to find the conversationIds, then use the Conversation API to fetch the actual transcript content.

Complete Working Example

This script combines both approaches to demonstrate a hybrid workflow: identifying active issues and comparing them against historical baselines.

import os
import requests
from datetime import datetime, timedelta
import json

class GenesysConversationsManager:
    def __init__(self, client_id: str, client_secret: str, org_id: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.org_id = org_id
        self.base_url = f"https://{org_id}.mypurecloud.com"
        self.token_url = f"{self.base_url}/oauth/token"
        self.access_token = None
        self.token_expiry = None

    def _get_token(self) -> str:
        if self.access_token and self.token_expiry and datetime.now() < self.token_expiry:
            return self.access_token

        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        response = requests.post(self.token_url, data=payload)
        response.raise_for_status()
        data = response.json()
        self.access_token = data["access_token"]
        self.token_expiry = datetime.now() + timedelta(seconds=data.get("expires_in", 3600) - 60)
        return self.access_token

    def get_active_voice_conversations(self) -> list:
        """Retrieves currently active voice conversations."""
        headers = {"Authorization": f"Bearer {self._get_token()}"}
        params = {"mediaTypes": "voice", "pageSize": 100}
        
        response = requests.get(f"{self.base_url}/api/v2/conversations", headers=headers, params=params)
        response.raise_for_status()
        return response.json().get("entities", [])

    def get_yesterday_aht(self, queue_name: str) -> dict:
        """Retrieves Average Handle Time for a specific queue from yesterday."""
        end_time = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).isoformat() + "Z"
        start_time = (datetime.utcnow() - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0).isoformat() + "Z"

        payload = {
            "viewId": "conversations",
            "timeGroup": "interval",
            "interval": "PT1D",
            "dateFrom": start_time,
            "dateTo": end_time,
            "select": ["conversationCount", "totalHandleTime"],
            "groupBy": ["queueName"],
            "where": [{"name": "queueName", "op": "eq", "values": [queue_name]}],
            "size": 25
        }

        headers = {
            "Authorization": f"Bearer {self._get_token()}",
            "Content-Type": "application/json"
        }

        response = requests.post(f"{self.base_url}/api/v2/analytics/conversations/summary/query", headers=headers, json=payload)
        response.raise_for_status()
        return response.json().get("entities", [])

    def run_comparison(self, queue_name: str):
        print(f"--- Analyzing Queue: {queue_name} ---")
        
        # 1. Get Real-Time Data
        active_convs = self.get_active_voice_conversations()
        active_in_queue = [c for c in active_convs if any(p.get('queueName') == queue_name for p in c.get('participants', []))]
        
        print(f"Active conversations in queue: {len(active_in_queue)}")
        for conv in active_in_queue:
            print(f"  - ID: {conv['id']}, State: {conv['state']}")

        # 2. Get Historical Baseline
        historical_data = self.get_yesterday_aht(queue_name)
        if historical_data:
            total_handle = historical_data[0].get("metrics", {}).get("totalHandleTime", 0)
            count = historical_data[0].get("metrics", {}).get("conversationCount", 1)
            aht_ms = total_handle / count if count > 0 else 0
            print(f"Yesterday's Avg Handle Time: {aht_ms / 1000:.2f} seconds")
        else:
            print("No historical data found for yesterday.")

# Execution
if __name__ == "__main__":
    manager = GenesysConversationsManager(
        client_id=os.getenv("GENESYS_CLIENT_ID"),
        client_secret=os.getenv("GENESYS_CLIENT_SECRET"),
        org_id=os.getenv("GENESYS_ORG_ID")
    )
    manager.run_comparison("General Support")

Common Errors & Debugging

Error: 403 Forbidden

  • Cause: The OAuth token lacks the required scope.
  • Fix: Ensure conversation:read is added for /api/v2/conversations and analytics:conversations:view for /api/v2/analytics/conversations. Check the Service Account in the Genesys Cloud Admin console under Security > Authentication > OAuth 2.0 Clients.

Error: 429 Too Many Requests

  • Cause: Exceeding the rate limit (typically 100-200 requests per minute depending on the endpoint).
  • Fix: Implement exponential backoff. The Analytics API is particularly sensitive. Do not poll /api/v2/conversations more than once every 10 seconds in a tight loop.
import time

def safe_request(func, *args, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func(*args)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

Error: Empty Entities in Analytics Query

  • Cause: The time range is in the future, or the groupBy/where filters are too restrictive.
  • Fix: Verify dateFrom and dateTo are in ISO 8601 format with ‘Z’ suffix. Ensure the queueName or other filter values exactly match the Genesys Cloud configuration. Analytics data is not available for the current minute; query at least 15 minutes in the past.

Official References