How to Programmatically Start and Stop Call Recordings via the Recording API

How to Programmatically Start and Stop Call Recordings via the Recording API

What You Will Build

  • This tutorial demonstrates how to trigger recording start and stop events for active conversations using the Genesys Cloud CX Recording API.
  • The solution uses the POST /api/v2/recordings/conversations/{conversationId}/events endpoint to inject control commands into a live session.
  • The implementation is provided in Python using the requests library and the official genesyscloud SDK for context.

Prerequisites

  • OAuth Client Type: Confidential Client (Client Credentials Grant) or Authorization Code Grant with PKCE.
  • Required Scopes:
    • conversation:recording:read
    • conversation:recording:write
    • conversation:read (to verify conversation status)
    • conversation:write (optional, if you need to update conversation metadata)
  • SDK/API Version: Genesys Cloud CX Platform API v2.
  • Language/Runtime: Python 3.9+.
  • External Dependencies:
    • requests (for raw HTTP calls)
    • genesyscloud (official SDK, used for authentication helper)

Authentication Setup

Before interacting with the Recording API, you must obtain a valid OAuth 2.0 access token. The Recording API endpoints are protected by standard Genesys Cloud CX authentication.

The most robust way to handle authentication in Python is to use the official SDK’s AuthenticationClient to manage token acquisition and refresh. However, for the raw API calls shown later, we will extract the token string.

import os
import json
from genesyscloud.platform_client import PlatformClientBuilder
from genesyscloud.auth.client import AuthClient

def get_access_token(client_id: str, client_secret: str, base_url: str) -> str:
    """
    Acquires an OAuth2 access token using Client Credentials Grant.
    """
    try:
        # Initialize the platform client builder
        builder = PlatformClientBuilder()
        builder.set_base_url(base_url)
        
        # Create the authentication client
        auth_client = AuthClient(client_id=client_id, client_secret=client_secret)
        
        # Request the token. This handles the POST to /oauth/token internally.
        token_response = auth_client.get_access_token()
        
        # Return the bearer token string
        return token_response.token
        
    except Exception as e:
        print(f"Authentication failed: {e}")
        raise

Critical Note on Scopes: If your application uses the Authorization Code Grant (user-specific), you must ensure the user has the conversation:recording:write permission assigned in their role. For machine-to-machine integrations, verify the OAuth Client’s scope settings in the Genesys Cloud Admin Console.

Implementation

Step 1: Identify the Active Conversation ID

You cannot start or stop a recording without a valid conversationId. This ID is generated when a call enters the Genesys Cloud environment. In a production system, you would likely receive this ID via the Webhook API or the Events API.

For this tutorial, we will query the active conversations endpoint to find a currently active voice call. This demonstrates how to locate the target resource before applying the recording command.

Endpoint: GET /api/v2/communications
Method: GET
Required Scope: conversation:read

import requests

def get_active_voice_call(access_token: str, base_url: str) -> dict | None:
    """
    Queries the Communications API to find an active voice conversation.
    Returns the first active voice conversation found, or None.
    """
    url = f"{base_url}/api/v2/communications"
    
    # Parameters to filter for active voice calls
    params = {
        "state": "active",
        "types": "voice"
    }
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        
        data = response.json()
        
        # The Communications API returns a list of items
        if data.get("items"):
            # Return the first active voice call
            conversation = data["items"][0]
            print(f"Found active conversation: {conversation['id']}")
            return conversation
        else:
            print("No active voice conversations found.")
            return None
            
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
        if response.status_code == 401:
            print("Authentication failed. Check your token and scopes.")
        elif response.status_code == 403:
            print("Forbidden. Ensure you have 'conversation:read' scope.")
        raise
    except Exception as e:
        print(f"An error occurred: {e}")
        raise

Step 2: Send the Recording Control Event

The core functionality resides in the POST /api/v2/recordings/conversations/{conversationId}/events endpoint. This endpoint accepts a JSON payload specifying the event type.

Endpoint: POST /api/v2/recordings/conversations/{conversationId}/events
Method: POST
Required Scope: conversation:recording:write

The payload body must contain the type field. Valid values are start and stop.

def control_recording(access_token: str, base_url: str, conversation_id: str, action: str) -> dict:
    """
    Sends a recording control event to a specific conversation.
    
    Args:
        access_token: Valid OAuth2 bearer token.
        base_url: Genesys Cloud base URL (e.g., https://mycompany.mygen.com).
        conversation_id: The ID of the active conversation.
        action: Either 'start' or 'stop'.
        
    Returns:
        The JSON response from the API.
    """
    
    # Validate action
    if action not in ["start", "stop"]:
        raise ValueError("Action must be 'start' or 'stop'")

    url = f"{base_url}/api/v2/recordings/conversations/{conversation_id}/events"
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    
    # The payload specifies the event type
    payload = {
        "type": action
    }

    try:
        response = requests.post(url, headers=headers, json=payload)
        
        # Handle specific HTTP status codes
        if response.status_code == 202:
            # 202 Accepted means the request has been accepted for processing.
            # Recording changes are asynchronous.
            print(f"Recording {action} request accepted for conversation {conversation_id}")
            return response.json()
        elif response.status_code == 404:
            print(f"Conversation {conversation_id} not found or not active.")
            raise Exception("Conversation not found")
        elif response.status_code == 409:
            # 409 Conflict often indicates the recording is already in the requested state
            print(f"Conflict: Recording is already {action}ed or cannot be {action}ed.")
            raise Exception("Recording state conflict")
        elif response.status_code == 429:
            # Rate limit exceeded
            retry_after = response.headers.get("Retry-After", "5")
            print(f"Rate limited. Wait {retry_after} seconds before retrying.")
            raise Exception("Rate limit exceeded")
        else:
            response.raise_for_status()
            
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        raise

Why 202 Accepted?
The Genesys Cloud CX platform processes recording state changes asynchronously. The media servers must synchronize the recording stream across multiple nodes. The API returns 202 Accepted to confirm that the command was received by the control plane. It does not guarantee that the recording has physically started or stopped on the media server at that exact millisecond.

Step 3: Verify Recording State

Because the operation is asynchronous, you should poll the recording status to confirm the change took effect. You can do this by querying the recordings for that specific conversation.

Endpoint: GET /api/v2/recordings/conversations/{conversationId}
Method: GET
Required Scope: conversation:recording:read

def check_recording_status(access_token: str, base_url: str, conversation_id: str) -> dict | None:
    """
    Checks the current recording status of a conversation.
    """
    url = f"{base_url}/api/v2/recordings/conversations/{conversation_id}"
    
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            data = response.json()
            # The API returns a list of recordings. Usually one per conversation.
            if data.get("items"):
                recording = data["items"][0]
                status = recording.get("status")
                print(f"Current recording status: {status}")
                return recording
            else:
                print("No active recordings found for this conversation.")
                return None
        elif response.status_code == 404:
            print("No recordings found for this conversation ID.")
            return None
        else:
            response.raise_for_status()
            
    except Exception as e:
        print(f"Error checking status: {e}")
        raise

Complete Working Example

This script combines authentication, conversation discovery, and recording control into a single executable workflow. It finds an active call, starts the recording, waits briefly, checks the status, stops the recording, and verifies the stop.

import os
import time
import requests
from genesyscloud.platform_client import PlatformClientBuilder
from genesyscloud.auth.client import AuthClient

# Configuration
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_BASE_URL = os.getenv("GENESYS_BASE_URL", "https://mycompany.mygen.com")

def main():
    if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
        raise EnvironmentError("Missing GENESYS_CLIENT_ID or GENESYS_CLIENT_SECRET environment variables.")

    print("1. Authenticating...")
    try:
        builder = PlatformClientBuilder()
        builder.set_base_url(GENESYS_BASE_URL)
        auth_client = AuthClient(client_id=GENESYS_CLIENT_ID, client_secret=GENESYS_CLIENT_SECRET)
        token_response = auth_client.get_access_token()
        access_token = token_response.token
        print("Authentication successful.")
    except Exception as e:
        print(f"Authentication failed: {e}")
        return

    print("2. Finding an active voice conversation...")
    conversation = get_active_voice_call(access_token, GENESYS_BASE_URL)
    
    if not conversation:
        print("No active voice call found. Please place a test call and try again.")
        return

    conversation_id = conversation["id"]
    print(f"Target Conversation ID: {conversation_id}")

    print("3. Starting recording...")
    try:
        control_recording(access_token, GENESYS_BASE_URL, conversation_id, "start")
    except Exception as e:
        print(f"Failed to start recording: {e}")
        return

    # Wait for the asynchronous operation to propagate
    print("4. Waiting 5 seconds for recording state to propagate...")
    time.sleep(5)

    print("5. Checking recording status...")
    check_recording_status(access_token, GENESYS_BASE_URL, conversation_id)

    print("6. Stopping recording...")
    try:
        control_recording(access_token, GENESYS_BASE_URL, conversation_id, "stop")
    except Exception as e:
        print(f"Failed to stop recording: {e}")
        return

    # Wait for stop to propagate
    print("7. Waiting 5 seconds for stop state to propagate...")
    time.sleep(5)

    print("8. Final status check...")
    final_status = check_recording_status(access_token, GENESYS_BASE_URL, conversation_id)
    
    if final_status:
        print(f"Final Recording Status: {final_status.get('status')}")
    else:
        print("Recording ended and is no longer active.")

def get_active_voice_call(access_token: str, base_url: str) -> dict | None:
    """
    Queries the Communications API to find an active voice conversation.
    """
    url = f"{base_url}/api/v2/communications"
    params = {"state": "active", "types": "voice"}
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}

    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        if data.get("items"):
            return data["items"][0]
        return None
    except Exception as e:
        print(f"Error fetching conversations: {e}")
        return None

def control_recording(access_token: str, base_url: str, conversation_id: str, action: str) -> dict:
    """
    Sends a recording control event to a specific conversation.
    """
    if action not in ["start", "stop"]:
        raise ValueError("Action must be 'start' or 'stop'")

    url = f"{base_url}/api/v2/recordings/conversations/{conversation_id}/events"
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}
    payload = {"type": action}

    try:
        response = requests.post(url, headers=headers, json=payload)
        
        if response.status_code == 202:
            print(f"Recording {action} request accepted.")
            return response.json()
        elif response.status_code == 404:
            raise Exception("Conversation not found")
        elif response.status_code == 409:
            raise Exception(f"Conflict: Recording is already {action}ed or cannot be {action}ed.")
        elif response.status_code == 429:
            raise Exception("Rate limit exceeded")
        else:
            response.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        raise

def check_recording_status(access_token: str, base_url: str, conversation_id: str) -> dict | None:
    """
    Checks the current recording status of a conversation.
    """
    url = f"{base_url}/api/v2/recordings/conversations/{conversation_id}"
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}

    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            if data.get("items"):
                return data["items"][0]
            return None
        elif response.status_code == 404:
            return None
        else:
            response.raise_for_status()
    except Exception as e:
        print(f"Error checking status: {e}")
        return None

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The OAuth token used in the Authorization header does not have the required scope.
Fix: Verify that the OAuth Client has conversation:recording:write assigned. If using a user token, verify the user’s role includes this permission.
Code Check:

# Ensure your token request includes the correct scopes if using custom auth flow
# For Client Credentials, this is configured in the Genesys Admin Console > Security > OAuth Clients

Error: 404 Not Found

Cause: The conversationId provided is invalid, expired, or the conversation has ended.
Fix: Ensure you are using a live, active conversation ID. The Recording API only accepts events for active conversations.
Debugging Tip: Use the GET /api/v2/communications endpoint to verify the conversation state is active.

Error: 409 Conflict

Cause: You attempted to start a recording that is already running, or stop a recording that is already stopped.
Fix: Check the current state before sending the event.
Code Fix:

# Check status before acting
current_status = check_recording_status(access_token, base_url, conversation_id)
if current_status:
    if action == "start" and current_status.get("status") == "recording":
        print("Recording is already active. Skipping start command.")
        return
    elif action == "stop" and current_status.get("status") == "ended":
        print("Recording is already ended. Skipping stop command.")
        return

Error: 429 Too Many Requests

Cause: You exceeded the API rate limit for the Recording API.
Fix: Implement exponential backoff and retry logic.
Code Fix:

import time

def send_with_retry(url, headers, payload, max_retries=3):
    for attempt in range(max_retries):
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
            print(f"Rate limited. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
        else:
            return response
    raise Exception("Max retries exceeded")

Official References