Deduplicating Genesys Cloud EventBridge Integration Events with Python and SQS

Deduplicating Genesys Cloud EventBridge Integration Events with Python and SQS

What You Will Build

  • A Python service that consumes events from an Amazon SQS queue fed by the Genesys Cloud EventBridge integration.
  • Logic to identify and discard duplicate events based on Genesys Cloud id and timestamp fields, preventing downstream processing errors.
  • Implementation using the boto3 SDK for AWS and standard Python libraries for message deduplication.

Prerequisites

  • AWS Account: Access to Amazon SQS, Amazon EventBridge, and AWS Lambda (optional for deployment).
  • Genesys Cloud Account: Admin access to configure the EventBridge integration.
  • Python 3.9+: Installed with pip.
  • Dependencies:
    • boto3 (AWS SDK for Python)
    • botocore (included with boto3)
    • pydantic (optional, for robust data validation)

Authentication Setup

This tutorial assumes you have already configured the Genesys Cloud EventBridge integration in the Genesys Cloud Admin portal. The integration pushes events to an AWS EventBridge bus, which routes them to an SQS FIFO or Standard queue.

Your Python service requires AWS credentials to access the SQS queue. Use one of the following methods:

  1. Environment Variables: Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION.
  2. IAM Role: If running on AWS Lambda or EC2, attach an IAM role with sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes permissions.

No Genesys Cloud OAuth tokens are required for this consumer side, as the integration handles authentication server-to-server.

Implementation

Step 1: Configure AWS SQS Client

Initialize the boto3 client for SQS. We use a FIFO queue (*.fifo) to preserve order, which aids in deduplication logic.

import boto3
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone

class GenesysEventConsumer:
    def __init__(self, queue_url: str, region_name: str = "us-east-1"):
        """
        Initialize the SQS client and queue URL.
        """
        self.sqs = boto3.client('sqs', region_name=region_name)
        self.queue_url = queue_url
        # Cache for deduplication: key is event ID, value is timestamp
        self.processed_events: Dict[str, float] = {}
        # Time window for deduplication (seconds). Events with same ID within this window are duplicates.
        self.dedup_window_seconds = 300  # 5 minutes

    def _get_messages(self) -> List[Dict[str, Any]]:
        """
        Receive messages from the SQS queue.
        """
        try:
            response = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5,
                VisibilityTimeout=30
            )
            messages = response.get('Messages', [])
            return messages
        except Exception as e:
            print(f"Error receiving messages: {e}")
            return []

Step 2: Parse and Validate Event Structure

Genesys Cloud events in EventBridge have a specific structure. We must extract the core event data to identify duplicates.

    def _parse_event(self, body: str) -> Optional[Dict[str, Any]]:
        """
        Parse the SQS message body to extract the Genesys Cloud event.
        Returns None if the event is malformed or not from Genesys.
        """
        try:
            # SQS message body is a JSON string
            message_data = json.loads(body)
            
            # EventBridge structure:
            # {
            #   "id": "unique-event-id",
            #   "source": "com.genesyscloud",
            #   "detail": { ... actual event payload ... }
            # }
            
            source = message_data.get('source')
            if source != 'com.genesyscloud':
                print(f"Ignoring non-Genesys event: {source}")
                return None

            detail = message_data.get('detail', {})
            event_type = detail.get('type')
            if not event_type:
                print("Missing event type in detail")
                return None

            # Extract key fields for deduplication
            # Genesys events often have a 'id' in the detail object
            event_id = detail.get('id')
            # Timestamp is often in 'timestamp' or 'date' field in detail
            timestamp_str = detail.get('timestamp') or detail.get('date')
            
            if not event_id or not timestamp_str:
                print(f"Missing ID or timestamp for event type {event_type}")
                return None

            # Convert timestamp to Unix timestamp for comparison
            try:
                # Genesys timestamps are ISO 8601
                dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                unix_timestamp = dt.timestamp()
            except ValueError:
                print(f"Invalid timestamp format: {timestamp_str}")
                return None

            return {
                'id': event_id,
                'type': event_type,
                'timestamp': unix_timestamp,
                'detail': detail,
                'original_message_id': message_data.get('id') # EventBridge ID
            }
        except json.JSONDecodeError:
            print("Failed to decode JSON body")
            return None
        except Exception as e:
            print(f"Error parsing event: {e}")
            return None

Step 3: Implement Deduplication Logic

This is the core logic. We check if an event with the same id has been processed within the dedup_window_seconds.

    def _is_duplicate(self, event: Dict[str, Any]) -> bool:
        """
        Check if the event is a duplicate based on ID and timestamp window.
        """
        event_id = event['id']
        current_time = time.time()
        
        # Clean up old entries from the cache
        self._cleanup_cache(current_time)
        
        if event_id in self.processed_events:
            last_processed_time = self.processed_events[event_id]
            # If the event ID exists and was processed within the window, it's a duplicate
            if current_time - last_processed_time < self.dedup_window_seconds:
                return True
        
        return False

    def _cleanup_cache(self, current_time: float):
        """
        Remove entries from the processed_events cache that are older than the dedup window.
        """
        expired_keys = [
            k for k, v in self.processed_events.items()
            if current_time - v > self.dedup_window_seconds
        ]
        for key in expired_keys:
            del self.processed_events[key]

    def process_event(self, event: Dict[str, Any]):
        """
        Placeholder for your business logic.
        """
        print(f"Processing event: {event['id']} of type {event['type']}")
        # Example: Store in database, trigger workflow, etc.
        pass

Step 4: Main Processing Loop

Combine receiving, parsing, deduplication, processing, and deletion.

    def run(self):
        """
        Main loop to process messages.
        """
        print("Starting Genesys Event Consumer...")
        while True:
            messages = self._get_messages()
            if not messages:
                time.sleep(1)
                continue

            for message in messages:
                receipt_handle = message['ReceiptHandle']
                body = message['Body']
                
                parsed_event = self._parse_event(body)
                if not parsed_event:
                    # Delete message even if parsing failed to avoid reprocessing garbage
                    self._delete_message(receipt_handle)
                    continue
                
                if self._is_duplicate(parsed_event):
                    print(f"Duplicate event detected: {parsed_event['id']}")
                    self._delete_message(receipt_handle)
                    continue
                
                # Process the event
                try:
                    self.process_event(parsed_event)
                    # Mark as processed in cache
                    self.processed_events[parsed_event['id']] = time.time()
                    # Delete from queue
                    self._delete_message(receipt_handle)
                except Exception as e:
                    print(f"Error processing event {parsed_event['id']}: {e}")
                    # Do not delete message; let it become visible again for retry
                    # Optionally, send to DLQ after N retries using RedrivePolicy

    def _delete_message(self, receipt_handle: str):
        """
        Delete a message from the SQS queue.
        """
        try:
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle
            )
        except Exception as e:
            print(f"Error deleting message: {e}")

Complete Working Example

import boto3
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone

class GenesysEventConsumer:
    def __init__(self, queue_url: str, region_name: str = "us-east-1"):
        self.sqs = boto3.client('sqs', region_name=region_name)
        self.queue_url = queue_url
        self.processed_events: Dict[str, float] = {}
        self.dedup_window_seconds = 300  # 5 minutes

    def _get_messages(self) -> List[Dict[str, Any]]:
        try:
            response = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=5,
                VisibilityTimeout=30
            )
            return response.get('Messages', [])
        except Exception as e:
            print(f"Error receiving messages: {e}")
            return []

    def _parse_event(self, body: str) -> Optional[Dict[str, Any]]:
        try:
            message_data = json.loads(body)
            source = message_data.get('source')
            if source != 'com.genesyscloud':
                print(f"Ignoring non-Genesys event: {source}")
                return None

            detail = message_data.get('detail', {})
            event_type = detail.get('type')
            if not event_type:
                print("Missing event type in detail")
                return None

            event_id = detail.get('id')
            timestamp_str = detail.get('timestamp') or detail.get('date')
            
            if not event_id or not timestamp_str:
                print(f"Missing ID or timestamp for event type {event_type}")
                return None

            try:
                dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                unix_timestamp = dt.timestamp()
            except ValueError:
                print(f"Invalid timestamp format: {timestamp_str}")
                return None

            return {
                'id': event_id,
                'type': event_type,
                'timestamp': unix_timestamp,
                'detail': detail,
                'original_message_id': message_data.get('id')
            }
        except json.JSONDecodeError:
            print("Failed to decode JSON body")
            return None
        except Exception as e:
            print(f"Error parsing event: {e}")
            return None

    def _is_duplicate(self, event: Dict[str, Any]) -> bool:
        event_id = event['id']
        current_time = time.time()
        self._cleanup_cache(current_time)
        
        if event_id in self.processed_events:
            last_processed_time = self.processed_events[event_id]
            if current_time - last_processed_time < self.dedup_window_seconds:
                return True
        return False

    def _cleanup_cache(self, current_time: float):
        expired_keys = [
            k for k, v in self.processed_events.items()
            if current_time - v > self.dedup_window_seconds
        ]
        for key in expired_keys:
            del self.processed_events[key]

    def process_event(self, event: Dict[str, Any]):
        print(f"Processing event: {event['id']} of type {event['type']}")
        # Add your business logic here

    def _delete_message(self, receipt_handle: str):
        try:
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle
            )
        except Exception as e:
            print(f"Error deleting message: {e}")

    def run(self):
        print("Starting Genesys Event Consumer...")
        while True:
            messages = self._get_messages()
            if not messages:
                time.sleep(1)
                continue

            for message in messages:
                receipt_handle = message['ReceiptHandle']
                body = message['Body']
                
                parsed_event = self._parse_event(body)
                if not parsed_event:
                    self._delete_message(receipt_handle)
                    continue
                
                if self._is_duplicate(parsed_event):
                    print(f"Duplicate event detected: {parsed_event['id']}")
                    self._delete_message(receipt_handle)
                    continue
                
                try:
                    self.process_event(parsed_event)
                    self.processed_events[parsed_event['id']] = time.time()
                    self._delete_message(receipt_handle)
                except Exception as e:
                    print(f"Error processing event {parsed_event['id']}: {e}")

if __name__ == "__main__":
    QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/genesys-events.fifo"
    REGION = "us-east-1"
    
    consumer = GenesysEventConsumer(queue_url=QUEUE_URL, region_name=REGION)
    consumer.run()

Common Errors & Debugging

Error: Message Not Deleted (Reprocessing)

  • Cause: The ReceiptHandle is invalid or the message visibility timeout expired before deletion.
  • Fix: Ensure VisibilityTimeout in receive_message is long enough to process the message. If processing takes longer, increase the timeout or use change_message_visibility to extend it.

Error: Duplicate Events Still Occur

  • Cause: The dedup_window_seconds is too short, or the Genesys Cloud event id is not unique across retries.
  • Fix: Increase dedup_window_seconds. Verify that the id field in the detail object is indeed unique per event instance. For some event types, Genesys Cloud may retry with the same id.

Error: JSONDecodeError

  • Cause: The SQS message body is not valid JSON.
  • Fix: Check the SQS queue directly in the AWS Console to inspect the raw message body. Ensure the EventBridge integration is correctly formatting the payload.

Error: Permission Denied (AccessDenied)

  • Cause: The AWS IAM role or user lacks sqs:ReceiveMessage, sqs:DeleteMessage, or sqs:GetQueueAttributes permissions.
  • Fix: Update the IAM policy to include the necessary SQS actions for the specific queue.

Official References