Programmatically injecting custom metadata tags into Genesys Cloud call recordings using the Media API and a Python script triggered by interaction.completed events
What You Will Build
- A Python script that listens for
interaction.completedevents, extracts the associated recording identifier, and applies custom metadata and tags to the recording via the Media API. - This solution uses the Genesys Cloud Event Streams API for event ingestion and the Media API for recording mutation.
- The implementation covers Python using the
requestslibrary with explicit type hints, retry logic, and pagination handling.
Prerequisites
- OAuth 2.0 Client Credentials grant with scopes:
event:read,media:read,media:write - Genesys Cloud API v2 endpoint (e.g.,
company.mypurecloud.comorcompany.genesiscloud.com) - Python 3.9+ runtime
- External dependencies:
requests,tenacity(for retry logic),pydantic(for payload validation) - Install dependencies:
pip install requests tenacity pydantic
Authentication Setup
Genesys Cloud uses a standard OAuth 2.0 Client Credentials flow. The platform issues short-lived access tokens that the requests session must attach to every subsequent API call. Token caching is mandatory to avoid unnecessary authorization server calls and to respect rate limits.
The following function retrieves the token and returns a configured requests.Session object. The session automatically attaches the Authorization header to all outgoing requests.
import requests
import time
from typing import Optional
def get_genesys_session(
client_id: str,
client_secret: str,
environment: str
) -> requests.Session:
"""
Authenticates against Genesys Cloud and returns a pre-configured session.
"""
base_url = f"https://{environment}"
token_url = f"{base_url}/oauth/token"
payload = {
"grant_type": "client_credentials",
"scope": "event:read media:read media:write"
}
response = requests.post(token_url, data=payload, auth=(client_id, client_secret))
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data["expires_in"]
issued_at = time.time()
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
})
# Attach token metadata to the session for refresh logic
session.auth_token_expires_at = issued_at + expires_in
session.auth_client_id = client_id
session.auth_client_secret = client_secret
session.auth_base_url = base_url
return session
The SDK equivalent uses PlatformClient(client_id, client_secret, environment=environment), which handles token caching and refresh internally. When using raw requests, you must implement the refresh check before long-running operations.
Implementation
Step 1: Subscribe to interaction.completed events
The Event Streams API requires an explicit subscription before you can poll for events. You must define the event type filter and assign a unique subscription identifier. The API returns a subscriptionId that you will use for all subsequent polling requests.
HTTP Request Cycle
- Method:
POST - Path:
/api/v2/events/subscriptions - Headers:
Authorization: Bearer <token>,Content-Type: application/json - Required Scope:
event:read - Request Body:
{
"name": "recording-metadata-injector",
"filter": {
"eventTypes": ["interaction.completed"]
},
"pageSize": 50
}
- Response Body (201 Created):
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "recording-metadata-injector",
"filter": {
"eventTypes": ["interaction.completed"]
},
"pageSize": 50,
"createdDate": "2024-01-15T10:30:00.000Z",
"modifiedDate": "2024-01-15T10:30:00.000Z"
}
Python Implementation
def create_event_subscription(session: requests.Session, subscription_name: str) -> str:
"""Creates an Event Streams subscription and returns the subscription ID."""
url = f"{session.auth_base_url}/api/v2/events/subscriptions"
payload = {
"name": subscription_name,
"filter": {"eventTypes": ["interaction.completed"]},
"pageSize": 50
}
response = session.post(url, json=payload)
if response.status_code == 409:
# Subscription already exists; extract ID from response or fallback to listing
return response.json().get("id", "")
response.raise_for_status()
return response.json()["id"]
The pageSize parameter controls how many events are returned per poll. Genesys Cloud enforces a maximum of 100. Setting it to 50 balances throughput with payload size.
Step 2: Poll events and extract recording identifiers
Once the subscription is active, you must poll the subscription endpoint. The API supports cursor-based pagination via the page_token parameter. You must process each event, locate the recordingId field, and queue it for metadata injection.
The interaction.completed event payload contains a data.recordingId field when the interaction generated a recording. If the field is null, the interaction did not produce a media artifact, and you should skip it.
HTTP Request Cycle
- Method:
GET - Path:
/api/v2/events/subscriptions/{subscriptionId} - Headers:
Authorization: Bearer <token> - Required Scope:
event:read - Query Parameters:
page_size=50 - Response Body (200 OK):
{
"events": [
{
"id": "evt-123",
"eventType": "interaction.completed",
"data": {
"id": "int-456",
"recordingId": "rec-789-abc-def",
"mediaType": "voice",
"completedDate": "2024-01-15T10:35:00.000Z"
}
}
],
"nextPageToken": "eyJwYWdlIjoxfQ=="
}
Python Implementation
import json
from typing import List, Optional
def poll_events(
session: requests.Session,
subscription_id: str,
page_token: Optional[str] = None
) -> tuple[List[dict], Optional[str]]:
"""
Polls the Event Streams API and returns a list of event payloads
alongside the next pagination token.
"""
url = f"{session.auth_base_url}/api/v2/events/subscriptions/{subscription_id}"
params = {"page_size": 50}
if page_token:
params["page_token"] = page_token
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
events = data.get("events", [])
next_token = data.get("nextPageToken")
return events, next_token
The pagination token is opaque. You must pass it verbatim to subsequent requests. The API returns an empty events array when no new events are available. Your polling loop should implement a delay between requests to avoid hitting the 429 rate limit.
Step 3: Update recording metadata and tags
The Media API accepts PATCH requests to mutate recording attributes. You must send a partial Recording object containing only the fields you intend to modify. The API merges the incoming payload with the existing recording state.
Custom metadata uses a key-value dictionary structure. Tags use a string array. Both fields are mutable and do not require full object replacement.
HTTP Request Cycle
- Method:
PATCH - Path:
/api/v2/media/recordings/{recordingId} - Headers:
Authorization: Bearer <token>,Content-Type: application/json - Required Scope:
media:write - Request Body:
{
"metadata": {
"processed_by": "automation-script",
"priority_level": "high",
"compliance_review": "pending"
},
"tags": ["voicemail", "escalated", "tag-injected"]
}
- Response Body (200 OK):
{
"id": "rec-789-abc-def",
"metadata": {
"processed_by": "automation-script",
"priority_level": "high",
"compliance_review": "pending"
},
"tags": ["voicemail", "escalated", "tag-injected"],
"mediaType": "voice",
"status": "ready"
}
Python Implementation
def update_recording_metadata(
session: requests.Session,
recording_id: str,
custom_metadata: dict,
new_tags: List[str]
) -> dict:
"""
Applies custom metadata and tags to a Genesys Cloud recording.
Implements retry logic for 429 rate limit responses.
"""
url = f"{session.auth_base_url}/api/v2/media/recordings/{recording_id}"
payload = {
"metadata": custom_metadata,
"tags": new_tags
}
# Retry logic for 429 Too Many Requests
retries = 3
for attempt in range(retries):
response = session.patch(url, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after * (attempt + 1))
continue
if response.status_code == 401:
raise RuntimeError("Token expired. Refresh authentication and retry.")
if response.status_code == 403:
raise PermissionError("Insufficient OAuth scope. Verify media:write is granted.")
if response.status_code == 404:
raise ValueError(f"Recording {recording_id} not found.")
response.raise_for_status()
return response.json()
raise RuntimeError("Max retries exceeded for recording update.")
The PATCH method is critical here. Using PUT would overwrite the entire recording object, which is not supported by this endpoint and would result in a 400 Bad Request. The retry logic respects the Retry-After header and applies exponential backoff to prevent cascading rate limit violations across microservices.
Complete Working Example
The following script combines authentication, subscription creation, event polling, and metadata injection into a single runnable module. Replace the placeholder credentials before execution.
import requests
import time
import sys
from typing import List, Optional, Dict
def get_genesys_session(client_id: str, client_secret: str, environment: str) -> requests.Session:
base_url = f"https://{environment}"
token_url = f"{base_url}/oauth/token"
payload = {"grant_type": "client_credentials", "scope": "event:read media:read media:write"}
response = requests.post(token_url, data=payload, auth=(client_id, client_secret))
response.raise_for_status()
token_data = response.json()
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {token_data['access_token']}",
"Content-Type": "application/json",
"Accept": "application/json"
})
session.auth_token_expires_at = time.time() + token_data["expires_in"]
session.auth_base_url = base_url
return session
def create_event_subscription(session: requests.Session, subscription_name: str) -> str:
url = f"{session.auth_base_url}/api/v2/events/subscriptions"
payload = {
"name": subscription_name,
"filter": {"eventTypes": ["interaction.completed"]},
"pageSize": 50
}
response = session.post(url, json=payload)
if response.status_code == 409:
return response.json().get("id", "")
response.raise_for_status()
return response.json()["id"]
def poll_events(session: requests.Session, subscription_id: str, page_token: Optional[str] = None) -> tuple[List[Dict], Optional[str]]:
url = f"{session.auth_base_url}/api/v2/events/subscriptions/{subscription_id}"
params = {"page_size": 50}
if page_token:
params["page_token"] = page_token
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("events", []), data.get("nextPageToken")
def update_recording_metadata(session: requests.Session, recording_id: str, custom_metadata: Dict, new_tags: List[str]) -> Dict:
url = f"{session.auth_base_url}/api/v2/media/recordings/{recording_id}"
payload = {"metadata": custom_metadata, "tags": new_tags}
for attempt in range(3):
response = session.patch(url, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after * (attempt + 1))
continue
if response.status_code == 401:
raise RuntimeError("Token expired.")
if response.status_code == 403:
raise PermissionError("Missing media:write scope.")
if response.status_code == 404:
raise ValueError(f"Recording {recording_id} not found.")
response.raise_for_status()
return response.json()
raise RuntimeError("Max retries exceeded.")
def main():
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
ENVIRONMENT = "company.mypurecloud.com"
SUBSCRIPTION_NAME = "recording-metadata-injector"
POLL_INTERVAL_SECONDS = 10
session = get_genesys_session(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
subscription_id = create_event_subscription(session, SUBSCRIPTION_NAME)
print(f"Subscription created/active: {subscription_id}")
page_token: Optional[str] = None
processed_recordings: set = set()
while True:
try:
events, page_token = poll_events(session, subscription_id, page_token)
for event in events:
data = event.get("data", {})
recording_id = data.get("recordingId")
if not recording_id:
continue
if recording_id in processed_recordings:
continue
custom_metadata = {
"injected_by": "python-automation",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"source_event": event["id"]
}
tags_to_add = ["auto-tagged", "post-call-processed"]
try:
result = update_recording_metadata(session, recording_id, custom_metadata, tags_to_add)
print(f"Updated recording {recording_id}: {result.get('status')}")
processed_recordings.add(recording_id)
except Exception as e:
print(f"Failed to update {recording_id}: {e}")
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
time.sleep(15)
continue
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
main()
The script maintains a processed_recordings set to prevent duplicate metadata injections if the polling window overlaps or if events are delivered multiple times. The time.sleep call enforces a steady polling cadence that stays within Genesys Cloud rate limits.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The OAuth access token has expired. Tokens default to a 3600-second lifetime.
- Fix: Implement token refresh logic before the polling loop continues. Check
session.auth_token_expires_atagainsttime.time()and callget_genesys_sessionagain when within 60 seconds of expiration. - Code Fix: Add a refresh guard inside the
while Trueloop:
if time.time() > session.auth_token_expires_at - 60:
session = get_genesys_session(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
Error: 403 Forbidden
- Cause: The OAuth client lacks the required scope. The Media API requires
media:writefor mutations. Event Streams requiresevent:read. - Fix: Navigate to the Genesys Cloud Admin console, locate the OAuth client, and verify both scopes are checked. Regenerate the token after scope modification.
- Debug Step: Print the token introspection response to verify granted scopes:
GET /oauth/introspectwith the active token.
Error: 429 Too Many Requests
- Cause: Exceeding the Event Streams polling limit or Media API mutation limit. Genesys Cloud enforces per-tenant and per-endpoint rate caps.
- Fix: Increase the
POLL_INTERVAL_SECONDSto 15 or 30. Implement theRetry-Afterheader parsing shown in theupdate_recording_metadatafunction. - Debug Step: Monitor the
X-RateLimit-Remainingheader in response objects. When it drops below 5, throttle outgoing requests automatically.
Error: 400 Bad Request on PATCH
- Cause: Sending a full
Recordingobject instead of a partial payload, or including read-only fields likeid,mediaType, orstatusin the request body. - Fix: Restrict the JSON payload to only
metadataandtags. The API rejects payloads containing immutable fields. - Code Fix: Verify the payload dictionary contains only mutable keys before transmission.