From 3cdaef24716e5f676d391ae4567aaf6ebf219b3f Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Thu, 9 Feb 2023 13:42:53 -0500 Subject: [PATCH 1/6] Add Slack Notification feature --- lambda_function/requirements.txt | 3 +- .../src/file_processor/file_processor.py | 77 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/lambda_function/requirements.txt b/lambda_function/requirements.txt index 81110eb..0d3aa40 100755 --- a/lambda_function/requirements.txt +++ b/lambda_function/requirements.txt @@ -6,4 +6,5 @@ pytest pytest-astropy pytest-cov flake8 -black \ No newline at end of file +black +slack_sdk \ No newline at end of file diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 12b0d95..6fd0340 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -12,6 +12,8 @@ import os import os.path from pathlib import Path +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError # Initialize constants to be parsed from config.yaml MISSION_NAME = "" @@ -114,6 +116,31 @@ def __init__( if self.dry_run: log.warning("Performing Dry Run - Files will not be copied/removed") + try: + # Initialize the slack client + self.slack_client = WebClient(token=os.getenv("SLACK_TOKEN")) + + # Initialize the slack channel + self.slack_channel = os.getenv("SLACK_CHANNEL") + + except SlackApiError as e: + error_code = int(e.response["Error"]["Code"]) + if error_code == 404: + log.error( + { + "status": "ERROR", + "message": "Slack Token is invalid", + } + ) + + except Exception as e: + log.error( + { + "status": "ERROR", + "message": f"Error when initializing slack client: {e}", + } + ) + # Process File self._process_file() @@ -179,6 +206,14 @@ def _process_file(self) -> None: new_file_path, destination_bucket, new_file_key ) + if self.slack_client: + # Send Slack Notification + self._send_slack_notification( + slack_client=self.slack_client, + slack_channel=self.slack_channel, + slack_message=f"File ({new_file_key}) has been successfully processed and uploaded to {destination_bucket}.", + ) + # Log to timeseries database self._log_to_timestream( action_type="PUT", @@ -223,6 +258,48 @@ def _does_object_exists(bucket: str, file_key: str) -> bool: log.info(f"File {file_key} already exists in Bucket {bucket}") return True + @staticmethod + def _send_slack_notification( + slack_client, + slack_channel: str, + slack_message: str, + alert_type: str = "success", + ) -> None: + """ + Function to send a Slack Notification + """ + log.info(f"Sending Slack Notification to {slack_channel}") + try: + color = { + "success": "#2ecc71", + "error": "#ff0000", + } + ct = datetime.datetime.now() + ts = ct.strftime("%y-%m-%d %H:%M:%S") + slack_client.chat_postMessage( + channel=slack_channel, + text=f"{ts} - {slack_message}", + attachments=[ + { + "color": color[alert_type], + "blocks": [ + { + "type": "section", + "text": { + "type": "plain_text", + "text": f"{ts} - {slack_message}", + }, + } + ], + } + ], + ) + + except SlackApiError as e: + log.error( + {"status": "ERROR", "message": f"Error sending Slack Notification: {e}"} + ) + @staticmethod def _generate_file_key(file_key) -> str: """ From c2e2654af3eec579b6c0860e19cb1a2d896935fa Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Thu, 9 Feb 2023 13:47:50 -0500 Subject: [PATCH 2/6] Add error slack notification --- lambda_function/src/file_processor/file_processor.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 6fd0340..8b219dc 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -228,6 +228,14 @@ def _process_file(self) -> None: except Exception as e: log.error(f"Error Processing File: {e}") + if self.slack_client: + # Send Slack Notification + self._send_slack_notification( + slack_client=self.slack_client, + slack_channel=self.slack_channel, + slack_message=f"File ({new_file_key}) has been successfully processed and uploaded to {destination_bucket}.", + alert_type="error" + ) raise e @staticmethod From 921136578c6c6cb3a2ed8c43f2da9b16ffa012d5 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Thu, 9 Feb 2023 13:48:52 -0500 Subject: [PATCH 3/6] Fix Styling --- lambda_function/src/file_processor/file_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 8b219dc..28493c1 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -234,7 +234,7 @@ def _process_file(self) -> None: slack_client=self.slack_client, slack_channel=self.slack_channel, slack_message=f"File ({new_file_key}) has been successfully processed and uploaded to {destination_bucket}.", - alert_type="error" + alert_type="error", ) raise e From c85a8efae7646b1cd91cfc9c8448395a6bcf531b Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Thu, 9 Feb 2023 14:00:22 -0500 Subject: [PATCH 4/6] Update black and fix linting --- lambda_function/src/file_processor/file_processor.py | 3 --- lambda_function/src/lambda.py | 3 --- 2 files changed, 6 deletions(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 28493c1..a8e020c 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -78,7 +78,6 @@ class FileProcessor: def __init__( self, s3_bucket: str, file_key: str, environment: str, dry_run: str = None ) -> None: - # Initialize Class Variables try: self.instrument_bucket_name = s3_bucket @@ -160,7 +159,6 @@ def _process_file(self) -> None: or self.dry_run ): try: - # Parse file key to get instrument name file_key_array = self.file_key.split("/") parsed_file_key = file_key_array[-1] @@ -200,7 +198,6 @@ def _process_file(self) -> None: # Upload file to destination bucket if not a dry run if not self.dry_run: - # Upload file to destination bucket self._upload_file( new_file_path, destination_bucket, new_file_key diff --git a/lambda_function/src/lambda.py b/lambda_function/src/lambda.py index 8ad7a20..f59a60e 100755 --- a/lambda_function/src/lambda.py +++ b/lambda_function/src/lambda.py @@ -34,7 +34,6 @@ def handler(event, context) -> dict: """ # Extract needed information from event try: - environment = os.getenv("LAMBDA_ENVIRONMENT") if environment is None: environment = "DEVELOPMENT" @@ -44,7 +43,6 @@ def handler(event, context) -> dict: # Parse message from SNS Notification for s3_event in records: - # Extract needed information from event s3_bucket = s3_event["s3"]["bucket"]["name"] file_key = s3_event["s3"]["object"]["key"] @@ -84,7 +82,6 @@ def environment_setup(s3_bucket: str, file_key: str, environment: str) -> dict: try: log.info(f"Initializing FileProcessor - Environment: {environment}") if environment == "Production": - # Initialize FileProcessor class FileProcessor( s3_bucket=s3_bucket, file_key=file_key, environment=environment From 15734e8ad07d7035ba92256a3071c83276973e25 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Thu, 9 Feb 2023 14:06:50 -0500 Subject: [PATCH 5/6] Fix line length --- lambda_function/src/file_processor/file_processor.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index a8e020c..c6e45f5 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -208,7 +208,11 @@ def _process_file(self) -> None: self._send_slack_notification( slack_client=self.slack_client, slack_channel=self.slack_channel, - slack_message=f"File ({new_file_key}) has been successfully processed and uploaded to {destination_bucket}.", + slack_message=( + f"File ({new_file_key})" + " has been successfully processed and" + f"uploaded to {destination_bucket}.", + ), ) # Log to timeseries database @@ -230,7 +234,11 @@ def _process_file(self) -> None: self._send_slack_notification( slack_client=self.slack_client, slack_channel=self.slack_channel, - slack_message=f"File ({new_file_key}) has been successfully processed and uploaded to {destination_bucket}.", + slack_message=( + f"File ({new_file_key}) has " + "been successfully processed and " + f"uploaded to {destination_bucket}.", + ), alert_type="error", ) raise e From ac69b00f9c7a46e2bab1e452b280742475ef9702 Mon Sep 17 00:00:00 2001 From: Damian Barrous Dume Date: Thu, 9 Feb 2023 14:46:01 -0500 Subject: [PATCH 6/6] Adjust Slack Message --- lambda_function/src/file_processor/file_processor.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index c6e45f5..4cbfdd9 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -209,8 +209,8 @@ def _process_file(self) -> None: slack_client=self.slack_client, slack_channel=self.slack_channel, slack_message=( - f"File ({new_file_key})" - " has been successfully processed and" + f"File ({new_file_key}) " + "has been successfully processed and " f"uploaded to {destination_bucket}.", ), ) @@ -235,9 +235,8 @@ def _process_file(self) -> None: slack_client=self.slack_client, slack_channel=self.slack_channel, slack_message=( - f"File ({new_file_key}) has " - "been successfully processed and " - f"uploaded to {destination_bucket}.", + f"Error Processing File ({new_file_key})" + f"from {destination_bucket}.", ), alert_type="error", )