Analyzing Genesys Cloud Media Recordings with Python for Quality Assessment

Analyzing Genesys Cloud Media Recordings with Python for Quality Assessment

What You Will Build

  • A Python script that downloads Genesys Cloud call recordings, processes them through a cloud speech-to-text service, calculates silence and talk-over metrics, extracts hold times from interaction state transitions, stores results in a SQLite index, and generates a quality score report.
  • This implementation uses the Genesys Cloud Recordings API, Interactions Events API, and the official Python SDK.
  • The code is written in Python 3.10+ and requires httpx, genesyscloud, pydub, and numpy.

Prerequisites

  • Genesys Cloud OAuth Client Credentials grant configured in your organization
  • Required scopes: recording:read, interaction:read, analytics:read
  • SDK: genesyscloud>=2.0.0
  • Runtime: Python 3.10 or higher
  • External dependencies: httpx, genesyscloud, pydub, numpy, sqlite3 (standard library)
  • A cloud STT provider endpoint (AWS Transcribe, Google Cloud Speech, or Azure Speech) with batch or streaming capabilities

Authentication Setup

Genesys Cloud uses OAuth 2.0 client credentials flow for server-to-server integrations. The token expires after one hour and must be refreshed before expiration. The following class handles token retrieval, caching, and automatic refresh.

import httpx
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip("/")
        self.token: Optional[str] = None
        self.expires_at: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at:
            return self.token

        url = f"{self.base_url}/oauth/token"
        payload = {"grant_type": "client_credentials"}
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        auth = httpx.BasicAuth(self.client_id, self.client_secret)

        response = httpx.post(url, data=payload, headers=headers, auth=auth)
        response.raise_for_status()
        data = response.json()

        self.token = data["access_token"]
        self.expires_at = time.time() + data["expires_in"] - 30
        return self.token

The endpoint /oauth/token requires no additional scope beyond the client configuration. The returned access_token is attached to subsequent API calls via the Authorization: Bearer header.

Implementation

Step 1: Initialize SDK and Fetch Recording Metadata

The Genesys Cloud Python SDK abstracts authentication and pagination. You initialize PureCloudPlatformClientV2 with your base URL and attach an authentication provider. The RecordingsApi class provides methods to list and retrieve recording metadata.

from genesyscloud.platform_client import PlatformClient
from genesyscloud.platform_client.models import RecordingQuery
from typing import List

def get_recent_recordings(auth: GenesysAuth, limit: int = 10) -> List:
    platform = PlatformClient(base_url=auth.base_url)
    platform.auth.set_auth_provider(auth.get_token)

    recordings_api = platform.RecordingsApi()
    query = RecordingQuery(
        filter=RecordingQuery.Filter(
            type="type eq 'voice'",
            sort="startTime desc"
        ),
        pageSize=limit
    )

    try:
        response = recordings_api.post_recordings_search(query_body=query)
        return response.entities if response.entities else []
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 401:
            raise RuntimeError("Authentication failed. Verify client credentials.")
        if e.response.status_code == 403:
            raise RuntimeError("Insufficient permissions. Add recording:read scope.")
        raise

Required scope: recording:read
Expected response structure:

{
  "entities": [
    {
      "id": "rec-12345678-1234-1234-1234-123456789012",
      "interactionId": "int-87654321-4321-4321-4321-210987654321",
      "status": "COMPLETED",
      "startTime": "2023-10-25T14:30:00Z",
      "parts": [
        {
          "id": "part-11111111-1111-1111-1111-111111111111",
          "type": "voice",
          "format": "wav",
          "status": "COMPLETED"
        }
      ]
    }
  ],
  "pageSize": 10,
  "pageNumber": 1
}

Step 2: Download Audio Part with Retry Logic

Recording parts are downloaded via direct HTTP GET requests. The Media API returns a pre-signed URL or streams the audio directly. You must handle HTTP 429 (Too Many Requests) responses with exponential backoff.

import httpx
import time
from typing import Tuple

def download_recording_part(auth: GenesysAuth, recording_id: str, part_id: str) -> Tuple[bytes, str]:
    url = f"{auth.base_url}/api/v2/recordings/{recording_id}/parts/{part_id}"
    headers = {"Authorization": f"Bearer {auth.get_token()}"}

    client = httpx.Client(timeout=60.0)
    max_retries = 3
    attempt = 0

    while attempt < max_retries:
        try:
            response = client.get(url, headers=headers)
            response.raise_for_status()
            return response.content, response.headers.get("content-type", "audio/wav")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                wait_time = 2 ** attempt
                print(f"Rate limited (429). Retrying in {wait_time}s...")
                time.sleep(wait_time)
                attempt += 1
            elif e.response.status_code == 404:
                raise RuntimeError(f"Recording part {part_id} not found.")
            else:
                raise
        finally:
            client.close()
    raise RuntimeError("Max retries exceeded for 429 rate limiting.")

Required scope: recording:read
The endpoint streams raw audio bytes. The content-type header indicates the format (audio/wav, audio/mp3, etc.). You will pass these bytes to your cloud STT provider in the next step.

Step 3: Process STT, Diarization, Silence, and Talk-Over Detection

Cloud STT providers return timestamped segments with speaker labels. You will parse this output to calculate silence gaps, detect talk-over events (overlapping speaker timestamps), and separate agent versus guest utterances. The following function simulates a cloud provider response structure and performs the analysis locally.

import json
import numpy as np
from typing import Dict, Any

def analyze_stt_output(stt_json: str) -> Dict[str, Any]:
    data = json.loads(stt_json)
    segments = data.get("results", data.get("transcripts", []))

    silence_gaps = []
    talk_overs = []
    agent_text = []
    guest_text = []

    if len(segments) < 2:
        return {"silence_gaps": [], "talk_overs": [], "agent_text": [], "guest_text": [], "total_silence_seconds": 0.0}

    for i in range(len(segments) - 1):
        current = segments[i]
        next_seg = segments[i + 1]

        current_end = current.get("end_time", current.get("endTime", 0.0))
        next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
        gap = next_start - current_end

        if gap > 0.5:
            silence_gaps.append({"start": current_end, "end": next_start, "duration": gap})

        current_start = current.get("start_time", current.get("startTime", 0.0))
        current_end = current.get("end_time", current.get("endTime", 0.0))
        next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
        next_end = next_seg.get("end_time", next_seg.get("endTime", 0.0))

        if next_start < current_end:
            overlap = current_end - next_start
            talk_overs.append({"overlap_start": next_start, "overlap_end": current_end, "duration": overlap})

        speaker = current.get("speaker", current.get("channel", "guest"))
        text = current.get("transcript", current.get("text", ""))
        if speaker.lower() in ["agent", "channel_0", "0"]:
            agent_text.append(text)
        else:
            guest_text.append(text)

    total_silence = sum(g["duration"] for g in silence_gaps)
    return {
        "silence_gaps": silence_gaps,
        "talk_overs": talk_overs,
        "agent_text": agent_text,
        "guest_text": guest_text,
        "total_silence_seconds": total_silence
    }

Required scope: None (local processing)
Expected STT input structure:

{
  "results": [
    {"start_time": 0.5, "end_time": 2.1, "speaker": "agent", "transcript": "Hello, how can I help you?"},
    {"start_time": 2.3, "end_time": 4.0, "speaker": "guest", "transcript": "I need assistance with my account."},
    {"start_time": 3.8, "end_time": 5.2, "speaker": "agent", "transcript": "I can certainly help with that."}
  ]
}

The third segment overlaps with the second, triggering a talk-over detection. Gaps exceeding 0.5 seconds are logged as silence events.

Step 4: Fetch Interaction Events for Hold Time Calculation

Hold time is derived from channel state transitions. The Interactions Events API returns a chronological list of state changes for a specific interaction. You will filter for HOLD and TALK states to calculate total hold duration.

def get_hold_time(auth: GenesysAuth, interaction_id: str) -> float:
    url = f"{auth.base_url}/api/v2/interactions/events/details"
    params = {
        "interactionId": interaction_id,
        "type": "stateChange",
        "limit": 1000
    }
    headers = {"Authorization": f"Bearer {auth.get_token()}"}

    client = httpx.Client(timeout=30.0)
    response = client.get(url, headers=headers, params=params)
    response.raise_for_status()
    client.close()

    data = response.json()
    events = data.get("entities", [])
    hold_start = None
    total_hold = 0.0

    for event in events:
        state = event.get("state", "").upper()
        timestamp = event.get("timestamp", "")
        ts_seconds = _iso_to_seconds(timestamp)

        if state == "HOLD":
            hold_start = ts_seconds
        elif state in ["TALK", "DISCONNECTED", "QUEUE"] and hold_start is not None:
            total_hold += ts_seconds - hold_start
            hold_start = None

    return total_hold

def _iso_to_seconds(iso_str: str) -> float:
    from datetime import datetime
    dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
    return dt.timestamp()

Required scope: interaction:read
Expected response snippet:

{
  "entities": [
    {"timestamp": "2023-10-25T14:30:05Z", "state": "RINGING"},
    {"timestamp": "2023-10-25T14:30:10Z", "state": "TALK"},
    {"timestamp": "2023-10-25T14:32:15Z", "state": "HOLD"},
    {"timestamp": "2023-10-25T14:33:45Z", "state": "TALK"},
    {"timestamp": "2023-10-25T14:35:00Z", "state": "DISCONNECTED"}
  ]
}

The script calculates 90 seconds of hold time between 14:32:15 and 14:33:45.

Step 5: Store in SQLite and Generate Quality Report

You will store analysis results in a SQLite database with FTS5 virtual table for full-text search. The quality score is calculated using a weighted formula: silence penalties, talk-over penalties, and hold time thresholds.

import sqlite3
import json
from typing import List, Dict

def init_db(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS qa_results (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            recording_id TEXT,
            interaction_id TEXT,
            hold_time REAL,
            silence_seconds REAL,
            talk_over_seconds REAL,
            quality_score REAL,
            raw_data TEXT
        )
    """)
    conn.execute("""
        CREATE VIRTUAL TABLE IF NOT EXISTS qa_search USING fts5(
            recording_id, interaction_id, raw_data, content='qa_results'
        )
    """)
    return conn

def calculate_quality_score(hold_time: float, silence: float, talk_over: float) -> float:
    score = 100.0
    if hold_time > 120:
        score -= 20
    elif hold_time > 60:
        score -= 10
    score -= (silence * 2)
    score -= (talk_over * 5)
    return max(0.0, min(100.0, score))

def store_and_report(db_path: str, recording_id: str, interaction_id: str, 
                     hold_time: float, analysis: Dict) -> Dict:
    conn = init_db(db_path)
    talk_over_secs = sum(t["duration"] for t in analysis["talk_overs"])
    score = calculate_quality_score(hold_time, analysis["total_silence_seconds"], talk_over_secs)

    raw = json.dumps({
        "silence_gaps": analysis["silence_gaps"],
        "talk_overs": analysis["talk_overs"],
        "agent_text": analysis["agent_text"],
        "guest_text": analysis["guest_text"]
    })

    conn.execute(
        "INSERT INTO qa_results (recording_id, interaction_id, hold_time, silence_seconds, talk_over_seconds, quality_score, raw_data) VALUES (?, ?, ?, ?, ?, ?, ?)",
        (recording_id, interaction_id, hold_time, analysis["total_silence_seconds"], talk_over_secs, score, raw)
    )
    conn.execute("INSERT INTO qa_search SELECT * FROM qa_results")
    conn.commit()
    conn.close()

    return {
        "recording_id": recording_id,
        "quality_score": score,
        "hold_time_seconds": hold_time,
        "silence_seconds": analysis["total_silence_seconds"],
        "talk_over_seconds": talk_over_secs,
        "agent_utterances": len(analysis["agent_text"]),
        "guest_utterances": len(analysis["guest_text"])
    }

Required scope: None (local storage)
The FTS5 table enables queries like SELECT * FROM qa_search WHERE qa_search MATCH 'account assistance'. The quality score applies linear penalties for excessive silence, talk-over, and hold duration.

Complete Working Example

The following script combines all components into a single executable module. Replace the placeholder credentials and STT endpoint with your environment values.

#!/usr/bin/env python3
import httpx
import time
import json
import sqlite3
from typing import Optional, Dict, List, Tuple
from genesyscloud.platform_client import PlatformClient
from genesyscloud.platform_client.models import RecordingQuery

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, base_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = base_url.rstrip("/")
        self.token: Optional[str] = None
        self.expires_at: float = 0.0

    def get_token(self) -> str:
        if self.token and time.time() < self.expires_at:
            return self.token
        url = f"{self.base_url}/oauth/token"
        payload = {"grant_type": "client_credentials"}
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        auth = httpx.BasicAuth(self.client_id, self.client_secret)
        response = httpx.post(url, data=payload, headers=headers, auth=auth)
        response.raise_for_status()
        data = response.json()
        self.token = data["access_token"]
        self.expires_at = time.time() + data["expires_in"] - 30
        return self.token

def download_recording_part(auth: GenesysAuth, recording_id: str, part_id: str) -> Tuple[bytes, str]:
    url = f"{auth.base_url}/api/v2/recordings/{recording_id}/parts/{part_id}"
    headers = {"Authorization": f"Bearer {auth.get_token()}"}
    client = httpx.Client(timeout=60.0)
    max_retries = 3
    attempt = 0
    while attempt < max_retries:
        try:
            response = client.get(url, headers=headers)
            response.raise_for_status()
            return response.content, response.headers.get("content-type", "audio/wav")
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                time.sleep(2 ** attempt)
                attempt += 1
            else:
                raise
        finally:
            client.close()
    raise RuntimeError("Max retries exceeded for 429 rate limiting.")

def analyze_stt_output(stt_json: str) -> Dict[str, any]:
    data = json.loads(stt_json)
    segments = data.get("results", data.get("transcripts", []))
    silence_gaps = []
    talk_overs = []
    agent_text = []
    guest_text = []
    if len(segments) < 2:
        return {"silence_gaps": [], "talk_overs": [], "agent_text": [], "guest_text": [], "total_silence_seconds": 0.0}
    for i in range(len(segments) - 1):
        current = segments[i]
        next_seg = segments[i + 1]
        current_end = current.get("end_time", current.get("endTime", 0.0))
        next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
        gap = next_start - current_end
        if gap > 0.5:
            silence_gaps.append({"start": current_end, "end": next_start, "duration": gap})
        current_start = current.get("start_time", current.get("startTime", 0.0))
        current_end = current.get("end_time", current.get("endTime", 0.0))
        next_start = next_seg.get("start_time", next_seg.get("startTime", 0.0))
        if next_start < current_end:
            overlap = current_end - next_start
            talk_overs.append({"overlap_start": next_start, "overlap_end": current_end, "duration": overlap})
        speaker = current.get("speaker", current.get("channel", "guest"))
        text = current.get("transcript", current.get("text", ""))
        if speaker.lower() in ["agent", "channel_0", "0"]:
            agent_text.append(text)
        else:
            guest_text.append(text)
    return {"silence_gaps": silence_gaps, "talk_overs": talk_overs, "agent_text": agent_text, "guest_text": guest_text, "total_silence_seconds": sum(g["duration"] for g in silence_gaps)}

def get_hold_time(auth: GenesysAuth, interaction_id: str) -> float:
    url = f"{auth.base_url}/api/v2/interactions/events/details"
    params = {"interactionId": interaction_id, "type": "stateChange", "limit": 1000}
    headers = {"Authorization": f"Bearer {auth.get_token()}"}
    client = httpx.Client(timeout=30.0)
    response = client.get(url, headers=headers, params=params)
    response.raise_for_status()
    client.close()
    events = response.json().get("entities", [])
    hold_start = None
    total_hold = 0.0
    from datetime import datetime
    for event in events:
        state = event.get("state", "").upper()
        ts_seconds = datetime.fromisoformat(event.get("timestamp", "").replace("Z", "+00:00")).timestamp()
        if state == "HOLD":
            hold_start = ts_seconds
        elif state in ["TALK", "DISCONNECTED", "QUEUE"] and hold_start is not None:
            total_hold += ts_seconds - hold_start
            hold_start = None
    return total_hold

def calculate_quality_score(hold_time: float, silence: float, talk_over: float) -> float:
    score = 100.0
    if hold_time > 120: score -= 20
    elif hold_time > 60: score -= 10
    score -= (silence * 2)
    score -= (talk_over * 5)
    return max(0.0, min(100.0, score))

def store_and_report(db_path: str, recording_id: str, interaction_id: str, hold_time: float, analysis: Dict) -> Dict:
    conn = sqlite3.connect(db_path)
    conn.execute("CREATE TABLE IF NOT EXISTS qa_results (id INTEGER PRIMARY KEY AUTOINCREMENT, recording_id TEXT, interaction_id TEXT, hold_time REAL, silence_seconds REAL, talk_over_seconds REAL, quality_score REAL, raw_data TEXT)")
    conn.execute("CREATE VIRTUAL TABLE IF NOT EXISTS qa_search USING fts5(recording_id, interaction_id, raw_data, content='qa_results')")
    talk_over_secs = sum(t["duration"] for t in analysis["talk_overs"])
    score = calculate_quality_score(hold_time, analysis["total_silence_seconds"], talk_over_secs)
    raw = json.dumps({"silence_gaps": analysis["silence_gaps"], "talk_overs": analysis["talk_overs"], "agent_text": analysis["agent_text"], "guest_text": analysis["guest_text"]})
    conn.execute("INSERT INTO qa_results (recording_id, interaction_id, hold_time, silence_seconds, talk_over_seconds, quality_score, raw_data) VALUES (?, ?, ?, ?, ?, ?, ?)", (recording_id, interaction_id, hold_time, analysis["total_silence_seconds"], talk_over_secs, score, raw))
    conn.execute("INSERT INTO qa_search SELECT * FROM qa_results")
    conn.commit()
    conn.close()
    return {"recording_id": recording_id, "quality_score": score, "hold_time_seconds": hold_time, "silence_seconds": analysis["total_silence_seconds"], "talk_over_seconds": talk_over_secs, "agent_utterances": len(analysis["agent_text"]), "guest_utterances": len(analysis["guest_text"])}

def main():
    auth = GenesysAuth(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET", base_url="https://api.mypurecloud.com")
    platform = PlatformClient(base_url=auth.base_url)
    platform.auth.set_auth_provider(auth.get_token)
    recordings_api = platform.RecordingsApi()
    query = RecordingQuery(filter=RecordingQuery.Filter(type="type eq 'voice'", sort="startTime desc"), pageSize=1)
    response = recordings_api.post_recordings_search(query_body=query)
    if not response.entities:
        print("No recordings found.")
        return

    rec = response.entities[0]
    part = rec.parts[0]
    print(f"Processing recording {rec.id}...")
    audio_bytes, _ = download_recording_part(auth, rec.id, part.id)

    stt_response_json = '{"results": [{"start_time": 0.5, "end_time": 2.1, "speaker": "agent", "transcript": "Hello"}, {"start_time": 2.3, "end_time": 4.0, "speaker": "guest", "transcript": "Hi there"}, {"start_time": 3.8, "end_time": 5.2, "speaker": "agent", "transcript": "How can I help"}]}'
    analysis = analyze_stt_output(stt_response_json)
    hold_time = get_hold_time(auth, rec.interactionId)
    report = store_and_report("qa_index.db", rec.id, rec.interactionId, hold_time, analysis)
    print(json.dumps(report, indent=2))

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: HTTP 401 Unauthorized

  • Cause: Expired OAuth token, incorrect client credentials, or missing Authorization header.
  • Fix: Verify the client ID and secret match your Genesys Cloud application. Ensure the get_token() method refreshes before expiration. Check that the token is attached as Bearer <token> in headers.
  • Code fix: The GenesysAuth class automatically refreshes tokens 30 seconds before expiration. If you manually manage tokens, implement a TTL check before each request.

Error: HTTP 403 Forbidden

  • Cause: The OAuth client lacks required scopes (recording:read, interaction:read).
  • Fix: Navigate to your Genesys Cloud application configuration and add the missing scopes. Save and regenerate credentials if necessary.
  • Code fix: Catch 403 explicitly and print the required scopes for the failing endpoint.

Error: HTTP 429 Too Many Requests

  • Cause: Exceeding Genesys Cloud API rate limits (typically 100 requests per minute per client).
  • Fix: Implement exponential backoff. The download_recording_part function includes a retry loop that sleeps for 2^attempt seconds before retrying.
  • Code fix: Monitor the Retry-After header in 429 responses. Adjust backoff intervals dynamically instead of using fixed delays.

Error: sqlite3.OperationalError: table qa_search already exists

  • Cause: FTS5 virtual tables cannot be recreated without dropping the backing table first.
  • Fix: Use CREATE VIRTUAL TABLE IF NOT EXISTS or drop both tables before reinitializing during development.
  • Code fix: The script uses IF NOT EXISTS clauses. If schema changes are required, execute DROP TABLE IF EXISTS qa_search; DROP TABLE IF EXISTS qa_results; before re-running.

Official References