Synchronizing Genesys Cloud Analytics Data to Snowflake with Python
What You Will Build
- This script extracts aggregated conversation metrics from Genesys Cloud, converts the payload to columnar Parquet format, uploads it to Amazon S3 with date-based partitioning, loads it into Snowflake, and verifies row parity between source and destination.
- The implementation uses the Genesys Cloud CX Analytics API, the
genesys-cloud-py-sdk, PyArrow,boto3, and the Snowflake Python connector. - The tutorial covers Python 3.9+ with type hints, production-grade error handling, and explicit pagination logic.
Prerequisites
- OAuth Client Type: Private Key or Client Credentials flow. Required scope:
analytics:conversations:read. - SDK Version:
genesys-cloud-py-sdk>=3.0.0 - Runtime: Python 3.9 or higher
- External Dependencies:
pip install httpx pyarrow boto3 snowflake-connector-python genesys-cloud-py-sdk- AWS IAM credentials with
s3:PutObjectpermissions on the target bucket - Snowflake database, schema, warehouse, and a target table matching the Parquet schema
- Environment variables for credentials and configuration
Authentication Setup
The Genesys Cloud Python SDK handles token acquisition and automatic refresh when you initialize the OAuth client. You must configure the regional endpoint and attach the private key before making API calls. The SDK caches the access token in memory and requests a new token when the current one expires.
import os
from purecloud_platform_client import PureCloudPlatformClientV2, Configuration
def initialize_genesys_client() -> PureCloudPlatformClientV2:
"""Create and authenticate a Genesys Cloud platform client."""
config = Configuration()
config.region = os.getenv("GENESYS_REGION", "us-east-1.genesyscloud.com")
client = PureCloudPlatformClientV2(config)
oauth_client = client.oauth_client
# Login using private key authentication
oauth_client.login(
private_key=os.getenv("GENESYS_PRIVATE_KEY"),
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET")
)
return client
The oauth_client.login() call performs a POST to /oauth/token. If the private key is invalid or the client lacks the analytics:conversations:read scope, the SDK raises a purecloud_platform_client.rest.ApiException with a 401 status code. Capture this exception during initialization to fail fast before consuming compute resources.
Implementation
Step 1: Query the Analytics API for Aggregated Metrics
The Analytics Summary Query endpoint returns time-bucketed metrics. You must define the timeGrouping, interval, groupBys, and requested metrics. The endpoint supports pagination via a nextUri field in the response. You must iterate until nextUri is null.
The raw HTTP cycle for this operation appears as follows:
POST /api/v2/analytics/conversations/summary/query HTTP/1.1
Host: {domain}.genesyscloud.com
Authorization: Bearer {access_token}
Content-Type: application/json
{
"timeGrouping": "hour",
"interval": "2024-11-01T00:00:00Z/2024-11-01T23:59:59Z",
"groupBys": ["queue"],
"metrics": ["handleTime", "waitTime", "abandonCount"],
"filter": {
"type": "queue",
"id": "9a8b7c6d-1234-5678-90ab-cdef12345678"
}
}
HTTP/1.1 200 OK
Content-Type: application/json
{
"data": [
{
"interval": "2024-11-01T00:00:00Z/2024-11-01T01:00:00Z",
"groupBys": {"queue": {"id": "9a8b7c6d-1234-5678-90ab-cdef12345678", "name": "Support"}},
"metrics": {
"handleTime": {"value": 125430, "unit": "ms"},
"waitTime": {"value": 45200, "unit": "ms"},
"abandonCount": {"value": 12, "unit": "count"}
}
}
],
"nextUri": "/api/v2/analytics/conversations/summary/query?nextUri=eyJwYWdlIjoyfQ==",
"pageSize": 1000
}
The SDK wrapper handles serialization and deserialization. You must implement pagination and exponential backoff for 429 rate-limit responses.
import time
import httpx
from purecloud_platform_client.rest import ApiException
from typing import List, Dict, Any
def fetch_analytics_data(client: PureCloudPlatformClientV2, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Paginate through Genesys Analytics Summary Query with 429 retry logic."""
all_rows: List[Dict[str, Any]] = []
next_uri: str | None = None
retries: int = 0
max_retries: int = 5
while True:
try:
if next_uri:
# SDK does not expose nextUri pagination directly for this endpoint,
# so we fall back to httpx for pagination control.
base_url = f"https://{client._configuration.region}"
response = httpx.post(
f"{base_url}{next_uri}",
headers={"Authorization": f"Bearer {client.oauth_client.access_token}"},
timeout=30.0
)
response.raise_for_status()
payload = response.json()
else:
# Initial call uses the SDK
payload = client.analytics.get_analytics_conversations_summary_query(body=query_body).to_dict()
next_uri = payload.get("nextUri")
all_rows.extend(payload.get("data", []))
retries = 0 # Reset retry counter on success
if not next_uri:
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and retries < max_retries:
wait_time = 2 ** retries
print(f"Rate limited (429). Retrying in {wait_time} seconds...")
time.sleep(wait_time)
retries += 1
continue
raise
except ApiException as e:
print(f"Genesys API error: {e.status_code} - {e.body}")
raise
return all_rows
Step 2: Transform JSON Responses to Parquet Format
Genesys returns nested dictionaries for groupBys and metrics. PyArrow requires flat structures or explicit schema definitions. You must flatten the payload before conversion to avoid schema inference failures during bulk loads.
import pyarrow as pa
import pyarrow.parquet as pq
from io import BytesIO
from typing import List, Dict, Any
def flatten_row(row: Dict[str, Any]) -> Dict[str, Any]:
"""Flatten nested Genesys analytics payload into a flat dictionary."""
flat = {}
flat["interval_start"] = row.get("interval", "").split("/")[0]
# Extract queue identifier
queue_info = row.get("groupBys", {}).get("queue", {})
flat["queue_id"] = queue_info.get("id")
flat["queue_name"] = queue_info.get("name")
# Extract metrics
metrics = row.get("metrics", {})
for metric_name, metric_value in metrics.items():
flat[f"metric_{metric_name}_value"] = metric_value.get("value")
flat[f"metric_{metric_name}_unit"] = metric_value.get("unit")
return flat
def convert_to_parquet(rows: List[Dict[str, Any]]) -> bytes:
"""Convert flattened rows to Parquet bytes using PyArrow."""
if not rows:
raise ValueError("No data rows to convert to Parquet.")
flat_rows = [flatten_row(r) for r in rows]
table = pa.Table.from_pylist(flat_rows)
buffer = BytesIO()
pq.write_table(table, buffer, compression="snappy")
return buffer.getvalue()
The snappy compression balances CPU usage and storage efficiency. Snowflake natively decompresses Snappy-encoded Parquet files during COPY INTO operations without additional configuration.
Step 3: Stage Files in an S3 Bucket with Partition Keys
Partitioning by date enables Snowflake to prune data during queries. You must construct the S3 key using ISO 8601 date components derived from the analytics interval.
import boto3
import os
from datetime import datetime
from typing import Tuple
def upload_to_s3(parquet_bytes: bytes, s3_client: boto3.client, bucket: str,
interval_start: str, date_partition: Tuple[str, str, str]) -> str:
"""Upload Parquet bytes to S3 with year/month/day partitioning."""
year, month, day = date_partition
key = f"analytics/conversations/year={year}/month={month}/day={day}/{interval_start.replace(':', '-')}.parquet"
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=parquet_bytes,
ContentType="application/octet-stream",
ServerSideEncryption="AES256"
)
return key
The function returns the S3 object key for downstream logging and Snowflake staging references. You must ensure the IAM role attached to the execution environment has s3:PutObject and s3:PutObjectAcl permissions.
Step 4: Trigger a Snowflake COPY Command
The Snowflake Python connector executes SQL directly against the warehouse. You must reference the S3 stage location, specify the Parquet file format, and provide AWS credentials inline or via an external stage definition.
import snowflake.connector
import os
def load_to_snowflake(conn: snowflake.connector.connect, s3_key: str,
target_table: str, aws_key_id: str, aws_secret: str) -> None:
"""Execute COPY INTO from S3 to Snowflake target table."""
cursor = conn.cursor()
copy_sql = f"""
COPY INTO {target_table}
FROM 's3://{os.getenv('S3_BUCKET')}/{s3_key}'
FILE_FORMAT = (TYPE = 'PARQUET' COMPRESSION = 'SNAPPY')
CREDENTIALS = (
AWS_KEY_ID = '{aws_key_id}'
AWS_SECRET_KEY = '{aws_secret}'
)
ON_ERROR = 'CONTINUE';
"""
try:
cursor.execute(copy_sql)
result = cursor.fetchall()
if not result:
print("COPY INTO executed successfully.")
except snowflake.connector.errors.ProgrammingError as e:
print(f"Snowflake COPY failed: {e.msg}")
raise
finally:
cursor.close()
The ON_ERROR = 'CONTINUE' directive prevents schema drift from halting the entire load. You should monitor the LOADS table in Snowflake for row-level rejection details.
Step 5: Validate Row Counts for Data Completeness
Data pipelines fail silently when pagination misses rows or network drops truncate payloads. You must compare the source row count against the destination count immediately after loading.
def validate_row_counts(conn: snowflake.connector.connect, target_table: str,
expected_count: int) -> bool:
"""Verify Snowflake table row count matches Genesys source count."""
cursor = conn.cursor()
try:
cursor.execute(f"SELECT COUNT(*) FROM {target_table};")
result = cursor.fetchone()
actual_count = result[0] if result else 0
if actual_count != expected_count:
print(f"Row count mismatch: Expected {expected_count}, Found {actual_count}")
return False
print(f"Validation passed: {actual_count} rows synchronized.")
return True
finally:
cursor.close()
This validation step catches partial loads caused by transient 429 errors that bypass retry logic or S3 upload failures that do not raise exceptions in certain SDK configurations.
Complete Working Example
The following script combines all components into a single executable module. Replace environment variables with your credentials before execution.
import os
import time
import httpx
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import snowflake.connector
from io import BytesIO
from typing import List, Dict, Any, Tuple
from purecloud_platform_client import PureCloudPlatformClientV2, Configuration
from purecloud_platform_client.rest import ApiException
def initialize_genesys_client() -> PureCloudPlatformClientV2:
config = Configuration()
config.region = os.getenv("GENESYS_REGION", "us-east-1.genesyscloud.com")
client = PureCloudPlatformClientV2(config)
client.oauth_client.login(
private_key=os.getenv("GENESYS_PRIVATE_KEY"),
client_id=os.getenv("GENESYS_CLIENT_ID"),
client_secret=os.getenv("GENESYS_CLIENT_SECRET")
)
return client
def fetch_analytics_data(client: PureCloudPlatformClientV2, query_body: Dict[str, Any]) -> List[Dict[str, Any]]:
all_rows: List[Dict[str, Any]] = []
next_uri: str | None = None
retries: int = 0
max_retries: int = 5
while True:
try:
if next_uri:
base_url = f"https://{client._configuration.region}"
response = httpx.post(
f"{base_url}{next_uri}",
headers={"Authorization": f"Bearer {client.oauth_client.access_token}"},
timeout=30.0
)
response.raise_for_status()
payload = response.json()
else:
payload = client.analytics.get_analytics_conversations_summary_query(body=query_body).to_dict()
next_uri = payload.get("nextUri")
all_rows.extend(payload.get("data", []))
retries = 0
if not next_uri:
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and retries < max_retries:
time.sleep(2 ** retries)
retries += 1
continue
raise
except ApiException as e:
print(f"Genesys API error: {e.status_code} - {e.body}")
raise
return all_rows
def flatten_row(row: Dict[str, Any]) -> Dict[str, Any]:
flat = {}
flat["interval_start"] = row.get("interval", "").split("/")[0]
queue_info = row.get("groupBys", {}).get("queue", {})
flat["queue_id"] = queue_info.get("id")
flat["queue_name"] = queue_info.get("name")
metrics = row.get("metrics", {})
for metric_name, metric_value in metrics.items():
flat[f"metric_{metric_name}_value"] = metric_value.get("value")
flat[f"metric_{metric_name}_unit"] = metric_value.get("unit")
return flat
def convert_to_parquet(rows: List[Dict[str, Any]]) -> bytes:
if not rows:
raise ValueError("No data rows to convert to Parquet.")
flat_rows = [flatten_row(r) for r in rows]
table = pa.Table.from_pylist(flat_rows)
buffer = BytesIO()
pq.write_table(table, buffer, compression="snappy")
return buffer.getvalue()
def upload_to_s3(parquet_bytes: bytes, s3_client: boto3.client, bucket: str,
interval_start: str, date_partition: Tuple[str, str, str]) -> str:
year, month, day = date_partition
key = f"analytics/conversations/year={year}/month={month}/day={day}/{interval_start.replace(':', '-')}.parquet"
s3_client.put_object(Bucket=bucket, Key=key, Body=parquet_bytes, ContentType="application/octet-stream")
return key
def load_to_snowflake(conn, s3_key: str, target_table: str, aws_key_id: str, aws_secret: str) -> None:
cursor = conn.cursor()
copy_sql = f"""
COPY INTO {target_table}
FROM 's3://{os.getenv('S3_BUCKET')}/{s3_key}'
FILE_FORMAT = (TYPE = 'PARQUET' COMPRESSION = 'SNAPPY')
CREDENTIALS = (AWS_KEY_ID = '{aws_key_id}' AWS_SECRET_KEY = '{aws_secret}')
ON_ERROR = 'CONTINUE';
"""
try:
cursor.execute(copy_sql)
except Exception as e:
print(f"Snowflake COPY failed: {e}")
raise
finally:
cursor.close()
def validate_row_counts(conn, target_table: str, expected_count: int) -> bool:
cursor = conn.cursor()
try:
cursor.execute(f"SELECT COUNT(*) FROM {target_table};")
result = cursor.fetchone()
actual_count = result[0] if result else 0
if actual_count != expected_count:
print(f"Row count mismatch: Expected {expected_count}, Found {actual_count}")
return False
return True
finally:
cursor.close()
def main():
# Configuration
query_body = {
"timeGrouping": "hour",
"interval": "2024-11-01T00:00:00Z/2024-11-01T23:59:59Z",
"groupBys": ["queue"],
"metrics": ["handleTime", "waitTime", "abandonCount"],
"filter": {"type": "queue", "id": os.getenv("GENESYS_QUEUE_ID")}
}
# Initialize clients
gc_client = initialize_genesys_client()
s3_client = boto3.client('s3')
sf_conn = snowflake.connector.connect(
user=os.getenv("SNOWFLAKE_USER"),
password=os.getenv("SNOWFLAKE_PASSWORD"),
account=os.getenv("SNOWFLAKE_ACCOUNT"),
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
database=os.getenv("SNOWFLAKE_DATABASE"),
schema=os.getenv("SNOWFLAKE_SCHEMA")
)
# Step 1: Fetch
rows = fetch_analytics_data(gc_client, query_body)
if not rows:
print("No data returned from Genesys Analytics.")
return
# Step 2: Transform
parquet_bytes = convert_to_parquet(rows)
interval_start = rows[0].get("interval", "").split("/")[0]
date_part = tuple(interval_start.split("T")[0].split("-"))
# Step 3: Stage
s3_key = upload_to_s3(
parquet_bytes, s3_client, os.getenv("S3_BUCKET"),
interval_start, date_part
)
# Step 4: Load
load_to_snowflake(
sf_conn, s3_key, os.getenv("SNOWFLAKE_TABLE"),
os.getenv("AWS_ACCESS_KEY_ID"), os.getenv("AWS_SECRET_ACCESS_KEY")
)
# Step 5: Validate
validate_row_counts(sf_conn, os.getenv("SNOWFLAKE_TABLE"), len(rows))
sf_conn.close()
if __name__ == "__main__":
main()
Common Errors & Debugging
Error: 401 Unauthorized or 403 Forbidden
- What causes it: The OAuth client lacks the
analytics:conversations:readscope, or the private key does not match the registered client ID. Genesys also returns 403 when the requested queue ID does not exist in the tenant. - How to fix it: Verify the client credentials in the Genesys Cloud Admin Console under Platform > OAuth. Ensure the scope list includes
analytics:conversations:read. Validate the queue ID against/api/v2/queues. - Code showing the fix:
try: payload = client.analytics.get_analytics_conversations_summary_query(body=query_body).to_dict() except ApiException as e: if e.status_code == 403: print("Access denied. Verify OAuth scopes and queue ID existence.") raise raise
Error: 429 Too Many Requests
- What causes it: Genesys Cloud enforces per-tenant and per-endpoint rate limits. Analytics queries are computationally expensive and trigger stricter throttling than CRUD operations.
- How to fix it: Implement exponential backoff with jitter. The script above uses
time.sleep(2 ** retries). Add a random jitter component for distributed environments. - Code showing the fix:
import random wait_time = (2 ** retries) + random.uniform(0, 1) time.sleep(wait_time)
Error: PyArrow Schema Inference Failure
- What causes it: Mixed data types across pagination boundaries. A metric value may be
nullin one interval and anintegerin another, causing PyArrow to fail type coercion. - How to fix it: Cast columns to explicit types before writing, or use
safe=Trueduring conversion. Define a fixed PyArrow schema and cast the table explicitly. - Code showing the fix:
schema = pa.schema([ ("interval_start", pa.string()), ("queue_id", pa.string()), ("queue_name", pa.string()), ("metric_handleTime_value", pa.float64()), ("metric_handleTime_unit", pa.string()), ("metric_waitTime_value", pa.float64()), ("metric_waitTime_unit", pa.string()), ("metric_abandonCount_value", pa.int64()), ("metric_abandonCount_unit", pa.string()) ]) table = pa.Table.from_pylist(flat_rows, schema=schema)
Error: Snowflake COPY INTO File Not Found
- What causes it: The S3 object key contains trailing slashes, or the IAM role lacks read permissions on the bucket path. Snowflake also rejects files if the external stage credentials are rotated.
- How to fix it: Verify the exact S3 key matches the
FROMclause. Rotate AWS credentials in Snowflake usingALTER STORAGE INTEGRATIONor update the inline credentials. Confirm the bucket policy allows the Snowflake AWS service principal to access the object. - Code showing the fix:
# Verify key format before COPY assert not s3_key.endswith('/'), "S3 key must point to a specific file, not a prefix."