From 89569d3b5074c06f606543aa1952eb6762ac177e Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Thu, 28 May 2020 18:24:15 +0200 Subject: [PATCH 01/18] mysql_to_s3_operator --- airflow/operators/mysql_to_s3_operator.py | 125 ++++++++++++++++++ docs/operators-and-hooks-ref.rst | 5 + .../operators/test_mysql_to_s3_operator.py | 61 +++++++++ 3 files changed, 191 insertions(+) create mode 100644 airflow/operators/mysql_to_s3_operator.py create mode 100644 tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py diff --git a/airflow/operators/mysql_to_s3_operator.py b/airflow/operators/mysql_to_s3_operator.py new file mode 100644 index 0000000000000..b0189ae1b1477 --- /dev/null +++ b/airflow/operators/mysql_to_s3_operator.py @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket. +""" +import os +import tempfile +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.mysql.hooks.mysql import MySqlHook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): + """ + Saves data from an specific MySQL query into a file in S3. + :param query: the sql query to be executed. If you want to execute a file, place the absolute path of it, + ending with .sql extension. + :type query: str + :param s3_bucket: bucket where the data will be stored + :type s3_bucket: str + :param s3_key: desired key for the file. It includes the name of the file + :type s3_key: str + :param mysql_conn_id: reference to a specific mysql database + :type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + - False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - path/to/cert/bundle.pem: A filename of the CA cert bundle to use. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + :param pd_csv_kwargs: arguments to include in pd.to_csv (header, index, columns...) + :type pd_csv_kwargs: dict + :param index: wheter to have the index or not in the dataframe + :type index: str + :param header: whether to include header or not into the S3 file + :type header: bool + """ + template_fields = ('s3_key', 'query', 'pd_csv_kwargs',) + template_ext = ('.sql',) + + @apply_defaults + def __init__( + self, + query: str, + s3_bucket: str, + s3_key: str, + mysql_conn_id: str = 'mysql_default', + aws_conn_id: str = 'aws_default', + verify: Optional[Union[bool, str]] = None, + pd_csv_kwargs: dict = None, + index: Optional[bool] = False, + header: Optional[bool] = False, + *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.query = query + self.s3_bucket = s3_bucket + self.s3_key = s3_key + self.mysql_conn_id = mysql_conn_id + self.aws_conn_id = aws_conn_id + self.verify = verify + self.pd_csv_kwargs = pd_csv_kwargs + + if not self.pd_csv_kwargs: + self.pd_csv_kwargs = {} + if "index" not in self.pd_csv_kwargs: + self.pd_csv_kwargs["index"] = index + if "header" not in self.pd_csv_kwargs: + self.pd_csv_kwargs["header"] = header + + def _fix_int_dtypes(self, df): + """ + Mutate DataFrame to set dtypes for int columns containing NaN values." + """ + for col in df: + if "float" in df[col].dtype.name and df[col].hasnans: + # inspect values to determine if dtype of non-null values is int or float + notna_series = df[col].dropna().values + if np.isclose(notna_series, notna_series.astype(int)).all(): + # set to dtype that retains integers and supports NaNs + df[col] = np.where(df[col].isnull(), None, df[col]).astype(pd.Int64Dtype) + + def execute(self, context): + mysql_hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) + s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + data_df = mysql_hook.get_pandas_df(self.query) + self.log.info("Data from MySQL obtained") + + self._fix_int_dtypes(data_df) + with tempfile.NamedTemporaryFile(mode='r+', suffix='.csv') as tmp_csv: + tmp_csv.file.write(data_df.to_csv(**self.pd_csv_kwargs)) + tmp_csv.file.seek(0) + s3_conn.load_file(filename=tmp_csv.name, + key=self.s3_key, + bucket_name=self.s3_bucket) + + if s3_conn.check_for_key(self.s3_key, bucket_name=self.s3_bucket): + file_location = os.path.join(self.s3_bucket, self.s3_key) + self.log.info("File saved correctly in %s", file_location) diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 4432fdd1b1409..a3b517f3b1d18 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -584,6 +584,11 @@ These integrations allow you to copy data from/to Amazon Web Services. - - :mod:`airflow.providers.amazon.aws.operators.sftp_to_s3` + * - `MySQL `__ + - `Amazon Simple Storage Service (S3) `_ + - + - :mod:`airflow.operators.mysql_to_s3_operator` + :ref:`[1] ` Those discovery-based operators use :class:`~airflow.providers.google.common.hooks.discovery_api.GoogleDiscoveryApiHook` to communicate with Google Services via the `Google API Python Client `__. diff --git a/tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py b/tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py new file mode 100644 index 0000000000000..430d4d719e9a5 --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +from unittest import mock + +import pandas as pd + +from airflow.operators.mysql_to_s3_operator import MySQLToS3Operator + + +class TestMySqlToS3Operator(unittest.TestCase): + + @mock.patch("airflow.operators.mysql_to_s3_operator.tempfile.NamedTemporaryFile") + @mock.patch("airflow.operators.mysql_to_s3_operator.S3Hook") + @mock.patch("airflow.operators.mysql_to_s3_operator.MySqlHook") + def test_execute(self, mock_mysql_hook, mock_s3_hook, temp_mock): + query = "query" + s3_bucket = "bucket" + s3_key = "key" + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_mysql_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + op = MySQLToS3Operator(query=query, + s3_bucket=s3_bucket, + s3_key=s3_key, + mysql_conn_id="mysql_conn_id", + aws_conn_id="aws_conn_id", + task_id="task_id", + pd_csv_kwargs={'index': False, 'header': False}, + dag=None + ) + op.execute(None) + mock_mysql_hook.assert_called_once_with(mysql_conn_id="mysql_conn_id") + mock_s3_hook.assert_called_once_with(aws_conn_id="aws_conn_id", verify=None) + + get_pandas_df_mock.assert_called_once_with(query) + + temp_mock.assert_called_once_with(mode='r+', suffix=".csv") + filename = "file" + temp_mock.return_value.__enter__.return_value.name = mock.PropertyMock(return_value=filename) + mock_s3_hook.return_value.load_file.assert_called_once_with(filename=filename, + key=s3_key, + bucket_name=s3_bucket) From 78ea11d77b379f43aa5b5350d075dcd10fe3e9f4 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 10:33:11 +0200 Subject: [PATCH 02/18] moved and blank line --- .../mysql/operators/mysql_to_s3.py} | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) rename airflow/{operators/mysql_to_s3_operator.py => providers/mysql/operators/mysql_to_s3.py} (96%) diff --git a/airflow/operators/mysql_to_s3_operator.py b/airflow/providers/mysql/operators/mysql_to_s3.py similarity index 96% rename from airflow/operators/mysql_to_s3_operator.py rename to airflow/providers/mysql/operators/mysql_to_s3.py index b0189ae1b1477..f1c3178309651 100644 --- a/airflow/operators/mysql_to_s3_operator.py +++ b/airflow/providers/mysql/operators/mysql_to_s3.py @@ -48,10 +48,10 @@ class MySQLToS3Operator(BaseOperator): :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - - False: do not validate SSL certificates. SSL will still be used + - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - - path/to/cert/bundle.pem: A filename of the CA cert bundle to use. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :type verify: bool or str @@ -62,6 +62,7 @@ class MySQLToS3Operator(BaseOperator): :param header: whether to include header or not into the S3 file :type header: bool """ + template_fields = ('s3_key', 'query', 'pd_csv_kwargs',) template_ext = ('.sql',) From f92ffd1ff1b09540263fc33656311c8879951644 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 11:56:48 +0200 Subject: [PATCH 03/18] documentation indent and dict none --- airflow/providers/mysql/operators/mysql_to_s3.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/providers/mysql/operators/mysql_to_s3.py b/airflow/providers/mysql/operators/mysql_to_s3.py index f1c3178309651..78b393566fed0 100644 --- a/airflow/providers/mysql/operators/mysql_to_s3.py +++ b/airflow/providers/mysql/operators/mysql_to_s3.py @@ -48,6 +48,7 @@ class MySQLToS3Operator(BaseOperator): :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: + - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. @@ -75,7 +76,7 @@ def __init__( mysql_conn_id: str = 'mysql_default', aws_conn_id: str = 'aws_default', verify: Optional[Union[bool, str]] = None, - pd_csv_kwargs: dict = None, + pd_csv_kwargs: dict = {}, index: Optional[bool] = False, header: Optional[bool] = False, *args, **kwargs) -> None: @@ -88,8 +89,6 @@ def __init__( self.verify = verify self.pd_csv_kwargs = pd_csv_kwargs - if not self.pd_csv_kwargs: - self.pd_csv_kwargs = {} if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: From 4f04c5244595f536713cfdb41126c856434149d9 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 13:18:32 +0200 Subject: [PATCH 04/18] directories changes --- .../{mysql => amazon/aws}/operators/mysql_to_s3.py | 6 ++++-- .../{test_mysql_to_s3_operator.py => test_mysql_to_s3.py} | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) rename airflow/providers/{mysql => amazon/aws}/operators/mysql_to_s3.py (97%) rename tests/providers/amazon/aws/operators/{test_mysql_to_s3_operator.py => test_mysql_to_s3.py} (87%) diff --git a/airflow/providers/mysql/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py similarity index 97% rename from airflow/providers/mysql/operators/mysql_to_s3.py rename to airflow/providers/amazon/aws/operators/mysql_to_s3.py index 78b393566fed0..9cccff04f1c5d 100644 --- a/airflow/providers/mysql/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -64,7 +64,7 @@ class MySQLToS3Operator(BaseOperator): :type header: bool """ - template_fields = ('s3_key', 'query', 'pd_csv_kwargs',) + template_fields = ('s3_key', 'query',) template_ext = ('.sql',) @apply_defaults @@ -76,7 +76,7 @@ def __init__( mysql_conn_id: str = 'mysql_default', aws_conn_id: str = 'aws_default', verify: Optional[Union[bool, str]] = None, - pd_csv_kwargs: dict = {}, + pd_csv_kwargs: dict = None, index: Optional[bool] = False, header: Optional[bool] = False, *args, **kwargs) -> None: @@ -89,6 +89,8 @@ def __init__( self.verify = verify self.pd_csv_kwargs = pd_csv_kwargs + if not self.pd_csv_kwargs: + self.pd_csv_kwargs = {} if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: diff --git a/tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py b/tests/providers/amazon/aws/operators/test_mysql_to_s3.py similarity index 87% rename from tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py rename to tests/providers/amazon/aws/operators/test_mysql_to_s3.py index 430d4d719e9a5..8f8b9dc389482 100644 --- a/tests/providers/amazon/aws/operators/test_mysql_to_s3_operator.py +++ b/tests/providers/amazon/aws/operators/test_mysql_to_s3.py @@ -21,14 +21,14 @@ import pandas as pd -from airflow.operators.mysql_to_s3_operator import MySQLToS3Operator +from airflow.providers.amazon.aws.operators.mysql_to_s3 import MySQLToS3Operator class TestMySqlToS3Operator(unittest.TestCase): - @mock.patch("airflow.operators.mysql_to_s3_operator.tempfile.NamedTemporaryFile") - @mock.patch("airflow.operators.mysql_to_s3_operator.S3Hook") - @mock.patch("airflow.operators.mysql_to_s3_operator.MySqlHook") + @mock.patch("airflow.providers.amazon.aws.operators.mysql_to_s3.tempfile.NamedTemporaryFile") + @mock.patch("airflow.providers.amazon.aws.operators.mysql_to_s3.S3Hook") + @mock.patch("airflow.providers.amazon.aws.operators.mysql_to_s3.MySqlHook") def test_execute(self, mock_mysql_hook, mock_s3_hook, temp_mock): query = "query" s3_bucket = "bucket" From 52346e3a9c4c70665072c8cafa1ece942a033a37 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 13:47:27 +0200 Subject: [PATCH 05/18] fixes --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 3 +-- docs/operators-and-hooks-ref.rst | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 9cccff04f1c5d..b2913c3737c2c 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -87,9 +87,8 @@ def __init__( self.mysql_conn_id = mysql_conn_id self.aws_conn_id = aws_conn_id self.verify = verify - self.pd_csv_kwargs = pd_csv_kwargs - if not self.pd_csv_kwargs: + if not pd_csv_kwargs: self.pd_csv_kwargs = {} if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index a3b517f3b1d18..0b94021f2a05e 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -587,7 +587,7 @@ These integrations allow you to copy data from/to Amazon Web Services. * - `MySQL `__ - `Amazon Simple Storage Service (S3) `_ - - - :mod:`airflow.operators.mysql_to_s3_operator` + - :mod:`airflow.providers.amazon.aws.operators.mysql_to_s3` :ref:`[1] ` Those discovery-based operators use :class:`~airflow.providers.google.common.hooks.discovery_api.GoogleDiscoveryApiHook` to communicate with Google From 0336eaabf0c7e40e867995f81d5ea1952f9b992d Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 14:08:32 +0200 Subject: [PATCH 06/18] fixes --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index b2913c3737c2c..b23fdf2db8423 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -50,15 +50,14 @@ class MySQLToS3Operator(BaseOperator): You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. + (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. :type verify: bool or str :param pd_csv_kwargs: arguments to include in pd.to_csv (header, index, columns...) :type pd_csv_kwargs: dict - :param index: wheter to have the index or not in the dataframe + :param index: whether to have the index or not in the dataframe :type index: str :param header: whether to include header or not into the S3 file :type header: bool @@ -76,7 +75,7 @@ def __init__( mysql_conn_id: str = 'mysql_default', aws_conn_id: str = 'aws_default', verify: Optional[Union[bool, str]] = None, - pd_csv_kwargs: dict = None, + pd_csv_kwargs: Optional[dict] = None, index: Optional[bool] = False, header: Optional[bool] = False, *args, **kwargs) -> None: From 0cd07e383d69ebf4d0b1ba052ee4e61cd42b7f9e Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 14:26:57 +0200 Subject: [PATCH 07/18] documentation change --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index b23fdf2db8423..e4f6c8cb6ef4d 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -15,9 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Transfer data from MySQL into a S3 bucket. -""" + import os import tempfile from typing import Optional, Union @@ -33,6 +31,8 @@ class MySQLToS3Operator(BaseOperator): """ + Transfer data from MySQL into a S3 bucket. + Saves data from an specific MySQL query into a file in S3. :param query: the sql query to be executed. If you want to execute a file, place the absolute path of it, ending with .sql extension. @@ -54,6 +54,7 @@ class MySQLToS3Operator(BaseOperator): - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. + :type verify: bool or str :param pd_csv_kwargs: arguments to include in pd.to_csv (header, index, columns...) :type pd_csv_kwargs: dict @@ -75,7 +76,7 @@ def __init__( mysql_conn_id: str = 'mysql_default', aws_conn_id: str = 'aws_default', verify: Optional[Union[bool, str]] = None, - pd_csv_kwargs: Optional[dict] = None, + pd_csv_kwargs: dict = None, index: Optional[bool] = False, header: Optional[bool] = False, *args, **kwargs) -> None: From 2637fa3f20a55a5cbc35cc5d5313fd8046dd0c5e Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 14:43:58 +0200 Subject: [PATCH 08/18] docs --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index e4f6c8cb6ef4d..5f56f4ba2baf6 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -31,9 +31,8 @@ class MySQLToS3Operator(BaseOperator): """ - Transfer data from MySQL into a S3 bucket. - Saves data from an specific MySQL query into a file in S3. + :param query: the sql query to be executed. If you want to execute a file, place the absolute path of it, ending with .sql extension. :type query: str From 3f2928c1c130643c05684fc97d20a8dc5d30fb75 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Fri, 29 May 2020 14:44:25 +0200 Subject: [PATCH 09/18] docs and pd_csv_kwargs --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 5f56f4ba2baf6..6a506fd440b29 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -86,8 +86,9 @@ def __init__( self.mysql_conn_id = mysql_conn_id self.aws_conn_id = aws_conn_id self.verify = verify + self.pd_csv_kwargs = pd_csv_kwargs - if not pd_csv_kwargs: + if not self.pd_csv_kwargs: self.pd_csv_kwargs = {} if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index From ac3fa52cfde660837799c3741c773a7a87aea4d7 Mon Sep 17 00:00:00 2001 From: JavierLopezT Date: Sat, 6 Jun 2020 11:42:28 +0200 Subject: [PATCH 10/18] Update airflow/providers/amazon/aws/operators/mysql_to_s3.py Co-authored-by: Tomek Urbaszek --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 6a506fd440b29..bf6a50f5ee971 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -34,7 +34,7 @@ class MySQLToS3Operator(BaseOperator): Saves data from an specific MySQL query into a file in S3. :param query: the sql query to be executed. If you want to execute a file, place the absolute path of it, - ending with .sql extension. + ending with .sql extension. :type query: str :param s3_bucket: bucket where the data will be stored :type s3_bucket: str From 746f894ee26b0dbfe2acb7fa373493cd95c57940 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Sat, 6 Jun 2020 11:52:34 +0200 Subject: [PATCH 11/18] pd_csv_kwargs --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index bf6a50f5ee971..13bce0ea833cf 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -86,10 +86,11 @@ def __init__( self.mysql_conn_id = mysql_conn_id self.aws_conn_id = aws_conn_id self.verify = verify - self.pd_csv_kwargs = pd_csv_kwargs - if not self.pd_csv_kwargs: + if not pd_csv_kwargs: self.pd_csv_kwargs = {} + else: + self.pd_csv_kwargs = pd_csv_kwargs if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: From 1d365688bcb2b1fb49eb9bec90462c2df304c00e Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Sat, 6 Jun 2020 15:28:56 +0200 Subject: [PATCH 12/18] dependencies and docs --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 1 - airflow/providers/dependencies.json | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 13bce0ea833cf..679e6449a445b 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -53,7 +53,6 @@ class MySQLToS3Operator(BaseOperator): - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. - :type verify: bool or str :param pd_csv_kwargs: arguments to include in pd.to_csv (header, index, columns...) :type pd_csv_kwargs: dict diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index b0b6187ec7512..704091c4d0b3a 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -4,6 +4,7 @@ "google", "imap", "mongo", + "mysql", "postgres", "ssh" ], From 9f6ba999cc68ef6d02270000600b19f91d63a09f Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Sat, 6 Jun 2020 18:33:05 +0200 Subject: [PATCH 13/18] contributing and docs --- CONTRIBUTING.rst | 2 +- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 5de901bea31a1..3825a347d107b 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -453,7 +453,7 @@ Here is the list of packages and their extras: ========================== =========================== Package Extras ========================== =========================== -amazon apache.hive,google,imap,mongo,postgres,ssh +amazon apache.hive,google,imap,mongo,mysql,postgres,ssh apache.druid apache.hive apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica apache.livy http diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 679e6449a445b..86a777dca5439 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -49,10 +49,10 @@ class MySQLToS3Operator(BaseOperator): You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be verified. + (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. :type verify: bool or str :param pd_csv_kwargs: arguments to include in pd.to_csv (header, index, columns...) :type pd_csv_kwargs: dict From 1a1bd169824dd5adc974af420b828146736fc788 Mon Sep 17 00:00:00 2001 From: JavierLopezT Date: Tue, 9 Jun 2020 10:07:01 +0200 Subject: [PATCH 14/18] Update airflow/providers/amazon/aws/operators/mysql_to_s3.py Co-authored-by: Tomek Urbaszek --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 86a777dca5439..ff71c3784400f 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -74,7 +74,7 @@ def __init__( mysql_conn_id: str = 'mysql_default', aws_conn_id: str = 'aws_default', verify: Optional[Union[bool, str]] = None, - pd_csv_kwargs: dict = None, + pd_csv_kwargs: Optional[dict] = None, index: Optional[bool] = False, header: Optional[bool] = False, *args, **kwargs) -> None: From 2b5bc17a4cff4b23d3aed3510b2d204216215700 Mon Sep 17 00:00:00 2001 From: JavierLopezT Date: Wed, 10 Jun 2020 12:06:49 +0200 Subject: [PATCH 15/18] Update airflow/providers/amazon/aws/operators/mysql_to_s3.py Co-authored-by: Felix Uellendall --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index ff71c3784400f..223460a940402 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -86,10 +86,7 @@ def __init__( self.aws_conn_id = aws_conn_id self.verify = verify - if not pd_csv_kwargs: - self.pd_csv_kwargs = {} - else: - self.pd_csv_kwargs = pd_csv_kwargs + self.pd_csv_kwargs = pd_csv_kwargs or {} if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: From e79c12c1c32e1c5344e615ecd89d6a32991ba19a Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Thu, 11 Jun 2020 11:24:36 +0200 Subject: [PATCH 16/18] feluelle suggestion --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 223460a940402..6d7f8cd977dff 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -87,6 +87,8 @@ def __init__( self.verify = verify self.pd_csv_kwargs = pd_csv_kwargs or {} + if "path_or_buf" in self.pd_csv_kwargs: + raise AirflowException('The argument path_or_buf is not allowed to be in pd_csv_kwargs, please remove it') if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: @@ -112,8 +114,7 @@ def execute(self, context): self._fix_int_dtypes(data_df) with tempfile.NamedTemporaryFile(mode='r+', suffix='.csv') as tmp_csv: - tmp_csv.file.write(data_df.to_csv(**self.pd_csv_kwargs)) - tmp_csv.file.seek(0) + data_df.to_csv(tmp_csv.name, **self.pd_csv_kwargs) s3_conn.load_file(filename=tmp_csv.name, key=self.s3_key, bucket_name=self.s3_bucket) From db9d442655a792a8cf9d0a31ac48c0869072c5c0 Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Thu, 11 Jun 2020 12:07:35 +0200 Subject: [PATCH 17/18] import airflowexception --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 6d7f8cd977dff..36170b7a4c512 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd +from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.mysql.hooks.mysql import MySqlHook From 2aa0c3654744700def4b2464248032737f301a9d Mon Sep 17 00:00:00 2001 From: "javier.lopez" Date: Thu, 11 Jun 2020 12:30:57 +0200 Subject: [PATCH 18/18] line too long fix --- airflow/providers/amazon/aws/operators/mysql_to_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/mysql_to_s3.py b/airflow/providers/amazon/aws/operators/mysql_to_s3.py index 36170b7a4c512..748e0f99a844e 100644 --- a/airflow/providers/amazon/aws/operators/mysql_to_s3.py +++ b/airflow/providers/amazon/aws/operators/mysql_to_s3.py @@ -89,7 +89,7 @@ def __init__( self.pd_csv_kwargs = pd_csv_kwargs or {} if "path_or_buf" in self.pd_csv_kwargs: - raise AirflowException('The argument path_or_buf is not allowed to be in pd_csv_kwargs, please remove it') + raise AirflowException('The argument path_or_buf is not allowed, please remove it') if "index" not in self.pd_csv_kwargs: self.pd_csv_kwargs["index"] = index if "header" not in self.pd_csv_kwargs: