Implement Genesys Cloud Outbound Call Recording Transcription with Python
What You Will Build
- One sentence: This script polls Genesys Cloud for outbound recording URIs, downloads audio in chunks, processes it through a speaker diarization STT service, maps speakers to agent or caller roles, uploads structured transcripts to the Interaction API, and generates a cost report.
- One sentence: This uses the Genesys Cloud Media API, Interaction Transcript API, and a generic REST STT endpoint via the
genesyscloudPython SDK andrequests. - One sentence: The programming language covered is Python 3.9+.
Prerequisites
- OAuth client type: Service Account (Client Credentials flow)
- Required scopes:
recording:media:read,interaction:transcript:write,analytics:query:read - SDK version:
genesyscloudv2.10+ - Language/runtime: Python 3.9 or higher
- External dependencies:
pip install genesyscloud requests python-dateutil
Authentication Setup
The Genesys Cloud Python SDK handles token acquisition and automatic refresh. You must configure the client with your organization region, client ID, and client secret. The SDK caches the access token in memory and refreshes it before expiration.
from genesyscloud import rest, configuration
from genesyscloud.auth import AuthApi
def init_genesys_client(client_id: str, client_secret: str, org_region: str) -> rest.PureCloudPlatformClientV2:
config = configuration.Configuration(
host=f"https://{org_region}.mypurecloud.com",
client_id=client_id,
client_secret=client_secret
)
client = rest.PureCloudPlatformClientV2(config)
# Force initial token acquisition
auth_api = AuthApi(client)
auth_api.post_oauth_token(grant_type="client_credentials")
return client
Implementation
Step 1: Poll the Media API for recording URIs
The Media API returns a temporary downloadUri when you request a recording by ID. You must query for recordings first. This example queries recent outbound conversations and extracts media IDs.
Required scope: analytics:query:read, recording:media:read
from genesyscloud import AnalyticsApi
from datetime import datetime, timedelta, timezone
def get_outbound_media_ids(client: rest.PureCloudPlatformClientV2, days_back: int = 1) -> list[str]:
analytics_api = AnalyticsApi(client)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days_back)
# Query body for outbound voice recordings
query_body = {
"dateFrom": start_time.isoformat(),
"dateTo": end_time.isoformat(),
"groupBy": ["conversationId"],
"metrics": ["conversationDuration"],
"filter": [
{"dimension": "conversationType", "operator": "eq", "value": "voice"},
{"dimension": "direction", "operator": "eq", "value": "outbound"}
]
}
response = analytics_api.post_analytics_conversations_details_query(body=query_body)
media_ids = []
for entity in response.entities or []:
if entity.mediaId:
media_ids.append(entity.mediaId)
return media_ids
Step 2: Download audio files to temporary storage with chunked requests
Large audio files can cause memory exhaustion if downloaded entirely. Use stream=True and write in fixed-size chunks. The download URI requires a Bearer token in the Authorization header.
import os
import tempfile
import requests
def download_recording_chunked(client: rest.PureCloudPlatformClientV2, media_id: str) -> str:
# Get the download URI from the Media API
recordings_api = client.RecordingsApi()
media_obj = recordings_api.get_recordings_media(media_id, expand=["downloadUri"])
if not media_obj.downloadUri:
raise ValueError(f"Download URI not available for media ID {media_id}")
# Extract token from SDK config
token = client.configuration.access_token
headers = {"Authorization": f"Bearer {token}"}
# Create temporary file
temp_fd, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(temp_fd)
try:
with requests.get(media_obj.downloadUri, headers=headers, stream=True) as r:
r.raise_for_status()
with open(temp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except requests.exceptions.RequestException as e:
os.remove(temp_path)
raise RuntimeError(f"Failed to download recording {media_id}: {e}")
return temp_path
Step 3: Invoke an STT service with speaker diarization
This step sends the audio file to a third-party STT provider. The example uses a standard REST pattern. You must enable diarization in the request payload. The response contains speaker-labeled segments.
import json
import httpx
def invoke_stt_with_diarization(audio_path: str, stt_endpoint: str, stt_api_key: str) -> dict:
headers = {"Authorization": f"Bearer {stt_api_key}"}
# Configure STT request
stt_payload = {
"config": {
"languageCode": "en-US",
"enableSpeakerDiarization": True,
"diarizationSpeakerCount": 2,
"model": "phone_call"
}
}
with open(audio_path, "rb") as audio_file:
files = {
"audio": ("call_recording.wav", audio_file, "audio/wav")
}
with httpx.Client(timeout=300.0) as stt_client:
response = stt_client.post(
stt_endpoint,
headers=headers,
data=stt_payload,
files=files
)
response.raise_for_status()
return response.json()
Step 4: Parse diarization results to tag agent vs caller segments
Diarization returns generic labels like speaker_0 and speaker_1. You must map these to agent or caller using Genesys Cloud participant metadata. Outbound calls have a clear direction: the user who initiated is the agent.
from genesyscloud import ParticipantsApi
def map_speakers_to_roles(client: rest.PureCloudPlatformClientV2, conversation_id: str, stt_result: dict) -> list[dict]:
participants_api = ParticipantsApi(client)
participants = participants_api.get_conversations_participants(conversation_id)
# Identify agent and caller by direction
agent_id = None
caller_id = None
for p in participants.entities or []:
if p.direction == "outbound":
agent_id = p.userId
elif p.direction == "inbound":
caller_id = p.userId
if not agent_id:
raise ValueError("Agent participant not found in conversation metadata")
mapped_segments = []
for segment in stt_result.get("results", []):
speaker_label = segment.get("speakerLabel", "speaker_0")
text = segment.get("transcript", "")
start_ms = segment.get("startTimeMs", 0)
end_ms = segment.get("endTimeMs", 0)
# Simple heuristic: speaker_0 is usually the first talking party.
# In production, cross-reference with call flow logs or VXML prompts.
role = "agent" if speaker_label == "speaker_0" else "caller"
mapped_segments.append({
"role": role,
"text": text,
"start": start_ms,
"end": end_ms
})
return mapped_segments
Step 5: Upload structured transcripts to the Interaction API
The Interaction API accepts transcripts as JSON arrays. You must include the interactionId, type, content, and timestamps. The API enforces strict schema validation.
Required scope: interaction:transcript:write
from genesyscloud import InteractionsApi
def upload_transcript(client: rest.PureCloudPlatformClientV2, interaction_id: str, segments: list[dict]) -> dict:
interactions_api = InteractionsApi(client)
transcript_body = {
"interactionId": interaction_id,
"type": "transcript",
"content": segments,
"dateCreated": datetime.now(timezone.utc).isoformat()
}
response = interactions_api.post_interactions_transcripts(body=transcript_body)
return response
Step 6: Generate transcription cost reports
STT providers charge per minute of audio processed. You must track duration, status, and cost for auditing. This function appends results to a CSV file.
import csv
from pathlib import Path
def log_transcription_cost(report_path: str, media_id: str, duration_seconds: float, cost_per_minute: float, status: str) -> None:
cost = (duration_seconds / 60.0) * cost_per_minute
report_file = Path(report_path)
file_exists = report_file.exists()
with open(report_file, "a", newline="") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["mediaId", "durationSeconds", "costUSD", "status", "timestamp"])
writer.writerow([media_id, duration_seconds, f"{cost:.4f}", status, datetime.now(timezone.utc).isoformat()])
Complete Working Example
This script orchestrates all steps. Replace placeholder credentials and STT endpoint with your values.
import os
import sys
import tempfile
from datetime import datetime, timezone
from genesyscloud import rest, configuration, AuthApi, AnalyticsApi, RecordingsApi, ParticipantsApi, InteractionsApi
import requests
import httpx
import csv
from pathlib import Path
def run_transcription_pipeline(
genesys_client_id: str,
genesys_client_secret: str,
genesys_org_region: str,
stt_endpoint: str,
stt_api_key: str,
cost_per_minute: float = 0.004,
report_path: str = "transcription_costs.csv"
):
# 1. Authenticate
config = configuration.Configuration(
host=f"https://{genesys_org_region}.mypurecloud.com",
client_id=genesys_client_id,
client_secret=genesys_client_secret
)
client = rest.PureCloudPlatformClientV2(config)
AuthApi(client).post_oauth_token(grant_type="client_credentials")
# 2. Query recordings
analytics_api = AnalyticsApi(client)
query_body = {
"dateFrom": (datetime.now(timezone.utc) - timedelta(days=1)).isoformat(),
"dateTo": datetime.now(timezone.utc).isoformat(),
"groupBy": ["conversationId"],
"metrics": ["conversationDuration"],
"filter": [
{"dimension": "conversationType", "operator": "eq", "value": "voice"},
{"dimension": "direction", "operator": "eq", "value": "outbound"}
]
}
analytics_response = analytics_api.post_analytics_conversations_details_query(body=query_body)
for entity in analytics_response.entities or []:
media_id = entity.mediaId
conversation_id = entity.conversationId
duration_sec = (entity.metrics or {}).get("conversationDuration", {}).get("value", 0)
if not media_id:
continue
temp_path = None
try:
# 3. Download chunked
recordings_api = RecordingsApi(client)
media_obj = recordings_api.get_recordings_media(media_id, expand=["downloadUri"])
if not media_obj.downloadUri:
continue
token = client.configuration.access_token
headers = {"Authorization": f"Bearer {token}"}
temp_fd, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(temp_fd)
with requests.get(media_obj.downloadUri, headers=headers, stream=True) as r:
r.raise_for_status()
with open(temp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# 4. STT Diarization
with open(temp_path, "rb") as audio_file:
files = {"audio": ("recording.wav", audio_file, "audio/wav")}
stt_payload = {
"config": {
"languageCode": "en-US",
"enableSpeakerDiarization": True,
"diarizationSpeakerCount": 2,
"model": "phone_call"
}
}
with httpx.Client(timeout=300.0) as stt_client:
stt_resp = stt_client.post(stt_endpoint, headers={"Authorization": f"Bearer {stt_api_key}"}, data=stt_payload, files=files)
stt_resp.raise_for_status()
stt_result = stt_resp.json()
# 5. Map speakers
participants_api = ParticipantsApi(client)
participants = participants_api.get_conversations_participants(conversation_id)
agent_id = None
for p in participants.entities or []:
if p.direction == "outbound":
agent_id = p.userId
segments = []
for seg in stt_result.get("results", []):
role = "agent" if seg.get("speakerLabel") == "speaker_0" else "caller"
segments.append({
"role": role,
"text": seg.get("transcript", ""),
"start": seg.get("startTimeMs", 0),
"end": seg.get("endTimeMs", 0)
})
# 6. Upload transcript
interactions_api = InteractionsApi(client)
transcript_body = {
"interactionId": conversation_id,
"type": "transcript",
"content": segments,
"dateCreated": datetime.now(timezone.utc).isoformat()
}
interactions_api.post_interactions_transcripts(body=transcript_body)
# 7. Log cost
cost = (duration_sec / 60.0) * cost_per_minute
report_file = Path(report_path)
file_exists = report_file.exists()
with open(report_file, "a", newline="") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["mediaId", "durationSeconds", "costUSD", "status", "timestamp"])
writer.writerow([media_id, duration_sec, f"{cost:.4f}", "success", datetime.now(timezone.utc).isoformat()])
print(f"Processed {media_id} successfully")
except Exception as e:
print(f"Failed {media_id}: {str(e)}")
# Log failure cost as zero
report_file = Path(report_path)
file_exists = report_file.exists()
with open(report_file, "a", newline="") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["mediaId", "durationSeconds", "costUSD", "status", "timestamp"])
writer.writerow([media_id, duration_sec, "0.0000", "failed", datetime.now(timezone.utc).isoformat()])
finally:
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
if __name__ == "__main__":
run_transcription_pipeline(
genesys_client_id=os.environ["GENESYS_CLIENT_ID"],
genesys_client_secret=os.environ["GENESYS_CLIENT_SECRET"],
genesys_org_region=os.environ["GENESYS_ORG_REGION"],
stt_endpoint=os.environ["STT_ENDPOINT"],
stt_api_key=os.environ["STT_API_KEY"]
)
Common Errors & Debugging
Error: 401 Unauthorized on Media API
- Cause: The OAuth token expired or the client lacks the
recording:media:readscope. The SDK does not automatically attach tokens to rawrequestscalls. - Fix: Extract the current token from
client.configuration.access_tokenand attach it to theAuthorizationheader before calling thedownloadUri. Refresh the token ifclient.configuration.access_tokenis None.
if not client.configuration.access_token:
AuthApi(client).post_oauth_token(grant_type="client_credentials")
headers = {"Authorization": f"Bearer {client.configuration.access_token}"}
Error: 429 Too Many Requests on STT Endpoint
- Cause: The STT provider enforces strict rate limits. Parallel downloads or rapid polling trigger cascading 429 responses.
- Fix: Implement exponential backoff with jitter before retrying.
import time
import random
def retry_with_backoff(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_retries - 1:
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Retrying in {delay:.2f}s")
time.sleep(delay)
else:
raise
Error: 400 Bad Request on Interaction Transcript Upload
- Cause: The transcript payload contains missing required fields or invalid timestamp formats. Genesys Cloud rejects
nullvalues in thecontentarray. - Fix: Validate the segment structure before upload. Ensure
startandendare integers representing milliseconds.
valid_segment = {
"role": "agent",
"text": "Hello, how can I help you?",
"start": 1200,
"end": 4500
}
Error: FileNotFoundError during chunked download cleanup
- Cause: The download fails before the temporary file is created, or the file is removed twice in nested exception handlers.
- Fix: Guard the
os.removecall with an existence check in thefinallyblock.
finally:
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)