Selecting the Correct OAuth Grant Type for Server-Side Reporting in Genesys Cloud
What You Will Build
- A Python script that authenticates to Genesys Cloud and retrieves historical conversation analytics data.
- Implementation of both the Client Credentials Grant and the Authorization Code Grant flows using the
requestslibrary. - Decision logic to determine which grant type fits specific reporting use cases based on data ownership requirements.
Prerequisites
- Platform: Genesys Cloud CX
- API Version: V2 REST API
- Language: Python 3.9+
- Dependencies:
requests(pip install requests),python-dotenv(pip install python-dotenv) - Genesys Cloud Org: You need a Genesys Cloud organization with API access enabled.
- Credentials:
- For Client Credentials: An API Key and API Secret generated in the Admin Console under Organization > API keys.
- For Authorization Code: A Public App registered in the Admin Console under Applications > Public apps, with a redirect URI configured.
Authentication Setup
The core distinction between these two grant types lies in identity. Client Credentials authenticates the application itself. Authorization Code authenticates a specific user, allowing the application to act on their behalf.
Client Credentials Grant
This flow is used when the application needs to access data that is not tied to a specific user’s permission scope, such as global configuration, user directories, or aggregate analytics. The application acts as a service account.
Required Scope: analytics:conversation:view (for analytics) or user:view (for user data).
import requests
import json
import time
from typing import Optional
class GenesysClientCredentials:
def __init__(self, client_id: str, client_secret: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.environment = environment
self.token_endpoint = f"https://api.{environment}/oauth/token"
self.access_token: Optional[str] = None
self.token_expiry: float = 0
def get_access_token(self) -> str:
"""
Retrieves an access token using the Client Credentials Grant.
Handles token caching to avoid unnecessary API calls.
"""
# Check if token is still valid (with 30s buffer)
if self.access_token and time.time() < (self.token_expiry - 30):
return self.access_token
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "analytics:conversation:view" # Specific scope for analytics
}
try:
response = requests.post(
self.token_endpoint,
headers=headers,
data=payload
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception("Invalid client_id or client_secret") from e
elif e.response.status_code == 403:
raise Exception("Invalid scope or API key not enabled") from e
else:
raise Exception(f"Authentication failed: {e.response.text}") from e
def make_api_call(self, method: str, path: str, params: Optional[dict] = None) -> dict:
"""
Makes a generic API call using the cached access token.
"""
base_url = f"https://api.{self.environment}"
url = f"{base_url}{path}"
headers = {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json"
}
if method == "GET":
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=params)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
Authorization Code Grant
This flow is used when the reporting application must access data that requires specific user permissions, such as viewing a user’s own activity, accessing data restricted by organizational units (OUs) that the app’s API key does not cover, or auditing actions performed by a specific user.
Required Scope: analytics:conversation:view plus any user-specific scopes required.
import requests
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from typing import Optional
class AuthCodeHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.code = None
super().__init__(*args, **kwargs)
def do_GET(self):
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if "code" in params:
self.code = params["code"][0]
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Authentication Successful. You can close this window.</h1>")
else:
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Authentication Failed. No code received.</h1>")
class GenesysAuthorizationCode:
def __init__(self, client_id: str, client_secret: str, redirect_uri: str, environment: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.environment = environment
self.token_endpoint = f"https://api.{environment}/oauth/token"
self.authorize_endpoint = f"https://api.{environment}/oauth/authorize"
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: float = 0
def get_authorization_url(self, scopes: list) -> str:
"""
Generates the URL to redirect the user to for login.
"""
scope_param = " ".join(scopes)
params = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"scope": scope_param,
"state": "random_state_string_for_csrf_protection" # In production, generate a unique state
}
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
return f"{self.authorize_endpoint}?{query_string}"
def start_local_server(self) -> str:
"""
Starts a local HTTP server to capture the authorization code.
"""
import threading
import socketserver
# Parse port from redirect_uri
parsed_uri = urlparse(self.redirect_uri)
port = parsed_uri.port if parsed_uri.port else 8080
server = HTTPServer(("localhost", port), AuthCodeHandler)
def run_server():
server.handle_request() # Handle one request then stop
thread = threading.Thread(target=run_server)
thread.daemon = True
thread.start()
return thread
def exchange_code_for_token(self, auth_code: str) -> dict:
"""
Exchanges the authorization code for access and refresh tokens.
"""
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {
"grant_type": "authorization_code",
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": auth_code,
"redirect_uri": self.redirect_uri
}
try:
response = requests.post(
self.token_endpoint,
headers=headers,
data=payload
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.refresh_token = token_data.get("refresh_token") # Crucial for long-lived sessions
self.token_expiry = time.time() + token_data["expires_in"]
return token_data
except requests.exceptions.HTTPError as e:
raise Exception(f"Token exchange failed: {e.response.text}") from e
def refresh_access_token(self) -> str:
"""
Uses the refresh token to obtain a new access token.
"""
if not self.refresh_token:
raise Exception("No refresh token available")
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
try:
response = requests.post(
self.token_endpoint,
headers=headers,
data=payload
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.refresh_token = token_data.get("refresh_token", self.refresh_token)
self.token_expiry = time.time() + token_data["expires_in"]
return self.access_token
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception("Refresh token expired or invalid. Re-authentication required.") from e
raise Exception(f"Token refresh failed: {e.response.text}") from e
Implementation
Step 1: Defining the Reporting Query
Regardless of the authentication method, the core task is retrieving analytics data. We will use the /api/v2/analytics/conversations/details/query endpoint. This endpoint allows for complex filtering and aggregation of conversation data.
OAuth Scope: analytics:conversation:view
def build_analytics_query() -> dict:
"""
Constructs a query for the Analytics API.
This example retrieves call duration statistics for the last 24 hours.
"""
from datetime import datetime, timedelta
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
query = {
"view": "default",
"groupBy": ["mediaType"],
"filter": {
"type": "and",
"clauses": [
{
"path": "startTime",
"type": "greaterThanOrEqualTo",
"value": start_time.isoformat() + "Z"
},
{
"path": "startTime",
"type": "lessThan",
"value": end_time.isoformat() + "Z"
}
]
},
"interval": "PT1H", # Hourly intervals
"select": [
{"type": "count"},
{"type": "sum", "path": "wrapUpDuration"},
{"type": "avg", "path": "wrapUpDuration"}
]
}
return query
Step 2: Executing the Report with Client Credentials
This approach is ideal for dashboarding tools that display aggregate data to multiple users. The data is not personal to the user running the report; it is organizational data.
def run_aggregate_report(client: GenesysClientCredentials) -> dict:
"""
Executes the analytics query using Client Credentials.
"""
query = build_analytics_query()
# The endpoint expects the query as the request body for POST
# or as query parameters for GET. POST is preferred for complex queries.
endpoint = "/api/v2/analytics/conversations/details/query"
try:
response = client.make_api_call("POST", endpoint, params=query)
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
print("Rate limit exceeded. Implement exponential backoff.")
time.sleep(5)
return client.make_api_call("POST", endpoint, params=query)
raise
Step 3: Executing the Report with Authorization Code
This approach is ideal for personalized reports, such as “My Performance Summary” or auditing access where the system must verify that the logged-in user has permission to view specific queues or users.
def run_personalized_report(client: GenesysAuthorizationCode) -> dict:
"""
Executes the analytics query using Authorization Code.
Note: The query might need to be filtered by the specific user's ID if
the scope is limited to user-specific data.
"""
query = build_analytics_query()
# Add a filter for the current user if necessary
# In a real app, you would pass the user_id from the session
# query["filter"]["clauses"].append({
# "path": "user.id",
# "type": "equal",
# "value": "current_user_id_here"
# })
endpoint = "/api/v2/analytics/conversations/details/query"
try:
# Ensure token is fresh before making the call
if time.time() > (client.token_expiry - 30):
client.refresh_access_token()
headers = {
"Authorization": f"Bearer {client.access_token}",
"Content-Type": "application/json"
}
base_url = f"https://api.{client.environment}"
response = requests.post(
f"{base_url}{endpoint}",
headers=headers,
json=query
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
# Token might be expired, try refresh
try:
client.refresh_access_token()
# Retry logic here
except Exception:
raise Exception("Session expired. Please log in again.")
raise
Complete Working Example
Below is a unified script that demonstrates how to choose the grant type based on a configuration flag and execute the report.
import os
from dotenv import load_dotenv
import json
import time
from datetime import datetime, timedelta
# Load environment variables from .env file
load_dotenv()
# --- Configuration ---
ENVIRONMENT = os.getenv("GENESYS_ENV", "mypurecloud.com")
GRANT_TYPE = os.getenv("GRANT_TYPE", "client_credentials") # 'client_credentials' or 'authorization_code'
# Client Credentials Config
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
# Authorization Code Config
REDIRECT_URI = os.getenv("REDIRECT_URI", "http://localhost:8080/callback")
def get_client():
if GRANT_TYPE == "client_credentials":
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("CLIENT_ID and CLIENT_SECRET are required for client_credentials")
return GenesysClientCredentials(CLIENT_ID, CLIENT_SECRET, ENVIRONMENT)
elif GRANT_TYPE == "authorization_code":
if not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("CLIENT_ID and CLIENT_SECRET are required for authorization_code")
client = GenesysAuthorizationCode(CLIENT_ID, CLIENT_SECRET, REDIRECT_URI, ENVIRONMENT)
# In a real app, you would check for a stored refresh_token here
# If no refresh_token, trigger the flow below
return client
else:
raise ValueError(f"Unsupported GRANT_TYPE: {GRANT_TYPE}")
def run_analytics_flow():
print(f"Initializing {GRANT_TYPE} flow...")
if GRANT_TYPE == "client_credentials":
client = get_client()
print("Fetching access token...")
token = client.get_access_token()
print("Token acquired.")
print("Running aggregate report...")
data = run_aggregate_report(client)
print(json.dumps(data, indent=2))
elif GRANT_TYPE == "authorization_code":
client = get_client()
# Check if we have a refresh token (simulated check)
# In production, load from secure storage
if not client.refresh_token:
print("No refresh token found. Starting authorization flow...")
auth_url = client.get_authorization_url(["analytics:conversation:view"])
print(f"Please visit this URL to authorize: {auth_url}")
# Start local server to catch the callback
thread = client.start_local_server()
webbrowser.open(auth_url)
# Wait for the code to be captured
while not client.code:
time.sleep(1)
print("Code captured. Exchanging for token...")
tokens = client.exchange_code_for_token(client.code)
print("Token exchange successful.")
# Save refresh_token to secure storage in production
else:
print("Refresh token found. Refreshing access token...")
client.refresh_access_token()
print("Running personalized report...")
data = run_personalized_report(client)
print(json.dumps(data, indent=2))
if __name__ == "__main__":
try:
run_analytics_flow()
except Exception as e:
print(f"Error: {e}")
Common Errors & Debugging
Error: 401 Unauthorized (Invalid Grant)
Cause: The client_id or client_secret is incorrect, or the API key is disabled. For Authorization Code, the redirect_uri in the exchange request does not match the one registered in the Public App.
Fix:
- Verify the API Key is active in Organization > API keys.
- For Authorization Code, ensure the
redirect_uristring matches exactly, including trailing slashes.
# Debugging snippet
response = requests.post(token_endpoint, data=payload)
if response.status_code == 401:
print("Check your Client ID and Secret.")
print("Response body:", response.text)
Error: 403 Forbidden (Insufficient Scope)
Cause: The requested scope (e.g., analytics:conversation:view) was not included in the token request, or the user/API key does not have permission to view the requested data.
Fix:
- Ensure the
scopeparameter in the token request includes all necessary scopes. - For Client Credentials, check that the API Key has the correct permissions in the Admin Console.
- For Authorization Code, ensure the user logging in has the necessary role permissions (e.g., Supervisor or Agent with analytics access).
Error: 429 Too Many Requests
Cause: The application has exceeded the rate limit for the specific endpoint. Analytics endpoints often have lower limits than configuration endpoints.
Fix: Implement exponential backoff and jitter.
import random
def make_request_with_retry(url, headers, max_retries=3):
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code == 429:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
return response
raise Exception("Max retries exceeded")
Error: 500 Internal Server Error (Invalid Query)
Cause: The JSON payload sent to the analytics query endpoint is malformed or contains invalid date formats.
Fix: Validate the query structure against the Genesys Cloud Analytics API documentation. Ensure dates are in ISO 8601 format with a Z suffix for UTC.