Programmatically Control Call Recording States via the Genesys Cloud API
What You Will Build
- A Python script that programmatically starts and stops recording on active phone conversations.
- This tutorial uses the Genesys Cloud CX Recording API (
/api/v2/recording/jobs). - The implementation is demonstrated in Python using the
genesyscloudSDK andrequestsfor direct HTTP calls.
Prerequisites
- OAuth Client Type: A Genesys Cloud API Client with the
client_credentialsgrant type. - Required Scopes:
recording:job:control(Required to start/stop recordings)recording:job:view(Required to list and check recording status)conversation:phone:view(Optional, used here to identify active calls if not known via other means)
- SDK Version:
genesyscloudPython SDK v10.0.0 or later. - Language/Runtime: Python 3.9+
- External Dependencies:
genesyscloudrequestspython-dotenv(for secure credential management)
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the client_credentials flow is the standard. The token expires after 24 hours, but it is best practice to cache the token and refresh it only when necessary or upon receiving a 401 Unauthorized response.
Below is a helper class that manages the token lifecycle. This is not part of the official SDK but is a robust pattern for production applications.
import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
self.token = None
self.expires_at = None
def get_token(self) -> str:
"""
Retrieves an OAuth2 token. Uses cached token if valid.
"""
if self.token and self.expires_at and datetime.now() < self.expires_at:
return self.token
url = f"{self.base_url}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
# Parse expires_in (seconds) to datetime
expires_in = token_data.get("expires_in", 86400)
self.expires_at = datetime.now() + timedelta(seconds=expires_in)
return self.token
Implementation
Step 1: Identify the Conversation and Recording Job
To control a recording, you need two pieces of information:
- The
conversationId(UUID of the active call). - The
recordingJobId(UUID of the specific recording job associated with that conversation).
The Recording API does not allow you to start/stop a recording using only the conversationId. You must first locate the recordingJobId. In a real-world scenario, you might receive the conversationId from a CTI event or a webhook. Here, we will query active phone conversations to find one, then query the recording jobs to find the associated recording.
Required Scope: conversation:phone:view, recording:job:view
import json
import time
import requests
from typing import Optional, Tuple
class RecordingController:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = auth.base_url
def get_active_phone_conversations(self) -> list:
"""
Retrieves a list of active phone conversations.
Returns a list of conversation objects.
"""
url = f"{self.base_url}/api/v2/conversations/phone"
headers = {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json"
}
params = {
"state": "active",
"pageSize": 20
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
return data.get("entities", [])
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("Token expired. Refreshing...")
self.auth.token = None # Force refresh on next call
return self.get_active_phone_conversations()
raise
def find_recording_job_for_conversation(self, conversation_id: str) -> Optional[str]:
"""
Finds the active recording job ID for a given conversation ID.
Returns the recording job ID string or None if no recording is found.
"""
url = f"{self.base_url}/api/v2/recording/jobs"
headers = {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json"
}
# Filter for active recordings in the last 1 hour
# Format: YYYY-MM-DDThh:mm:ssZ
from datetime import datetime, timedelta
start_time = (datetime.utcnow() - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
params = {
"from": start_time,
"to": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"states": "active", # Only look at currently active recordings
"pageSize": 100
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
for job in data.get("entities", []):
# Check if this job belongs to our conversation
if job.get("conversationId") == conversation_id:
return job.get("id")
return None
except requests.exceptions.HTTPError as e:
print(f"Error fetching recording jobs: {e}")
return None
Step 2: Start a Recording
Starting a recording is done via a POST request to the /api/v2/recording/jobs endpoint. The body must include the conversationId. The API will automatically create a new recording job and return its ID.
Required Scope: recording:job:control
def start_recording(self, conversation_id: str) -> dict:
"""
Starts a recording for the specified conversation.
Returns the response JSON containing the new recording job ID.
"""
url = f"{self.base_url}/api/v2/recording/jobs"
headers = {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json"
}
# The body must contain the conversationId
# You can also specify 'segments' if you want to record specific participants,
# but default is usually all participants.
body = {
"conversationId": conversation_id
}
try:
response = requests.post(url, headers=headers, json=body)
response.raise_for_status()
# 201 Created is expected
job_data = response.json()
print(f"Recording started. Job ID: {job_data.get('id')}")
return job_data
except requests.exceptions.HTTPError as e:
error_detail = e.response.json() if e.response.content else {}
print(f"Failed to start recording: {error_detail}")
raise
Step 3: Stop a Recording
Stopping a recording is done via a POST request to /api/v2/recording/jobs/{recordingJobId}/stop. Note that this is a POST to a specific sub-resource, not a DELETE on the job itself. Deleting the job would remove the metadata and potentially the file reference, whereas stopping the recording finalizes the file and marks the job as complete.
Required Scope: recording:job:control
def stop_recording(self, recording_job_id: str) -> dict:
"""
Stops an active recording by its Job ID.
Returns the response JSON confirming the stop action.
"""
url = f"{self.base_url}/api/v2/recording/jobs/{recording_job_id}/stop"
headers = {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json"
}
try:
response = requests.post(url, headers=headers, json={})
response.raise_for_status()
# 200 OK is expected
result = response.json()
print(f"Recording stopped. Job ID: {recording_job_id}")
return result
except requests.exceptions.HTTPError as e:
error_detail = e.response.json() if e.response.content else {}
# Handle 404 if the recording was already stopped or deleted
if e.response.status_code == 404:
print(f"Recording job {recording_job_id} not found or already completed.")
else:
print(f"Failed to stop recording: {error_detail}")
raise
Complete Working Example
The following script combines the authentication, discovery, and control logic. It finds an active phone conversation, starts a recording, waits for a specified duration, and then stops the recording.
import os
import time
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Import the classes defined in previous steps
# In a real file, these would be in the same file or imported modules
# For this example, we inline them for copy-pasteability
class GenesysAuth:
def __init__(self):
self.client_id = os.getenv("GENESYS_CLIENT_ID")
self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
self.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
self.token = None
self.expires_at = None
def get_token(self) -> str:
if self.token and self.expires_at and datetime.now() < self.expires_at:
return self.token
url = f"{self.base_url}/oauth/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data["access_token"]
expires_in = token_data.get("expires_in", 86400)
self.expires_at = datetime.now() + timedelta(seconds=expires_in)
return self.token
class RecordingController:
def __init__(self, auth: GenesysAuth):
self.auth = auth
self.base_url = auth.base_url
def _get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.auth.get_token()}",
"Content-Type": "application/json"
}
def get_active_phone_conversations(self) -> list:
url = f"{self.base_url}/api/v2/conversations/phone"
params = {
"state": "active",
"pageSize": 5
}
try:
response = requests.get(url, headers=self._get_headers(), params=params)
response.raise_for_status()
return response.json().get("entities", [])
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
self.auth.token = None
return self.get_active_phone_conversations()
raise
def find_recording_job_for_conversation(self, conversation_id: str) -> str | None:
url = f"{self.base_url}/api/v2/recording/jobs"
start_time = (datetime.utcnow() - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
params = {
"from": start_time,
"to": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"states": "active",
"pageSize": 100
}
try:
response = requests.get(url, headers=self._get_headers(), params=params)
response.raise_for_status()
for job in response.json().get("entities", []):
if job.get("conversationId") == conversation_id:
return job.get("id")
return None
except requests.exceptions.HTTPError:
return None
def start_recording(self, conversation_id: str) -> str | None:
url = f"{self.base_url}/api/v2/recording/jobs"
body = {
"conversationId": conversation_id
}
try:
response = requests.post(url, headers=self._get_headers(), json=body)
response.raise_for_status()
return response.json().get("id")
except requests.exceptions.HTTPError as e:
print(f"Start Error: {e.response.text}")
return None
def stop_recording(self, recording_job_id: str) -> bool:
url = f"{self.base_url}/api/v2/recording/jobs/{recording_job_id}/stop"
try:
response = requests.post(url, headers=self._get_headers(), json={})
response.raise_for_status()
return True
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print("Recording not found or already stopped.")
return False
print(f"Stop Error: {e.response.text}")
return False
def main():
load_dotenv()
if not os.getenv("GENESYS_CLIENT_ID") or not os.getenv("GENESYS_CLIENT_SECRET"):
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")
auth = GenesysAuth()
controller = RecordingController(auth)
print("1. Fetching active phone conversations...")
conversations = controller.get_active_phone_conversations()
if not conversations:
print("No active phone conversations found. Please initiate a call before running this script.")
return
# Pick the first active conversation
target_conv = conversations[0]
conv_id = target_conv["id"]
print(f"2. Selected Conversation ID: {conv_id}")
# Check if already recording
existing_job_id = controller.find_recording_job_for_conversation(conv_id)
if existing_job_id:
print(f"3. Recording already active for this conversation. Job ID: {existing_job_id}")
job_id = existing_job_id
else:
print("3. Starting new recording...")
job_id = controller.start_recording(conv_id)
if not job_id:
print("Failed to start recording.")
return
print(f" Recording started. Job ID: {job_id}")
# Wait for 10 seconds to simulate a conversation duration
print("4. Waiting 10 seconds...")
time.sleep(10)
# Stop the recording
print("5. Stopping recording...")
success = controller.stop_recording(job_id)
if success:
print(" Recording stopped successfully.")
else:
print(" Failed to stop recording.")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
- Cause: The OAuth token does not have the
recording:job:controlscope. - Fix: Go to the Genesys Cloud Admin Console, navigate to Setup > Apps > API Clients, edit your client, and add the
recording:job:controlscope. Regenerate the token or refresh it.
Error: 404 Not Found (on Stop)
- Cause: The
recordingJobIdis invalid, or the recording has already been stopped/completed by another process or the system (e.g., call ended). - Fix: Ensure you are using the
recordingJobIdreturned from thestart_recordingcall or thefind_recording_job_for_conversationmethod. Do not use theconversationIdin the stop endpoint.
Error: 409 Conflict (on Start)
- Cause: You attempted to start a recording on a conversation that already has an active recording job.
- Fix: Check for existing active recordings before attempting to start a new one, as shown in the
mainfunction logic.
Error: 429 Too Many Requests
- Cause: You have exceeded the rate limit for the Recording API.
- Fix: Implement exponential backoff. The
Retry-Afterheader in the response indicates how many seconds to wait.
import time
def make_request_with_retry(url, headers, method="GET", json_body=None, max_retries=3):
for attempt in range(max_retries):
try:
if method == "GET":
response = requests.get(url, headers=headers)
elif method == "POST":
response = requests.post(url, headers=headers, json=json_body)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code != 429:
raise
raise Exception("Max retries exceeded for 429 error")