How to disconnect a specific participant from a conference call using the Conversations API

How to disconnect a specific participant from a conference call using the Conversations API

What You Will Build

  • This tutorial provides a working Python script that programmatically removes a single leg from an active multi-party conversation without terminating the entire call.
  • The implementation uses the Genesys Cloud CX Conversations API v2 and the official Python SDK.
  • All code examples run in Python 3.9+ using synchronous HTTP clients and standard type hints.

Prerequisites

  • OAuth 2.0 Client Credentials flow configured in Genesys Cloud with the conversation:participant:delete scope
  • Genesys Cloud API v2 (Conversations)
  • Python 3.9 or higher
  • requests (v2.31.0+), genesys-cloud-python-sdk (v2.2.0+), pydantic (for type validation)
  • Active conversation ID and participant ID retrieved via prior API calls or webhook payloads

Authentication Setup

Genesys Cloud requires a bearer token for every API request. The client credentials flow is the standard approach for server-to-server integrations. The token expires after one hour, so production code must cache and refresh it automatically.

import requests
import time
from typing import Optional

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0

    def _fetch_token(self) -> str:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = requests.post(self.token_url, data=payload, headers=headers)
        response.raise_for_status()
        return response.json()["access_token"]

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry - 300:
            return self.access_token
        self.access_token = self._fetch_token()
        self.token_expiry = time.time() + 3600
        return self.access_token

The code checks the cached token against the current timestamp and subtracts a five-minute buffer to prevent edge-case expiration during request transmission. The _fetch_token method raises an exception on non-2xx responses, which forces the calling code to handle authentication failures explicitly.

Implementation

Step 1: Identify the conversation and participant

Before disconnecting a leg, you must supply two identifiers: the conversationId and the participantId. The conversation ID follows the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and represents the entire multi-party session. The participant ID is a separate UUID assigned to each individual leg (agent, external caller, internal transfer, or conference room).

You can retrieve these values from the Conversation Webhook payload or by querying the active conversations endpoint. The following request demonstrates how to list active conversations and extract participant IDs:

import requests
from typing import Dict, Any

def get_active_participants(auth: GenesysAuth, environment: str) -> Dict[str, Any]:
    base_url = f"https://{environment}/api/v2/conversations"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Accept": "application/json"
    }
    params = {
        "types": "voice",
        "filter": "state:in-progress"
    }
    response = requests.get(base_url, headers=headers, params=params)
    response.raise_for_status()
    return response.json()

The types parameter restricts results to voice conversations, and the filter parameter uses the Genesys query syntax to return only active sessions. The response contains an array of conversations, each with a participants array. You must iterate through this array to locate the exact participant ID you intend to remove.

Step 2: Execute the disconnect request

The Conversations API exposes a dedicated endpoint for participant removal. The HTTP method is POST, and the path requires both identifiers. The API design uses POST instead of DELETE because the operation triggers a server-side state transition and may accept optional configuration in the request body. The endpoint returns a 204 No Content response on success, which follows REST conventions for idempotent destructive actions.

Raw HTTP implementation with retry logic for rate limiting:

import time
import requests
from typing import Optional

def disconnect_participant_http(
    auth: GenesysAuth,
    environment: str,
    conversation_id: str,
    participant_id: str,
    max_retries: int = 3
) -> Optional[requests.Response]:
    base_url = f"https://{environment}/api/v2/conversations"
    endpoint = f"{base_url}/{conversation_id}/participants/{participant_id}/disconnect"
    
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    body = {}  # Empty JSON object satisfies the endpoint contract
    
    attempt = 0
    while attempt < max_retries:
        response = requests.post(endpoint, json=body, headers=headers)
        
        if response.status_code == 204:
            return response
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
            print(f"Rate limited. Waiting {retry_after} seconds before retry {attempt + 1}")
            time.sleep(retry_after)
            attempt += 1
        else:
            response.raise_for_status()
            
    return None

The retry loop handles 429 Too Many Requests by reading the Retry-After header and applying exponential backoff as a fallback. The empty JSON body is required because the SDK and API gateway validate the content type. Sending null or omitting the body entirely may trigger a 415 Unsupported Media Type error depending on the gateway configuration.

Step 3: Process results and validate state

The 204 No Content response indicates the platform accepted the disconnect request. The actual media termination occurs asynchronously on the media servers. You should poll the conversation endpoint to confirm the participant state changed to ended or removed. The following function demonstrates state verification:

import requests
from typing import Dict, Any

def verify_disconnect(
    auth: GenesysAuth,
    environment: str,
    conversation_id: str,
    participant_id: str,
    timeout: int = 10
) -> bool:
    base_url = f"https://{environment}/api/v2/conversations/{conversation_id}/participants"
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Accept": "application/json"
    }
    
    start_time = time.time()
    while time.time() - start_time < timeout:
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()
        participants = response.json()["entities"]
        
        for p in participants:
            if p["id"] == participant_id:
                if p["state"] in ["ended", "removed"]:
                    return True
                break
        time.sleep(1)
        
    return False

The verification loop queries the participant list and checks the state field. Voice participants transition through in-progress, alerting, queued, and finally ended. The API design separates the disconnect command from the state confirmation to accommodate media server latency and network routing delays.

Complete Working Example

The following script combines authentication, disconnect execution, and verification into a single runnable module. Replace the credential placeholders and identifier values before execution.

import time
import requests
from typing import Optional, Dict, Any

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.environment = environment
        self.token_url = f"https://{environment}/oauth/token"
        self.access_token: Optional[str] = None
        self.token_expiry: float = 0.0

    def _fetch_token(self) -> str:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = requests.post(self.token_url, data=payload, headers=headers)
        response.raise_for_status()
        return response.json()["access_token"]

    def get_token(self) -> str:
        if self.access_token and time.time() < self.token_expiry - 300:
            return self.access_token
        self.access_token = self._fetch_token()
        self.token_expiry = time.time() + 3600
        return self.access_token

def disconnect_participant(
    auth: GenesysAuth,
    environment: str,
    conversation_id: str,
    participant_id: str
) -> bool:
    base_url = f"https://{environment}/api/v2/conversations"
    endpoint = f"{base_url}/{conversation_id}/participants/{participant_id}/disconnect"
    
    headers = {
        "Authorization": f"Bearer {auth.get_token()}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    
    body = {}
    max_retries = 3
    attempt = 0
    
    while attempt < max_retries:
        response = requests.post(endpoint, json=body, headers=headers)
        
        if response.status_code == 204:
            print("Disconnect request accepted (204 No Content)")
            break
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
            print(f"Rate limited. Waiting {retry_after} seconds before retry {attempt + 1}")
            time.sleep(retry_after)
            attempt += 1
        else:
            print(f"API Error: {response.status_code} - {response.text}")
            response.raise_for_status()
    else:
        print("Max retries exceeded. Disconnect may not have processed.")
        return False

    # Verification phase
    verify_url = f"{base_url}/{conversation_id}/participants"
    start_time = time.time()
    while time.time() - start_time < 15:
        resp = requests.get(verify_url, headers=headers)
        resp.raise_for_status()
        for p in resp.json()["entities"]:
            if p["id"] == participant_id:
                if p["state"] in ["ended", "removed"]:
                    print(f"Participant {participant_id} successfully disconnected.")
                    return True
                break
        time.sleep(1)
        
    print("Verification timeout. Participant state may still be transitioning.")
    return False

if __name__ == "__main__":
    CLIENT_ID = "YOUR_CLIENT_ID"
    CLIENT_SECRET = "YOUR_CLIENT_SECRET"
    ENVIRONMENT = "mypurecloud.com"
    CONVERSATION_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    PARTICIPANT_ID = "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"
    
    auth_client = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
    disconnect_participant(auth_client, ENVIRONMENT, CONVERSATION_ID, PARTICIPANT_ID)

The script handles token caching, retry logic, and state verification in a single flow. It prints progress messages and raises exceptions on unrecoverable errors. You can integrate this module into larger orchestration systems or event-driven architectures.

Common Errors & Debugging

Error: 401 Unauthorized

  • Cause: The access token has expired, is malformed, or the client credentials are incorrect.
  • Fix: Verify the client_id and client_secret match the integration settings in Genesys Cloud. Ensure the OAuth client is active and not disabled. Check the token response for error: "invalid_client".
  • Code adjustment: Add explicit validation of the token response before caching.
def _fetch_token(self) -> str:
    payload = {
        "grant_type": "client_credentials",
        "client_id": self.client_id,
        "client_secret": self.client_secret
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = requests.post(self.token_url, data=payload, headers=headers)
    if response.status_code != 200:
        raise ValueError(f"OAuth failed: {response.status_code} - {response.text}")
    token_data = response.json()
    if "access_token" not in token_data:
        raise ValueError("Token response missing access_token field")
    return token_data["access_token"]

Error: 403 Forbidden

  • Cause: The OAuth client lacks the conversation:participant:delete scope, or the integration user does not have the required role permissions.
  • Fix: Navigate to Admin > Security > OAuth clients > Edit your client > Scopes. Add conversation:participant:delete. Assign the integration user the Conversation Administrator or API User role with conversation write permissions.
  • Code adjustment: Log the exact scope requirements in your deployment documentation.

Error: 404 Not Found

  • Cause: The conversation_id or participant_id is invalid, expired, or belongs to a different organization environment.
  • Fix: Validate both UUIDs against the active conversations endpoint before calling disconnect. Conference participants may have short lifespans if the call drops unexpectedly.
  • Code adjustment: Wrap the disconnect call in a try-except block that catches requests.exceptions.HTTPError and checks for 404 specifically.
try:
    response = requests.post(endpoint, json=body, headers=headers)
    response.raise_for_status()
except requests.exceptions.HTTPError as http_err:
    if http_err.response.status_code == 404:
        print(f"Conversation or participant not found. Verify IDs: {conversation_id}, {participant_id}")
    else:
        raise

Error: 429 Too Many Requests

  • Cause: The API gateway enforces rate limits per OAuth client or per tenant. Disconnect operations share limits with other conversation endpoints.
  • Fix: Implement exponential backoff with jitter. Read the Retry-After header when present. Distribute calls across multiple OAuth clients if processing bulk disconnects.
  • Code adjustment: The complete example already includes retry logic. Add random jitter to prevent thundering herd scenarios.
import random
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
jitter = random.uniform(0, 0.5 * retry_after)
time.sleep(retry_after + jitter)

Official References