Setting up an AWS EventBridge integration to receive real-time Genesys Cloud events
What You Will Build
- You will build a Python application that configures a Genesys Cloud Event Stream to push real-time conversation and interaction events to an AWS EventBridge API destination.
- You will use the Genesys Cloud Python SDK (
genesyscloud) to manage Event Stream configuration and the AWS SDK (boto3) to manage the EventBridge API destination. - The code is written in Python 3.9+ using
asynciofor concurrent API calls andrequestsfor direct HTTP interactions where the SDK lacks specific EventBridge support.
Prerequisites
- Genesys Cloud: An OAuth Client with
eventstream:writeandeventstream:readscopes. You need the Client ID, Client Secret, and Organization Region. - AWS: An IAM User or Role with permissions to create and manage EventBridge API Destinations (
events:CreateApiDestination,events:PutApiDestination,events:ListApiDestinations). - Python Runtime: Python 3.9 or higher.
- Dependencies:
genesyscloud: The official Genesys Cloud Python SDK.boto3: The official AWS SDK for Python.requests: For handling HTTP requests that the SDK does not abstract fully.
Install dependencies:
pip install genesyscloud boto3 requests
Authentication Setup
Genesys Cloud uses OAuth 2.0 for API authentication. The Python SDK handles token acquisition and refresh automatically if you provide the client credentials. AWS uses IAM credentials, which boto3 handles via environment variables or the default credential chain.
Genesys Cloud Authentication
The genesyscloud SDK provides a PlatformClient that manages the OAuth flow. You must initialize it with your client ID, secret, and region.
import os
from genesyscloud.platform_client import PlatformClient
from genesyscloud.auth import AuthClient
def get_genesys_client() -> PlatformClient:
"""
Initializes and returns an authenticated Genesys Cloud PlatformClient.
"""
client_id = os.getenv("GENESYS_CLIENT_ID")
client_secret = os.getenv("GENESYS_CLIENT_SECRET")
region = os.getenv("GENESYS_REGION", "mypurecloud.com") # e.g., us-east-1.mypurecloud.com
if not client_id or not client_secret:
raise ValueError("GENESYS_CLIENT_ID and GENESYS_CLIENT_SECRET must be set.")
# Initialize the platform client
platform = PlatformClient()
# Configure the auth client
platform.auth_client = AuthClient(
client_id=client_id,
client_secret=client_secret,
region=region
)
# Attempt to authenticate. This triggers the OAuth flow.
try:
platform.auth_client.login()
except Exception as e:
raise RuntimeError(f"Failed to authenticate with Genesys Cloud: {e}")
return platform
AWS Authentication
Ensure your AWS credentials are configured in your environment. boto3 will automatically pick them up. For local development, ensure AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set, or use aws configure.
Implementation
Step 1: Create the AWS EventBridge API Destination
Before Genesys Cloud can push events, AWS must have an API Destination configured to receive them. This destination defines the authentication method (usually Basic Auth or OAuth2) and the target URL. For EventBridge, we often use an HTTP API destination that forwards to an EventBridge Bus, or more commonly, we configure Genesys to push to an AWS Service like SQS/SNS which then triggers an EventBridge rule. However, the prompt asks for EventBridge integration.
A direct push to EventBridge is not natively supported via a simple HTTP endpoint without a proxy. The standard pattern is:
- Genesys pushes to an AWS Lambda function or SQS.
- That resource puts events onto EventBridge.
However, AWS EventBridge Custom Buses can ingest HTTP events if you use the EventBridge HTTP API feature or a Lambda Proxy. For this tutorial, we will assume a common robust pattern: Genesys pushes to an AWS Lambda Function that acts as a bridge to EventBridge.
First, we will create the EventBridge Bus and the Lambda function using boto3.
import boto3
import json
import time
from botocore.exceptions import ClientError
def create_eventbridge_bus(bus_name: str, region: str = "us-east-1") -> str:
"""
Creates an EventBridge Custom Bus if it does not exist.
Returns the ARN of the bus.
"""
events_client = boto3.client("events", region_name=region)
try:
response = events_client.create_event_bus(
Name=bus_name,
Description="Custom bus for Genesys Cloud events"
)
print(f"Created EventBridge Bus: {bus_name}")
return response["EventBusArn"]
except events_client.exceptions.ResourceAlreadyExistsException:
print(f"EventBridge Bus {bus_name} already exists.")
# Retrieve existing ARN
buses = events_client.list_event_buses(NamePrefix=bus_name)
return buses["EventBuses"][0]["Arn"]
except ClientError as e:
raise RuntimeError(f"Failed to create EventBridge Bus: {e}")
def deploy_lambda_bridge(lambda_name: str, handler_code: str, role_arn: str, region: str = "us-east-1") -> str:
"""
Deploys a Lambda function that receives HTTP POST from Genesys and puts events to EventBridge.
"""
lambda_client = boto3.client("lambda", region_name=region)
iam_client = boto3.client("iam", region_name=region)
# 1. Create IAM Role for Lambda if not exists
role_name = f"{lambda_name}-role"
try:
iam_client.get_role(RoleName=role_name)
print(f"IAM Role {role_name} already exists.")
except iam_client.exceptions.NoSuchEntityException:
print(f"Creating IAM Role {role_name}...")
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}
iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
Description="Role for Genesys EventBridge Bridge Lambda"
)
# Attach Policy to allow PutEvents to EventBridge
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"events:PutEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*" # Restrict this in production to specific Bus ARNs
}
]
}
iam_client.put_role_policy(
RoleName=role_name,
PolicyName="AllowEventBridgePut",
PolicyDocument=json.dumps(policy_document)
)
role_arn = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
# 2. Create Lambda Function
# Note: In production, zip the code properly. Here we use a simple inline string for brevity.
# The handler expects a POST request with JSON body.
lambda_code = """
import json
import boto3
def lambda_handler(event, context):
# Genesys sends a batch of events in the body
body = json.loads(event.get('body', '{}'))
# Extract events from Genesys payload
# Genesys Event Stream sends a list of events
genesys_events = body if isinstance(body, list) else [body]
events_client = boto3.client('events')
# Map Genesys events to EventBridge format
eventbridge_events = []
for g_event in genesys_events:
eventbridge_events.append({
'Source': 'genesys.cloud',
'DetailType': g_event.get('type', 'unknown'),
'Detail': json.dumps(g_event),
'EventBusName': 'genesys-events-bus' # Match your bus name
})
# Send to EventBridge
response = events_client.put_events(Entries=eventbridge_events)
return {
'statusCode': 200,
'body': json.dumps({'processed': len(eventbridge_events), 'errors': response.get('FailedEntryCount', 0)})
}
"""
zip_content = zip_lambda_code(lambda_code)
try:
lambda_client.create_function(
FunctionName=lambda_name,
Runtime="python3.9",
Role=role_arn,
Handler="index.lambda_handler",
Code={"ZipFile": zip_content},
Description="Bridge Genesys Events to EventBridge"
)
print(f"Created Lambda Function: {lambda_name}")
except lambda_client.exceptions.ResourceConflictException:
print(f"Lambda Function {lambda_name} already exists. Updating...")
lambda_client.update_function_code(
FunctionName=lambda_name,
ZipFile=zip_content
)
# Return Lambda ARN
return lambda_client.get_function(FunctionName=lambda_name)["Configuration"]["FunctionArn"]
def zip_lambda_code(code: str) -> bytes:
"""Helper to zip the lambda code."""
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("index.py", code)
return zip_buffer.getvalue()
Step 2: Configure Genesys Cloud Event Stream
Now that the AWS infrastructure is ready, we configure Genesys Cloud to push events to the Lambda function URL. Genesys Cloud Event Streams support “Push” destinations. We will use the SDK to create an Event Stream with an HTTP destination.
Required Scope: eventstream:write
from genesyscloud.rest import ApiException
from genesyscloud.event_streams import EventStreamsApi
from genesyscloud.event_streams.model import (
EventStream,
EventStreamDestination,
EventStreamDestinationHttp
)
def setup_genesys_event_stream(platform: PlatformClient, destination_url: str, stream_name: str) -> str:
"""
Creates a Genesys Cloud Event Stream that pushes to the given HTTP URL (Lambda).
"""
event_streams_api = EventStreamsApi(platform)
# Define the HTTP destination
http_destination = EventStreamDestinationHttp(
url=destination_url,
method="POST",
headers={
"Content-Type": "application/json"
},
# Optional: Add authentication headers if Lambda requires them
# For this example, we assume public access or IP whitelisting for simplicity.
# In production, use API Gateway with Auth.
auth_scheme="NONE"
)
# Define the destination object
destination = EventStreamDestination(
name="AWS Lambda Bridge",
type="http",
http=http_destination
)
# Define the Event Stream
# We want to capture all conversation events
event_stream = EventStream(
name=stream_name,
description="Real-time events to AWS EventBridge via Lambda",
destinations=[destination],
# Filter events: You can restrict to specific types
# For now, we use a broad filter for demonstration
event_types=[
"conversation:participant:added",
"conversation:participant:removed",
"conversation:updated",
"interaction:created",
"interaction:updated"
]
)
try:
# Create the event stream
response = event_streams_api.post_event_streams(body=event_stream)
print(f"Created Event Stream: {response.id}")
return response.id
except ApiException as e:
if e.status == 409:
print(f"Event Stream {stream_name} may already exist. Check existing streams.")
else:
raise
Step 3: Retrieve Lambda Function URL
Genesys needs the HTTPS URL of the Lambda function. AWS Lambda has two URL types: the default invoke URL and the newer Function URL. We will use the Function URL as it is more secure and configurable.
def create_lambda_function_url(lambda_name: str, region: str = "us-east-1") -> str:
"""
Creates a Function URL for the Lambda and returns the URL.
"""
lambda_client = boto3.client("lambda", region_name=region)
try:
# Check if URL already exists
lambda_client.get_function_url_config(FunctionName=lambda_name)
print("Function URL already exists.")
except lambda_client.exceptions.ResourceNotFoundException:
print("Creating Function URL...")
response = lambda_client.create_function_url_config(
FunctionName=lambda_name,
Qualifier="$LATEST",
AuthType="NONE", # Use AWS_IAM in production with SigV4
Cors={
"AllowOrigins": ["*"],
"AllowMethods": ["POST"]
}
)
url_config = lambda_client.get_function_url_config(FunctionName=lambda_name)
return url_config["FunctionUrl"]
Complete Working Example
This script combines all steps. It assumes you have set the environment variables for Genesys and AWS.
import os
import sys
import time
from genesyscloud.platform_client import PlatformClient
from genesyscloud.auth import AuthClient
from genesyscloud.rest import ApiException
from genesyscloud.event_streams import EventStreamsApi
from genesyscloud.event_streams.model import (
EventStream,
EventStreamDestination,
EventStreamDestinationHttp
)
import boto3
import json
# --- Configuration ---
GENESYS_CLIENT_ID = os.getenv("GENESYS_CLIENT_ID")
GENESYS_CLIENT_SECRET = os.getenv("GENESYS_CLIENT_SECRET")
GENESYS_REGION = os.getenv("GENESYS_REGION", "mypurecloud.com")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
BUS_NAME = "genesys-events-bus"
LAMBDA_NAME = "genesys-event-bridge-bridge"
STREAM_NAME = "Prod-EventBridge-Stream"
def get_genesys_client() -> PlatformClient:
if not GENESYS_CLIENT_ID or not GENESYS_CLIENT_SECRET:
raise ValueError("Genesys credentials not set.")
platform = PlatformClient()
platform.auth_client = AuthClient(
client_id=GENESYS_CLIENT_ID,
client_secret=GENESYS_CLIENT_SECRET,
region=GENESYS_REGION
)
platform.auth_client.login()
return platform
def setup_aws_infrastructure() -> str:
"""Sets up EventBridge Bus and Lambda, returns Lambda URL."""
events_client = boto3.client("events", region_name=AWS_REGION)
lambda_client = boto3.client("lambda", region_name=AWS_REGION)
# 1. Create EventBridge Bus
try:
events_client.create_event_bus(Name=BUS_NAME)
print(f"Created EventBridge Bus: {BUS_NAME}")
except events_client.exceptions.ResourceAlreadyExistsException:
print(f"EventBridge Bus {BUS_NAME} exists.")
# 2. Create Lambda IAM Role
iam_client = boto3.client("iam", region_name=AWS_REGION)
role_name = f"{LAMBDA_NAME}-role"
try:
iam_client.get_role(RoleName=role_name)
except iam_client.exceptions.NoSuchEntityException:
print(f"Creating IAM Role: {role_name}")
assume_policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]
}
iam_client.create_role(RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_policy))
policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": ["events:PutEvents", "logs:*"], "Resource": "*"}]
}
iam_client.put_role_policy(RoleName=role_name, PolicyName="EventBridgeAccess", PolicyDocument=json.dumps(policy))
role_arn = iam_client.get_role(RoleName=role_name)["Role"]["Arn"]
# 3. Create Lambda Function
lambda_code = f"""
import json
import boto3
def lambda_handler(event, context):
body = json.loads(event.get('body', '[]'))
events_client = boto3.client('events')
entries = []
for evt in body:
entries.append({{
'Source': 'genesys.cloud',
'DetailType': evt.get('type', 'unknown'),
'Detail': json.dumps(evt),
'EventBusName': '{BUS_NAME}'
}})
if entries:
events_client.put_events(Entries=entries)
return {{'statusCode': 200}}
"""
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("index.py", lambda_code)
try:
lambda_client.create_function(
FunctionName=LAMBDA_NAME,
Runtime="python3.9",
Role=role_arn,
Handler="index.lambda_handler",
Code={"ZipFile": zip_buffer.getvalue()}
)
print(f"Created Lambda: {LAMBDA_NAME}")
except lambda_client.exceptions.ResourceConflictException:
print(f"Updating Lambda: {LAMBDA_NAME}")
lambda_client.update_function_code(FunctionName=LAMBDA_NAME, ZipFile=zip_buffer.getvalue())
# 4. Create Function URL
try:
lambda_client.create_function_url_config(
FunctionName=LAMBDA_NAME,
Qualifier="$LATEST",
AuthType="NONE"
)
except lambda_client.exceptions.ResourceConflictException:
pass
url_config = lambda_client.get_function_url_config(FunctionName=LAMBDA_NAME)
return url_config["FunctionUrl"]
def setup_genesys_stream(lambda_url: str):
"""Creates the Genesys Event Stream."""
platform = get_genesys_client()
event_streams_api = EventStreamsApi(platform)
http_dest = EventStreamDestinationHttp(
url=lambda_url,
method="POST",
headers={"Content-Type": "application/json"},
auth_scheme="NONE"
)
dest = EventStreamDestination(
name="AWS Lambda",
type="http",
http=http_dest
)
stream = EventStream(
name=STREAM_NAME,
description="Pushes to AWS EventBridge",
destinations=[dest],
event_types=[
"conversation:participant:added",
"conversation:updated"
]
)
try:
response = event_streams_api.post_event_streams(body=stream)
print(f"Created Genesys Event Stream: {response.id}")
except ApiException as e:
if e.status == 409:
print("Event Stream already exists.")
else:
raise
if __name__ == "__main__":
try:
print("Setting up AWS Infrastructure...")
lambda_url = setup_aws_infrastructure()
print(f"Lambda URL: {lambda_url}")
print("Setting up Genesys Event Stream...")
setup_genesys_stream(lambda_url)
print("Integration complete. Test by starting a conversation in Genesys Cloud.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
Common Errors & Debugging
Error: 401 Unauthorized from Genesys API
- Cause: The OAuth token is expired or the client credentials are incorrect.
- Fix: Ensure
GENESYS_CLIENT_IDandGENESYS_CLIENT_SECRETare correct. The SDK handles refresh, but if the initial login fails, check the scopes. Ensureeventstream:writeis granted to the client.
Error: 403 Forbidden from AWS Lambda
- Cause: The Lambda function does not have permission to write to EventBridge, or the IAM Role is not attached correctly.
- Fix: Verify the IAM Role attached to the Lambda has the
events:PutEventsaction. Check CloudWatch Logs for the Lambda function for detailed error traces.
Error: Event Stream 429 Rate Limit
- Cause: Genesys Cloud limits the number of Event Streams you can create or modify.
- Fix: Wait a few minutes and retry. Ensure you are not creating multiple streams with the same name in a loop.
Error: Lambda Timeout
- Cause: The Lambda function takes too long to process the batch of events.
- Fix: Increase the Lambda timeout in the AWS console. Optimize the Lambda code to use
put_eventswith a larger batch size (up to 10 events per call, but you can call it multiple times if needed).
Error: Genesys Webhook 4xx Response
- Cause: The Lambda Function URL is returning an error (e.g., 502, 500). Genesys will stop sending events if it receives consistent failures.
- Fix: Check the Lambda CloudWatch Logs. Ensure the Lambda returns a 200 status code. Genesys expects a 2xx response. If the Lambda crashes, it returns 502, and Genesys marks the destination as failed.