Implementing Automated PII Discovery and Masking in Historical Interaction Recordings

Implementing Automated PII Discovery and Masking in Historical Interaction Recordings

What This Guide Covers

You are building a retroactive PII remediation pipeline that scans your historical call recording corpus - accumulated over years - identifies recordings containing sensitive personal data (Social Security Numbers, full payment card numbers, dates of birth, medical identifiers), and automatically masks the identified segments with silence or a tone before the recording is accessed by analytics tools, external auditors, or in response to data subject requests. When complete, a data subject’s Right to Access request returns a de-identified recording, and your analytics pipeline never receives unredacted PII regardless of when the recording was made.


Prerequisites, Roles & Licensing

  • Licensing: Genesys Cloud CX 2 or CX 3 with recording access; no additional Genesys module required for the extraction pipeline
  • Permissions required (service account):
    • Recording > Recording > View
    • Recording > Recording > Export
  • Processing infrastructure: AWS (Transcribe + Lambda + S3 + DynamoDB) or GCP (Speech-to-Text + Cloud Functions + GCS + Firestore)
  • Scale consideration: For a corpus of 1 million recordings averaging 5 minutes each = ~83,000 hours of audio. At AWS Transcribe pricing (~$0.024/minute), full corpus processing costs ~$120,000. Prioritize high-risk queues first.

The Implementation Deep-Dive

1. Scoping the Discovery Problem: Risk-Tiered Queue Prioritization

Don’t process all recordings equally - start with the highest-risk queues:

Risk Tier 1 (Process First):

  • Payment/billing queues (likely contain spoken card numbers)
  • Insurance/healthcare queues (DOB, member IDs, diagnosis codes)
  • Account management queues (SSN verification calls)
  • Queues where agents manually read back sensitive data

Risk Tier 2 (Process Second):

  • General customer service queues (occasional SSN or DOB collection)
  • Sales queues (credit check calls)

Risk Tier 3 (Process Last or Sample Only):

  • Internal support queues (agent-to-agent)
  • Short calls (<60 seconds - insufficient time for PII collection)
  • Calls where IVR handled payment via secure pause (PCI-compliant secure pause recordings already redacted at capture time)

Query the Genesys Cloud Analytics API to build your prioritized processing queue:

def build_processing_queue(
    priority_queue_ids: list[str],
    start_date: str,
    end_date: str,
    access_token: str,
    base_url: str,
    already_processed_ids: set
) -> list[dict]:
    """
    Returns ordered list of recordings to process, highest-risk queues first.
    """
    all_recordings = []
    
    for queue_id in priority_queue_ids:
        conversations = query_conversations_in_date_range(
            queue_id=queue_id,
            start=start_date,
            end=end_date,
            min_duration_seconds=60,
            access_token=access_token,
            base_url=base_url
        )
        
        for conv in conversations:
            if conv["conversationId"] not in already_processed_ids:
                all_recordings.append({
                    "conversationId": conv["conversationId"],
                    "queueId": queue_id,
                    "durationSeconds": conv.get("durationSeconds", 0),
                    "capturedAt": conv["conversationStart"]
                })
    
    # Sort: longer calls first (more likely to contain full PII sequences)
    return sorted(all_recordings, key=lambda x: x["durationSeconds"], reverse=True)

2. Downloading Recordings from Genesys Cloud

import requests
import tempfile
import os

def download_recording(
    conversation_id: str,
    recording_id: str,
    access_token: str,
    base_url: str
) -> tuple[bytes, str]:
    """
    Downloads the recording audio.
    Returns (audio_bytes, content_type).
    """
    headers = {"Authorization": f"Bearer {access_token}"}
    
    # Get download URL (it's time-limited, ~5 minutes)
    meta_resp = requests.get(
        f"{base_url}/api/v2/conversations/{conversation_id}/recordings/{recording_id}",
        headers=headers,
        params={"formatId": "WAV"}  # Request WAV for broadest compatibility
    )
    meta_resp.raise_for_status()
    meta = meta_resp.json()
    
    download_url = meta.get("mediaUris", [{}])[0].get("uri")
    if not download_url:
        # Try the direct media URL
        download_url = meta.get("media", [{}])[0].get("downloadURL")
    
    if not download_url:
        raise ValueError(f"No download URL for recording {recording_id}")
    
    audio_resp = requests.get(download_url, stream=True)
    audio_resp.raise_for_status()
    
    content_type = audio_resp.headers.get("Content-Type", "audio/wav")
    audio_bytes = audio_resp.content
    
    return audio_bytes, content_type

The Trap - download URLs expiring before processing: Genesys Cloud recording download URLs are pre-signed URLs valid for a short window (typically 5 minutes). If your pipeline downloads the URL, queues it for later processing, and the processor picks it up 20 minutes later, the URL has expired. Always download the audio bytes immediately after obtaining the URL, not the URL itself.


3. PII Detection: Multi-Layer Approach

Use three complementary detection methods in a pipeline:

Layer 1: Amazon Transcribe with PII Identification (fast, structured PII)

import boto3
import time
import json

transcribe = boto3.client("transcribe", region_name="us-east-1")
s3 = boto3.client("s3")

def transcribe_with_pii_detection(
    audio_s3_uri: str,
    job_name: str,
    language_code: str = "en-US"
) -> dict:
    """
    Run transcription with built-in PII identification.
    Returns transcript with redaction metadata.
    """
    transcribe.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={"MediaFileUri": audio_s3_uri},
        MediaFormat="wav",
        LanguageCode=language_code,
        ContentRedaction={
            "RedactionType": "PII",
            "RedactionOutput": "redacted_and_unredacted",  # Get both for audit comparison
            "PiiEntityTypes": [
                "CREDIT_DEBIT_NUMBER",
                "CREDIT_DEBIT_CVV",
                "CREDIT_DEBIT_EXPIRY",
                "SSN",
                "PHONE",
                "NAME",
                "ADDRESS",
                "DATE_OF_BIRTH",
                "BANK_ACCOUNT_NUMBER",
                "BANK_ROUTING"
            ]
        },
        Settings={
            "ShowSpeakerLabels": True,
            "MaxSpeakerLabels": 2,
            "ChannelIdentification": True  # Separate agent and customer channels
        }
    )
    
    # Poll for completion (max 4x audio duration)
    max_wait = 1800  # 30 minutes
    start = time.time()
    
    while (time.time() - start) < max_wait:
        job = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        status = job["TranscriptionJob"]["TranscriptionJobStatus"]
        
        if status == "COMPLETED":
            # Download transcript JSON
            transcript_uri = job["TranscriptionJob"]["Transcript"]["RedactedTranscriptFileUri"]
            # Parse PII timestamps from transcript items
            return extract_pii_timestamps(job)
        elif status == "FAILED":
            raise RuntimeError(f"Transcription job failed: {job['TranscriptionJob'].get('FailureReason')}")
        
        time.sleep(30)
    
    raise TimeoutError(f"Transcription job {job_name} timed out")

def extract_pii_timestamps(job: dict) -> list[dict]:
    """Extract time-stamped PII segments from Transcribe output."""
    pii_segments = []
    
    # Download the full transcript JSON
    transcript_uri = job["TranscriptionJob"]["Transcript"]["TranscriptFileUri"]
    # The unredacted transcript contains PII entity metadata with timestamps
    resp = requests.get(transcript_uri)
    transcript_data = resp.json()
    
    for item in transcript_data.get("results", {}).get("items", []):
        if (item.get("type") == "pronunciation" and 
            item.get("pii_detection_result", {}).get("redacted", False)):
            
            pii_segments.append({
                "startTime": float(item.get("start_time", 0)),
                "endTime": float(item.get("end_time", 0)) + 0.2,  # +200ms buffer
                "piiType": item.get("pii_detection_result", {}).get("entity_types", ["UNKNOWN"]),
                "channel": item.get("channel_label", "ch_0")
            })
    
    return pii_segments

Layer 2: Regex-Based Pattern Matching on Transcript Text

Catches patterns Amazon Transcribe’s PII model misses (medical record numbers, employee IDs, custom organizational identifiers):

import re

CUSTOM_PII_PATTERNS = {
    "EMPLOYEE_ID": r"\b[Ee][Mm][Pp][-\s]?\d{6,8}\b",
    "MEDICAL_RECORD_NUMBER": r"\b[Mm][Rr][Nn][-:\s]?\d{6,10}\b",
    "POLICY_NUMBER": r"\bPOL[-\s]?\d{8,12}\b",
    "PASSPORT": r"\b[A-Z]{2}\d{7}\b",  # Simplified - use country-specific patterns
    "TAX_ID": r"\b\d{2}[-\s]?\d{7}\b"  # EIN format
}

def find_custom_pii_in_transcript(transcript_items: list[dict]) -> list[dict]:
    """Find custom PII patterns in transcript word items."""
    # Reconstruct text windows for context-aware matching
    custom_segments = []
    
    # Combine words with their timestamps for sliding window search
    words = [
        {
            "word": item.get("alternatives", [{}])[0].get("content", ""),
            "startTime": float(item.get("start_time", 0)),
            "endTime": float(item.get("end_time", 0))
        }
        for item in transcript_items
        if item.get("type") == "pronunciation"
    ]
    
    # Build windowed text for regex (5-word windows)
    for i in range(len(words)):
        window_words = words[i:i+8]
        window_text = " ".join(w["word"] for w in window_words)
        
        for pattern_name, pattern in CUSTOM_PII_PATTERNS.items():
            if re.search(pattern, window_text, re.IGNORECASE):
                custom_segments.append({
                    "startTime": window_words[0]["startTime"] - 0.1,
                    "endTime": window_words[-1]["endTime"] + 0.2,
                    "piiType": [pattern_name],
                    "channel": "all"
                })
    
    return custom_segments

Layer 3: Spoken Number Sequence Detection (for card numbers)

def detect_spoken_card_number_sequences(transcript_items: list[dict]) -> list[dict]:
    """
    Detect sequences of 15-16 consecutive spoken digits (card numbers).
    """
    NUMBER_WORDS = {
        "zero": 0, "oh": 0, "one": 1, "two": 2, "three": 3, "four": 4,
        "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9
    }
    
    words = [
        {
            "word": item["alternatives"][0]["content"].lower(),
            "startTime": float(item.get("start_time", 0)),
            "endTime": float(item.get("end_time", 0))
        }
        for item in transcript_items
        if item.get("type") == "pronunciation"
    ]
    
    sequences = []
    i = 0
    while i < len(words):
        if words[i]["word"] in NUMBER_WORDS:
            # Start of a potential number sequence
            seq_start = i
            seq_words = [words[i]]
            j = i + 1
            
            while j < len(words) and words[j]["word"] in NUMBER_WORDS and j - i < 20:
                seq_words.append(words[j])
                j += 1
            
            if len(seq_words) >= 15:  # 15+ consecutive number words = likely card number
                sequences.append({
                    "startTime": seq_words[0]["startTime"] - 0.2,
                    "endTime": seq_words[-1]["endTime"] + 0.3,
                    "piiType": ["SPOKEN_CARD_NUMBER"],
                    "channel": "all"
                })
            
            i = j
        else:
            i += 1
    
    return sequences

4. Audio Masking: Replacing PII Segments

After collecting all PII timestamps across all three detection layers, merge overlapping segments and apply masking:

from pydub import AudioSegment
import io

def merge_overlapping_segments(segments: list[dict], buffer_seconds: float = 0.1) -> list[dict]:
    """Merge overlapping time segments with an optional buffer."""
    if not segments:
        return []
    
    sorted_segs = sorted(segments, key=lambda x: x["startTime"])
    merged = [sorted_segs[0].copy()]
    
    for seg in sorted_segs[1:]:
        last = merged[-1]
        if seg["startTime"] <= last["endTime"] + buffer_seconds:
            # Overlapping - extend the current segment
            last["endTime"] = max(last["endTime"], seg["endTime"])
            last["piiType"] = list(set(last["piiType"] + seg["piiType"]))
        else:
            merged.append(seg.copy())
    
    return merged

def mask_audio_segments(
    audio_bytes: bytes,
    pii_segments: list[dict],
    audio_format: str = "wav",
    mask_type: str = "silence"  # or "tone"
) -> bytes:
    """Replace PII segments with silence or a 1kHz tone."""
    audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=audio_format)
    total_duration_ms = len(audio)
    
    merged_segments = merge_overlapping_segments(pii_segments)
    
    # Process in reverse order to preserve timestamp accuracy
    for seg in sorted(merged_segments, key=lambda x: x["startTime"], reverse=True):
        start_ms = max(0, int(seg["startTime"] * 1000))
        end_ms = min(total_duration_ms, int(seg["endTime"] * 1000))
        duration_ms = end_ms - start_ms
        
        if duration_ms <= 0:
            continue
        
        if mask_type == "tone":
            # 1kHz tone (commonly used by broadcasters for "bleep" censorship)
            import numpy as np
            sample_rate = audio.frame_rate
            t = np.linspace(0, duration_ms / 1000, int(sample_rate * duration_ms / 1000))
            tone_data = (np.sin(2 * np.pi * 1000 * t) * 32767 * 0.3).astype(np.int16)
            replacement = AudioSegment(
                tone_data.tobytes(),
                frame_rate=sample_rate,
                sample_width=2,
                channels=audio.channels
            )
        else:
            replacement = AudioSegment.silent(duration=duration_ms, frame_rate=audio.frame_rate)
        
        audio = audio[:start_ms] + replacement + audio[end_ms:]
    
    output = io.BytesIO()
    audio.export(output, format="wav")
    return output.getvalue()

5. Storing Masked Recordings and Redaction Registry

Store the masked recording alongside the original (preserving the original for legal hold purposes) with a clear separation:

s3://recordings-original/    <- Access restricted to Legal/DPO only
s3://recordings-masked/      <- Access for analytics, QA, external auditors

Write a redaction registry entry for each processed recording:

def register_redaction(
    conversation_id: str,
    recording_id: str,
    pii_segments: list[dict],
    masked_s3_key: str,
    dynamodb_table
):
    dynamodb_table.put_item(Item={
        "recordingId": recording_id,
        "conversationId": conversation_id,
        "processedAt": datetime.utcnow().isoformat() + "Z",
        "piiSegmentsFound": len(pii_segments),
        "piiTypes": list(set(t for seg in pii_segments for t in seg["piiType"])),
        "totalMaskedDurationMs": sum(
            int((seg["endTime"] - seg["startTime"]) * 1000) for seg in pii_segments
        ),
        "maskedAudioS3Key": masked_s3_key,
        "pipelineVersion": "3.1.0",
        "ttl": int((datetime.utcnow() + timedelta(days=2555)).timestamp())  # 7-year retention
    })

Validation, Edge Cases & Troubleshooting

Edge Case 1: Dual-Channel Recordings Where Only One Channel Contains PII

When the customer speaks their card number, the PII is on the customer channel (channel 0). The agent reads it back (“I confirm the last 4 digits are…”) - PII on the agent channel (channel 1). Configure ChannelIdentification: True in Transcribe and apply masking per-channel to avoid silencing the entire recording when only one channel contains PII. This preserves more of the agent’s voice for QA evaluation purposes.

Edge Case 2: Accent and Non-Standard Pronunciation Reducing Detection Accuracy

Amazon Transcribe’s PII detection accuracy drops for non-native English speakers and regional accents. A customer with a heavy accent saying their SSN may generate transcription errors that the regex misses. For accented populations, supplement AWS Transcribe with Google Cloud Speech-to-Text (which often performs better on certain accents) and run both in parallel, using the union of detected PII segments. The cost increases but the detection coverage improves.

Edge Case 3: Processing Rate Limitations

At 100 concurrent Transcribe jobs, AWS imposes service quotas. For a corpus of 1 million recordings, processing at 100 concurrent jobs takes approximately 30 days. Request a Transcribe service quota increase via AWS Support before starting bulk processing. Plan the bulk scan to run over weeks, processing priority queues first, to surface the highest-risk recordings early while the full scan continues.

Edge Case 4: Recordings Containing Silence Segments Before Masking

If the original recording already has long silences (hold music, agent put customer on hold), your masking algorithm may select silence as the dominant segment when visualizing the waveform. Tag original-silence segments in the transcript (items with no transcription content) and exclude them from the PII masking pass - they are already effectively redacted. Only apply new silence where transcription confirmed PII speech.


Official References