Retrieving Full Voice Conversation Transcripts via Genesys Cloud Speech Analytics API

Retrieving Full Voice Conversation Transcripts via Genesys Cloud Speech Analytics API

What You Will Build

  • A Python script that queries the Genesys Cloud Analytics API to locate voice conversations and extracts the full, time-stamped text transcript generated by Speech Analytics.
  • This tutorial utilizes the Genesys Cloud analytics/conversations/voice/details/query endpoint and the speech/text sub-resource.
  • The implementation is written in Python 3.8+ using the requests library and the official genesyscloud SDK for authentication.

Prerequisites

  • OAuth Client Type: Service Account (Client Credentials Flow).
  • Required Scopes:
    • analytics:conversation:read (to query conversation details)
    • speech:text:read (to access the transcript content)
    • analytics:conversation:view (often required for detailed metadata)
  • SDK Version: genesys-cloud-python >= 160.0.0.
  • Runtime Requirements: Python 3.8 or higher.
  • External Dependencies:
    • genesys-cloud-python
    • requests
    • python-dotenv (for managing credentials securely)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for API access. For server-to-server integrations like data retrieval, the Client Credentials Flow is the standard approach. We will use the Genesys Cloud Python SDK to handle the token acquisition and refresh automatically, ensuring that our HTTP requests in subsequent steps include a valid Authorization header.

First, install the required packages:

pip install genesys-cloud-python requests python-dotenv

Create a .env file in your project root with the following variables:

GENESYS_CLIENT_ID=your_client_id
GENESYS_CLIENT_SECRET=your_client_secret
GENESYS_REGION=us-east-1

Initialize the client in your Python script. This object manages the token lifecycle.

import os
import sys
from dotenv import load_dotenv
from genesyscloud.platform_client_v2 import PlatformClient
from genesyscloud.platform_client_v2.auth_client import AuthClient

# Load environment variables
load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes the Genesys Cloud Platform Client with OAuth credentials.
    """
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    region = os.getenv("GENESYS_REGION")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")

    # Initialize the platform client
    platform_client = PlatformClient(
        auth_client=AuthClient(
            client_id=client_id,
            client_secret=client_secret,
            region=region
        )
    )

    # Verify connectivity by fetching the current user (or checking token validity)
    try:
        # This triggers the initial token fetch if not cached
        platform_client.auth_client.get_token()
    except Exception as e:
        raise ConnectionError(f"Failed to authenticate with Genesys Cloud: {e}")

    return platform_client

# Instantiate the client
platform_client = get_platform_client()

Implementation

Step 1: Querying Voice Conversations

The Genesys Cloud Analytics API does not provide a single endpoint to “get all transcripts.” Instead, you must first query for conversation records that match your criteria (date range, media type, etc.) and then retrieve the detailed transcript for each record.

We will use the analytics/conversations/voice/details/query endpoint. This endpoint returns a summary of conversations. Crucially, we must set the metrics parameter to include transcript or ensure the response includes the id and wrapupcode necessary to fetch the full text. However, the most efficient path for transcripts is often to query the analytics/conversations/voice/summary/query first to get IDs, then fetch details, or use the details endpoint with specific metrics.

For this tutorial, we will use the details query endpoint because it allows us to filter by mediaType and returns the conversation id which is the primary key for fetching the transcript.

OAuth Scope: analytics:conversation:read

import json
from datetime import datetime, timedelta
from typing import List, Dict, Any

def query_voice_conversations(
    platform_client: PlatformClient,
    start_time: datetime,
    end_time: datetime
) -> List[Dict[str, Any]]:
    """
    Queries for voice conversations within a specific time range.
    Returns a list of conversation IDs and basic metadata.
    """
    analytics_api = platform_client.analytics_api

    # Define the query body
    # We filter for 'voice' media type to exclude chat/email
    body = {
        "dateRange": {
            "start": start_time.isoformat(),
            "end": end_time.isoformat()
        },
        "groupBy": ["conversationId"],
        "metrics": ["duration"],
        "selectionPredicates": [
            {
                "type": "mediaType",
                "mediaType": "voice"
            }
        ],
        "pageSize": 25, # Max page size for this endpoint
        "pageToken": None
    }

    conversation_ids = []
    page_token = None

    print("Querying for voice conversations...")

    while True:
        try:
            # Make the API call
            response = analytics_api.post_analytics_conversations_voice_details_query(
                body=body
            )
            
            # Check for empty response
            if not response.entity or not response.entity.get('items'):
                break

            # Extract IDs
            for item in response.entity['items']:
                conversation_id = item.get('id')
                if conversation_id:
                    conversation_ids.append(conversation_id)
            
            # Handle pagination
            page_token = response.entity.get('nextPageToken')
            if not page_token:
                break
            
            body['pageToken'] = page_token
            # Small delay to respect rate limits if querying large datasets
            import time
            time.sleep(0.5)

        except Exception as e:
            print(f"Error querying conversations: {e}")
            break

    print(f"Found {len(conversation_ids)} voice conversations.")
    return conversation_ids

Step 2: Fetching the Transcript for a Single Conversation

Once we have a conversation ID, we can fetch the transcript. The transcript is part of the detailed conversation record. In Genesys Cloud, the transcript is stored within the transcript field of the conversation detail object when retrieved via the analytics/conversations/voice/details/{id} endpoint.

However, the most direct way to get the full text transcript, including speaker labels and timestamps, is to use the get_analytics_conversations_voice_details endpoint.

OAuth Scope: analytics:conversation:read, speech:text:read

def fetch_conversation_transcript(
    platform_client: PlatformClient,
    conversation_id: str
) -> Dict[str, Any]:
    """
    Fetches the detailed record for a single voice conversation,
    including the transcript.
    """
    analytics_api = platform_client.analytics_api
    
    try:
        # Retrieve the full detail object for the conversation
        # Note: The 'transcript' metric is not explicitly passed in the summary query,
        # but the detail endpoint returns it if available and permissions allow.
        response = analytics_api.get_analytics_conversations_voice_details(
            conversation_id=conversation_id
        )
        
        return response.entity
        
    except Exception as e:
        # Handle specific error codes
        if "404" in str(e):
            return {"error": "Conversation not found or ID invalid"}
        elif "403" in str(e):
            return {"error": "Forbidden: Missing speech:text:read scope"}
        else:
            return {"error": str(e)}

Step 3: Processing the Transcript Structure

The response from the detail endpoint contains a transcript array. Each element in this array represents a segment of speech. The structure varies slightly depending on whether the transcript is from a bot, an agent, or a customer, and whether sentiment analysis is enabled.

A typical transcript segment looks like this:

{
  "from": 10.5,
  "to": 15.2,
  "confidence": 0.95,
  "speaker": "customer",
  "text": "I need help with my recent order."
}

We will create a processor that flattens these segments into a readable format and handles cases where the transcript might be empty (e.g., audio was too short or processing failed).

def process_transcript_segments(transcript_data: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Extracts and cleans transcript segments from the API response.
    """
    if not transcript_data:
        return []
    
    transcript_list = transcript_data.get('transcript', [])
    
    if not transcript_list:
        return []
    
    processed_segments = []
    
    for segment in transcript_list:
        # Ensure required fields exist
        text = segment.get('text', '')
        speaker = segment.get('speaker', 'unknown')
        start_time = segment.get('from', 0.0)
        end_time = segment.get('to', 0.0)
        confidence = segment.get('confidence', 0.0)
        
        # Skip empty segments
        if not text.strip():
            continue
            
        processed_segments.append({
            "speaker": speaker,
            "text": text,
            "start_seconds": start_time,
            "end_seconds": end_time,
            "confidence": confidence
        })
        
    return processed_segments

Step 4: Combining Queries and Exporting

We now combine the query loop with the detail fetcher. Since fetching details for every conversation in a large date range can be slow, we will implement a simple batch processor.

def export_transcripts(
    platform_client: PlatformClient,
    start_time: datetime,
    end_time: datetime,
    output_file: str = "transcripts.json"
):
    """
    Main function to query conversations, fetch transcripts, and save to file.
    """
    # Step 1: Get IDs
    conversation_ids = query_voice_conversations(platform_client, start_time, end_time)
    
    if not conversation_ids:
        print("No conversations found in the specified range.")
        return

    all_transcripts = []
    
    print(f"Fetching detailed transcripts for {len(conversation_ids)} conversations...")
    
    for idx, conv_id in enumerate(conversation_ids):
        try:
            # Step 2: Fetch Detail
            detail = fetch_conversation_transcript(platform_client, conv_id)
            
            # Step 3: Process
            segments = process_transcript_segments(detail)
            
            if segments:
                all_transcripts.append({
                    "conversationId": conv_id,
                    "startTime": detail.get('startDateTime'),
                    "duration": detail.get('duration', {}).get('total', 0),
                    "transcript": segments
                })
            else:
                print(f"[{idx+1}/{len(conversation_ids)}] No transcript for {conv_id}")
            
            # Rate limiting: Genesys Cloud allows ~20-30 requests per second for this endpoint.
            # Be conservative.
            import time
            time.sleep(0.1)
            
        except Exception as e:
            print(f"Error processing {conv_id}: {e}")
            continue

    # Save to JSON
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(all_transcripts, f, indent=2, default=str)
        
    print(f"Exported {len(all_transcripts)} transcripts to {output_file}")

Complete Working Example

Below is the complete, runnable script. Save this as get_transcripts.py.

import os
import sys
import json
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any

from dotenv import load_dotenv
from genesyscloud.platform_client_v2 import PlatformClient
from genesyscloud.platform_client_v2.auth_client import AuthClient

# Load environment variables
load_dotenv()

def get_platform_client() -> PlatformClient:
    """
    Initializes the Genesys Cloud Platform Client with OAuth credentials.
    """
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    region = os.getenv("GENESYS_REGION", "us-east-1")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")

    platform_client = PlatformClient(
        auth_client=AuthClient(
            client_id=client_id,
            client_secret=client_secret,
            region=region
        )
    )

    try:
        platform_client.auth_client.get_token()
    except Exception as e:
        raise ConnectionError(f"Failed to authenticate with Genesys Cloud: {e}")

    return platform_client

def query_voice_conversations(
    platform_client: PlatformClient,
    start_time: datetime,
    end_time: datetime
) -> List[str]:
    """
    Queries for voice conversations within a specific time range.
    Returns a list of conversation IDs.
    """
    analytics_api = platform_client.analytics_api
    
    body = {
        "dateRange": {
            "start": start_time.isoformat(),
            "end": end_time.isoformat()
        },
        "groupBy": ["conversationId"],
        "metrics": ["duration"],
        "selectionPredicates": [
            {
                "type": "mediaType",
                "mediaType": "voice"
            }
        ],
        "pageSize": 25,
        "pageToken": None
    }

    conversation_ids = []
    page_token = None

    print("Querying for voice conversations...")

    while True:
        try:
            response = analytics_api.post_analytics_conversations_voice_details_query(
                body=body
            )
            
            if not response.entity or not response.entity.get('items'):
                break

            for item in response.entity['items']:
                conversation_id = item.get('id')
                if conversation_id:
                    conversation_ids.append(conversation_id)
            
            page_token = response.entity.get('nextPageToken')
            if not page_token:
                break
            
            body['pageToken'] = page_token
            time.sleep(0.5)

        except Exception as e:
            print(f"Error querying conversations: {e}")
            break

    print(f"Found {len(conversation_ids)} voice conversations.")
    return conversation_ids

def fetch_conversation_transcript(
    platform_client: PlatformClient,
    conversation_id: str
) -> Dict[str, Any]:
    """
    Fetches the detailed record for a single voice conversation,
    including the transcript.
    """
    analytics_api = platform_client.analytics_api
    
    try:
        response = analytics_api.get_analytics_conversations_voice_details(
            conversation_id=conversation_id
        )
        return response.entity
        
    except Exception as e:
        if "404" in str(e):
            return {}
        elif "403" in str(e):
            return {"error": "Forbidden: Missing speech:text:read scope"}
        else:
            return {"error": str(e)}

def process_transcript_segments(transcript_data: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Extracts and cleans transcript segments from the API response.
    """
    if not transcript_data:
        return []
    
    transcript_list = transcript_data.get('transcript', [])
    
    if not transcript_list:
        return []
    
    processed_segments = []
    
    for segment in transcript_list:
        text = segment.get('text', '')
        speaker = segment.get('speaker', 'unknown')
        start_time = segment.get('from', 0.0)
        end_time = segment.get('to', 0.0)
        confidence = segment.get('confidence', 0.0)
        
        if not text.strip():
            continue
            
        processed_segments.append({
            "speaker": speaker,
            "text": text,
            "start_seconds": start_time,
            "end_seconds": end_time,
            "confidence": confidence
        })
        
    return processed_segments

def main():
    try:
        platform_client = get_platform_client()
    except Exception as e:
        print(f"Authentication failed: {e}")
        sys.exit(1)

    # Define time range (Last 24 hours)
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=1)
    
    output_file = "voice_transcripts.json"
    
    print(f"Starting transcript extraction from {start_time.isoformat()} to {end_time.isoformat()}")
    
    # Step 1: Get IDs
    conversation_ids = query_voice_conversations(platform_client, start_time, end_time)
    
    if not conversation_ids:
        print("No conversations found in the specified range.")
        return

    all_transcripts = []
    
    print(f"Fetching detailed transcripts for {len(conversation_ids)} conversations...")
    
    for idx, conv_id in enumerate(conversation_ids):
        try:
            # Step 2: Fetch Detail
            detail = fetch_conversation_transcript(platform_client, conv_id)
            
            # Handle errors in detail fetch
            if "error" in detail:
                print(f"Error fetching {conv_id}: {detail['error']}")
                continue
            
            # Step 3: Process
            segments = process_transcript_segments(detail)
            
            if segments:
                all_transcripts.append({
                    "conversationId": conv_id,
                    "startTime": detail.get('startDateTime'),
                    "durationSeconds": detail.get('duration', {}).get('total', 0),
                    "transcript": segments
                })
            else:
                # Transcript might not be ready or available
                print(f"[{idx+1}/{len(conversation_ids)}] No transcript data for {conv_id}")
            
            # Rate limiting
            time.sleep(0.1)
            
        except Exception as e:
            print(f"Unexpected error processing {conv_id}: {e}")
            continue

    # Save to JSON
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(all_transcripts, f, indent=2, default=str)
        
    print(f"Successfully exported {len(all_transcripts)} transcripts to {output_file}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth token does not have the required scopes.
Fix: Ensure your Service Account in Genesys Cloud Admin has both analytics:conversation:read and speech:text:read assigned. If you are using the SDK, verify that the AuthClient is initialized with the correct client_id and client_secret.

# Check scopes in your Genesys Cloud Admin
# Navigate to: Admin -> Security -> OAuth Clients -> [Your Client] -> Scopes
# Add: analytics:conversation:read, speech:text:read

Error: 429 Too Many Requests

Cause: You are hitting the API rate limit. The analytics/conversations/voice/details endpoint has a lower rate limit than summary endpoints.
Fix: Increase the sleep interval in the loop. The example uses time.sleep(0.1) which allows for 10 requests per second. If you still see 429s, increase this to 0.5 or 1.0.

# Adjust this value based on your observed limits
time.sleep(0.5) # 2 requests per second

Error: Empty Transcript Array

Cause: The conversation exists, but the Speech Analytics engine has not yet processed the audio, or the audio was too short/low quality.
Fix: Check the transcript field in the raw response. If it is an empty list [], the transcript is not yet available. You may need to wait for the analytics job to complete (usually takes a few minutes after the call ends).

# Debugging step
print(json.dumps(detail, indent=2))
# Look for "transcript": []

Error: 404 Not Found

Cause: The conversation ID is invalid or the conversation has been purged.
Fix: Verify the conversation ID from the query step. Ensure the date range in the query matches the date range you expect the conversation to exist in. Note that Genesys Cloud purges detailed conversation data after a retention period (default is often 30-90 days depending on your plan).

Official References