From e6f84927b3b15b5feaacbf8c948982fd8068baa1 Mon Sep 17 00:00:00 2001 From: feluelle Date: Thu, 10 Jan 2019 19:42:40 +0100 Subject: [PATCH] [AIRFLOW-3552] Add ImapToS3TransferOperator NOTE: This operator only transfers the latest attachment by name. - adds tests [AIRFLOW-3552] Add ImapToS3TransferOperator - add missing template fields - add missing apply_defaults decorator - change s3_conn_id to default to aws_default connection id [AIRFLOW-3602] Rename class and its references [AIRFLOW-3602] Fix imap_to_s3 test class --- .../imap_attachment_to_s3_operator.py | 88 +++++++++++++++++++ docs/code.rst | 1 + .../test_imap_attachment_to_s3_operator.py | 57 ++++++++++++ 3 files changed, 146 insertions(+) create mode 100644 airflow/contrib/operators/imap_attachment_to_s3_operator.py create mode 100644 tests/contrib/operators/test_imap_attachment_to_s3_operator.py diff --git a/airflow/contrib/operators/imap_attachment_to_s3_operator.py b/airflow/contrib/operators/imap_attachment_to_s3_operator.py new file mode 100644 index 0000000000000..6126968c9c761 --- /dev/null +++ b/airflow/contrib/operators/imap_attachment_to_s3_operator.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.imap_hook import ImapHook +from airflow.hooks.S3_hook import S3Hook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class ImapAttachmentToS3Operator(BaseOperator): + """ + Transfers a mail attachment from a mail server into s3 bucket. + + :param imap_attachment_name: The file name of the mail attachment that you want to transfer. + :type imap_attachment_name: str + :param s3_key: The destination file name in the s3 bucket for the attachment. + :type s3_key: str + :param imap_mail_folder: The folder on the mail server to look for the attachment. + :type imap_mail_folder: str + :param imap_check_regex: If set checks the `imap_attachment_name` for a regular expression. + :type imap_check_regex: bool + :param s3_overwrite: If set overwrites the s3 key if already exists. + :type s3_overwrite: bool + :param imap_conn_id: The reference to the connection details of the mail server. + :type imap_conn_id: str + :param s3_conn_id: The reference to the s3 connection details. + :type s3_conn_id: str + """ + template_fields = ('imap_attachment_name', 's3_key') + + @apply_defaults + def __init__(self, + imap_attachment_name, + s3_key, + imap_mail_folder='INBOX', + imap_check_regex=False, + s3_overwrite=False, + imap_conn_id='imap_default', + s3_conn_id='aws_default', + *args, + **kwargs): + super(ImapAttachmentToS3Operator, self).__init__(*args, **kwargs) + self.imap_attachment_name = imap_attachment_name + self.s3_key = s3_key + self.imap_mail_folder = imap_mail_folder + self.imap_check_regex = imap_check_regex + self.s3_overwrite = s3_overwrite + self.imap_conn_id = imap_conn_id + self.s3_conn_id = s3_conn_id + + def execute(self, context): + """ + This function executes the transfer from the email server (via imap) into s3. + + :param context: The context while executing. + :type context: dict + """ + self.log.info( + 'Transferring mail attachment %s from mail server via imap to s3 key %s...', + self.imap_attachment_name, self.s3_key + ) + + with ImapHook(imap_conn_id=self.imap_conn_id) as imap_hook: + imap_mail_attachments = imap_hook.retrieve_mail_attachments( + name=self.imap_attachment_name, + mail_folder=self.imap_mail_folder, + check_regex=self.imap_check_regex, + latest_only=True + ) + + s3_hook = S3Hook(aws_conn_id=self.s3_conn_id) + s3_hook.load_string(string_data=imap_mail_attachments[0][1], key=self.s3_key) diff --git a/docs/code.rst b/docs/code.rst index a670a2d8fd8f8..24a4f49cf055a 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -185,6 +185,7 @@ Operators .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator .. autoclass:: airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator +.. autoclass:: airflow.contrib.operators.imap_attachment_to_s3_operator.ImapAttachmentToS3Operator .. autoclass:: airflow.contrib.operators.jenkins_job_trigger_operator.JenkinsJobTriggerOperator .. autoclass:: airflow.contrib.operators.jira_operator.JiraOperator .. autoclass:: airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator diff --git a/tests/contrib/operators/test_imap_attachment_to_s3_operator.py b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py new file mode 100644 index 0000000000000..0cde76cc94f1a --- /dev/null +++ b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from mock import patch + +from airflow.contrib.operators.imap_attachment_to_s3_operator import ImapAttachmentToS3Operator + + +class TestImapAttachmentToS3Operator(unittest.TestCase): + + def setUp(self): + self.kwargs = dict( + imap_attachment_name='test_file', + s3_key='test_file', + imap_mail_folder='INBOX', + imap_check_regex=False, + s3_overwrite=False, + task_id='test_task', + dag=None + ) + + @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.S3Hook') + @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.ImapHook') + def test_execute(self, mock_imap_hook, mock_s3_hook): + mock_imap_hook.return_value.__enter__ = mock_imap_hook + mock_imap_hook.return_value.retrieve_mail_attachments.return_value = [('test_file', b'Hello World')] + + ImapAttachmentToS3Operator(**self.kwargs).execute(context={}) + + mock_imap_hook.return_value.retrieve_mail_attachments.assert_called_once_with( + name=self.kwargs['imap_attachment_name'], + mail_folder=self.kwargs['imap_mail_folder'], + check_regex=self.kwargs['imap_check_regex'], + latest_only=True + ) + mock_s3_hook.return_value.load_string.assert_called_once_with( + string_data=mock_imap_hook.return_value.retrieve_mail_attachments.return_value[0][1], + key=self.kwargs['s3_key'] + )