Generating Genesys Cloud Interaction Summaries via API with Python
What You Will Build
This tutorial builds a Python service that fetches conversation transcripts and metadata from Genesys Cloud, generates AI summaries using prompt templates, validates content against PII and length rules, manages asynchronous generation with polling, caches results, tracks latency and quality scores, and exposes a retrieval endpoint for CRM systems. The solution uses the official Genesys Cloud Python SDK and FastAPI. The implementation covers authentication, transcript extraction, LLM integration, validation, async task management, and CRM-ready endpoints.
Prerequisites
- Genesys Cloud OAuth Client Credentials flow (confidential client)
- Required scopes:
conversation:interaction:view,analytics:conversation:view - Genesys Cloud Python SDK v2.0+ (
genesyscloud) - Python 3.10+
- Dependencies:
fastapi,httpx,openai,pydantic,uvicorn,cachetools,regex
Install dependencies before running the code:
pip install fastapi httpx openai pydantic uvicorn cachetools regex genesyscloud
Authentication Setup
Genesys Cloud uses OAuth 2.0 Client Credentials flow for server-to-server API access. The Python SDK handles token acquisition and automatic refresh. You must configure the auth object before initializing any API client.
import os
from genesyscloud.auth.oauth_client_credentials_auth import OAuthClientCredentialsAuth
from genesyscloud.api.api_interactions_api import ApiInteractionsApi
from genesyscloud.api.api_analytics_api import ApiAnalyticsApi
def init_genesys_client() -> tuple[ApiInteractionsApi, ApiAnalyticsApi]:
"""Initialize Genesys Cloud API clients with OAuth credentials."""
env_host = os.getenv("GENESYS_CLOUD_ENV_HOST", "https://api.mypurecloud.com")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
if not all([env_host, client_id, client_secret]):
raise ValueError("Missing required Genesys Cloud environment variables")
auth = OAuthClientCredentialsAuth(
environment=env_host,
client_id=client_id,
client_secret=client_secret
)
auth.get_access_token() # Forces initial token fetch
interactions_api = ApiInteractionsApi(auth)
analytics_api = ApiAnalyticsApi(auth)
return interactions_api, analytics_api
The get_access_token() call triggers the /oauth/token endpoint. The SDK caches the token and refreshes it automatically when expiration approaches. You must handle genesyscloud.rest_exception.RESTException for 401 (invalid credentials) and 403 (insufficient scopes).
Implementation
Step 1: Query Interactions API for Transcript and Metadata
The Interactions API returns conversation metadata, participant roles, and media transcripts. You will fetch a single conversation by ID. The endpoint requires the conversation:interaction:view scope.
import asyncio
from genesyscloud.rest_exception import RESTException
async def fetch_conversation(interactions_api: ApiInteractionsApi, conversation_id: str) -> dict:
"""Fetch conversation metadata and transcript from Genesys Cloud."""
try:
# SDK calls are synchronous. Run in executor to avoid blocking FastAPI event loop.
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
interactions_api.get_interactions_conversations_conversation_id,
conversation_id
)
except RESTException as e:
if e.status == 429:
raise RuntimeError("Rate limited by Genesys Cloud. Implement exponential backoff.") from e
if e.status in (401, 403):
raise RuntimeError(f"Authentication or authorization failed: {e.status}") from e
raise RuntimeError(f"Genesys API error {e.status}: {e.reason}") from e
# Extract transcript lines and metadata
transcript_lines = []
if response.media and response.media.transcript:
for line in response.media.transcript:
transcript_lines.append(f"{line.from_}: {line.text}")
return {
"conversation_id": response.id,
"type": response.type,
"start_time": response.start_time.isoformat() if response.start_time else None,
"end_time": response.end_time.isoformat() if response.end_time else None,
"participants": [p.id for p in response.participants] if response.participants else [],
"transcript": "\n".join(transcript_lines)
}
The response contains a Media object with a Transcript array. Each transcript line includes from_ (participant ID), text, and sent_at. You concatenate lines into a single string for the LLM prompt.
Step 2: Construct Summary Payloads Using LLM Integration with Prompt Templates
You will use OpenAI’s GPT-4 API for summary generation. The prompt template enforces structure, tone, and length constraints. You must pass the conversation metadata to guide the model.
import httpx
import json
from openai import AsyncOpenAI
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
SUMMARY_PROMPT_TEMPLATE = """
You are an expert call center analyst. Generate a concise interaction summary based on the following metadata and transcript.
Conversation Type: {conv_type}
Duration: {duration_minutes} minutes
Participants: {participant_count}
Transcript:
{transcript}
Requirements:
1. Output exactly one paragraph.
2. Maximum {max_length} characters.
3. Include customer intent, agent resolution, and follow-up actions.
4. Do not include any personally identifiable information.
5. Return only the summary text. No markdown. No prefixes.
"""
async def generate_summary_payload(conversation_data: dict, max_length: int = 500) -> str:
"""Generate summary using LLM with structured prompt."""
duration = 0
if conversation_data["start_time"] and conversation_data["end_time"]:
from datetime import datetime
start = datetime.fromisoformat(conversation_data["start_time"])
end = datetime.fromisoformat(conversation_data["end_time"])
duration = int((end - start).total_seconds() / 60)
prompt = SUMMARY_PROMPT_TEMPLATE.format(
conv_type=conversation_data["type"],
duration_minutes=duration,
participant_count=len(conversation_data["participants"]),
transcript=conversation_data["transcript"][:15000], # Token limit safety
max_length=max_length
)
try:
response = await openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=300
)
return response.choices[0].message.content.strip()
except Exception as e:
raise RuntimeError(f"LLM generation failed: {str(e)}")
The template explicitly restricts output format and length. The temperature=0.2 setting reduces hallucination risk. You truncate the transcript to 15,000 characters to stay within context window limits.
Step 3: Validate Summary Content Against PII Redaction Rules and Length Constraints
Before storing or returning the summary, you must verify it meets compliance standards. This step checks character limits and scans for common PII patterns.
import re
from typing import Tuple
PII_PATTERNS = {
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d[ -]*?){13,16}\b"
}
def validate_summary(summary: str, max_length: int) -> Tuple[bool, str, dict]:
"""Validate summary against PII rules and length constraints."""
if len(summary) > max_length:
return False, f"Summary exceeds maximum length of {max_length} characters.", {"length": len(summary)}
violations = []
for pii_type, pattern in PII_PATTERNS.items():
matches = re.findall(pattern, summary, re.IGNORECASE)
if matches:
violations.append(f"{pii_type.upper()} detected: {matches}")
if violations:
return False, "PII redaction failed.", {"violations": violations}
return True, "Validation passed.", {"length": len(summary), "pii_count": 0}
The function returns a tuple of (is_valid, message, metadata). You reject summaries that contain PII or exceed the length threshold. Production systems should integrate microsoft/presidio for advanced entity recognition, but regex suffices for deterministic filtering.
Step 4: Handle Asynchronous Summary Generation via Polling Endpoints
LLM calls and validation run asynchronously. You will register tasks in a memory-backed queue and expose a polling endpoint for status checks. This pattern decouples CRM requests from generation latency.
import uuid
import time
from enum import Enum
from typing import Dict
class TaskStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
# In-memory task registry. Replace with Redis in production.
task_registry: Dict[str, dict] = {}
async def submit_summary_task(conversation_id: str, max_length: int = 500) -> str:
"""Submit async summary generation task and return task ID."""
task_id = str(uuid.uuid4())
task_registry[task_id] = {
"task_id": task_id,
"conversation_id": conversation_id,
"status": TaskStatus.PENDING,
"created_at": time.time(),
"summary": None,
"error": None,
"latency_ms": 0,
"quality_score": 0.0
}
# Schedule background processing
asyncio.create_task(process_summary_task(task_id, conversation_id, max_length))
return task_id
async def process_summary_task(task_id: str, conversation_id: str, max_length: int):
"""Background worker to fetch, generate, validate, and score summary."""
interactions_api, _ = init_genesys_client()
task = task_registry[task_id]
task["status"] = TaskStatus.PROCESSING
start_time = time.perf_counter()
try:
conv_data = await fetch_conversation(interactions_api, conversation_id)
raw_summary = await generate_summary_payload(conv_data, max_length)
is_valid, msg, meta = validate_summary(raw_summary, max_length)
if not is_valid:
task["status"] = TaskStatus.FAILED
task["error"] = msg
return
# Calculate quality score (0.0 to 1.0)
quality_score = calculate_quality_score(raw_summary, max_length, conv_data)
latency_ms = (time.perf_counter() - start_time) * 1000
task["status"] = TaskStatus.COMPLETED
task["summary"] = raw_summary
task["latency_ms"] = latency_ms
task["quality_score"] = quality_score
task["metadata"] = meta
except Exception as e:
task["status"] = TaskStatus.FAILED
task["error"] = str(e)
The submit_summary_task function returns a task_id immediately. The background worker updates the registry. You poll /api/summary/status/{task_id} to retrieve the result. This prevents HTTP timeout errors on CRM integrations.
Step 5: Implement Caching Strategies, Track Latency, and Generate Quality Scores
Repeated requests for the same conversation should return cached results. You will use cachetools.TTLCache with a 1-hour expiration. Latency tracking measures total pipeline duration. Quality scoring evaluates summary usefulness for model tuning.
from cachetools import TTLCache
from typing import Optional
# Cache key: conversation_id -> summary data
summary_cache = TTLCache(maxsize=1000, ttl=3600)
def calculate_quality_score(summary: str, max_length: int, conv_data: dict) -> float:
"""Generate quality score for model tuning based on heuristic metrics."""
score = 1.0
length_ratio = len(summary) / max_length
if length_ratio < 0.5:
score -= 0.2 # Too short
elif length_ratio > 0.9:
score -= 0.1 # Near limit
# Keyword presence check
required_keywords = ["customer", "agent", "resolved", "issue", "follow-up"]
summary_lower = summary.lower()
keyword_hits = sum(1 for kw in required_keywords if kw in summary_lower)
score -= (5 - keyword_hits) * 0.1
# Transcript coverage heuristic
transcript_word_count = len(conv_data["transcript"].split())
if transcript_word_count > 500 and len(summary.split()) < 30:
score -= 0.15
return max(0.0, min(1.0, score))
def get_cached_summary(conversation_id: str) -> Optional[dict]:
"""Retrieve summary from cache if available."""
return summary_cache.get(conversation_id)
def store_cached_summary(conversation_id: str, data: dict):
"""Store validated summary in cache."""
summary_cache[conversation_id] = data
The quality score penalizes under-length summaries, missing business keywords, and low transcript coverage. You log these scores to tune prompt templates or switch models. Latency is recorded in latency_ms for agent productivity dashboards.
Step 6: Expose Summary Retrieval Service for CRM Integration
You will wrap the logic in a FastAPI application. The service exposes three endpoints: submission, polling, and direct retrieval. CRM systems use the polling pattern to avoid blocking UI threads.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Genesys Interaction Summary Service")
class SummaryRequest(BaseModel):
conversation_id: str
max_length: int = 500
class TaskStatusResponse(BaseModel):
task_id: str
conversation_id: str
status: str
summary: Optional[str] = None
error: Optional[str] = None
latency_ms: Optional[float] = None
quality_score: Optional[float] = None
@app.post("/api/summary/submit")
async def submit_summary(req: SummaryRequest):
"""Submit async summary generation request."""
cached = get_cached_summary(req.conversation_id)
if cached:
return {"task_id": "cached", "status": "completed", "summary": cached["summary"]}
task_id = await submit_summary_task(req.conversation_id, req.max_length)
return {"task_id": task_id, "status": "pending"}
@app.get("/api/summary/status/{task_id}")
async def poll_summary_status(task_id: str):
"""Poll async task status for CRM integrations."""
if task_id == "cached":
raise HTTPException(400, "Invalid task ID for polling")
task = task_registry.get(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] == TaskStatus.COMPLETED:
store_cached_summary(task["conversation_id"], {
"summary": task["summary"],
"quality_score": task["quality_score"],
"latency_ms": task["latency_ms"]
})
return TaskStatusResponse(**task)
@app.get("/api/summary/{conversation_id}")
async def get_summary_direct(conversation_id: str):
"""Direct retrieval endpoint for synchronous CRM calls."""
cached = get_cached_summary(conversation_id)
if cached:
return {"conversation_id": conversation_id, "summary": cached["summary"], "source": "cache"}
# Fallback to sync generation if cache miss
interactions_api, _ = init_genesys_client()
conv_data = await fetch_conversation(interactions_api, conversation_id)
summary = await generate_summary_payload(conv_data)
is_valid, msg, meta = validate_summary(summary, 500)
if not is_valid:
raise HTTPException(400, msg)
return {"conversation_id": conversation_id, "summary": summary, "source": "live"}
The /submit endpoint returns immediately. The /status endpoint enables CRM polling. The direct endpoint serves cached results or falls back to synchronous generation. All responses include latency and quality metrics for downstream analytics.
Complete Working Example
The following script combines all components into a runnable FastAPI application. Replace environment variables before execution.
import os
import asyncio
import time
import uuid
import re
from enum import Enum
from typing import Dict, Optional, Tuple
import httpx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from cachetools import TTLCache
from openai import AsyncOpenAI
from genesyscloud.auth.oauth_client_credentials_auth import OAuthClientCredentialsAuth
from genesyscloud.api.api_interactions_api import ApiInteractionsApi
from genesyscloud.rest_exception import RESTException
# --- Configuration ---
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
task_registry: Dict[str, dict] = {}
summary_cache = TTLCache(maxsize=1000, ttl=3600)
class TaskStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
# --- Genesys Client Initialization ---
def init_genesys_client():
env_host = os.getenv("GENESYS_CLOUD_ENV_HOST", "https://api.mypurecloud.com")
client_id = os.getenv("GENESYS_CLOUD_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLOUD_CLIENT_SECRET")
auth = OAuthClientCredentialsAuth(environment=env_host, client_id=client_id, client_secret=client_secret)
auth.get_access_token()
return ApiInteractionsApi(auth)
# --- API & LLM Functions ---
async def fetch_conversation(interactions_api, conversation_id: str) -> dict:
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None, interactions_api.get_interactions_conversations_conversation_id, conversation_id
)
except RESTException as e:
if e.status == 429:
raise RuntimeError("Rate limited by Genesys Cloud.") from e
raise RuntimeError(f"Genesys API error {e.status}") from e
transcript_lines = []
if response.media and response.media.transcript:
for line in response.media.transcript:
transcript_lines.append(f"{line.from_}: {line.text}")
return {
"conversation_id": response.id,
"type": response.type,
"start_time": response.start_time.isoformat() if response.start_time else None,
"end_time": response.end_time.isoformat() if response.end_time else None,
"participants": [p.id for p in response.participants] if response.participants else [],
"transcript": "\n".join(transcript_lines)
}
SUMMARY_PROMPT = """
You are an expert call center analyst. Generate a concise interaction summary.
Type: {conv_type} | Duration: {duration} min | Participants: {p_count}
Transcript: {transcript}
Requirements: One paragraph. Max {max_len} chars. Include intent, resolution, follow-up. No PII. No markdown.
"""
async def generate_summary_payload(conv_data: dict, max_length: int) -> str:
duration = 0
if conv_data["start_time"] and conv_data["end_time"]:
from datetime import datetime
start = datetime.fromisoformat(conv_data["start_time"])
end = datetime.fromisoformat(conv_data["end_time"])
duration = int((end - start).total_seconds() / 60)
prompt = SUMMARY_PROMPT.format(
conv_type=conv_data["type"], duration=duration,
p_count=len(conv_data["participants"]),
transcript=conv_data["transcript"][:15000], max_len=max_length
)
resp = await openai_client.chat.completions.create(
model="gpt-4-turbo", messages=[{"role": "user", "content": prompt}],
temperature=0.2, max_tokens=300
)
return resp.choices[0].message.content.strip()
PII_PATTERNS = {
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b"
}
def validate_summary(summary: str, max_length: int) -> Tuple[bool, str, dict]:
if len(summary) > max_length:
return False, f"Exceeds {max_length} chars.", {"length": len(summary)}
violations = []
for ptype, pat in PII_PATTERNS.items():
matches = re.findall(pat, summary, re.IGNORECASE)
if matches:
violations.append(f"{ptype.upper()} detected")
if violations:
return False, "PII redaction failed.", {"violations": violations}
return True, "Valid.", {"length": len(summary)}
def calculate_quality_score(summary: str, max_length: int, conv_data: dict) -> float:
score = 1.0
ratio = len(summary) / max_length
if ratio < 0.5: score -= 0.2
elif ratio > 0.9: score -= 0.1
kw = ["customer", "agent", "resolved", "issue", "follow-up"]
hits = sum(1 for w in kw if w in summary.lower())
score -= (5 - hits) * 0.1
if len(conv_data["transcript"].split()) > 500 and len(summary.split()) < 30:
score -= 0.15
return max(0.0, min(1.0, score))
# --- Async Task Management ---
async def submit_summary_task(conversation_id: str, max_length: int) -> str:
task_id = str(uuid.uuid4())
task_registry[task_id] = {
"task_id": task_id, "conversation_id": conversation_id,
"status": TaskStatus.PENDING, "created_at": time.time(),
"summary": None, "error": None, "latency_ms": 0, "quality_score": 0.0
}
asyncio.create_task(process_task(task_id, conversation_id, max_length))
return task_id
async def process_task(task_id: str, conversation_id: str, max_length: int):
api = init_genesys_client()
task_registry[task_id]["status"] = TaskStatus.PROCESSING
start = time.perf_counter()
try:
conv = await fetch_conversation(api, conversation_id)
raw = await generate_summary_payload(conv, max_length)
valid, msg, meta = validate_summary(raw, max_length)
if not valid:
task_registry[task_id].update({"status": TaskStatus.FAILED, "error": msg})
return
score = calculate_quality_score(raw, max_length, conv)
latency = (time.perf_counter() - start) * 1000
task_registry[task_id].update({
"status": TaskStatus.COMPLETED, "summary": raw,
"latency_ms": latency, "quality_score": score, "metadata": meta
})
except Exception as e:
task_registry[task_id].update({"status": TaskStatus.FAILED, "error": str(e)})
# --- FastAPI Service ---
app = FastAPI(title="Genesys Summary Service")
class SummaryReq(BaseModel):
conversation_id: str
max_length: int = 500
@app.post("/api/summary/submit")
async def submit(req: SummaryReq):
cached = summary_cache.get(req.conversation_id)
if cached:
return {"task_id": "cached", "status": "completed", "summary": cached["summary"]}
tid = await submit_summary_task(req.conversation_id, req.max_length)
return {"task_id": tid, "status": "pending"}
@app.get("/api/summary/status/{task_id}")
async def poll(task_id: str):
if task_id == "cached":
raise HTTPException(400, "Invalid task ID")
task = task_registry.get(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] == TaskStatus.COMPLETED:
summary_cache[task["conversation_id"]] = {
"summary": task["summary"], "quality_score": task["quality_score"], "latency_ms": task["latency_ms"]
}
return task
@app.get("/api/summary/{conversation_id}")
async def direct(conversation_id: str):
cached = summary_cache.get(conversation_id)
if cached:
return {"conversation_id": conversation_id, "summary": cached["summary"], "source": "cache"}
api = init_genesys_client()
conv = await fetch_conversation(api, conversation_id)
s = await generate_summary_payload(conv)
v, m, _ = validate_summary(s, 500)
if not v:
raise HTTPException(400, m)
return {"conversation_id": conversation_id, "summary": s, "source": "live"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
- Cause: Invalid client credentials, expired token, or missing OAuth scopes.
- Fix: Verify
GENESYS_CLOUD_CLIENT_IDandGENESYS_CLOUD_CLIENT_SECRETmatch the Genesys Cloud admin console. Ensure the OAuth client hasconversation:interaction:viewassigned. The SDK refreshes tokens automatically, but initialget_access_token()must succeed before any API call. - Code Fix: Wrap SDK initialization in try/except and log
e.statusande.reason. Rotate credentials if the client was recently revoked.
Error: 429 Too Many Requests
- Cause: Genesys Cloud enforces rate limits per OAuth client and per endpoint. Bursting transcript fetches triggers throttling.
- Fix: Implement exponential backoff with jitter. The SDK does not retry automatically. You must catch
RESTExceptionwith status 429 and delay subsequent calls. - Code Fix:
import time
def retry_429(func, *args, max_retries=3, base_delay=1.0):
for attempt in range(max_retries):
try:
return func(*args)
except RESTException as e:
if e.status != 429 or attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
time.sleep(delay)
Error: LLM Context Window Exceeded or Token Limit
- Cause: Transcript exceeds model token limits or
max_tokensis too low. - Fix: Truncate transcript to 15,000 characters before prompt injection. Set
max_tokensto 300 to match length constraints. Usegpt-4-turbofor 128k context support. - Code Fix: The
generate_summary_payloadfunction already slicestranscript[:15000]. Adjust slice length based on model selection.
Error: PII Validation False Positives
- Cause: Regex patterns match non-PII numeric sequences (e.g., order numbers, account IDs).
- Fix: Use contextual validation or switch to
microsoft/presidio. Add allowlists for known internal identifiers. - Code Fix: Replace regex with
from presidio_analyzer import AnalyzerEngineand filter by entity type confidence scores.