Send Canned Responses During a Chat Interaction via the Genesys Cloud Conversations API
What You Will Build
This tutorial demonstrates how to programmatically send a pre-defined canned response to an active chat conversation using the Genesys Cloud Conversations API. You will build a Python script that authenticates via OAuth, retrieves a specific canned response by ID, and injects the text payload into an ongoing chat session using the POST /api/v2/conversations/chat/events endpoint. The implementation handles token management, error retries, and payload construction required for real-time messaging.
Prerequisites
- OAuth Client Type: Confidential Client (Client Credentials Flow).
- Required Scopes:
conversation:chat:write,cannedresponses:read,user:read(to identify the agent if simulating user action, though system-level sends often requireconversation:chat:write). - SDK/API Version: Genesys Cloud API v2.
- Language/Runtime: Python 3.9+.
- External Dependencies:
requests: For HTTP communication.purecloud-platform-client: Official Genesys Cloud Python SDK (optional, but this tutorial usesrequestsfor transparency on raw payloads).
Authentication Setup
Genesys Cloud uses OAuth 2.0 for authentication. For server-to-server integrations, the Client Credentials flow is the standard. You must obtain an access token before making any API calls. The token expires after a set duration (typically 3600 seconds), so production code should cache and refresh tokens.
The following code demonstrates a robust authentication helper. It uses the requests library to fetch the token from the Genesys Cloud authorization server.
import requests
import time
import json
import os
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, region: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.auth_url = f"https://login.{region}/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_access_token(self) -> str:
"""
Retrieves an OAuth access token if valid, or fetches a new one.
Returns:
str: The OAuth Bearer token.
"""
# Check if token is still valid (with a 5-minute buffer)
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.auth_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get("access_token")
self.token_expiry = time.time() + token_data.get("expires_in", 3600)
return self.access_token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error during authentication: {e}")
raise
# Initialize with your credentials
# In production, retrieve these from environment variables or a secrets manager
auth_client = GenesysAuth(
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET"),
region=os.getenv("GENESYS_REGION", "mypurecloud.com")
)
Implementation
Step 1: Retrieve the Canned Response
Before sending a message, you need the content of the canned response. While you can hardcode the text, best practice is to fetch it by ID to ensure synchronization with your Genesys Cloud configuration. This allows your bot or application to update automatically when an admin changes the canned text.
Endpoint: GET /api/v2/cannedresponses/{id}
Scope: cannedresponses:read
def get_canned_response(auth: GenesysAuth, canned_id: str) -> dict:
"""
Fetches the canned response object by ID.
Args:
auth: GenesysAuth instance.
canned_id: The UUID of the canned response.
Returns:
dict: The canned response object containing 'text' and 'name'.
"""
base_url = f"https://api.{auth.region}"
endpoint = f"/api/v2/cannedresponses/{canned_id}"
url = base_url + endpoint
headers = {
"Authorization": f"Bearer {auth.get_access_token()}",
"Content-Type": "application/json"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"Canned response {canned_id} not found.")
elif e.response.status_code == 403:
print("Access forbidden. Ensure 'cannedresponses:read' scope is granted.")
else:
print(f"HTTP Error: {e}")
raise
except Exception as e:
print(f"Error fetching canned response: {e}")
raise
Step 2: Construct the Chat Event Payload
The Genesys Cloud Conversations API uses a generic event structure for all conversation types (Chat, Voice, Email, etc.). For Chat, you must send a POST request to /api/v2/conversations/chat/events.
The payload requires three critical components:
conversationId: The UUID of the active chat session.eventType: Must beuserMessagefor outbound messages from the agent/system side.body: The text content.
Critical Detail: If you are sending this as a system bot or an automated agent, you may need to specify the userId if the message should appear to come from a specific agent. If omitted, it may appear as “System” or require specific permissions depending on your tenant configuration. For standard agent-assisted canned sends, the message is associated with the active participant.
Endpoint: POST /api/v2/conversations/chat/events
Scope: conversation:chat:write
def send_chat_message(auth: GenesysAuth, conversation_id: str, text: str, user_id: str = None) -> dict:
"""
Sends a text message to an active chat conversation.
Args:
auth: GenesysAuth instance.
conversation_id: The UUID of the chat conversation.
text: The message content.
user_id: Optional. The UUID of the agent sending the message.
If None, it may be treated as a system message or require
specific context depending on tenant settings.
Returns:
dict: The response from the API, typically containing the event ID.
"""
base_url = f"https://api.{auth.region}"
endpoint = f"/api/v2/conversations/chat/events"
url = base_url + endpoint
headers = {
"Authorization": f"Bearer {auth.get_access_token()}",
"Content-Type": "application/json"
}
# Construct the event payload
payload = {
"conversationId": conversation_id,
"eventType": "userMessage",
"body": text
}
# If a specific agent is sending the message, include the userId
# This ensures the message is attributed correctly in the transcript
if user_id:
payload["userId"] = user_id
try:
response = requests.post(url, headers=headers, json=payload)
# Handle Rate Limiting (429)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
print(f"Rate limited. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return send_chat_message(auth, conversation_id, text, user_id)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
print(f"Bad Request. Check payload structure. Response: {e.response.text}")
elif e.response.status_code == 404:
print(f"Conversation {conversation_id} not found or inactive.")
elif e.response.status_code == 409:
print("Conflict. The conversation may have ended or is in an invalid state.")
else:
print(f"HTTP Error: {e}")
raise
except Exception as e:
print(f"Error sending chat message: {e}")
raise
Step 3: Execute the Flow
This step combines the authentication, retrieval, and sending logic into a single executable flow. This demonstrates how to handle the end-to-end process.
def main():
# Configuration
CANONED_RESPONSE_ID = "YOUR_CANNED_RESPONSE_UUID"
CONVERSATION_ID = "YOUR_ACTIVE_CHAT_CONVERSATION_UUID"
AGENT_USER_ID = "YOUR_AGENT_USER_UUID" # Optional but recommended for attribution
try:
# Step 1: Get the canned response text
print(f"Fetching canned response: {CANONED_RESPONSE_ID}")
canned_obj = get_canned_response(auth_client, CANONED_RESPONSE_ID)
message_text = canned_obj.get("text")
canned_name = canned_obj.get("name")
if not message_text:
raise ValueError("Canned response has no text content.")
print(f"Canned Response '{canned_name}' retrieved: \"{message_text}\"")
# Step 2: Send the message
print(f"Sending message to conversation: {CONVERSATION_ID}")
result = send_chat_message(
auth=auth_client,
conversation_id=CONVERSATION_ID,
text=message_text,
user_id=AGENT_USER_ID
)
print(f"Message sent successfully. Event ID: {result.get('id')}")
except Exception as e:
print(f"Failed to process request: {e}")
if __name__ == "__main__":
main()
Complete Working Example
The following is the complete, copy-pasteable Python script. It includes error handling, retry logic for rate limits, and clear separation of concerns. Save this as send_canned_chat.py.
import requests
import time
import json
import os
import sys
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, region: str = "mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.region = region
self.auth_url = f"https://login.{region}/oauth/token"
self.access_token = None
self.token_expiry = 0
def get_access_token(self) -> str:
if self.access_token and time.time() < (self.token_expiry - 300):
return self.access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
try:
response = requests.post(self.auth_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get("access_token")
self.token_expiry = time.time() + token_data.get("expires_in", 3600)
return self.access_token
except requests.exceptions.HTTPError as e:
print(f"Authentication failed: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error during authentication: {e}")
raise
def get_canned_response(auth: GenesysAuth, canned_id: str) -> dict:
base_url = f"https://api.{auth.region}"
endpoint = f"/api/v2/cannedresponses/{canned_id}"
url = base_url + endpoint
headers = {
"Authorization": f"Bearer {auth.get_access_token()}",
"Content-Type": "application/json"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"Canned response {canned_id} not found.")
elif e.response.status_code == 403:
print("Access forbidden. Ensure 'cannedresponses:read' scope is granted.")
else:
print(f"HTTP Error: {e}")
raise
except Exception as e:
print(f"Error fetching canned response: {e}")
raise
def send_chat_message(auth: GenesysAuth, conversation_id: str, text: str, user_id: str = None) -> dict:
base_url = f"https://api.{auth.region}"
endpoint = f"/api/v2/conversations/chat/events"
url = base_url + endpoint
headers = {
"Authorization": f"Bearer {auth.get_access_token()}",
"Content-Type": "application/json"
}
payload = {
"conversationId": conversation_id,
"eventType": "userMessage",
"body": text
}
if user_id:
payload["userId"] = user_id
try:
response = requests.post(url, headers=headers, json=payload)
# Handle Rate Limiting (429)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
print(f"Rate limited. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return send_chat_message(auth, conversation_id, text, user_id)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
print(f"Bad Request. Check payload structure. Response: {e.response.text}")
elif e.response.status_code == 404:
print(f"Conversation {conversation_id} not found or inactive.")
elif e.response.status_code == 409:
print("Conflict. The conversation may have ended or is in an invalid state.")
else:
print(f"HTTP Error: {e}")
raise
except Exception as e:
print(f"Error sending chat message: {e}")
raise
def main():
# Load credentials from environment variables
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "mypurecloud.com")
if not client_id or not client_secret:
print("Error: GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET environment variables are required.")
sys.exit(1)
# Initialize Auth
auth = GenesysAuth(client_id, client_secret, region)
# Input parameters
canned_id = os.getenv("CANNED_RESPONSE_ID", "PLACEHOLDER_ID")
conversation_id = os.getenv("CONVERSATION_ID", "PLACEHOLDER_CONVO_ID")
user_id = os.getenv("AGENT_USER_ID") # Optional
if canned_id == "PLACEHOLDER_ID" or conversation_id == "PLACEHOLDER_CONVO_ID":
print("Error: Please provide CANNED_RESPONSE_ID and CONVERSATION_ID environment variables.")
sys.exit(1)
try:
# 1. Fetch Canned Response
print(f"1. Fetching canned response: {canned_id}")
canned_obj = get_canned_response(auth, canned_id)
message_text = canned_obj.get("text")
canned_name = canned_obj.get("name")
if not message_text:
raise ValueError("Canned response has no text content.")
print(f" Retrieved: '{canned_name}'")
# 2. Send Message
print(f"2. Sending message to conversation: {conversation_id}")
result = send_chat_message(
auth=auth,
conversation_id=conversation_id,
text=message_text,
user_id=user_id
)
print(f"3. Success. Event ID: {result.get('id')}")
except Exception as e:
print(f"Failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 403 Forbidden
Cause: The OAuth client lacks the required scopes.
Fix: Ensure your OAuth client in Genesys Cloud Admin has the following scopes:
conversation:chat:writecannedresponses:read
Verify the scopes in the Admin Console under Administration > Security > OAuth clients.
Error: 409 Conflict
Cause: The conversation ID provided is valid but the conversation is no longer active (e.g., it has ended, was transferred out, or is in a state that does not allow new messages).
Fix: Check the conversation status using GET /api/v2/conversations/chat/{id}. Ensure the status is active or queued (depending on context). You cannot send messages to ended conversations.
Error: 400 Bad Request
Cause: The payload structure is incorrect.
Fix:
- Verify
eventTypeis exactlyuserMessage. - Verify
conversationIdis a valid UUID. - Ensure
bodyis a string. - If using
userId, ensure it is a valid UUID of an existing user in the tenant.
Error: 429 Too Many Requests
Cause: You have exceeded the API rate limits.
Fix: Implement exponential backoff. The provided code includes a basic retry mechanism that reads the Retry-After header. For high-volume bots, consider batching requests or using the WebSocket API for real-time updates instead of polling/sending via REST.