Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lambda_function/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pytest
pytest-astropy
pytest-cov
flake8
black
black
slack_sdk
95 changes: 92 additions & 3 deletions lambda_function/src/file_processor/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import os
import os.path
from pathlib import Path
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

# Initialize constants to be parsed from config.yaml
MISSION_NAME = ""
Expand Down Expand Up @@ -76,7 +78,6 @@ class FileProcessor:
def __init__(
self, s3_bucket: str, file_key: str, environment: str, dry_run: str = None
) -> None:

# Initialize Class Variables
try:
self.instrument_bucket_name = s3_bucket
Expand Down Expand Up @@ -114,6 +115,31 @@ def __init__(
if self.dry_run:
log.warning("Performing Dry Run - Files will not be copied/removed")

try:
# Initialize the slack client
self.slack_client = WebClient(token=os.getenv("SLACK_TOKEN"))

# Initialize the slack channel
self.slack_channel = os.getenv("SLACK_CHANNEL")

except SlackApiError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
log.error(
{
"status": "ERROR",
"message": "Slack Token is invalid",
}
)

except Exception as e:
log.error(
{
"status": "ERROR",
"message": f"Error when initializing slack client: {e}",
}
)

# Process File
self._process_file()

Expand All @@ -133,7 +159,6 @@ def _process_file(self) -> None:
or self.dry_run
):
try:

# Parse file key to get instrument name
file_key_array = self.file_key.split("/")
parsed_file_key = file_key_array[-1]
Expand Down Expand Up @@ -173,12 +198,23 @@ def _process_file(self) -> None:

# Upload file to destination bucket if not a dry run
if not self.dry_run:

# Upload file to destination bucket
self._upload_file(
new_file_path, destination_bucket, new_file_key
)

if self.slack_client:
# Send Slack Notification
self._send_slack_notification(
slack_client=self.slack_client,
slack_channel=self.slack_channel,
slack_message=(
f"File ({new_file_key}) "
"has been successfully processed and "
f"uploaded to {destination_bucket}.",
),
)

# Log to timeseries database
self._log_to_timestream(
action_type="PUT",
Expand All @@ -193,6 +229,17 @@ def _process_file(self) -> None:

except Exception as e:
log.error(f"Error Processing File: {e}")
if self.slack_client:
# Send Slack Notification
self._send_slack_notification(
slack_client=self.slack_client,
slack_channel=self.slack_channel,
slack_message=(
f"Error Processing File ({new_file_key})"
f"from {destination_bucket}.",
),
alert_type="error",
)
raise e

@staticmethod
Expand Down Expand Up @@ -223,6 +270,48 @@ def _does_object_exists(bucket: str, file_key: str) -> bool:
log.info(f"File {file_key} already exists in Bucket {bucket}")
return True

@staticmethod
def _send_slack_notification(
slack_client,
slack_channel: str,
slack_message: str,
alert_type: str = "success",
) -> None:
"""
Function to send a Slack Notification
"""
log.info(f"Sending Slack Notification to {slack_channel}")
try:
color = {
"success": "#2ecc71",
"error": "#ff0000",
}
ct = datetime.datetime.now()
ts = ct.strftime("%y-%m-%d %H:%M:%S")
slack_client.chat_postMessage(
channel=slack_channel,
text=f"{ts} - {slack_message}",
attachments=[
{
"color": color[alert_type],
"blocks": [
{
"type": "section",
"text": {
"type": "plain_text",
"text": f"{ts} - {slack_message}",
},
}
],
}
],
)

except SlackApiError as e:
log.error(
{"status": "ERROR", "message": f"Error sending Slack Notification: {e}"}
)

@staticmethod
def _generate_file_key(file_key) -> str:
"""
Expand Down
3 changes: 0 additions & 3 deletions lambda_function/src/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def handler(event, context) -> dict:
"""
# Extract needed information from event
try:

environment = os.getenv("LAMBDA_ENVIRONMENT")
if environment is None:
environment = "DEVELOPMENT"
Expand All @@ -44,7 +43,6 @@ def handler(event, context) -> dict:

# Parse message from SNS Notification
for s3_event in records:

# Extract needed information from event
s3_bucket = s3_event["s3"]["bucket"]["name"]
file_key = s3_event["s3"]["object"]["key"]
Expand Down Expand Up @@ -84,7 +82,6 @@ def environment_setup(s3_bucket: str, file_key: str, environment: str) -> dict:
try:
log.info(f"Initializing FileProcessor - Environment: {environment}")
if environment == "Production":

# Initialize FileProcessor class
FileProcessor(
s3_bucket=s3_bucket, file_key=file_key, environment=environment
Expand Down