Transcribing Genesys Cloud Recordings with AWS Transcribe and Python

Transcribing Genesys Cloud Recordings with AWS Transcribe and Python

What You Will Build

This script polls the Genesys Cloud Media API for newly completed recordings, streams audio files to an S3 bucket, invokes AWS Transcribe with a custom language model for domain-specific terminology, parses the JSON transcription output to extract speaker turns and confidence scores, uploads the structured transcript back to Genesys Cloud via the Interactions API, and applies an S3 lifecycle policy to automatically purge raw audio after processing. It uses the Genesys Cloud Python SDK, AWS SDK for Python (boto3), and httpx. It is written in Python 3.9 and higher.

Prerequisites

  • OAuth 2.0 Client Credentials flow with scopes: media:recording:view, interaction:transcript:add
  • Genesys Cloud Python SDK (genesyscloud>=2.0.0)
  • Python 3.9 runtime with boto3>=1.28.0, httpx>=0.24.0, tenacity>=8.2.0
  • AWS IAM role or user with S3FullAccess, TranscribeFullAccess, and s3:PutBucketLifecycleConfiguration permissions
  • An active AWS Transcribe Custom Language Model deployed in the target region

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials for server-to-server integrations. The Python SDK handles token caching, expiration tracking, and automatic refresh. You must pass the client identifier, secret, base URL, and required scopes during initialization.

import os
from genesyscloud import configuration, platform_client_v2
from genesyscloud.recording_api import RecordingApi
from genesyscloud.interaction_api import InteractionApi

def init_genesys_client() -> platform_client_v2.PureCloudPlatformClientV2:
    config = configuration.Configuration()
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
    config.scope = "media:recording:view interaction:transcript:add"
    
    # PureCloudPlatformClientV2 manages the OAuth token lifecycle automatically
    client = platform_client_v2.PureCloudPlatformClientV2(config)
    return client

The SDK intercepts HTTP calls, attaches the bearer token, and refreshes it when the exp claim approaches. You do not need to implement manual token rotation.

Implementation

Step 1: Poll the Media API for Completed Recordings

The endpoint GET /api/v2/media/recordings returns recording metadata. You must filter by status=completed to ensure the audio file is ready for download. The API supports pagination via next_page_token. You must also implement retry logic for 429 Too Many Requests responses, which occur when you exceed the tenant rate limit.

import httpx
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type(httpx.HTTPStatusError)
)
def fetch_completed_recordings(client: platform_client_v2.PureCloudPlatformClientV2, page_token: str = None, page_size: int = 25) -> list[dict]:
    recording_api = RecordingApi(client)
    records = []
    
    params = {
        "status": "completed",
        "page_size": page_size
    }
    if page_token:
        params["next_page_token"] = page_token
        
    try:
        response = recording_api.get_recordings(**params)
        records.extend(response.entities)
        
        # Recurse if more pages exist
        if response.next_page_token:
            records.extend(fetch_completed_recordings(client, page_token=response.next_page_token, page_size=page_size))
            
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 429:
            print("Rate limit encountered. Retrying with backoff...")
            raise
        raise
        
    return records

The get_recordings method returns a RecordingEntity wrapper. Each entity contains id, fileUrl, mediaType, and conversationId. You only process completed records to avoid downloading incomplete streams.

Step 2: Stream Audio to S3 and Invoke AWS Transcribe

Genesys provides a signed fileUrl for each recording. You stream this URL directly to S3 to avoid local disk I/O. After upload, you invoke start_transcription_job with the LanguageModelName parameter to apply your custom terminology model. You must poll get_transcription_job until the status transitions to COMPLETED.

import boto3
import httpx

def stream_to_s3_and_transcribe(recording_id: str, file_url: str, bucket: str, key_prefix: str, transcribe_client: boto3.client) -> str:
    s3_client = boto3.client("s3")
    s3_key = f"{key_prefix}/{recording_id}.wav"
    
    # Stream audio directly to S3
    with httpx.stream("GET", file_url) as response:
        response.raise_for_status()
        s3_client.upload_fileobj(
            response.raw, 
            bucket, 
            s3_key,
            Config=boto3.s3.transfer.TransferConfig(MultipartThreshold=8 * 1024 * 1024)
        )
        
    # Start transcription with custom language model
    job_name = f"genesys-transcription-{recording_id}"
    media_uri = f"s3://{bucket}/{s3_key}"
    output_uri = f"s3://{bucket}/transcripts/"
    
    transcribe_client.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={"MediaFileUri": media_uri},
        MediaFormat="wav",
        LanguageCode="en-US",
        LanguageModelName=os.getenv("AWS_CUSTOM_LM_NAME"),
        OutputBucketName=bucket,
        OutputKey=output_uri.lstrip("s3://")
    )
    
    # Poll until completion
    while True:
        job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
        status = job["TranscriptionJob"]["TranscriptionJobStatus"]
        
        if status == "COMPLETED":
            return job["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
        elif status == "FAILED":
            raise RuntimeError(f"Transcription failed: {job['TranscriptionJob'].get('FailureReason')}")
            
        time.sleep(5)

The LanguageModelName parameter instructs AWS Transcribe to use your deployed custom model. The job runs asynchronously. You must wait for COMPLETED before accessing the output JSON.

Step 3: Parse Transcription JSON and Upload to Genesys

AWS Transcribe outputs a JSON file containing results.transcripts (full text) and results.items (phoneme-level details with confidence and speaker labels). You must parse results.items, group consecutive words by speaker, calculate average confidence per turn, and format the data to match the Genesys Interactions API schema.

import json
from datetime import datetime, timezone

def parse_aws_transcript(transcript_uri: str) -> list[dict]:
    s3_client = boto3.client("s3")
    parts = transcript_uri.replace("s3://", "").split("/", 2)
    bucket, key = parts[0], parts[1]
    
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    data = json.loads(obj["Body"].read().decode("utf-8"))
    
    items = data["results"]["items"]
    parsed_turns = []
    current_turn = None
    
    for item in items:
        if item.get("type") != "pronunciation":
            continue
            
        speaker = item.get("speaker_label", "unknown")
        confidence = item["alternatives"][0]["confidence"]
        word = item["alternatives"][0]["content"]
        start_time = item["start_time"]
        end_time = item["end_time"]
        
        if not current_turn or current_turn["speaker"] != speaker:
            if current_turn:
                parsed_turns.append(current_turn)
            current_turn = {
                "speaker": speaker,
                "text": word,
                "confidence": confidence,
                "start": datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
                "end": datetime.fromtimestamp(end_time, tz=timezone.utc).isoformat()
            }
        else:
            current_turn["text"] += f" {word}"
            current_turn["confidence"] = (current_turn["confidence"] + confidence) / 2
            current_turn["end"] = datetime.fromtimestamp(end_time, tz=timezone.utc).isoformat()
            
    if current_turn:
        parsed_turns.append(current_turn)
        
    return parsed_turns

def upload_transcript_to_genesys(client: platform_client_v2.PureCloudPlatformClientV2, recording_id: str, turns: list[dict]) -> None:
    interaction_api = InteractionApi(client)
    body = {
        "transcript": turns
    }
    
    try:
        interaction_api.post_interactions_recordings_transcripts(
            recording_id=recording_id,
            body=body
        )
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 403:
            raise PermissionError("Missing interaction:transcript:add scope") from e
        raise

The Genesys transcript endpoint expects an array of turn objects. Each object requires speaker, text, confidence, start, and end fields. The confidence field must be a float between 0 and 1.

Step 4: Configure S3 Lifecycle Policies for Audio Purging

Raw audio files consume storage and incur costs. You apply an S3 lifecycle configuration to expire objects after a defined retention period. This configuration runs once during initialization or via infrastructure-as-code.

def configure_s3_lifecycle(bucket: str, expiration_days: int = 1) -> None:
    s3_client = boto3.client("s3")
    
    lifecycle_config = {
        "Rules": [
            {
                "ID": "PurgeRawAudio",
                "Status": "Enabled",
                "Filter": {"Prefix": "recordings/"},
                "Expiration": {"Days": expiration_days},
                "AbortIncompleteMultipartUpload": {"DaysAfterInitiation": 1}
            },
            {
                "ID": "PurgeTranscripts",
                "Status": "Enabled",
                "Filter": {"Prefix": "transcripts/"},
                "Expiration": {"Days": expiration_days}
            }
        ]
    }
    
    s3_client.put_bucket_lifecycle_configuration(
        Bucket=bucket,
        LifecycleConfiguration=lifecycle_config
    )

The Prefix filter ensures only processed audio and transcript files are targeted. The AbortIncompleteMultipartUpload rule cleans up failed transfers.

Complete Working Example

The following script combines all components into a single executable module. It initializes clients, applies the lifecycle policy, polls for recordings, processes each file, and uploads results.

import os
import time
import httpx
import boto3
from genesyscloud import configuration, platform_client_v2
from genesyscloud.recording_api import RecordingApi
from genesyscloud.interaction_api import InteractionApi
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

def init_clients():
    config = configuration.Configuration()
    config.client_id = os.getenv("GENESYS_CLIENT_ID")
    config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
    config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
    config.scope = "media:recording:view interaction:transcript:add"
    genesys_client = platform_client_v2.PureCloudPlatformClientV2(config)
    
    transcribe_client = boto3.client("transcribe", region_name=os.getenv("AWS_REGION", "us-east-1"))
    s3_client = boto3.client("s3", region_name=os.getenv("AWS_REGION", "us-east-1"))
    
    return genesys_client, transcribe_client, s3_client

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type(httpx.HTTPStatusError)
)
def fetch_completed_recordings(client, page_token=None, page_size=25):
    recording_api = RecordingApi(client)
    records = []
    params = {"status": "completed", "page_size": page_size}
    if page_token:
        params["next_page_token"] = page_token
        
    response = recording_api.get_recordings(**params)
    records.extend(response.entities)
    if response.next_page_token:
        records.extend(fetch_completed_recordings(client, page_token=response.next_page_token, page_size=page_size))
    return records

def process_recording(recording, genesys_client, transcribe_client, s3_client, bucket):
    recording_id = recording.id
    file_url = recording.file_url
    
    # Stream to S3
    s3_key = f"recordings/{recording_id}.wav"
    with httpx.stream("GET", file_url) as response:
        response.raise_for_status()
        s3_client.upload_fileobj(response.raw, bucket, s3_key)
        
    # Start Transcribe
    job_name = f"genesys-{recording_id}"
    media_uri = f"s3://{bucket}/{s3_key}"
    transcribe_client.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={"MediaFileUri": media_uri},
        MediaFormat="wav",
        LanguageCode="en-US",
        LanguageModelName=os.getenv("AWS_CUSTOM_LM_NAME"),
        OutputBucketName=bucket,
        OutputKey="transcripts/"
    )
    
    # Poll job
    while True:
        job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
        status = job["TranscriptionJob"]["TranscriptionJobStatus"]
        if status == "COMPLETED":
            break
        elif status == "FAILED":
            raise RuntimeError(f"Transcription failed for {recording_id}")
        time.sleep(5)
        
    # Parse and upload
    transcript_uri = job["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
    parts = transcript_uri.replace("s3://", "").split("/", 2)
    transcript_data = json.loads(s3_client.get_object(Bucket=parts[0], Key=parts[1])["Body"].read())
    
    items = transcript_data["results"]["items"]
    turns = []
    current = None
    for item in items:
        if item.get("type") != "pronunciation":
            continue
        speaker = item.get("speaker_label", "unknown")
        conf = item["alternatives"][0]["confidence"]
        word = item["alternatives"][0]["content"]
        if not current or current["speaker"] != speaker:
            if current: turns.append(current)
            current = {"speaker": speaker, "text": word, "confidence": conf, "start": item["start_time"], "end": item["end_time"]}
        else:
            current["text"] += f" {word}"
            current["confidence"] = (current["confidence"] + conf) / 2
            current["end"] = item["end_time"]
    if current: turns.append(current)
    
    interaction_api = InteractionApi(genesys_client)
    interaction_api.post_interactions_recordings_transcripts(
        recording_id=recording_id,
        body={"transcript": turns}
    )
    print(f"Uploaded transcript for {recording_id}")

if __name__ == "__main__":
    genesys, transcribe, s3 = init_clients()
    bucket = os.getenv("S3_BUCKET_NAME")
    
    # Apply lifecycle policy once
    s3.put_bucket_lifecycle_configuration(
        Bucket=bucket,
        LifecycleConfiguration={
            "Rules": [
                {"ID": "PurgeAudio", "Status": "Enabled", "Filter": {"Prefix": "recordings/"}, "Expiration": {"Days": 1}},
                {"ID": "PurgeTranscripts", "Status": "Enabled", "Filter": {"Prefix": "transcripts/"}, "Expiration": {"Days": 1}}
            ]
        }
    )
    
    records = fetch_completed_recordings(genesys)
    for rec in records:
        try:
            process_recording(rec, genesys, transcribe, s3, bucket)
        except Exception as e:
            print(f"Failed to process {rec.id}: {e}")

Common Errors & Debugging

Error: 401 Unauthorized on Transcript Upload

  • Cause: The OAuth token lacks the interaction:transcript:add scope, or the client credentials are expired.
  • Fix: Verify the config.scope string includes both required scopes. Regenerate the client secret if rotation occurred.
  • Code: The SDK automatically refreshes tokens. If the error persists, explicitly clear the cached token by instantiating a new PureCloudPlatformClientV2.

Error: 429 Too Many Requests on Recording Poll

  • Cause: Exceeding the Genesys Cloud API rate limit (typically 100 requests per second per tenant).
  • Fix: The tenacity decorator in fetch_completed_recordings handles exponential backoff. Increase page_size to 100 to reduce request frequency.
  • Code: Adjust wait_exponential(multiplier=2, min=4, max=60) for aggressive throttling environments.

Error: AWS Transcribe InvalidStatusTransitionException

  • Cause: Attempting to start a job with a name that already exists and is in a terminal state.
  • Fix: Ensure TranscriptionJobName is globally unique. Append a timestamp or UUID to the job name.
  • Code: Change job_name = f"genesys-{recording_id}-{int(time.time())}".

Error: 403 Forbidden on S3 Lifecycle Configuration

  • Cause: The IAM role lacks s3:PutBucketLifecycleConfiguration.
  • Fix: Attach the S3FullAccess policy or add the specific permission to the role trust policy.
  • Code: Verify boto3 credentials match the IAM role attached to the execution environment.

Official References