Programmatically Control Call Recording States via the Genesys Cloud API

Programmatically Control Call Recording States via the Genesys Cloud API

What You Will Build

  • A Python script that programmatically starts and stops recording on active phone conversations.
  • This tutorial uses the Genesys Cloud CX Recording API (/api/v2/recording/jobs).
  • The implementation is demonstrated in Python using the genesyscloud SDK and requests for direct HTTP calls.

Prerequisites

  • OAuth Client Type: A Genesys Cloud API Client with the client_credentials grant type.
  • Required Scopes:
    • recording:job:control (Required to start/stop recordings)
    • recording:job:view (Required to list and check recording status)
    • conversation:phone:view (Optional, used here to identify active calls if not known via other means)
  • SDK Version: genesyscloud Python SDK v10.0.0 or later.
  • Language/Runtime: Python 3.9+
  • External Dependencies:
    • genesyscloud
    • requests
    • python-dotenv (for secure credential management)

Authentication Setup

Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the client_credentials flow is the standard. The token expires after 24 hours, but it is best practice to cache the token and refresh it only when necessary or upon receiving a 401 Unauthorized response.

Below is a helper class that manages the token lifecycle. This is not part of the official SDK but is a robust pattern for production applications.

import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv

load_dotenv()

class GenesysAuth:
    def __init__(self):
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        self.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
        self.token = None
        self.expires_at = None

    def get_token(self) -> str:
        """
        Retrieves an OAuth2 token. Uses cached token if valid.
        """
        if self.token and self.expires_at and datetime.now() < self.expires_at:
            return self.token

        url = f"{self.base_url}/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self.token = token_data["access_token"]
        
        # Parse expires_in (seconds) to datetime
        expires_in = token_data.get("expires_in", 86400)
        self.expires_at = datetime.now() + timedelta(seconds=expires_in)
        
        return self.token

Implementation

Step 1: Identify the Conversation and Recording Job

To control a recording, you need two pieces of information:

  1. The conversationId (UUID of the active call).
  2. The recordingJobId (UUID of the specific recording job associated with that conversation).

The Recording API does not allow you to start/stop a recording using only the conversationId. You must first locate the recordingJobId. In a real-world scenario, you might receive the conversationId from a CTI event or a webhook. Here, we will query active phone conversations to find one, then query the recording jobs to find the associated recording.

Required Scope: conversation:phone:view, recording:job:view

import json
import time
import requests
from typing import Optional, Tuple

class RecordingController:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = auth.base_url

    def get_active_phone_conversations(self) -> list:
        """
        Retrieves a list of active phone conversations.
        Returns a list of conversation objects.
        """
        url = f"{self.base_url}/api/v2/conversations/phone"
        headers = {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json"
        }
        params = {
            "state": "active",
            "pageSize": 20
        }

        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            data = response.json()
            return data.get("entities", [])
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                print("Token expired. Refreshing...")
                self.auth.token = None # Force refresh on next call
                return self.get_active_phone_conversations()
            raise

    def find_recording_job_for_conversation(self, conversation_id: str) -> Optional[str]:
        """
        Finds the active recording job ID for a given conversation ID.
        Returns the recording job ID string or None if no recording is found.
        """
        url = f"{self.base_url}/api/v2/recording/jobs"
        headers = {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json"
        }
        
        # Filter for active recordings in the last 1 hour
        # Format: YYYY-MM-DDThh:mm:ssZ
        from datetime import datetime, timedelta
        start_time = (datetime.utcnow() - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
        
        params = {
            "from": start_time,
            "to": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
            "states": "active", # Only look at currently active recordings
            "pageSize": 100
        }

        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            data = response.json()
            
            for job in data.get("entities", []):
                # Check if this job belongs to our conversation
                if job.get("conversationId") == conversation_id:
                    return job.get("id")
            
            return None

        except requests.exceptions.HTTPError as e:
            print(f"Error fetching recording jobs: {e}")
            return None

Step 2: Start a Recording

Starting a recording is done via a POST request to the /api/v2/recording/jobs endpoint. The body must include the conversationId. The API will automatically create a new recording job and return its ID.

Required Scope: recording:job:control

    def start_recording(self, conversation_id: str) -> dict:
        """
        Starts a recording for the specified conversation.
        Returns the response JSON containing the new recording job ID.
        """
        url = f"{self.base_url}/api/v2/recording/jobs"
        headers = {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json"
        }
        
        # The body must contain the conversationId
        # You can also specify 'segments' if you want to record specific participants,
        # but default is usually all participants.
        body = {
            "conversationId": conversation_id
        }

        try:
            response = requests.post(url, headers=headers, json=body)
            response.raise_for_status()
            
            # 201 Created is expected
            job_data = response.json()
            print(f"Recording started. Job ID: {job_data.get('id')}")
            return job_data

        except requests.exceptions.HTTPError as e:
            error_detail = e.response.json() if e.response.content else {}
            print(f"Failed to start recording: {error_detail}")
            raise

Step 3: Stop a Recording

Stopping a recording is done via a POST request to /api/v2/recording/jobs/{recordingJobId}/stop. Note that this is a POST to a specific sub-resource, not a DELETE on the job itself. Deleting the job would remove the metadata and potentially the file reference, whereas stopping the recording finalizes the file and marks the job as complete.

Required Scope: recording:job:control

    def stop_recording(self, recording_job_id: str) -> dict:
        """
        Stops an active recording by its Job ID.
        Returns the response JSON confirming the stop action.
        """
        url = f"{self.base_url}/api/v2/recording/jobs/{recording_job_id}/stop"
        headers = {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json"
        }

        try:
            response = requests.post(url, headers=headers, json={})
            response.raise_for_status()
            
            # 200 OK is expected
            result = response.json()
            print(f"Recording stopped. Job ID: {recording_job_id}")
            return result

        except requests.exceptions.HTTPError as e:
            error_detail = e.response.json() if e.response.content else {}
            
            # Handle 404 if the recording was already stopped or deleted
            if e.response.status_code == 404:
                print(f"Recording job {recording_job_id} not found or already completed.")
            else:
                print(f"Failed to stop recording: {error_detail}")
            raise

Complete Working Example

The following script combines the authentication, discovery, and control logic. It finds an active phone conversation, starts a recording, waits for a specified duration, and then stops the recording.

import os
import time
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv

# Import the classes defined in previous steps
# In a real file, these would be in the same file or imported modules
# For this example, we inline them for copy-pasteability

class GenesysAuth:
    def __init__(self):
        self.client_id = os.getenv("GENESYS_CLIENT_ID")
        self.client_secret = os.getenv("GENESYS_CLIENT_SECRET")
        self.base_url = os.getenv("GENESYS_BASE_URL", "https://api.mypurecloud.com")
        self.token = None
        self.expires_at = None

    def get_token(self) -> str:
        if self.token and self.expires_at and datetime.now() < self.expires_at:
            return self.token

        url = f"{self.base_url}/oauth/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()

        token_data = response.json()
        self.token = token_data["access_token"]
        expires_in = token_data.get("expires_in", 86400)
        self.expires_at = datetime.now() + timedelta(seconds=expires_in)
        return self.token

class RecordingController:
    def __init__(self, auth: GenesysAuth):
        self.auth = auth
        self.base_url = auth.base_url

    def _get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.auth.get_token()}",
            "Content-Type": "application/json"
        }

    def get_active_phone_conversations(self) -> list:
        url = f"{self.base_url}/api/v2/conversations/phone"
        params = {
            "state": "active",
            "pageSize": 5
        }
        try:
            response = requests.get(url, headers=self._get_headers(), params=params)
            response.raise_for_status()
            return response.json().get("entities", [])
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                self.auth.token = None
                return self.get_active_phone_conversations()
            raise

    def find_recording_job_for_conversation(self, conversation_id: str) -> str | None:
        url = f"{self.base_url}/api/v2/recording/jobs"
        start_time = (datetime.utcnow() - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
        params = {
            "from": start_time,
            "to": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
            "states": "active",
            "pageSize": 100
        }
        try:
            response = requests.get(url, headers=self._get_headers(), params=params)
            response.raise_for_status()
            for job in response.json().get("entities", []):
                if job.get("conversationId") == conversation_id:
                    return job.get("id")
            return None
        except requests.exceptions.HTTPError:
            return None

    def start_recording(self, conversation_id: str) -> str | None:
        url = f"{self.base_url}/api/v2/recording/jobs"
        body = {
            "conversationId": conversation_id
        }
        try:
            response = requests.post(url, headers=self._get_headers(), json=body)
            response.raise_for_status()
            return response.json().get("id")
        except requests.exceptions.HTTPError as e:
            print(f"Start Error: {e.response.text}")
            return None

    def stop_recording(self, recording_job_id: str) -> bool:
        url = f"{self.base_url}/api/v2/recording/jobs/{recording_job_id}/stop"
        try:
            response = requests.post(url, headers=self._get_headers(), json={})
            response.raise_for_status()
            return True
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                print("Recording not found or already stopped.")
                return False
            print(f"Stop Error: {e.response.text}")
            return False

def main():
    load_dotenv()
    
    if not os.getenv("GENESYS_CLIENT_ID") or not os.getenv("GENESYS_CLIENT_SECRET"):
        raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set in .env")

    auth = GenesysAuth()
    controller = RecordingController(auth)

    print("1. Fetching active phone conversations...")
    conversations = controller.get_active_phone_conversations()

    if not conversations:
        print("No active phone conversations found. Please initiate a call before running this script.")
        return

    # Pick the first active conversation
    target_conv = conversations[0]
    conv_id = target_conv["id"]
    print(f"2. Selected Conversation ID: {conv_id}")

    # Check if already recording
    existing_job_id = controller.find_recording_job_for_conversation(conv_id)
    if existing_job_id:
        print(f"3. Recording already active for this conversation. Job ID: {existing_job_id}")
        job_id = existing_job_id
    else:
        print("3. Starting new recording...")
        job_id = controller.start_recording(conv_id)
        if not job_id:
            print("Failed to start recording.")
            return
        print(f"   Recording started. Job ID: {job_id}")

    # Wait for 10 seconds to simulate a conversation duration
    print("4. Waiting 10 seconds...")
    time.sleep(10)

    # Stop the recording
    print("5. Stopping recording...")
    success = controller.stop_recording(job_id)
    if success:
        print("   Recording stopped successfully.")
    else:
        print("   Failed to stop recording.")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

  • Cause: The OAuth token does not have the recording:job:control scope.
  • Fix: Go to the Genesys Cloud Admin Console, navigate to Setup > Apps > API Clients, edit your client, and add the recording:job:control scope. Regenerate the token or refresh it.

Error: 404 Not Found (on Stop)

  • Cause: The recordingJobId is invalid, or the recording has already been stopped/completed by another process or the system (e.g., call ended).
  • Fix: Ensure you are using the recordingJobId returned from the start_recording call or the find_recording_job_for_conversation method. Do not use the conversationId in the stop endpoint.

Error: 409 Conflict (on Start)

  • Cause: You attempted to start a recording on a conversation that already has an active recording job.
  • Fix: Check for existing active recordings before attempting to start a new one, as shown in the main function logic.

Error: 429 Too Many Requests

  • Cause: You have exceeded the rate limit for the Recording API.
  • Fix: Implement exponential backoff. The Retry-After header in the response indicates how many seconds to wait.
import time

def make_request_with_retry(url, headers, method="GET", json_body=None, max_retries=3):
    for attempt in range(max_retries):
        try:
            if method == "GET":
                response = requests.get(url, headers=headers)
            elif method == "POST":
                response = requests.post(url, headers=headers, json=json_body)
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 2))
                print(f"Rate limited. Waiting {retry_after} seconds...")
                time.sleep(retry_after)
                continue
            
            response.raise_for_status()
            return response
        except requests.exceptions.HTTPError as e:
            if e.response.status_code != 429:
                raise
    raise Exception("Max retries exceeded for 429 error")

Official References