Stream Genesys Cloud Conversation Search Results with a Python Generator
What You Will Build
- Build a Python generator that streams conversation search results from Genesys Cloud CX without loading the entire dataset into memory.
- The implementation uses the Genesys Cloud Search API (
POST /api/v2/search) with cursor-based navigation via thenextPagetoken. - The code includes automatic deduplication, exponential backoff for rate limits, and production-grade error handling.
Prerequisites
- OAuth 2.0 Client Credentials grant with
search:queryscope - Python 3.9+ runtime
requestslibrary (pip install requests)- Genesys Cloud organization with active conversation data and a configured environment URL (e.g.,
https://api.mypurecloud.com) - Familiarity with Python generators and asynchronous/concurrent processing patterns
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API access. You must exchange your client credentials for a short-lived access token before calling the Search API. The token expires after one hour and must be refreshed automatically in production systems.
import requests
import time
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"{base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry:
return self.access_token
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "search:query"
}
response = requests.post(self.token_url, headers=headers, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Subtract 60 seconds to ensure we refresh before actual expiry
self.token_expiry = time.time() + token_data["expires_in"] - 60
return self.access_token
The get_token method caches the token in memory and checks expiration before making redundant HTTP calls. In distributed environments, replace the in-memory cache with Redis or a dedicated secret manager. The search:query scope is mandatory for the Search API. Requests without this scope return 403 Forbidden.
Implementation
Step 1: Initialize the HTTP Client and Define the Search Query
The Search API accepts a JSON body containing a Lucene-style query, pagination parameters, and field filters. You must specify the type as conversation to target conversation records. The size parameter controls how many results return per page. Genesys Cloud enforces a maximum page size of 200.
import json
from typing import Iterator, Dict, Any
class ConversationSearchClient:
def __init__(self, auth: GenesysAuth, base_url: str):
self.auth = auth
self.search_url = f"{base_url}/api/v2/search"
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"Content-Type": "application/json"
})
def _build_query_payload(self, query_string: str, page: Optional[str] = None, size: int = 100) -> Dict[str, Any]:
payload = {
"query": query_string,
"type": "conversation",
"size": size,
"sort": [{"field": "timestamp", "direction": "desc"}]
}
if page:
payload["page"] = page
return payload
The _build_query_payload method constructs the exact JSON structure the API expects. The page parameter accepts either an integer (for the first request) or the opaque nextPage string returned by subsequent responses. The sort array ensures deterministic ordering, which prevents duplicate results when data changes during pagination.
Step 2: Implement Cursor Pagination and Deduplication Logic
Genesys Cloud returns a nextPage field in the response body when additional results exist. This string acts as a cursor. You must pass it back as the page parameter in the next request. Search results can overlap if conversations are updated while you paginate. The generator tracks seen conversation identifiers in a set and yields only unique records.
def stream_conversations(self, query_string: str, max_results: Optional[int] = None) -> Iterator[Dict[str, Any]]:
seen_ids: set[str] = set()
current_page: Optional[str] = None
results_yielded = 0
while True:
payload = self._build_query_payload(query_string, page=current_page, size=200)
try:
access_token = self.auth.get_token()
headers = {"Authorization": f"Bearer {access_token}"}
response = self.session.post(self.search_url, headers=headers, json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 2))
time.sleep(retry_after)
continue
raise
data = response.json()
results = data.get("results", [])
next_page_token = data.get("nextPage")
for result in results:
conv_id = result.get("conversationId") or result.get("id")
if conv_id and conv_id not in seen_ids:
seen_ids.add(conv_id)
yield result
results_yielded += 1
if max_results and results_yielded >= max_results:
return
if not next_page_token:
break
current_page = next_page_token
The generator maintains state across yield calls. The seen_ids set prevents duplicates caused by concurrent data modifications. The max_results parameter allows early termination without breaking the cursor chain. The continue statement on 429 retries the same page without advancing the cursor, ensuring data consistency.
Step 3: Handle Rate Limits and Parse Response Payloads
The Search API enforces strict rate limits per organization. You must implement exponential backoff for 429 Too Many Requests responses. The API returns a Retry-After header indicating seconds to wait. If the header is missing, a default backoff of two seconds applies.
Below is a realistic HTTP request cycle and response structure:
Request:
POST /api/v2/search HTTP/1.1
Host: api.mypurecloud.com
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json
{
"query": "status:completed",
"type": "conversation",
"size": 200,
"sort": [{"field": "timestamp", "direction": "desc"}]
}
Response:
{
"page": 1,
"pageSize": 200,
"total": 1542,
"nextPage": "eyJwYWdlIjoyLCJzaXplIjoyMDB9",
"results": [
{
"type": "conversation",
"id": "conv-8a7b6c5d-4e3f-2a1b-0c9d-8e7f6a5b4c3d",
"conversationId": "conv-8a7b6c5d-4e3f-2a1b-0c9d-8e7f6a5b4c3d",
"timestamp": "2024-05-12T14:32:18.000Z",
"fields": {
"channelType": "voice",
"status": "completed",
"durationSeconds": 142
}
}
]
}
The nextPage token is base64-encoded and opaque. You must never parse or modify it. Pass it exactly as received. The total field indicates the estimated match count, but you should rely on the absence of nextPage to determine pagination completion.
Complete Working Example
The following script combines authentication, pagination, deduplication, and retry logic into a single executable module. Replace the placeholder credentials with your Genesys Cloud application settings.
import requests
import time
import sys
from typing import Iterator, Dict, Any, Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = f"{base_url}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0.0
def get_token(self) -> str:
if self.access_token and time.time() < self.token_expiry:
return self.access_token
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "search:query"
}
response = requests.post(self.token_url, headers=headers, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"] - 60
return self.access_token
class ConversationSearchClient:
def __init__(self, auth: GenesysAuth, base_url: str):
self.auth = auth
self.search_url = f"{base_url}/api/v2/search"
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"Content-Type": "application/json"
})
def _build_query_payload(self, query_string: str, page: Optional[str] = None, size: int = 200) -> Dict[str, Any]:
payload = {
"query": query_string,
"type": "conversation",
"size": size,
"sort": [{"field": "timestamp", "direction": "desc"}]
}
if page:
payload["page"] = page
return payload
def stream_conversations(self, query_string: str, max_results: Optional[int] = None) -> Iterator[Dict[str, Any]]:
seen_ids: set[str] = set()
current_page: Optional[str] = None
results_yielded = 0
while True:
payload = self._build_query_payload(query_string, page=current_page, size=200)
try:
access_token = self.auth.get_token()
headers = {"Authorization": f"Bearer {access_token}"}
response = self.session.post(self.search_url, headers=headers, json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 2))
time.sleep(retry_after)
continue
raise
data = response.json()
results = data.get("results", [])
next_page_token = data.get("nextPage")
for result in results:
conv_id = result.get("conversationId") or result.get("id")
if conv_id and conv_id not in seen_ids:
seen_ids.add(conv_id)
yield result
results_yielded += 1
if max_results and results_yielded >= max_results:
return
if not next_page_token:
break
current_page = next_page_token
if __name__ == "__main__":
# Configuration
CLIENT_ID = "YOUR_CLIENT_ID"
CLIENT_SECRET = "YOUR_CLIENT_SECRET"
ORG_URL = "https://api.mypurecloud.com"
QUERY = "status:completed channelType:voice"
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, ORG_URL)
client = ConversationSearchClient(auth, ORG_URL)
print("Starting conversation search stream...")
count = 0
try:
for conversation in client.stream_conversations(QUERY, max_results=500):
print(f"[{count}] ID: {conversation['id']} | Time: {conversation['timestamp']}")
count += 1
except requests.exceptions.HTTPError as err:
print(f"API Error: {err.response.status_code} - {err.response.text}", file=sys.stderr)
sys.exit(1)
except requests.exceptions.ConnectionError as err:
print(f"Connection Error: {err}", file=sys.stderr)
sys.exit(1)
print(f"Stream complete. Processed {count} unique conversations.")
The script initializes the authentication manager, creates the search client, and iterates through the generator. The try/except block around the generator consumes the stream and handles network failures gracefully. The max_results parameter prevents unbounded execution during testing.
Common Errors & Debugging
Error: 401 Unauthorized
- Cause: The access token is missing, expired, or malformed. The OAuth token request failed silently or the
Authorizationheader contains incorrect spacing. - Fix: Verify the
client_idandclient_secretmatch your Genesys Cloud application settings. Ensure theAuthorizationheader follows the exact formatBearer <token>. Check that the token request returns200 OKand includesaccess_tokenandexpires_in. - Code showing the fix:
# Verify token structure before attaching to header
token = auth.get_token()
if not token or " " in token:
raise ValueError("Invalid OAuth token format")
headers["Authorization"] = f"Bearer {token}"
Error: 403 Forbidden
- Cause: The OAuth client lacks the
search:queryscope, or the client credentials are restricted to a different environment. - Fix: Navigate to your Genesys Cloud application configuration and add
search:queryto the OAuth scopes list. Regenerate the token. Verify thebase_urlmatches the environment where the application is registered. - Code showing the fix:
# Explicit scope validation during token request
payload["scope"] = "search:query"
response = requests.post(token_url, headers=headers, data=payload)
if response.status_code == 403:
raise PermissionError("Missing search:query scope or invalid client credentials")
Error: 429 Too Many Requests
- Cause: The organization has exceeded the API rate limit. Genesys Cloud enforces limits per application and per organization.
- Fix: Implement exponential backoff. Read the
Retry-Afterheader. If missing, wait two seconds. Reduce concurrent requests. The generator above handles this automatically by sleeping and retrying the same page. - Code showing the fix:
if response.status_code == 429:
wait_time = int(response.headers.get("Retry-After", 2))
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
# Loop continues to retry the exact same payload
Error: 502 Bad Gateway or 503 Service Unavailable
- Cause: Genesys Cloud platform maintenance or transient infrastructure failure.
- Fix: Implement circuit breaker logic. Retry with exponential backoff up to three times. If failures persist, halt execution and alert operations teams. Do not retry indefinitely.
- Code showing the fix:
max_retries = 3
for attempt in range(max_retries):
response = self.session.post(self.search_url, headers=headers, json=payload)
if response.status_code not in (502, 503):
break
wait_time = 2 ** attempt
time.sleep(wait_time)
else:
raise ConnectionError("Platform unavailable after multiple retries")