Retrieving the Full Voice-to-Text Transcript via Genesys Cloud Speech Analytics API

Retrieving the Full Voice-to-Text Transcript via Genesys Cloud Speech Analytics API

What You Will Build

  • A Python script that authenticates with Genesys Cloud, queries for specific voice conversations, and retrieves the complete, timestamped speech-to-text transcript for each interaction.
  • This tutorial uses the Genesys Cloud Platform Client V2 SDK (Python) and the underlying REST API for transcript retrieval.
  • The programming language covered is Python 3.9+.

Prerequisites

  • OAuth Client Type: Machine-to-Machine (MTM) OAuth Client.
  • Required Scopes:
    • analytics:conversation:read (to query conversation details)
    • analytics:speech:read (to access speech analytics data and transcripts)
    • federation:admin:read (optional, for debugging user context, but not strictly required for this flow)
  • SDK Version: genesys-cloud-sdk-python v4.0.0 or higher.
  • Runtime Requirements: Python 3.9 or later.
  • External Dependencies:
    • genesys-cloud-sdk-python
    • python-dateutil (usually included as a dependency of the SDK)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-side integrations that retrieve historical data, the Machine-to-Machine (MTM) flow is the standard. You must create an OAuth Client in the Genesys Cloud Admin Console and assign it the necessary scopes.

The following code demonstrates how to initialize the SDK client with MTM credentials. The SDK handles the token acquisition and automatic refresh.

import os
from purecloudplatformclientv2 import (
    Configuration,
    ApiClient,
    AnalyticsApi,
    SpeechAnalyticsApi,
    AuthorizationApi
)

def get_genesis_api_client() -> ApiClient:
    """
    Configures and returns an authenticated Genesys Cloud API Client using MTM flow.
    """
    # Load credentials from environment variables for security
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")

    # Initialize the configuration object
    configuration = Configuration()
    configuration.host = f"https://{environment}"
    configuration.client_id = client_id
    configuration.client_secret = client_secret

    # Create the API Client. This object manages the OAuth token lifecycle.
    api_client = ApiClient(configuration)

    return api_client

# Initialize the client globally or within your main function
api_client = get_genesis_api_client()

Implementation

Step 1: Query for Voice Conversations

To retrieve a transcript, you first need the conversationId. The Genesys Cloud Analytics API allows you to query conversation details. We will filter for voice conversations that have speech analytics data available.

Endpoint: POST /api/v2/analytics/conversations/details/query
Scope: analytics:conversation:read

The request body must specify the date range and the entity filter. We filter by type: voice to ensure we only get voice interactions.

from purecloudplatformclientv2.models import (
    ConversationDetailsQueryRequest,
    ConversationDetailsQueryFilter,
    ConversationDetailsQueryFilterType
)
from datetime import datetime, timedelta

def query_voice_conversations(api_client: ApiClient, days_back: int = 7) -> list:
    """
    Queries Genesys Cloud for voice conversations within the last N days.
    Returns a list of ConversationDetail objects.
    """
    analytics_api = AnalyticsApi(api_client)

    # Define the time window
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)

    # Construct the filter for voice conversations
    # We specifically look for conversations that have speech analytics
    # Note: 'speech' is a valid filter type for analytics queries
    filter_obj = ConversationDetailsQueryFilter(
        entity_filter_type="speech",
        entity_filter_value="true" 
    )

    # Build the query request
    query_request = ConversationDetailsQueryRequest(
        interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
        filter=[filter_obj],
        group_by=["conversationId"],
        select=["conversationId", "type", "start_time", "end_time"]
    )

    try:
        # Execute the query
        response = analytics_api.post_analytics_conversations_details_query(body=query_request)
        
        # Extract the conversation IDs from the response
        conversation_ids = []
        if response.entities:
            for entity in response.entities:
                if entity.conversation_id:
                    conversation_ids.append(entity.conversation_id)
        
        return conversation_ids

    except Exception as e:
        print(f"Error querying conversations: {e}")
        raise

# Get a list of conversation IDs
conversation_ids = query_voice_conversations(api_client)
if not conversation_ids:
    print("No voice conversations with speech analytics found in the specified period.")
else:
    print(f"Found {len(conversation_ids)} conversations.")

Step 2: Retrieve the Speech Transcript

Once you have the conversationId, you can retrieve the transcript using the Speech Analytics API. The endpoint returns a list of segments, each containing the speaker, text, and timestamps.

Endpoint: GET /api/v2/analytics/speech/conversations/{conversationId}/transcript
Scope: analytics:speech:read

This endpoint supports pagination. The response includes a nextPageToken if there are more segments to retrieve.

from purecloudplatformclientv2.models import (
    SpeechAnalyticsConversationTranscript
)

def get_full_transcript(api_client: ApiClient, conversation_id: str) -> list:
    """
    Retrieves the full transcript for a given conversation ID, handling pagination.
    Returns a list of TranscriptSegment objects.
    """
    speech_api = SpeechAnalyticsApi(api_client)
    all_segments = []
    next_page_token = None

    while True:
        try:
            # Call the API with the conversation ID and optional page token
            response = speech_api.get_analytics_speech_conversations_transcript(
                conversation_id=conversation_id,
                next_page_token=next_page_token
            )

            if response.segments:
                all_segments.extend(response.segments)
            
            # Check for pagination
            if response.next_page_token:
                next_page_token = response.next_page_token
            else:
                break

        except Exception as e:
            print(f"Error retrieving transcript for conversation {conversation_id}: {e}")
            break

    return all_segments

Step 3: Processing and Formatting the Transcript

The raw transcript segments contain technical metadata. We will process them to create a readable, timestamped transcript. Each segment includes:

  • speaker: The role of the speaker (e.g., agent, customer, system).
  • text: The transcribed text.
  • start_time and end_time: ISO 8601 timestamps.
  • confidence: The confidence score of the transcription.
from datetime import datetime

def format_transcript_segments(segments: list) -> str:
    """
    Formats a list of transcript segments into a human-readable string.
    """
    formatted_lines = []
    for segment in segments:
        # Determine speaker label
        speaker_label = "Unknown"
        if segment.speaker:
            # Map speaker roles to readable names
            if segment.speaker == "agent":
                speaker_label = "Agent"
            elif segment.speaker == "customer":
                speaker_label = "Customer"
            elif segment.speaker == "system":
                speaker_label = "System"
            else:
                speaker_label = segment.speaker.capitalize()

        # Format timestamps
        start_time_str = ""
        end_time_str = ""
        if segment.start_time:
            start_dt = datetime.fromisoformat(segment.start_time.replace('Z', '+00:00'))
            start_time_str = start_dt.strftime("%H:%M:%S")
        
        if segment.end_time:
            end_dt = datetime.fromisoformat(segment.end_time.replace('Z', '+00:00'))
            end_time_str = end_dt.strftime("%H:%M:%S")

        # Construct the line
        line = f"[{start_time_str} - {end_time_str}] {speaker_label}: {segment.text}"
        formatted_lines.append(line)

    return "\n".join(formatted_lines)

Complete Working Example

The following script combines all steps into a single executable module. It retrieves voice conversations from the last 7 days and prints the full transcript for the first conversation found.

import os
import sys
from purecloudplatformclientv2 import (
    Configuration,
    ApiClient,
    AnalyticsApi,
    SpeechAnalyticsApi
)
from purecloudplatformclientv2.models import (
    ConversationDetailsQueryRequest,
    ConversationDetailsQueryFilter
)
from datetime import datetime, timedelta

def get_genesis_api_client() -> ApiClient:
    client_id = os.getenv("GENESYS_CLIENT_ID")
    client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    environment = os.getenv("GENESYS_ENVIRONMENT", "mypurecloud.com")

    if not client_id or not client_secret:
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in environment.")

    configuration = Configuration()
    configuration.host = f"https://{environment}"
    configuration.client_id = client_id
    configuration.client_secret = client_secret

    return ApiClient(configuration)

def query_voice_conversations(api_client: ApiClient, days_back: int = 7) -> list:
    analytics_api = AnalyticsApi(api_client)
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days_back)

    filter_obj = ConversationDetailsQueryFilter(
        entity_filter_type="speech",
        entity_filter_value="true" 
    )

    query_request = ConversationDetailsQueryRequest(
        interval=f"{start_time.isoformat()}Z/{end_time.isoformat()}Z",
        filter=[filter_obj],
        group_by=["conversationId"],
        select=["conversationId", "type", "start_time", "end_time"]
    )

    try:
        response = analytics_api.post_analytics_conversations_details_query(body=query_request)
        conversation_ids = []
        if response.entities:
            for entity in response.entities:
                if entity.conversation_id:
                    conversation_ids.append(entity.conversation_id)
        return conversation_ids
    except Exception as e:
        print(f"Error querying conversations: {e}")
        raise

def get_full_transcript(api_client: ApiClient, conversation_id: str) -> list:
    speech_api = SpeechAnalyticsApi(api_client)
    all_segments = []
    next_page_token = None

    while True:
        try:
            response = speech_api.get_analytics_speech_conversations_transcript(
                conversation_id=conversation_id,
                next_page_token=next_page_token
            )
            if response.segments:
                all_segments.extend(response.segments)
            if response.next_page_token:
                next_page_token = response.next_page_token
            else:
                break
        except Exception as e:
            print(f"Error retrieving transcript for conversation {conversation_id}: {e}")
            break
    return all_segments

def format_transcript_segments(segments: list) -> str:
    formatted_lines = []
    for segment in segments:
        speaker_label = "Unknown"
        if segment.speaker:
            if segment.speaker == "agent":
                speaker_label = "Agent"
            elif segment.speaker == "customer":
                speaker_label = "Customer"
            elif segment.speaker == "system":
                speaker_label = "System"
            else:
                speaker_label = segment.speaker.capitalize()

        start_time_str = ""
        end_time_str = ""
        if segment.start_time:
            start_dt = datetime.fromisoformat(segment.start_time.replace('Z', '+00:00'))
            start_time_str = start_dt.strftime("%H:%M:%S")
        if segment.end_time:
            end_dt = datetime.fromisoformat(segment.end_time.replace('Z', '+00:00'))
            end_time_str = end_dt.strftime("%H:%M:%S")

        line = f"[{start_time_str} - {end_time_str}] {speaker_label}: {segment.text}"
        formatted_lines.append(line)
    return "\n".join(formatted_lines)

def main():
    try:
        api_client = get_genesis_api_client()
        
        # Step 1: Get conversation IDs
        conversation_ids = query_voice_conversations(api_client, days_back=7)
        
        if not conversation_ids:
            print("No voice conversations with speech analytics found in the last 7 days.")
            return

        # Step 2: Get transcript for the first conversation
        target_conversation_id = conversation_ids[0]
        print(f"Retrieving transcript for conversation ID: {target_conversation_id}")
        
        segments = get_full_transcript(api_client, target_conversation_id)
        
        if not segments:
            print("No transcript segments found for this conversation.")
            return

        # Step 3: Format and print
        transcript_text = format_transcript_segments(segments)
        print("--- TRANSCRIPT START ---")
        print(transcript_text)
        print("--- TRANSCRIPT END ---")

    except Exception as e:
        print(f"Fatal error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth Client does not have the required scopes.
Fix: Ensure your OAuth Client has analytics:conversation:read and analytics:speech:read scopes assigned in the Genesys Cloud Admin Console.
Code Check:

# Verify scopes in your OAuth Client configuration in Genesys Cloud
# Required: analytics:conversation:read, analytics:speech:read

Error: 404 Not Found

Cause: The conversationId does not exist or does not have speech analytics data.
Fix: Verify that the conversation is of type voice and that Speech Analytics is enabled for the user queue or skill associated with the conversation.
Code Check:

# Ensure the conversation ID is valid and exists in the system
# Check if speech analytics is enabled for the queue/skill in Genesys Cloud Admin

Error: 429 Too Many Requests

Cause: You have exceeded the API rate limit.
Fix: Implement exponential backoff and retry logic. The SDK does not automatically handle retries for 429 errors.
Code Fix:

import time

def get_full_transcript_with_retry(api_client: ApiClient, conversation_id: str, max_retries: int = 3) -> list:
    speech_api = SpeechAnalyticsApi(api_client)
    all_segments = []
    next_page_token = None
    retries = 0

    while True:
        try:
            response = speech_api.get_analytics_speech_conversations_transcript(
                conversation_id=conversation_id,
                next_page_token=next_page_token
            )
            if response.segments:
                all_segments.extend(response.segments)
            if response.next_page_token:
                next_page_token = response.next_page_token
            else:
                break
            retries = 0  # Reset retries on success
        except Exception as e:
            if "429" in str(e) and retries < max_retries:
                wait_time = 2 ** retries  # Exponential backoff
                print(f"Rate limited. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                retries += 1
            else:
                print(f"Error retrieving transcript for conversation {conversation_id}: {e}")
                break
    return all_segments

Official References