Transcribing Genesys Cloud Recordings with AWS Transcribe and Python
What You Will Build
This script polls the Genesys Cloud Media API for newly completed recordings, streams audio files to an S3 bucket, invokes AWS Transcribe with a custom language model for domain-specific terminology, parses the JSON transcription output to extract speaker turns and confidence scores, uploads the structured transcript back to Genesys Cloud via the Interactions API, and applies an S3 lifecycle policy to automatically purge raw audio after processing. It uses the Genesys Cloud Python SDK, AWS SDK for Python (boto3), and httpx. It is written in Python 3.9 and higher.
Prerequisites
- OAuth 2.0 Client Credentials flow with scopes:
media:recording:view,interaction:transcript:add - Genesys Cloud Python SDK (
genesyscloud>=2.0.0) - Python 3.9 runtime with
boto3>=1.28.0,httpx>=0.24.0,tenacity>=8.2.0 - AWS IAM role or user with
S3FullAccess,TranscribeFullAccess, ands3:PutBucketLifecycleConfigurationpermissions - An active AWS Transcribe Custom Language Model deployed in the target region
Authentication Setup
Genesys Cloud uses OAuth 2.0 client credentials for server-to-server integrations. The Python SDK handles token caching, expiration tracking, and automatic refresh. You must pass the client identifier, secret, base URL, and required scopes during initialization.
import os
from genesyscloud import configuration, platform_client_v2
from genesyscloud.recording_api import RecordingApi
from genesyscloud.interaction_api import InteractionApi
def init_genesys_client() -> platform_client_v2.PureCloudPlatformClientV2:
config = configuration.Configuration()
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
config.scope = "media:recording:view interaction:transcript:add"
# PureCloudPlatformClientV2 manages the OAuth token lifecycle automatically
client = platform_client_v2.PureCloudPlatformClientV2(config)
return client
The SDK intercepts HTTP calls, attaches the bearer token, and refreshes it when the exp claim approaches. You do not need to implement manual token rotation.
Implementation
Step 1: Poll the Media API for Completed Recordings
The endpoint GET /api/v2/media/recordings returns recording metadata. You must filter by status=completed to ensure the audio file is ready for download. The API supports pagination via next_page_token. You must also implement retry logic for 429 Too Many Requests responses, which occur when you exceed the tenant rate limit.
import httpx
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(httpx.HTTPStatusError)
)
def fetch_completed_recordings(client: platform_client_v2.PureCloudPlatformClientV2, page_token: str = None, page_size: int = 25) -> list[dict]:
recording_api = RecordingApi(client)
records = []
params = {
"status": "completed",
"page_size": page_size
}
if page_token:
params["next_page_token"] = page_token
try:
response = recording_api.get_recordings(**params)
records.extend(response.entities)
# Recurse if more pages exist
if response.next_page_token:
records.extend(fetch_completed_recordings(client, page_token=response.next_page_token, page_size=page_size))
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
print("Rate limit encountered. Retrying with backoff...")
raise
raise
return records
The get_recordings method returns a RecordingEntity wrapper. Each entity contains id, fileUrl, mediaType, and conversationId. You only process completed records to avoid downloading incomplete streams.
Step 2: Stream Audio to S3 and Invoke AWS Transcribe
Genesys provides a signed fileUrl for each recording. You stream this URL directly to S3 to avoid local disk I/O. After upload, you invoke start_transcription_job with the LanguageModelName parameter to apply your custom terminology model. You must poll get_transcription_job until the status transitions to COMPLETED.
import boto3
import httpx
def stream_to_s3_and_transcribe(recording_id: str, file_url: str, bucket: str, key_prefix: str, transcribe_client: boto3.client) -> str:
s3_client = boto3.client("s3")
s3_key = f"{key_prefix}/{recording_id}.wav"
# Stream audio directly to S3
with httpx.stream("GET", file_url) as response:
response.raise_for_status()
s3_client.upload_fileobj(
response.raw,
bucket,
s3_key,
Config=boto3.s3.transfer.TransferConfig(MultipartThreshold=8 * 1024 * 1024)
)
# Start transcription with custom language model
job_name = f"genesys-transcription-{recording_id}"
media_uri = f"s3://{bucket}/{s3_key}"
output_uri = f"s3://{bucket}/transcripts/"
transcribe_client.start_transcription_job(
TranscriptionJobName=job_name,
Media={"MediaFileUri": media_uri},
MediaFormat="wav",
LanguageCode="en-US",
LanguageModelName=os.getenv("AWS_CUSTOM_LM_NAME"),
OutputBucketName=bucket,
OutputKey=output_uri.lstrip("s3://")
)
# Poll until completion
while True:
job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
status = job["TranscriptionJob"]["TranscriptionJobStatus"]
if status == "COMPLETED":
return job["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
elif status == "FAILED":
raise RuntimeError(f"Transcription failed: {job['TranscriptionJob'].get('FailureReason')}")
time.sleep(5)
The LanguageModelName parameter instructs AWS Transcribe to use your deployed custom model. The job runs asynchronously. You must wait for COMPLETED before accessing the output JSON.
Step 3: Parse Transcription JSON and Upload to Genesys
AWS Transcribe outputs a JSON file containing results.transcripts (full text) and results.items (phoneme-level details with confidence and speaker labels). You must parse results.items, group consecutive words by speaker, calculate average confidence per turn, and format the data to match the Genesys Interactions API schema.
import json
from datetime import datetime, timezone
def parse_aws_transcript(transcript_uri: str) -> list[dict]:
s3_client = boto3.client("s3")
parts = transcript_uri.replace("s3://", "").split("/", 2)
bucket, key = parts[0], parts[1]
obj = s3_client.get_object(Bucket=bucket, Key=key)
data = json.loads(obj["Body"].read().decode("utf-8"))
items = data["results"]["items"]
parsed_turns = []
current_turn = None
for item in items:
if item.get("type") != "pronunciation":
continue
speaker = item.get("speaker_label", "unknown")
confidence = item["alternatives"][0]["confidence"]
word = item["alternatives"][0]["content"]
start_time = item["start_time"]
end_time = item["end_time"]
if not current_turn or current_turn["speaker"] != speaker:
if current_turn:
parsed_turns.append(current_turn)
current_turn = {
"speaker": speaker,
"text": word,
"confidence": confidence,
"start": datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
"end": datetime.fromtimestamp(end_time, tz=timezone.utc).isoformat()
}
else:
current_turn["text"] += f" {word}"
current_turn["confidence"] = (current_turn["confidence"] + confidence) / 2
current_turn["end"] = datetime.fromtimestamp(end_time, tz=timezone.utc).isoformat()
if current_turn:
parsed_turns.append(current_turn)
return parsed_turns
def upload_transcript_to_genesys(client: platform_client_v2.PureCloudPlatformClientV2, recording_id: str, turns: list[dict]) -> None:
interaction_api = InteractionApi(client)
body = {
"transcript": turns
}
try:
interaction_api.post_interactions_recordings_transcripts(
recording_id=recording_id,
body=body
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
raise PermissionError("Missing interaction:transcript:add scope") from e
raise
The Genesys transcript endpoint expects an array of turn objects. Each object requires speaker, text, confidence, start, and end fields. The confidence field must be a float between 0 and 1.
Step 4: Configure S3 Lifecycle Policies for Audio Purging
Raw audio files consume storage and incur costs. You apply an S3 lifecycle configuration to expire objects after a defined retention period. This configuration runs once during initialization or via infrastructure-as-code.
def configure_s3_lifecycle(bucket: str, expiration_days: int = 1) -> None:
s3_client = boto3.client("s3")
lifecycle_config = {
"Rules": [
{
"ID": "PurgeRawAudio",
"Status": "Enabled",
"Filter": {"Prefix": "recordings/"},
"Expiration": {"Days": expiration_days},
"AbortIncompleteMultipartUpload": {"DaysAfterInitiation": 1}
},
{
"ID": "PurgeTranscripts",
"Status": "Enabled",
"Filter": {"Prefix": "transcripts/"},
"Expiration": {"Days": expiration_days}
}
]
}
s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration=lifecycle_config
)
The Prefix filter ensures only processed audio and transcript files are targeted. The AbortIncompleteMultipartUpload rule cleans up failed transfers.
Complete Working Example
The following script combines all components into a single executable module. It initializes clients, applies the lifecycle policy, polls for recordings, processes each file, and uploads results.
import os
import time
import httpx
import boto3
from genesyscloud import configuration, platform_client_v2
from genesyscloud.recording_api import RecordingApi
from genesyscloud.interaction_api import InteractionApi
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
def init_clients():
config = configuration.Configuration()
config.client_id = os.getenv("GENESYS_CLIENT_ID")
config.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
config.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
config.scope = "media:recording:view interaction:transcript:add"
genesys_client = platform_client_v2.PureCloudPlatformClientV2(config)
transcribe_client = boto3.client("transcribe", region_name=os.getenv("AWS_REGION", "us-east-1"))
s3_client = boto3.client("s3", region_name=os.getenv("AWS_REGION", "us-east-1"))
return genesys_client, transcribe_client, s3_client
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type(httpx.HTTPStatusError)
)
def fetch_completed_recordings(client, page_token=None, page_size=25):
recording_api = RecordingApi(client)
records = []
params = {"status": "completed", "page_size": page_size}
if page_token:
params["next_page_token"] = page_token
response = recording_api.get_recordings(**params)
records.extend(response.entities)
if response.next_page_token:
records.extend(fetch_completed_recordings(client, page_token=response.next_page_token, page_size=page_size))
return records
def process_recording(recording, genesys_client, transcribe_client, s3_client, bucket):
recording_id = recording.id
file_url = recording.file_url
# Stream to S3
s3_key = f"recordings/{recording_id}.wav"
with httpx.stream("GET", file_url) as response:
response.raise_for_status()
s3_client.upload_fileobj(response.raw, bucket, s3_key)
# Start Transcribe
job_name = f"genesys-{recording_id}"
media_uri = f"s3://{bucket}/{s3_key}"
transcribe_client.start_transcription_job(
TranscriptionJobName=job_name,
Media={"MediaFileUri": media_uri},
MediaFormat="wav",
LanguageCode="en-US",
LanguageModelName=os.getenv("AWS_CUSTOM_LM_NAME"),
OutputBucketName=bucket,
OutputKey="transcripts/"
)
# Poll job
while True:
job = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
status = job["TranscriptionJob"]["TranscriptionJobStatus"]
if status == "COMPLETED":
break
elif status == "FAILED":
raise RuntimeError(f"Transcription failed for {recording_id}")
time.sleep(5)
# Parse and upload
transcript_uri = job["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
parts = transcript_uri.replace("s3://", "").split("/", 2)
transcript_data = json.loads(s3_client.get_object(Bucket=parts[0], Key=parts[1])["Body"].read())
items = transcript_data["results"]["items"]
turns = []
current = None
for item in items:
if item.get("type") != "pronunciation":
continue
speaker = item.get("speaker_label", "unknown")
conf = item["alternatives"][0]["confidence"]
word = item["alternatives"][0]["content"]
if not current or current["speaker"] != speaker:
if current: turns.append(current)
current = {"speaker": speaker, "text": word, "confidence": conf, "start": item["start_time"], "end": item["end_time"]}
else:
current["text"] += f" {word}"
current["confidence"] = (current["confidence"] + conf) / 2
current["end"] = item["end_time"]
if current: turns.append(current)
interaction_api = InteractionApi(genesys_client)
interaction_api.post_interactions_recordings_transcripts(
recording_id=recording_id,
body={"transcript": turns}
)
print(f"Uploaded transcript for {recording_id}")
if __name__ == "__main__":
genesys, transcribe, s3 = init_clients()
bucket = os.getenv("S3_BUCKET_NAME")
# Apply lifecycle policy once
s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration={
"Rules": [
{"ID": "PurgeAudio", "Status": "Enabled", "Filter": {"Prefix": "recordings/"}, "Expiration": {"Days": 1}},
{"ID": "PurgeTranscripts", "Status": "Enabled", "Filter": {"Prefix": "transcripts/"}, "Expiration": {"Days": 1}}
]
}
)
records = fetch_completed_recordings(genesys)
for rec in records:
try:
process_recording(rec, genesys, transcribe, s3, bucket)
except Exception as e:
print(f"Failed to process {rec.id}: {e}")
Common Errors & Debugging
Error: 401 Unauthorized on Transcript Upload
- Cause: The OAuth token lacks the
interaction:transcript:addscope, or the client credentials are expired. - Fix: Verify the
config.scopestring includes both required scopes. Regenerate the client secret if rotation occurred. - Code: The SDK automatically refreshes tokens. If the error persists, explicitly clear the cached token by instantiating a new
PureCloudPlatformClientV2.
Error: 429 Too Many Requests on Recording Poll
- Cause: Exceeding the Genesys Cloud API rate limit (typically 100 requests per second per tenant).
- Fix: The
tenacitydecorator infetch_completed_recordingshandles exponential backoff. Increasepage_sizeto 100 to reduce request frequency. - Code: Adjust
wait_exponential(multiplier=2, min=4, max=60)for aggressive throttling environments.
Error: AWS Transcribe InvalidStatusTransitionException
- Cause: Attempting to start a job with a name that already exists and is in a terminal state.
- Fix: Ensure
TranscriptionJobNameis globally unique. Append a timestamp or UUID to the job name. - Code: Change
job_name = f"genesys-{recording_id}-{int(time.time())}".
Error: 403 Forbidden on S3 Lifecycle Configuration
- Cause: The IAM role lacks
s3:PutBucketLifecycleConfiguration. - Fix: Attach the S3FullAccess policy or add the specific permission to the role trust policy.
- Code: Verify
boto3credentials match the IAM role attached to the execution environment.