Sending Proactive Notifications to Customers with Prior Web Messaging History

Sending Proactive Notifications to Customers with Prior Web Messaging History

What You Will Build

  • You will build a service that identifies customers who initiated a web messaging session in the last 30 days and sends them a targeted proactive notification.
  • This solution uses the Genesys Cloud CX REST API for analytics queries and the Messaging API for outbound delivery.
  • The tutorial covers implementation in Python using the requests library for HTTP interactions.

Prerequisites

  • OAuth Client Type: A Service Account with the following scopes:
    • analytics:conversation:read (for querying historical data)
    • message:send (for sending the outbound message)
    • user:read (optional, for resolving user details if needed)
  • SDK/API Version: Genesys Cloud CX REST API v2.
  • Language/Runtime: Python 3.8+ with requests and python-dateutil.
  • External Dependencies:
    • pip install requests python-dateutil

Authentication Setup

Genesys Cloud CX uses OAuth 2.0 for authentication. For backend services, the Client Credentials flow is the standard approach. This flow exchanges a client ID and secret for an access token.

The token expires after 20 minutes. A robust implementation must cache the token and refresh it before expiration to avoid unnecessary network overhead and potential 401 Unauthorized errors during high-volume operations.

import requests
import time
from datetime import datetime, timezone, timedelta

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.region = region
        self.access_token = None
        self.token_expiry = None
        # Base URL depends on region
        self.auth_url = f"https://api.{region}.mygenesys.com/oauth/token"
        self.api_base_url = f"https://api.{region}.mygenesys.com/api/v2"

    def get_token(self) -> str:
        """
        Retrieves an access token. Returns cached token if valid.
        """
        # Check if we have a valid token
        if self.access_token and self.token_expiry and datetime.now(timezone.utc) < self.token_expiry:
            return self.access_token

        # Request new token
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.auth_url, headers=headers, data=data)
            response.raise_for_status()
            token_data = response.json()
            
            self.access_token = token_data["access_token"]
            # Expires_in is in seconds. Add buffer for safety.
            self.token_expiry = datetime.now(timezone.utc) + timedelta(seconds=token_data["expires_in"] - 60)
            
            return self.access_token

        except requests.exceptions.HTTPError as e:
            print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
            raise
        except requests.exceptions.RequestException as e:
            print(f"Network error during authentication: {e}")
            raise

    def get_headers(self) -> dict:
        """
        Returns standard headers for API requests including the Bearer token.
        """
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

Implementation

Step 1: Querying Historical Web Messaging Sessions

To identify customers for proactive outreach, you need to query the Analytics API. Specifically, the GET /api/v2/analytics/conversations/details/query endpoint allows complex filtering.

You must filter for:

  1. Channel: webchat
  2. Timeframe: Last 30 days.
  3. Group By: customer to aggregate sessions per user.
  4. Metrics: sessions (count of sessions).

The analytics:conversation:read scope is required.

def get_recent_webchat_customers(auth: GenesysAuth, days_back: int = 30) -> list[dict]:
    """
    Queries Genesys Cloud for unique customers who had webchat sessions in the last N days.
    """
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days_back)

    # Format dates as ISO 8601 strings
    start_str = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")

    url = f"{auth.api_base_url}/analytics/conversations/details/query"

    # The body structure for the analytics query
    body = {
        "dateFrom": start_str,
        "dateTo": end_str,
        "groupBy": ["customer"],
        "metrics": ["sessions"],
        "filters": [
            {
                "type": "equal",
                "field": "channel",
                "value": "webchat"
            }
        ],
        "size": 100  # Max page size for details query
    }

    headers = auth.get_headers()
    
    # Handle pagination
    all_customers = []
    next_page_token = None
    
    while True:
        params = {}
        if next_page_token:
            params["nextPageToken"] = next_page_token
            
        try:
            response = requests.post(url, json=body, headers=headers, params=params)
            response.raise_for_status()
            data = response.json()
            
            # Extract customer info from the result buckets
            for bucket in data.get("bucket", []):
                # bucket['key']['customer'] contains the customer identifier
                # The structure depends on the groupBy. For 'customer', it usually includes id and address.
                customer_info = bucket.get("key", {}).get("customer", {})
                metrics = bucket.get("metrics", {})
                
                # Only include customers with at least 1 session
                if metrics.get("sessions", {}).get("count", 0) > 0:
                    all_customers.append({
                        "id": customer_info.get("id"),
                        "address": customer_info.get("address"), # This is usually the email or phone
                        "name": customer_info.get("name"),
                        "sessions": metrics["sessions"]["count"]
                    })
            
            # Check for pagination
            next_page_token = data.get("nextPageToken")
            if not next_page_token:
                break
                
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                # Rate limit hit. Wait and retry.
                retry_after = int(e.response.headers.get("Retry-After", 5))
                print(f"Rate limited. Waiting {retry_after} seconds...")
                time.sleep(retry_after)
                continue
            else:
                print(f"Error querying analytics: {e.response.status_code} - {e.response.text}")
                break
        except requests.exceptions.RequestException as e:
            print(f"Network error during query: {e}")
            break
            
    return all_customers

Expected Response Structure (Snippet):

{
  "dateFrom": "2023-10-01T00:00:00.000Z",
  "dateTo": "2023-11-01T00:00:00.000Z",
  "bucket": [
    {
      "key": {
        "customer": {
          "id": "12345678-abcd-efgh-ijkl-1234567890ab",
          "address": "customer@example.com",
          "name": "John Doe"
        }
      },
      "metrics": {
        "sessions": {
          "count": 2
        }
      }
    }
  ]
}

Step 2: Constructing the Outbound Message Payload

Genesys Cloud CX sends outbound messages via the POST /api/v2/conversations/messaging/outbound endpoint.

Critical parameters:

  • from: The sender address. This must be a verified sender address in your Genesys Cloud account.
  • to: The recipient’s address (email or phone). For webchat history, this is typically the email associated with the webchat session.
  • body: The message content. For webchat/email, this is an HTML or Plain Text block.
  • channel: Must be email or sms. Since we are targeting webchat users, email is the most appropriate channel unless you have their phone number and consent for SMS.

Note: You cannot send a “webchat” message proactively to a user who is not currently online. You must use Email or SMS to reach them outside an active session.

def prepare_outbound_message(customer: dict, subject: str, body_html: str, sender_address: str) -> dict:
    """
    Constructs the JSON payload for the outbound messaging API.
    """
    return {
        "from": {
            "address": sender_address,
            "name": "Customer Support Team" # Optional sender name
        },
        "to": [
            {
                "address": customer["address"],
                "name": customer.get("name", "")
            }
        ],
        "subject": subject,
        "body": [
            {
                "contentType": "html",
                "content": body_html
            }
        ],
        "channel": "email"
    }

Step 3: Sending the Message and Handling Delivery Reports

The POST /api/v2/conversations/messaging/outbound call is asynchronous. It returns a conversation ID immediately. The actual delivery happens in the background.

To track success or failure, you should monitor the conversation state or subscribe to webhooks. For this tutorial, we will poll the conversation status briefly to confirm acceptance.

def send_outbound_message(auth: GenesysAuth, payload: dict) -> str:
    """
    Sends an outbound message and returns the conversation ID.
    """
    url = f"{auth.api_base_url}/conversations/messaging/outbound"
    headers = auth.get_headers()

    try:
        response = requests.post(url, json=payload, headers=headers)
        
        # 202 Accepted is the standard success response for outbound messages
        if response.status_code == 202:
            response_data = response.json()
            conversation_id = response_data.get("conversationId")
            print(f"Message accepted. Conversation ID: {conversation_id}")
            return conversation_id
        elif response.status_code == 409:
            # Conflict: Usually means the conversation already exists or rate limit
            print(f"Conflict sending message: {response.text}")
            return None
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"Rate limited on send. Waiting {retry_after} seconds...")
            time.sleep(retry_after)
            # Recursive retry (simple implementation)
            return send_outbound_message(auth, payload)
        else:
            response.raise_for_status()
            
    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error sending message: {e.response.status_code} - {e.response.text}")
        raise
    except requests.exceptions.RequestException as e:
        print(f"Network error sending message: {e}")
        raise

    return None

Complete Working Example

This script combines authentication, querying, and sending. It includes a simple delay between sends to respect rate limits (Genesys Cloud typically allows 50-100 requests per second for messaging, but spreading them out is safer for bulk operations).

import requests
import time
from datetime import datetime, timezone, timedelta
import sys

# --- Configuration ---
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REGION = "us-east-1"
SENDER_EMAIL = "verified.sender@yourdomain.com"
SUBJECT = "We missed you! Here is an update."
BODY_HTML = "<p>Hello,</p><p>We noticed you were chatting with us recently. How can we help you further?</p>"

# --- Classes and Functions from Previous Steps ---

class GenesysAuth:
    def __init__(self, client_id: str, client_secret: str, region: str = "us-east-1"):
        self.client_id = client_id
        self.client_secret = client_secret
        self.region = region
        self.access_token = None
        self.token_expiry = None
        self.auth_url = f"https://api.{region}.mygenesys.com/oauth/token"
        self.api_base_url = f"https://api.{region}.mygenesys.com/api/v2"

    def get_token(self) -> str:
        if self.access_token and self.token_expiry and datetime.now(timezone.utc) < self.token_expiry:
            return self.access_token

        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }

        try:
            response = requests.post(self.auth_url, headers=headers, data=data)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]
            self.token_expiry = datetime.now(timezone.utc) + timedelta(seconds=token_data["expires_in"] - 60)
            return self.access_token
        except requests.exceptions.HTTPError as e:
            print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
            raise
        except requests.exceptions.RequestException as e:
            print(f"Network error during authentication: {e}")
            raise

    def get_headers(self) -> dict:
        return {
            "Authorization": f"Bearer {self.get_token()}",
            "Content-Type": "application/json"
        }

def get_recent_webchat_customers(auth: GenesysAuth, days_back: int = 30) -> list[dict]:
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days_back)
    start_str = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    end_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")

    url = f"{auth.api_base_url}/analytics/conversations/details/query"
    body = {
        "dateFrom": start_str,
        "dateTo": end_str,
        "groupBy": ["customer"],
        "metrics": ["sessions"],
        "filters": [{"type": "equal", "field": "channel", "value": "webchat"}],
        "size": 100
    }
    headers = auth.get_headers()
    all_customers = []
    next_page_token = None

    while True:
        params = {"nextPageToken": next_page_token} if next_page_token else {}
        try:
            response = requests.post(url, json=body, headers=headers, params=params)
            response.raise_for_status()
            data = response.json()
            for bucket in data.get("bucket", []):
                customer_info = bucket.get("key", {}).get("customer", {})
                metrics = bucket.get("metrics", {})
                if metrics.get("sessions", {}).get("count", 0) > 0:
                    # Filter out invalid emails if necessary
                    addr = customer_info.get("address", "")
                    if "@" in addr: # Basic validation for email channel
                        all_customers.append({
                            "id": customer_info.get("id"),
                            "address": addr,
                            "name": customer_info.get("name", ""),
                            "sessions": metrics["sessions"]["count"]
                        })
            next_page_token = data.get("nextPageToken")
            if not next_page_token:
                break
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                time.sleep(int(e.response.headers.get("Retry-After", 5)))
                continue
            else:
                print(f"Query error: {e}")
                break
        except requests.exceptions.RequestException as e:
            print(f"Network error: {e}")
            break
    return all_customers

def send_outbound_message(auth: GenesysAuth, payload: dict) -> str:
    url = f"{auth.api_base_url}/conversations/messaging/outbound"
    headers = auth.get_headers()
    try:
        response = requests.post(url, json=payload, headers=headers)
        if response.status_code == 202:
            return response.json().get("conversationId")
        elif response.status_code == 429:
            time.sleep(int(response.headers.get("Retry-After", 5)))
            return send_outbound_message(auth, payload)
        else:
            print(f"Send failed: {response.status_code} - {response.text}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"Network error sending: {e}")
        return None

def main():
    print("Initializing Genesys Auth...")
    auth = GenesysAuth(CLIENT_ID, CLIENT_SECRET, REGION)

    print("Fetching customers with webchat history (last 30 days)...")
    customers = get_recent_webchat_customers(auth, days_back=30)
    
    print(f"Found {len(customers)} customers.")
    
    if not customers:
        print("No customers to notify.")
        return

    sent_count = 0
    failed_count = 0

    for i, customer in enumerate(customers):
        print(f"Processing {i+1}/{len(customers)}: {customer['address']}")
        
        payload = {
            "from": {"address": SENDER_EMAIL, "name": "Support"},
            "to": [{"address": customer["address"], "name": customer["name"]}],
            "subject": SUBJECT,
            "body": [{"contentType": "html", "content": BODY_HTML}],
            "channel": "email"
        }

        conv_id = send_outbound_message(auth, payload)
        
        if conv_id:
            sent_count += 1
            print(f"  -> Sent successfully. ConvID: {conv_id}")
        else:
            failed_count += 1
            print(f"  -> Failed to send.")
            
        # Rate limiting: Sleep 0.5s between sends to be safe
        time.sleep(0.5)

    print(f"Done. Sent: {sent_count}, Failed: {failed_count}")

if __name__ == "__main__":
    main()

Common Errors & Debugging

Error: 403 Forbidden

Cause: The service account lacks the required OAuth scopes.
Fix: Ensure the client ID/secret pair has analytics:conversation:read and message:send scopes assigned in the Genesys Cloud Admin UI under Security > OAuth Clients.

Error: 429 Too Many Requests

Cause: You have exceeded the API rate limit. Genesys Cloud enforces rate limits per client ID.
Fix: Implement exponential backoff. The response header Retry-After indicates how many seconds to wait. The code above includes basic retry logic, but for large batches, consider spreading requests over a longer period or using a queue.

Error: 400 Bad Request - “Invalid recipient address”

Cause: The to address is malformed or not a valid email/phone format, or it is not verified (for email).
Fix: Validate the email address format before sending. Ensure the sender address is verified in Genesys Cloud.

Error: 409 Conflict

Cause: You are trying to send a message to a conversation that is already active or recently closed, or the rate limit for outbound messages is hit.
Fix: Check if the customer has an active conversation. If so, append to that conversation instead of creating a new outbound one.

Official References