Sending Proactive Notifications to Customers with Prior Web Messaging History
What You Will Build
- You will build a service that identifies customers who initiated a web messaging session in the last 30 days and sends them a targeted proactive notification.
- This solution uses the Genesys Cloud CX REST API for analytics queries and the Messaging API for outbound delivery.
- The tutorial covers implementation in Python using the
requestslibrary for HTTP interactions.
Prerequisites
- OAuth Client Type: A Service Account with the following scopes:
analytics:conversation:read(for querying historical data)message:send(for sending the outbound message)user:read(optional, for resolving user details if needed)
- SDK/API Version: Genesys Cloud CX REST API v2.
- Language/Runtime: Python 3.8+ with
requestsandpython-dateutil. - External Dependencies:
pip install requests python-dateutil
Authentication Setup
Genesys Cloud CX uses OAuth 2.0 for authentication. For backend services, the Client Credentials flow is the standard approach. This flow exchanges a client ID and secret for an access token.
The token expires after 20 minutes. A robust implementation must cache the token and refresh it before expiration to avoid unnecessary network overhead and potential 401 Unauthorized errors during high-volume operations.
import requests
import time
from datetime import datetime, timezone, timedelta
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.access_token = None
self.token_expiry = None
# Base URL depends on region
self.auth_url = f"https://api.{region}.mygenesys.com/oauth/token"
self.api_base_url = f"https://api.{region}.mygenesys.com/api/v2"
def get_token(self) -> str:
"""
Retrieves an access token. Returns cached token if valid.
"""
# Check if we have a valid token
if self.access_token and self.token_expiry and datetime.now(timezone.utc) < self.token_expiry:
return self.access_token
# Request new token
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
# Expires_in is in seconds. Add buffer for safety.
self.token_expiry = datetime.now(timezone.utc) + timedelta(seconds=token_data["expires_in"] - 60)
return self.access_token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error during authentication: {e}")
raise
def get_headers(self) -> dict:
"""
Returns standard headers for API requests including the Bearer token.
"""
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
Implementation
Step 1: Querying Historical Web Messaging Sessions
To identify customers for proactive outreach, you need to query the Analytics API. Specifically, the GET /api/v2/analytics/conversations/details/query endpoint allows complex filtering.
You must filter for:
- Channel:
webchat - Timeframe: Last 30 days.
- Group By:
customerto aggregate sessions per user. - Metrics:
sessions(count of sessions).
The analytics:conversation:read scope is required.
def get_recent_webchat_customers(auth: GenesysAuth, days_back: int = 30) -> list[dict]:
"""
Queries Genesys Cloud for unique customers who had webchat sessions in the last N days.
"""
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days_back)
# Format dates as ISO 8601 strings
start_str = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
end_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
url = f"{auth.api_base_url}/analytics/conversations/details/query"
# The body structure for the analytics query
body = {
"dateFrom": start_str,
"dateTo": end_str,
"groupBy": ["customer"],
"metrics": ["sessions"],
"filters": [
{
"type": "equal",
"field": "channel",
"value": "webchat"
}
],
"size": 100 # Max page size for details query
}
headers = auth.get_headers()
# Handle pagination
all_customers = []
next_page_token = None
while True:
params = {}
if next_page_token:
params["nextPageToken"] = next_page_token
try:
response = requests.post(url, json=body, headers=headers, params=params)
response.raise_for_status()
data = response.json()
# Extract customer info from the result buckets
for bucket in data.get("bucket", []):
# bucket['key']['customer'] contains the customer identifier
# The structure depends on the groupBy. For 'customer', it usually includes id and address.
customer_info = bucket.get("key", {}).get("customer", {})
metrics = bucket.get("metrics", {})
# Only include customers with at least 1 session
if metrics.get("sessions", {}).get("count", 0) > 0:
all_customers.append({
"id": customer_info.get("id"),
"address": customer_info.get("address"), # This is usually the email or phone
"name": customer_info.get("name"),
"sessions": metrics["sessions"]["count"]
})
# Check for pagination
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limit hit. Wait and retry.
retry_after = int(e.response.headers.get("Retry-After", 5))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
else:
print(f"Error querying analytics: {e.response.status_code} - {e.response.text}")
break
except requests.exceptions.RequestException as e:
print(f"Network error during query: {e}")
break
return all_customers
Expected Response Structure (Snippet):
{
"dateFrom": "2023-10-01T00:00:00.000Z",
"dateTo": "2023-11-01T00:00:00.000Z",
"bucket": [
{
"key": {
"customer": {
"id": "12345678-abcd-efgh-ijkl-1234567890ab",
"address": "customer@example.com",
"name": "John Doe"
}
},
"metrics": {
"sessions": {
"count": 2
}
}
}
]
}
Step 2: Constructing the Outbound Message Payload
Genesys Cloud CX sends outbound messages via the POST /api/v2/conversations/messaging/outbound endpoint.
Critical parameters:
from: The sender address. This must be a verified sender address in your Genesys Cloud account.to: The recipient’s address (email or phone). For webchat history, this is typically the email associated with the webchat session.body: The message content. For webchat/email, this is an HTML or Plain Text block.channel: Must beemailorsms. Since we are targeting webchat users,emailis the most appropriate channel unless you have their phone number and consent for SMS.
Note: You cannot send a “webchat” message proactively to a user who is not currently online. You must use Email or SMS to reach them outside an active session.
def prepare_outbound_message(customer: dict, subject: str, body_html: str, sender_address: str) -> dict:
"""
Constructs the JSON payload for the outbound messaging API.
"""
return {
"from": {
"address": sender_address,
"name": "Customer Support Team" # Optional sender name
},
"to": [
{
"address": customer["address"],
"name": customer.get("name", "")
}
],
"subject": subject,
"body": [
{
"contentType": "html",
"content": body_html
}
],
"channel": "email"
}
Step 3: Sending the Message and Handling Delivery Reports
The POST /api/v2/conversations/messaging/outbound call is asynchronous. It returns a conversation ID immediately. The actual delivery happens in the background.
To track success or failure, you should monitor the conversation state or subscribe to webhooks. For this tutorial, we will poll the conversation status briefly to confirm acceptance.
def send_outbound_message(auth: GenesysAuth, payload: dict) -> str:
"""
Sends an outbound message and returns the conversation ID.
"""
url = f"{auth.api_base_url}/conversations/messaging/outbound"
headers = auth.get_headers()
try:
response = requests.post(url, json=payload, headers=headers)
# 202 Accepted is the standard success response for outbound messages
if response.status_code == 202:
response_data = response.json()
conversation_id = response_data.get("conversationId")
print(f"Message accepted. Conversation ID: {conversation_id}")
return conversation_id
elif response.status_code == 409:
# Conflict: Usually means the conversation already exists or rate limit
print(f"Conflict sending message: {response.text}")
return None
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"Rate limited on send. Waiting {retry_after} seconds...")
time.sleep(retry_after)
# Recursive retry (simple implementation)
return send_outbound_message(auth, payload)
else:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
print(f"HTTP Error sending message: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error sending message: {e}")
raise
return None
Complete Working Example
This script combines authentication, querying, and sending. It includes a simple delay between sends to respect rate limits (Genesys Cloud typically allows 50-100 requests per second for messaging, but spreading them out is safer for bulk operations).
import requests
import time
from datetime import datetime, timezone, timedelta
import sys
# --- Configuration ---
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REGION = "us-east-1"
SENDER_EMAIL = "verified.sender@yourdomain.com"
SUBJECT = "We missed you! Here is an update."
BODY_HTML = "<p>Hello,</p><p>We noticed you were chatting with us recently. How can we help you further?</p>"
# --- Classes and Functions from Previous Steps ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.access_token = None
self.token_expiry = None
self.auth_url = f"https://api.{region}.mygenesys.com/oauth/token"
self.api_base_url = f"https://api.{region}.mygenesys.com/api/v2"
def get_token(self) -> str:
if self.access_token and self.token_expiry and datetime.now(timezone.utc) < self.token_expiry:
return self.access_token
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.auth_url, headers=headers, data=data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = datetime.now(timezone.utc) + timedelta(seconds=token_data["expires_in"] - 60)
return self.access_token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error during authentication: {e}")
raise
def get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self.get_token()}",
"Content-Type": "application/json"
}
def get_recent_webchat_customers(auth: GenesysAuth, days_back: int = 30) -> list[dict]:
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days_back)
start_str = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
end_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
url = f"{auth.api_base_url}/analytics/conversations/details/query"
body = {
"dateFrom": start_str,
"dateTo": end_str,
"groupBy": ["customer"],
"metrics": ["sessions"],
"filters": [{"type": "equal", "field": "channel", "value": "webchat"}],
"size": 100
}
headers = auth.get_headers()
all_customers = []
next_page_token = None
while True:
params = {"nextPageToken": next_page_token} if next_page_token else {}
try:
response = requests.post(url, json=body, headers=headers, params=params)
response.raise_for_status()
data = response.json()
for bucket in data.get("bucket", []):
customer_info = bucket.get("key", {}).get("customer", {})
metrics = bucket.get("metrics", {})
if metrics.get("sessions", {}).get("count", 0) > 0:
# Filter out invalid emails if necessary
addr = customer_info.get("address", "")
if "@" in addr: # Basic validation for email channel
all_customers.append({
"id": customer_info.get("id"),
"address": addr,
"name": customer_info.get("name", ""),
"sessions": metrics["sessions"]["count"]
})
next_page_token = data.get("nextPageToken")
if not next_page_token:
break
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
time.sleep(int(e.response.headers.get("Retry-After", 5)))
continue
else:
print(f"Query error: {e}")
break
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
break
return all_customers
def send_outbound_message(auth: GenesysAuth, payload: dict) -> str:
url = f"{auth.api_base_url}/conversations/messaging/outbound"
headers = auth.get_headers()
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 202:
return response.json().get("conversationId")
elif response.status_code == 429:
time.sleep(int(response.headers.get("Retry-After", 5)))
return send_outbound_message(auth, payload)
else:
print(f"Send failed: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Network error sending: {e}")
return None
def main():
print("Initializing Genesys Auth...")
auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, REGION)
print("Fetching customers with webchat history (last 30 days)...")
customers = get_recent_webchat_customers(auth, days_back=30)
print(f"Found {len(customers)} customers.")
if not customers:
print("No customers to notify.")
return
sent_count = 0
failed_count = 0
for i, customer in enumerate(customers):
print(f"Processing {i+1}/{len(customers)}: {customer['address']}")
payload = {
"from": {"address": SENDER_EMAIL, "name": "Support"},
"to": [{"address": customer["address"], "name": customer["name"]}],
"subject": SUBJECT,
"body": [{"contentType": "html", "content": BODY_HTML}],
"channel": "email"
}
conv_id = send_outbound_message(auth, payload)
if conv_id:
sent_count += 1
print(f" -> Sent successfully. ConvID: {conv_id}")
else:
failed_count += 1
print(f" -> Failed to send.")
# Rate limiting: Sleep 0.5s between sends to be safe
time.sleep(0.5)
print(f"Done. Sent: {sent_count}, Failed: {failed_count}")
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The service account lacks the required OAuth scopes.
Fix: Ensure the client ID/secret pair has analytics:conversation:read and message:send scopes assigned in the Genesys Cloud Admin UI under Security > OAuth Clients.
Error: 429 Too Many Requests
Cause: You have exceeded the API rate limit. Genesys Cloud enforces rate limits per client ID.
Fix: Implement exponential backoff. The response header Retry-After indicates how many seconds to wait. The code above includes basic retry logic, but for large batches, consider spreading requests over a longer period or using a queue.
Error: 400 Bad Request - “Invalid recipient address”
Cause: The to address is malformed or not a valid email/phone format, or it is not verified (for email).
Fix: Validate the email address format before sending. Ensure the sender address is verified in Genesys Cloud.
Error: 409 Conflict
Cause: You are trying to send a message to a conversation that is already active or recently closed, or the rate limit for outbound messages is hit.
Fix: Check if the customer has an active conversation. If so, append to that conversation instead of creating a new outbound one.