diff --git a/lambda_function/requirements.txt b/lambda_function/requirements.txt index 81110eb..0d3aa40 100755 --- a/lambda_function/requirements.txt +++ b/lambda_function/requirements.txt @@ -6,4 +6,5 @@ pytest pytest-astropy pytest-cov flake8 -black \ No newline at end of file +black +slack_sdk \ No newline at end of file diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 12b0d95..4cbfdd9 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -12,6 +12,8 @@ import os import os.path from pathlib import Path +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError # Initialize constants to be parsed from config.yaml MISSION_NAME = "" @@ -76,7 +78,6 @@ class FileProcessor: def __init__( self, s3_bucket: str, file_key: str, environment: str, dry_run: str = None ) -> None: - # Initialize Class Variables try: self.instrument_bucket_name = s3_bucket @@ -114,6 +115,31 @@ def __init__( if self.dry_run: log.warning("Performing Dry Run - Files will not be copied/removed") + try: + # Initialize the slack client + self.slack_client = WebClient(token=os.getenv("SLACK_TOKEN")) + + # Initialize the slack channel + self.slack_channel = os.getenv("SLACK_CHANNEL") + + except SlackApiError as e: + error_code = int(e.response["Error"]["Code"]) + if error_code == 404: + log.error( + { + "status": "ERROR", + "message": "Slack Token is invalid", + } + ) + + except Exception as e: + log.error( + { + "status": "ERROR", + "message": f"Error when initializing slack client: {e}", + } + ) + # Process File self._process_file() @@ -133,7 +159,6 @@ def _process_file(self) -> None: or self.dry_run ): try: - # Parse file key to get instrument name file_key_array = self.file_key.split("/") parsed_file_key = file_key_array[-1] @@ -173,12 +198,23 @@ def _process_file(self) -> None: # Upload file to destination bucket if not a dry run if not self.dry_run: - # Upload file to destination bucket self._upload_file( new_file_path, destination_bucket, new_file_key ) + if self.slack_client: + # Send Slack Notification + self._send_slack_notification( + slack_client=self.slack_client, + slack_channel=self.slack_channel, + slack_message=( + f"File ({new_file_key}) " + "has been successfully processed and " + f"uploaded to {destination_bucket}.", + ), + ) + # Log to timeseries database self._log_to_timestream( action_type="PUT", @@ -193,6 +229,17 @@ def _process_file(self) -> None: except Exception as e: log.error(f"Error Processing File: {e}") + if self.slack_client: + # Send Slack Notification + self._send_slack_notification( + slack_client=self.slack_client, + slack_channel=self.slack_channel, + slack_message=( + f"Error Processing File ({new_file_key})" + f"from {destination_bucket}.", + ), + alert_type="error", + ) raise e @staticmethod @@ -223,6 +270,48 @@ def _does_object_exists(bucket: str, file_key: str) -> bool: log.info(f"File {file_key} already exists in Bucket {bucket}") return True + @staticmethod + def _send_slack_notification( + slack_client, + slack_channel: str, + slack_message: str, + alert_type: str = "success", + ) -> None: + """ + Function to send a Slack Notification + """ + log.info(f"Sending Slack Notification to {slack_channel}") + try: + color = { + "success": "#2ecc71", + "error": "#ff0000", + } + ct = datetime.datetime.now() + ts = ct.strftime("%y-%m-%d %H:%M:%S") + slack_client.chat_postMessage( + channel=slack_channel, + text=f"{ts} - {slack_message}", + attachments=[ + { + "color": color[alert_type], + "blocks": [ + { + "type": "section", + "text": { + "type": "plain_text", + "text": f"{ts} - {slack_message}", + }, + } + ], + } + ], + ) + + except SlackApiError as e: + log.error( + {"status": "ERROR", "message": f"Error sending Slack Notification: {e}"} + ) + @staticmethod def _generate_file_key(file_key) -> str: """ diff --git a/lambda_function/src/lambda.py b/lambda_function/src/lambda.py index 8ad7a20..f59a60e 100755 --- a/lambda_function/src/lambda.py +++ b/lambda_function/src/lambda.py @@ -34,7 +34,6 @@ def handler(event, context) -> dict: """ # Extract needed information from event try: - environment = os.getenv("LAMBDA_ENVIRONMENT") if environment is None: environment = "DEVELOPMENT" @@ -44,7 +43,6 @@ def handler(event, context) -> dict: # Parse message from SNS Notification for s3_event in records: - # Extract needed information from event s3_bucket = s3_event["s3"]["bucket"]["name"] file_key = s3_event["s3"]["object"]["key"] @@ -84,7 +82,6 @@ def environment_setup(s3_bucket: str, file_key: str, environment: str) -> dict: try: log.info(f"Initializing FileProcessor - Environment: {environment}") if environment == "Production": - # Initialize FileProcessor class FileProcessor( s3_bucket=s3_bucket, file_key=file_key, environment=environment