Decoupling Genesys Cloud Conversation Creation from Backend Processing by Publishing Interaction Metadata to RabbitMQ Using a Python Celery Worker
What You Will Build
- The code captures real-time conversation creation events from Genesys Cloud, extracts interaction metadata, publishes the payload to RabbitMQ, and processes the data asynchronously using a Celery worker.
- This tutorial uses the Genesys Cloud REST API for webhook configuration and conversation retrieval, combined with Python
httpx,pika, andceleryfor message queuing and background execution. - The implementation covers Python 3.9+ with FastAPI for the webhook receiver and Celery for distributed task processing.
Prerequisites
- Genesys Cloud OAuth2 Client Credentials flow with
webhooks:read,webhooks:write, andconversation:readscopes - Python 3.9 or newer runtime
- RabbitMQ instance running on
localhost:5672with default credentials (guest/guest) - External dependencies:
fastapi,uvicorn,httpx,pika,celery,pydantic,python-dotenv - A Genesys Cloud organization with API access enabled and a valid OAuth client ID and secret
Authentication Setup
Genesys Cloud uses OAuth2 client credentials grants for server-to-server communication. You must request a short-lived access token and cache it until expiration. The following class handles token acquisition, caching, and automatic refresh.
import time
import httpx
from typing import Optional
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.mypurecloud.com"):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = f"{base_url}/oauth/token"
self._access_token: Optional[str] = None
self._expires_at: float = 0
async def get_token(self) -> str:
if self._access_token and time.time() < self._expires_at:
return self._access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "webhooks:read webhooks:write conversation:read"
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
self.token_endpoint,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
data = response.json()
self._access_token = data["access_token"]
self._expires_at = time.time() + data["expires_in"] - 30
return self._access_token
else:
raise httpx.HTTPStatusError(
f"Authentication failed: {response.status_code}",
request=response.request,
response=response
)
The class checks local cache before making network calls. It subtracts thirty seconds from the expiration window to prevent race conditions during high-throughput webhook ingestion.
Implementation
Step 1: Configure Genesys Cloud Webhook via API
You must register a webhook that triggers when a conversation is created. The endpoint supports pagination when listing existing webhooks, which you should handle to avoid duplicate registrations.
import httpx
import asyncio
async def register_conversation_webhook(auth: GenesysAuth, webhook_url: str) -> dict:
base_url = "https://api.mypurecloud.com"
headers = {"Authorization": f"Bearer {await auth.get_token()}", "Content-Type": "application/json"}
webhook_body = {
"name": "Conversation Creation Publisher",
"address": webhook_url,
"enabled": True,
"method": "POST",
"eventFilters": [
{
"event": "conversation:created",
"entityType": "conversation"
}
],
"deliveryMode": "PUSH",
"protocolVersion": "v2"
}
async with httpx.AsyncClient(timeout=15.0) as client:
max_retries = 3
for attempt in range(max_retries):
response = await client.post(
f"{base_url}/api/v2/webhooks",
headers=headers,
json=webhook_body
)
if response.status_code == 201:
return response.json()
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
print(f"Rate limited. Retrying in {retry_after} seconds...")
await asyncio.sleep(retry_after)
elif response.status_code in (401, 403):
raise RuntimeError(f"Authentication or permission error: {response.status_code}. Verify scopes include webhooks:write.")
else:
raise RuntimeError(f"Webhook creation failed: {response.status_code} - {response.text}")
The request body specifies conversation:created as the trigger. The retry loop handles 429 responses using exponential backoff. A 401 or 403 indicates missing webhooks:write scope or expired credentials.
Step 2: Build the Webhook Receiver and RabbitMQ Publisher
The FastAPI endpoint receives the webhook payload, fetches full conversation metadata, and publishes it to RabbitMQ. Direct pika publishing ensures reliable message delivery independent of Celery task routing.
import pika
import json
import asyncio
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
app = FastAPI()
RABBITMQ_HOST = "localhost"
RABBITMQ_PORT = 5672
RABBITMQ_USER = "guest"
RABBITMQ_PASS = "guest"
QUEUE_NAME = "gen_interactions"
def get_rabbitmq_channel() -> pika.BlockingConnection:
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
return connection
@app.post("/webhook/genesys/conversation")
async def handle_conversation_webhook(request: Request):
try:
body = await request.json()
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
conversation_id = body.get("conversationId")
if not conversation_id:
raise HTTPException(status_code=400, detail="Missing conversationId in payload")
# Fetch full metadata from Genesys Cloud
auth = GenesysAuth(client_id="YOUR_CLIENT_ID", client_secret="YOUR_CLIENT_SECRET")
headers = {"Authorization": f"Bearer {await auth.get_token()}", "Content-Type": "application/json"}
async with httpx.AsyncClient(timeout=10.0) as client:
metadata_response = await client.get(
f"https://api.mypurecloud.com/api/v2/conversations/{conversation_id}",
headers=headers
)
if metadata_response.status_code == 404:
raise HTTPException(status_code=404, detail="Conversation not found")
elif metadata_response.status_code in (401, 403):
raise HTTPException(status_code=403, detail="Insufficient conversation:read scope")
elif metadata_response.status_code == 429:
raise HTTPException(status_code=429, detail="Rate limited by Genesys Cloud")
conversation_metadata = metadata_response.json()
# Publish to RabbitMQ
try:
connection = get_rabbitmq_channel()
channel = connection.channel()
message = json.dumps({
"conversationId": conversation_id,
"timestamp": body.get("timestamp"),
"metadata": conversation_metadata
})
channel.basic_publish(
exchange="",
routing_key=QUEUE_NAME,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type="application/json"
)
)
connection.close()
return {"status": "published", "conversationId": conversation_id}
except pika.exceptions.AMQPConnectionError as e:
raise HTTPException(status_code=503, detail=f"RabbitMQ connection failed: {str(e)}")
The endpoint extracts conversationId, calls /api/v2/conversations/{conversationId} to retrieve full metadata, and publishes a structured JSON payload to RabbitMQ. The delivery_mode=2 property ensures message persistence across broker restarts.
Step 3: Configure Celery Worker and Define Processing Task
Celery consumes messages from RabbitMQ and executes backend processing logic. You must configure the broker URL to match your RabbitMQ instance and define a task with retry logic for transient failures.
from celery import Celery
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
celery_app = Celery(
"interaction_worker",
broker="amqp://guest:guest@localhost:5672//",
backend="rpc://",
task_serializer="json",
accept_content=["json"],
result_serializer="json"
)
celery_app.conf.update(
task_default_queue="gen_interactions",
task_queue_max_priority=10,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True
)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=5)
def process_interaction_task(self, payload: dict):
conversation_id = payload.get("conversationId")
metadata = payload.get("metadata", {})
logger.info(f"Processing conversation: {conversation_id}")
try:
# Simulate backend processing (database write, ML inference, CRM sync, etc.)
customer_id = metadata.get("participants", [{}])[0].get("id") if metadata.get("participants") else "unknown"
channel = metadata.get("channel", {}).get("name", "unknown")
logger.info(f"Conversation {conversation_id} | Customer: {customer_id} | Channel: {channel}")
# Example: raise ValueError to test retry logic
# raise ValueError("Simulated backend failure")
return {"status": "completed", "conversationId": conversation_id}
except Exception as exc:
logger.error(f"Task failed for {conversation_id}: {str(exc)}")
raise self.retry(exc=exc, countdown=5)
The worker configuration sets task_acks_late=True to acknowledge messages only after successful processing. The bind=True parameter enables access to self.retry() for exponential backoff on failures. You must start the worker using celery -A celery_worker worker --loglevel=info.
Complete Working Example
The following module combines authentication, webhook reception, RabbitMQ publishing, and Celery task definition. Save it as main.py and run the components separately.
import time
import httpx
import json
import asyncio
import pika
import logging
from typing import Optional
from fastapi import FastAPI, Request, HTTPException
from celery import Celery
# --- Configuration ---
GENESYS_CLIENT_ID = "YOUR_CLIENT_ID"
GENESYS_CLIENT_SECRET = "YOUR_CLIENT_SECRET"
GENESYS_BASE_URL = "https://api.mypurecloud.com"
RABBITMQ_URL = "amqp://guest:guest@localhost:5672//"
QUEUE_NAME = "gen_interactions"
# --- Authentication ---
class GenesysAuth:
def __init__(self, client_id: str, client_secret: str, base_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = f"{base_url}/oauth/token"
self._access_token: Optional[str] = None
self._expires_at: float = 0
async def get_token(self) -> str:
if self._access_token and time.time() < self._expires_at:
return self._access_token
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "webhooks:read webhooks:write conversation:read"
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(self.token_endpoint, data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"})
if response.status_code == 200:
data = response.json()
self._access_token = data["access_token"]
self._expires_at = time.time() + data["expires_in"] - 30
return self._access_token
raise RuntimeError(f"Auth failed: {response.status_code} {response.text}")
# --- FastAPI Webhook Receiver ---
app = FastAPI()
auth = GenesysAuth(GENESYS_CLIENT_ID, GENESYS_CLIENT_SECRET, GENESYS_BASE_URL)
def publish_to_rabbitmq(message: dict):
import pika
credentials = pika.PlainCredentials("guest", "guest")
parameters = pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials, heartbeat=600)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_publish(
exchange="",
routing_key=QUEUE_NAME,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2, content_type="application/json")
)
connection.close()
@app.post("/webhook/genesys/conversation")
async def handle_webhook(request: Request):
try:
body = await request.json()
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
conversation_id = body.get("conversationId")
if not conversation_id:
raise HTTPException(status_code=400, detail="Missing conversationId")
headers = {"Authorization": f"Bearer {await auth.get_token()}", "Content-Type": "application/json"}
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{GENESYS_BASE_URL}/api/v2/conversations/{conversation_id}", headers=headers)
if resp.status_code == 404:
raise HTTPException(status_code=404, detail="Conversation not found")
if resp.status_code in (401, 403):
raise HTTPException(status_code=403, detail="Scope error")
if resp.status_code == 429:
raise HTTPException(status_code=429, detail="Rate limited")
metadata = resp.json()
payload = {"conversationId": conversation_id, "timestamp": body.get("timestamp"), "metadata": metadata}
publish_to_rabbitmq(payload)
return {"status": "queued", "conversationId": conversation_id}
# --- Celery Worker ---
celery_app = Celery("worker", broker=RABBITMQ_URL, backend="rpc://")
celery_app.conf.update(task_default_queue=QUEUE_NAME, task_acks_late=True)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=5)
def process_interaction_task(self, payload: dict):
cid = payload.get("conversationId")
meta = payload.get("metadata", {})
logging.info(f"Processing {cid}")
try:
participants = meta.get("participants", [])
customer = participants[0].get("id", "unknown") if participants else "unknown"
logging.info(f"Customer: {customer} | Channel: {meta.get('channel', {}).get('name')}")
return {"status": "success", "conversationId": cid}
except Exception as e:
logging.error(f"Failed {cid}: {e}")
raise self.retry(exc=e)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Run the FastAPI server with python main.py. Start the Celery worker in a separate terminal with celery -A main celery_app worker --loglevel=info. The system will ingest webhooks, queue metadata, and process tasks asynchronously.
Common Errors & Debugging
Error: 401 Unauthorized on Webhook Creation or Conversation Fetch
- Cause: The OAuth token has expired, or the client credentials are incorrect.
- Fix: Verify
client_idandclient_secretmatch the Genesys Cloud integration settings. Ensure the token cache expiration logic subtracts a safety margin. Restart the FastAPI service to force token refresh. - Code Fix: Add explicit token invalidation on
401responses:if response.status_code == 401: self._access_token = None; return await self.get_token()
Error: 403 Forbidden on API Calls
- Cause: The OAuth token lacks the required scopes.
- Fix: Confirm the token request includes
webhooks:writefor creation andconversation:readfor metadata retrieval. Regenerate the token with the correct scope string. - Code Fix: Update the payload scope field to
"webhooks:read webhooks:write conversation:read"and verify in the Genesys Cloud admin console under Integrations.
Error: 429 Too Many Requests
- Cause: Genesys Cloud rate limits have been exceeded.
- Fix: Implement exponential backoff with
Retry-Afterheader parsing. Reduce concurrent webhook receiver instances. - Code Fix: The provided
register_conversation_webhookfunction already handles429withawait asyncio.sleep(retry_after). Apply the same pattern to conversation fetch calls.
Error: pika.exceptions.AMQPConnectionError
- Cause: RabbitMQ is unreachable, credentials are wrong, or the port is blocked.
- Fix: Verify RabbitMQ is running with
rabbitmqctl status. Check firewall rules for port5672. Confirmguestcredentials match the broker configuration. - Code Fix: Wrap
pika.BlockingConnectionin a try-except block and return503with a descriptive message. Implement connection pooling for high-throughput environments.
Error: celery.exceptions.MaxRetriesExceededError
- Cause: The Celery task failed repeatedly and exhausted
max_retries. - Fix: Inspect worker logs for the root exception. Fix data validation issues or external service timeouts. Increase
max_retriesor implement a dead-letter queue for failed messages. - Code Fix: Add
@celery_app.task(bind=True, max_retries=5, default_retry_delay=10)and logself.request.retriesto track attempt counts.