Analyzing Genesys Cloud Media Recordings with Python for Quality Assessment
What You Will Build
- A Python script that downloads Genesys Cloud call recordings, processes them through a cloud speech-to-text service, calculates silence and talk-over metrics, extracts hold times from interaction state transitions, stores results in a SQLite index, and generates a quality score report.
- This implementation uses the Genesys Cloud Recordings API, Interactions Events API, and the official Python SDK.
- The code is written in Python 3.10+ and requires
httpx,genesyscloud,pydub, andnumpy.
Prerequisites
- Genesys Cloud OAuth Client Credentials grant configured in your organization
- Required scopes:
recording:read,interaction:read,analytics:read - SDK:
genesyscloud>=2.0.0 - Runtime: Python 3.10 or higher
- External dependencies:
httpx,genesyscloud,pydub,numpy,sqlite3(standard library) - A cloud STT provider endpoint (AWS Transcribe, Google Cloud Speech, or Azure Speech) with batch or streaming capabilities
Authentication Setup
Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server integrations. The token expires after one hour and must be refreshed before expiration. The following class handles token retrieval, caching, and automatic refresh.
import httpx
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip("/")
self.token: Optional[str] = None
self.expires_at: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.expires_at:
return self.token
url = f"{self.base_url}/oauth/token"
payload = {"grant_type": "client_credentials"}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
auth = httpx.BasicAuth(self.client_id, self.client_secret)
response = httpx.post(url, data=payload, headers=headers, auth=auth)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = time.time() + data["expires_in"] - 30
return self.token
The endpoint /oauth/token requires no additional scope beyond the client configuration. The returned access_token is attached to subsequent API calls via the Authorization: Bearer header.
Implementation
Step 1: Initialize SDK and Fetch Recording Metadata
The Genesys Cloud Python SDK abstracts authentication and pagination. You initialize PureCloudPlatformClientV2 with your base URL and attach an authentication provider. The RecordingsApi class provides methods to list and retrieve recording metadata.
from genesyscloud.platform_client import PlatformClient
from genesyscloud.platform_client.models import RecordingQuery
from typing import List
def get_recent_recordings(auth: GenesysAuth, limit: int = 10) -> List:
platform = PlatformClient(base_url=auth.base_url)
platform.auth.set_auth_provider(auth.get_token)
recordings_api = platform.RecordingsApi()
query = RecordingQuery(
filter=RecordingQuery.Filter(
type="type eq 'voice'",
sort="startTime desc"
),
pageSize=limit
)
try:
response = recordings_api.post_recordings_search(query_body=query)
return response.entities if response.entities else []
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise RuntimeError("Authentication failed. Verify client credentials.")
if e.response.status_code == 403:
raise RuntimeError("Insufficient permissions. Add recording:read scope.")
raise
Required scope: recording:read
Expected response structure:
{
"entities": [
{
"id": "rec-12345678-1234-1234-1234-123456789012",
"interactionId": "int-87654321-4321-4321-4321-210987654321",
"status": "COMPLETED",
"startTime": "2023-10-25T14:30:00Z",
"parts": [
{
"id": "part-11111111-1111-1111-1111-111111111111",
"type": "voice",
"format": "wav",
"status": "COMPLETED"
}
]
}
],
"pageSize": 10,
"pageNumber": 1
}
Step 2: Download Audio Part with Retry Logic
Recording parts are downloaded via direct HTTP GET requests. The Media API returns a pre-signed URL or streams the audio directly. You must handle HTTP 429 (Too Many Requests) responses with exponential backoff.
import httpx
import time
from typing import Tuple
def download_recording_part(auth: GenesysAuth, recording_id: str, part_id: str) -> Tuple[bytes, str]:
url = f"{auth.base_url}/api/v2/recordings/{recording_id}/parts/{part_id}"
headers = {"Authorization": f"Bearer {auth.get_token()}"}
client = httpx.Client(timeout=60.0)
max_retries = 3
attempt = 0
while attempt < max_retries:
try:
response = client.get(url, headers=headers)
response.raise_for_status()
return response.content, response.headers.get("content-type", "audio/wav")
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
wait_time = 2 ** attempt
print(f"Rate limited (429). Retrying in {wait_time}s...")
time.sleep(wait_time)
attempt += 1
elif e.response.status_code == 404:
raise RuntimeError(f"Recording part {part_id} not found.")
else:
raise
finally:
client.close()
raise RuntimeError("Max retries exceeded for 429 rate limiting.")
Required scope: recording:read
The endpoint streams raw audio bytes. The content-type header indicates the format (audio/wav, audio/mp3, etc.). You will pass these bytes to your cloud STT provider in the next step.
Step 3: Process STT, Diarization, Silence, and Talk-Over Detection
Cloud STT providers return timestamped segments with speaker labels. You will parse this output to calculate silence gaps, detect talk-over events (overlapping speaker timestamps), and separate agent versus guest utterances. The following function simulates a cloud provider response structure and performs the analysis locally.
import json
import numpy as np
from typing import Dict, Any
def analyze_stt_output(stt_json: str) -> Dict[str, Any]:
data = json.loads(stt_json)
segments = data.get("results", data.get("transcripts", []))
silence_gaps = []
talk_overs = []
agent_text = []
guest_text = []
if len(segments) < 2:
return {"silence_gaps": [], "talk_overs": [], "agent_text": [], "guest_text": [], "total_silence_seconds": 0.0}
for i in range(len(segments) - 1):
current = segments[i]
next_seg = segments[i + 1]
current_end = current.get("end_time", current.get("endTime", 0.0))
next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
gap = next_start - current_end
if gap > 0.5:
silence_gaps.append({"start": current_end, "end": next_start, "duration": gap})
current_start = current.get("start_time", current.get("startTime", 0.0))
current_end = current.get("end_time", current.get("endTime", 0.0))
next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
next_end = next_seg.get("end_time", next_seg.get("endTime", 0.0))
if next_start < current_end:
overlap = current_end - next_start
talk_overs.append({"overlap_start": next_start, "overlap_end": current_end, "duration": overlap})
speaker = current.get("speaker", current.get("channel", "guest"))
text = current.get("transcript", current.get("text", ""))
if speaker.lower() in ["agent", "channel_0", "0"]:
agent_text.append(text)
else:
guest_text.append(text)
total_silence = sum(g["duration"] for g in silence_gaps)
return {
"silence_gaps": silence_gaps,
"talk_overs": talk_overs,
"agent_text": agent_text,
"guest_text": guest_text,
"total_silence_seconds": total_silence
}
Required scope: None (local processing)
Expected STT input structure:
{
"results": [
{"start_time": 0.5, "end_time": 2.1, "speaker": "agent", "transcript": "Hello, how can I help you?"},
{"start_time": 2.3, "end_time": 4.0, "speaker": "guest", "transcript": "I need assistance with my account."},
{"start_time": 3.8, "end_time": 5.2, "speaker": "agent", "transcript": "I can certainly help with that."}
]
}
The third segment overlaps with the second, triggering a talk-over detection. Gaps exceeding 0.5 seconds are logged as silence events.
Step 4: Fetch Interaction Events for Hold Time Calculation
Hold time is derived from channel state transitions. The Interactions Events API returns a chronological list of state changes for a specific interaction. You will filter for HOLD and TALK states to calculate total hold duration.
def get_hold_time(auth: GenesysAuth, interaction_id: str) -> float:
url = f"{auth.base_url}/api/v2/interactions/events/details"
params = {
"interactionId": interaction_id,
"type": "stateChange",
"limit": 1000
}
headers = {"Authorization": f"Bearer {auth.get_token()}"}
client = httpx.Client(timeout=30.0)
response = client.get(url, headers=headers, params=params)
response.raise_for_status()
client.close()
data = response.json()
events = data.get("entities", [])
hold_start = None
total_hold = 0.0
for event in events:
state = event.get("state", "").upper()
timestamp = event.get("timestamp", "")
ts_seconds = _iso_to_seconds(timestamp)
if state == "HOLD":
hold_start = ts_seconds
elif state in ["TALK", "DISCONNECTED", "QUEUE"] and hold_start is not None:
total_hold += ts_seconds - hold_start
hold_start = None
return total_hold
def _iso_to_seconds(iso_str: str) -> float:
from datetime import datetime
dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
return dt.timestamp()
Required scope: interaction:read
Expected response snippet:
{
"entities": [
{"timestamp": "2023-10-25T14:30:05Z", "state": "RINGING"},
{"timestamp": "2023-10-25T14:30:10Z", "state": "TALK"},
{"timestamp": "2023-10-25T14:32:15Z", "state": "HOLD"},
{"timestamp": "2023-10-25T14:33:45Z", "state": "TALK"},
{"timestamp": "2023-10-25T14:35:00Z", "state": "DISCONNECTED"}
]
}
The script calculates 90 seconds of hold time between 14:32:15 and 14:33:45.
Step 5: Store in SQLite and Generate Quality Report
You will store analysis results in a SQLite database with FTS5 virtual table for full-text search. The quality score is calculated using a weighted formula: silence penalties, talk-over penalties, and hold time thresholds.
import sqlite3
import json
from typing import List, Dict
def init_db(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS qa_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recording_id TEXT,
interaction_id TEXT,
hold_time REAL,
silence_seconds REAL,
talk_over_seconds REAL,
quality_score REAL,
raw_data TEXT
)
""")
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS qa_search USING fts5(
recording_id, interaction_id, raw_data, content='qa_results'
)
""")
return conn
def calculate_quality_score(hold_time: float, silence: float, talk_over: float) -> float:
score = 100.0
if hold_time > 120:
score -= 20
elif hold_time > 60:
score -= 10
score -= (silence * 2)
score -= (talk_over * 5)
return max(0.0, min(100.0, score))
def store_and_report(db_path: str, recording_id: str, interaction_id: str,
hold_time: float, analysis: Dict) -> Dict:
conn = init_db(db_path)
talk_over_secs = sum(t["duration"] for t in analysis["talk_overs"])
score = calculate_quality_score(hold_time, analysis["total_silence_seconds"], talk_over_secs)
raw = json.dumps({
"silence_gaps": analysis["silence_gaps"],
"talk_overs": analysis["talk_overs"],
"agent_text": analysis["agent_text"],
"guest_text": analysis["guest_text"]
})
conn.execute(
"INSERT INTO qa_results (recording_id, interaction_id, hold_time, silence_seconds, talk_over_seconds, quality_score, raw_data) VALUES (?, ?, ?, ?, ?, ?, ?)",
(recording_id, interaction_id, hold_time, analysis["total_silence_seconds"], talk_over_secs, score, raw)
)
conn.execute("INSERT INTO qa_search SELECT * FROM qa_results")
conn.commit()
conn.close()
return {
"recording_id": recording_id,
"quality_score": score,
"hold_time_seconds": hold_time,
"silence_seconds": analysis["total_silence_seconds"],
"talk_over_seconds": talk_over_secs,
"agent_utterances": len(analysis["agent_text"]),
"guest_utterances": len(analysis["guest_text"])
}
Required scope: None (local storage)
The FTS5 table enables queries like SELECT * FROM qa_search WHERE qa_search MATCH 'account assistance'. The quality score applies linear penalties for excessive silence, talk-over, and hold duration.
Complete Working Example
The following script combines all components into a single executable module. Replace the placeholder credentials and STT endpoint with your environment values.
#!/usr/bin/env python3
import httpx
import time
import json
import sqlite3
from typing import Optional, Dict, List, Tuple
from genesyscloud.platform_client import PlatformClient
from genesyscloud.platform_client.models import RecordingQuery
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip("/")
self.token: Optional[str] = None
self.expires_at: float = 0.0
def get_token(self) -> str:
if self.token and time.time() < self.expires_at:
return self.token
url = f"{self.base_url}/oauth/token"
payload = {"grant_type": "client_credentials"}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
auth = httpx.BasicAuth(self.client_id, self.client_secret)
response = httpx.post(url, data=payload, headers=headers, auth=auth)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
self.expires_at = time.time() + data["expires_in"] - 30
return self.token
def download_recording_part(auth: GenesysAuth, recording_id: str, part_id: str) -> Tuple[bytes, str]:
url = f"{auth.base_url}/api/v2/recordings/{recording_id}/parts/{part_id}"
headers = {"Authorization": f"Bearer {auth.get_token()}"}
client = httpx.Client(timeout=60.0)
max_retries = 3
attempt = 0
while attempt < max_retries:
try:
response = client.get(url, headers=headers)
response.raise_for_status()
return response.content, response.headers.get("content-type", "audio/wav")
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
time.sleep(2 ** attempt)
attempt += 1
else:
raise
finally:
client.close()
raise RuntimeError("Max retries exceeded for 429 rate limiting.")
def analyze_stt_output(stt_json: str) -> Dict[str, any]:
data = json.loads(stt_json)
segments = data.get("results", data.get("transcripts", []))
silence_gaps = []
talk_overs = []
agent_text = []
guest_text = []
if len(segments) < 2:
return {"silence_gaps": [], "talk_overs": [], "agent_text": [], "guest_text": [], "total_silence_seconds": 0.0}
for i in range(len(segments) - 1):
current = segments[i]
next_seg = segments[i + 1]
current_end = current.get("end_time", current.get("endTime", 0.0))
next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
gap = next_start - current_end
if gap > 0.5:
silence_gaps.append({"start": current_end, "end": next_start, "duration": gap})
current_start = current.get("start_time", current.get("startTime", 0.0))
current_end = current.get("end_time", current.get("endTime", 0.0))
next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
if next_start < current_end:
overlap = current_end - next_start
talk_overs.append({"overlap_start": next_start, "overlap_end": current_end, "duration": overlap})
speaker = current.get("speaker", current.get("channel", "guest"))
text = current.get("transcript", current.get("text", ""))
if speaker.lower() in ["agent", "channel_0", "0"]:
agent_text.append(text)
else:
guest_text.append(text)
return {"silence_gaps": silence_gaps, "talk_overs": talk_overs, "agent_text": agent_text, "guest_text": guest_text, "total_silence_seconds": sum(g["duration"] for g in silence_gaps)}
def get_hold_time(auth: GenesysAuth, interaction_id: str) -> float:
url = f"{auth.base_url}/api/v2/interactions/events/details"
params = {"interactionId": interaction_id, "type": "stateChange", "limit": 1000}
headers = {"Authorization": f"Bearer {auth.get_token()}"}
client = httpx.Client(timeout=30.0)
response = client.get(url, headers=headers, params=params)
response.raise_for_status()
client.close()
events = response.json().get("entities", [])
hold_start = None
total_hold = 0.0
from datetime import datetime
for event in events:
state = event.get("state", "").upper()
ts_seconds = datetime.fromisoformat(event.get("timestamp", "").replace("Z", "+00:00")).timestamp()
if state == "HOLD":
hold_start = ts_seconds
elif state in ["TALK", "DISCONNECTED", "QUEUE"] and hold_start is not None:
total_hold += ts_seconds - hold_start
hold_start = None
return total_hold
def calculate_quality_score(hold_time: float, silence: float, talk_over: float) -> float:
score = 100.0
if hold_time > 120: score -= 20
elif hold_time > 60: score -= 10
score -= (silence * 2)
score -= (talk_over * 5)
return max(0.0, min(100.0, score))
def store_and_report(db_path: str, recording_id: str, interaction_id: str, hold_time: float, analysis: Dict) -> Dict:
conn = sqlite3.connect(db_path)
conn.execute("CREATE TABLE IF NOT EXISTS qa_results (id INTEGER PRIMARY KEY AUTOINCREMENT, recording_id TEXT, interaction_id TEXT, hold_time REAL, silence_seconds REAL, talk_over_seconds REAL, quality_score REAL, raw_data TEXT)")
conn.execute("CREATE VIRTUAL TABLE IF NOT EXISTS qa_search USING fts5(recording_id, interaction_id, raw_data, content='qa_results')")
talk_over_secs = sum(t["duration"] for t in analysis["talk_overs"])
score = calculate_quality_score(hold_time, analysis["total_silence_seconds"], talk_over_secs)
raw = json.dumps({"silence_gaps": analysis["silence_gaps"], "talk_overs": analysis["talk_overs"], "agent_text": analysis["agent_text"], "guest_text": analysis["guest_text"]})
conn.execute("INSERT INTO qa_results (recording_id, interaction_id, hold_time, silence_seconds, talk_over_seconds, quality_score, raw_data) VALUES (?, ?, ?, ?, ?, ?, ?)", (recording_id, interaction_id, hold_time, analysis["total_silence_seconds"], talk_over_secs, score, raw))
conn.execute("INSERT INTO qa_search SELECT * FROM qa_results")
conn.commit()
conn.close()
return {"recording_id": recording_id, "quality_score": score, "hold_time_seconds": hold_time, "silence_seconds": analysis["total_silence_seconds"], "talk_over_seconds": talk_over_secs, "agent_utterances": len(analysis["agent_text"]), "guest_utterances": len(analysis["guest_text"])}
def main():
auth = GenesysAuth(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET", base_url="https://api.mypurecloud.com")
platform = PlatformClient(base_url=auth.base_url)
platform.auth.set_auth_provider(auth.get_token)
recordings_api = platform.RecordingsApi()
query = RecordingQuery(filter=RecordingQuery.Filter(type="type eq 'voice'", sort="startTime desc"), pageSize=1)
response = recordings_api.post_recordings_search(query_body=query)
if not response.entities:
print("No recordings found.")
return
rec = response.entities[0]
part = rec.parts[0]
print(f"Processing recording {rec.id}...")
audio_bytes, _ = download_recording_part(auth, rec.id, part.id)
stt_response_json = '{"results": [{"start_time": 0.5, "end_time": 2.1, "speaker": "agent", "transcript": "Hello"}, {"start_time": 2.3, "end_time": 4.0, "speaker": "guest", "transcript": "Hi there"}, {"start_time": 3.8, "end_time": 5.2, "speaker": "agent", "transcript": "How can I help"}]}'
analysis = analyze_stt_output(stt_response_json)
hold_time = get_hold_time(auth, rec.interactionId)
report = store_and_report("qa_index.db", rec.id, rec.interactionId, hold_time, analysis)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: HTTP 401 Unauthorized
- Cause: Expired OAuth token, incorrect client credentials, or missing
Authorizationheader. - Fix: Verify the client ID and secret match your Genesys Cloud application. Ensure the
get_token()method refreshes before expiration. Check that the token is attached asBearer <token>in headers. - Code fix: The
GenesysAuthclass automatically refreshes tokens 30 seconds before expiration. If you manually manage tokens, implement a TTL check before each request.
Error: HTTP 403 Forbidden
- Cause: The OAuth client lacks required scopes (
recording:read,interaction:read). - Fix: Navigate to your Genesys Cloud application configuration and add the missing scopes. Save and regenerate credentials if necessary.
- Code fix: Catch
403explicitly and print the required scopes for the failing endpoint.
Error: HTTP 429 Too Many Requests
- Cause: Exceeding Genesys Cloud API rate limits (typically 100 requests per minute per client).
- Fix: Implement exponential backoff. The
download_recording_partfunction includes a retry loop that sleeps for2^attemptseconds before retrying. - Code fix: Monitor the
Retry-Afterheader in 429 responses. Adjust backoff intervals dynamically instead of using fixed delays.
Error: sqlite3.OperationalError: table qa_search already exists
- Cause: FTS5 virtual tables cannot be recreated without dropping the backing table first.
- Fix: Use
CREATE VIRTUAL TABLE IF NOT EXISTSor drop both tables before reinitializing during development. - Code fix: The script uses
IF NOT EXISTSclauses. If schema changes are required, executeDROP TABLE IF EXISTS qa_search; DROP TABLE IF EXISTS qa_results;before re-running.