From 98ad6bac444e493965762f2e4c66c37753e77dc2 Mon Sep 17 00:00:00 2001 From: mingrammer Date: Wed, 30 Oct 2019 02:42:24 +0900 Subject: [PATCH 1/4] [AIRFLOW-5803] Rename S3Hook to AWSS3Hook and update import paths --- UPDATING.md | 2 +- airflow/contrib/hooks/sagemaker_hook.py | 6 +- airflow/contrib/hooks/wasb_hook.py | 4 +- airflow/contrib/operators/dynamodb_to_s3.py | 4 +- .../imap_attachment_to_s3_operator.py | 4 +- airflow/contrib/operators/mongo_to_s3.py | 4 +- .../operators/s3_copy_object_operator.py | 4 +- .../operators/s3_delete_objects_operator.py | 4 +- airflow/contrib/operators/s3_list_operator.py | 4 +- .../contrib/operators/s3_to_gcs_operator.py | 4 +- .../contrib/operators/s3_to_sftp_operator.py | 4 +- .../contrib/operators/sftp_to_s3_operator.py | 4 +- airflow/hooks/S3_hook.py | 16 ++++- airflow/operators/gcs_to_s3.py | 6 +- .../operators/google_api_to_s3_transfer.py | 4 +- airflow/operators/redshift_to_s3_operator.py | 4 +- .../operators/s3_file_transform_operator.py | 6 +- airflow/operators/s3_to_hive_operator.py | 4 +- airflow/operators/s3_to_redshift_operator.py | 4 +- airflow/providers/aws/hooks/s3.py | 2 +- airflow/sensors/s3_key_sensor.py | 4 +- airflow/sensors/s3_prefix_sensor.py | 4 +- airflow/utils/log/s3_task_handler.py | 6 +- docs/howto/write-logs.rst | 2 +- tests/contrib/hooks/test_sagemaker_hook.py | 12 ++-- .../contrib/operators/test_dynamodb_to_s3.py | 2 +- .../test_imap_attachment_to_s3_operator.py | 2 +- .../operators/test_mongo_to_s3_operator.py | 2 +- .../operators/test_s3_list_operator.py | 2 +- .../operators/test_s3_to_gcs_operator.py | 8 +-- .../operators/test_s3_to_sftp_operator.py | 4 +- .../operators/test_sftp_to_s3_operator.py | 4 +- tests/hooks/test_s3_hook.py | 62 +++++++++---------- tests/operators/test_gcs_to_s3.py | 12 ++-- .../test_google_api_to_s3_transfer.py | 6 +- .../test_s3_file_transform_operator.py | 2 +- tests/operators/test_s3_to_hive_operator.py | 4 +- tests/sensors/test_s3_key_sensor.py | 4 +- tests/sensors/test_s3_prefix_sensor.py | 2 +- tests/test_core_to_contrib.py | 2 +- tests/utils/log/test_s3_task_handler.py | 10 +-- 41 files changed, 132 insertions(+), 118 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 932fb708d3144..984c2ca9cbed5 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -50,7 +50,7 @@ Migrated are: | Old path | New path | | ------------------------------------------------------------ | -------------------------------------------------------- | -| airflow.hooks.S3_hook.S3Hook | airflow.providers.aws.hooks.s3.S3Hook | +| airflow.hooks.S3_hook.S3Hook | airflow.providers.aws.hooks.s3.AWSS3Hook | | airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook | airflow.providers.aws.hooks.athena.AWSAthenaHook | | airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator | airflow.providers.aws.operators.athena.AWSAthenaOperator | | airflow.contrib.sensors.aws_athena_sensor.AthenaSensor | airflow.providers.aws.sensors.athena.AthenaSensor | diff --git a/airflow/contrib/hooks/sagemaker_hook.py b/airflow/contrib/hooks/sagemaker_hook.py index 1b91e74368475..e27d8f2ff46a2 100644 --- a/airflow/contrib/hooks/sagemaker_hook.py +++ b/airflow/contrib/hooks/sagemaker_hook.py @@ -28,7 +28,7 @@ from airflow.contrib.hooks.aws_hook import AwsHook from airflow.contrib.hooks.aws_logs_hook import AwsLogsHook from airflow.exceptions import AirflowException -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils import timezone @@ -130,7 +130,7 @@ class SageMakerHook(AwsHook): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + self.s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id) self.logs_hook = AwsLogsHook(aws_conn_id=self.aws_conn_id) def tar_and_s3_upload(self, path, key, bucket): @@ -187,7 +187,7 @@ def check_s3_url(self, s3url): :type s3url: str :rtype: bool """ - bucket, key = S3Hook.parse_s3_url(s3url) + bucket, key = AWSS3Hook.parse_s3_url(s3url) if not self.s3_hook.check_for_bucket(bucket_name=bucket): raise AirflowException( "The input S3 Bucket {} does not exist ".format(bucket)) diff --git a/airflow/contrib/hooks/wasb_hook.py b/airflow/contrib/hooks/wasb_hook.py index b1eef4aba6ec1..08262fcd0a375 100644 --- a/airflow/contrib/hooks/wasb_hook.py +++ b/airflow/contrib/hooks/wasb_hook.py @@ -102,7 +102,7 @@ def load_file(self, file_path, container_name, blob_name, **kwargs): `BlockBlobService.create_blob_from_path()` takes. :type kwargs: object """ - # Reorder the argument order from airflow.hooks.S3_hook.load_file. + # Reorder the argument order from airflow.providers.aws.hooks.s3.load_file. self.connection.create_blob_from_path(container_name, blob_name, file_path, **kwargs) @@ -120,7 +120,7 @@ def load_string(self, string_data, container_name, blob_name, **kwargs): `BlockBlobService.create_blob_from_text()` takes. :type kwargs: object """ - # Reorder the argument order from airflow.hooks.S3_hook.load_string. + # Reorder the argument order from airflow.providers.aws.hooks.s3.load_string. self.connection.create_blob_from_text(container_name, blob_name, string_data, **kwargs) diff --git a/airflow/contrib/operators/dynamodb_to_s3.py b/airflow/contrib/operators/dynamodb_to_s3.py index 22b86a3c39279..6ff1c261ba0f9 100644 --- a/airflow/contrib/operators/dynamodb_to_s3.py +++ b/airflow/contrib/operators/dynamodb_to_s3.py @@ -32,7 +32,7 @@ from boto.compat import json # type: ignore from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models.baseoperator import BaseOperator @@ -41,7 +41,7 @@ def _convert_item_to_json_bytes(item): def _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix): - s3_client = S3Hook().get_conn() + s3_client = AWSS3Hook().get_conn() file_obj.seek(0) s3_client.upload_file( Filename=file_obj.name, diff --git a/airflow/contrib/operators/imap_attachment_to_s3_operator.py b/airflow/contrib/operators/imap_attachment_to_s3_operator.py index 8e0b9c9c4f974..fe2d2a4d7dbda 100644 --- a/airflow/contrib/operators/imap_attachment_to_s3_operator.py +++ b/airflow/contrib/operators/imap_attachment_to_s3_operator.py @@ -20,7 +20,7 @@ This module allows you to transfer mail attachments from a mail server into s3 bucket. """ from airflow.contrib.hooks.imap_hook import ImapHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -92,7 +92,7 @@ def execute(self, context): mail_filter=self.imap_mail_filter, ) - s3_hook = S3Hook(aws_conn_id=self.s3_conn_id) + s3_hook = AWSS3Hook(aws_conn_id=self.s3_conn_id) s3_hook.load_bytes(bytes_data=imap_mail_attachments[0][1], key=self.s3_key, replace=self.s3_overwrite) diff --git a/airflow/contrib/operators/mongo_to_s3.py b/airflow/contrib/operators/mongo_to_s3.py index 72d2a0c5b79e6..d2bfa5e67ac1f 100644 --- a/airflow/contrib/operators/mongo_to_s3.py +++ b/airflow/contrib/operators/mongo_to_s3.py @@ -21,7 +21,7 @@ from bson import json_util from airflow.contrib.hooks.mongo_hook import MongoHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -73,7 +73,7 @@ def execute(self, context): """ Executed by task_instance at runtime """ - s3_conn = S3Hook(self.s3_conn_id) + s3_conn = AWSS3Hook(self.s3_conn_id) # Grab collection and execute query according to whether or not it is a pipeline if self.is_pipeline: diff --git a/airflow/contrib/operators/s3_copy_object_operator.py b/airflow/contrib/operators/s3_copy_object_operator.py index 3c4e99fea2979..b6799fff033a8 100644 --- a/airflow/contrib/operators/s3_copy_object_operator.py +++ b/airflow/contrib/operators/s3_copy_object_operator.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -90,7 +90,7 @@ def __init__( self.verify = verify def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) s3_hook.copy_object(self.source_bucket_key, self.dest_bucket_key, self.source_bucket_name, self.dest_bucket_name, self.source_version_id) diff --git a/airflow/contrib/operators/s3_delete_objects_operator.py b/airflow/contrib/operators/s3_delete_objects_operator.py index e22e60d29aa26..e951c5930fe8f 100644 --- a/airflow/contrib/operators/s3_delete_objects_operator.py +++ b/airflow/contrib/operators/s3_delete_objects_operator.py @@ -18,7 +18,7 @@ # under the License. from airflow.exceptions import AirflowException -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -75,7 +75,7 @@ def __init__( self.verify = verify def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) response = s3_hook.delete_objects(bucket=self.bucket, keys=self.keys) diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index b47374c4b21ce..9592ff96346cd 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -19,7 +19,7 @@ from typing import Iterable -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -86,7 +86,7 @@ def __init__(self, self.verify = verify def execute(self, context): - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.log.info( 'Getting the list of files from bucket: %s in prefix: %s (Delimiter {%s)', diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py index 0c86e481da95d..fa2f4642018f1 100644 --- a/airflow/contrib/operators/s3_to_gcs_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_operator.py @@ -22,7 +22,7 @@ from airflow.contrib.operators.s3_list_operator import S3ListOperator from airflow.exceptions import AirflowException from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults @@ -176,7 +176,7 @@ def execute(self, context): 'There are no new files to sync. Have a nice day!') if files: - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) for file in files: # GCS hook builds its own in-memory file so we have to create diff --git a/airflow/contrib/operators/s3_to_sftp_operator.py b/airflow/contrib/operators/s3_to_sftp_operator.py index 9fc95788f8091..990796917fa3a 100644 --- a/airflow/contrib/operators/s3_to_sftp_operator.py +++ b/airflow/contrib/operators/s3_to_sftp_operator.py @@ -21,7 +21,7 @@ from urllib.parse import urlparse from airflow.contrib.hooks.ssh_hook import SSHHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -76,7 +76,7 @@ def get_s3_key(s3_key): def execute(self, context): self.s3_key = self.get_s3_key(self.s3_key) ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) - s3_hook = S3Hook(self.s3_conn_id) + s3_hook = AWSS3Hook(self.s3_conn_id) s3_client = s3_hook.get_conn() sftp_client = ssh_hook.get_conn().open_sftp() diff --git a/airflow/contrib/operators/sftp_to_s3_operator.py b/airflow/contrib/operators/sftp_to_s3_operator.py index 5dabb3bfc168c..208150818b37b 100644 --- a/airflow/contrib/operators/sftp_to_s3_operator.py +++ b/airflow/contrib/operators/sftp_to_s3_operator.py @@ -21,7 +21,7 @@ from urllib.parse import urlparse from airflow.contrib.hooks.ssh_hook import SSHHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -77,7 +77,7 @@ def get_s3_key(s3_key): def execute(self, context): self.s3_key = self.get_s3_key(self.s3_key) ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) - s3_hook = S3Hook(self.s3_conn_id) + s3_hook = AWSS3Hook(self.s3_conn_id) sftp_client = ssh_hook.get_conn().open_sftp() diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index f66cd0db3d488..b2a4e01788ffd 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -23,10 +23,24 @@ import warnings # pylint: disable=unused-import -from airflow.providers.aws.hooks.s3 import S3Hook, provide_bucket_name # noqa +from airflow.providers.aws.hooks.s3 import AWSS3Hook, provide_bucket_name # noqa warnings.warn( "This module is deprecated. Please use `airflow.providers.aws.hooks.s3`.", DeprecationWarning, stacklevel=2, ) + + +class S3Hook(AWSS3Hook): + """ + This class is deprecated. Please use `airflow.providers.aws.hooks.s3.AWSS3Hook`. + """ + + def __init__(self, *args, **kwargs): + warnings.warn( + "This class is deprecated. Please use `airflow.providers.aws.hooks.s3.AWSS3Hook`.", + DeprecationWarning, stacklevel=2 + ) + + super().__init__(*args, **kwargs) diff --git a/airflow/operators/gcs_to_s3.py b/airflow/operators/gcs_to_s3.py index b4b41c5fac816..8d2ee60d6825e 100644 --- a/airflow/operators/gcs_to_s3.py +++ b/airflow/operators/gcs_to_s3.py @@ -23,7 +23,7 @@ from airflow.gcp.hooks.gcs import GoogleCloudStorageHook from airflow.gcp.operators.gcs import GoogleCloudStorageListOperator -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults @@ -116,13 +116,13 @@ def __init__(self, # pylint: disable=too-many-arguments def execute(self, context): # use the super to list all files in an Google Cloud Storage bucket files = super().execute(context) - s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) + s3_hook = AWSS3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) if not self.replace: # if we are not replacing -> list all files in the S3 bucket # and only keep those files which are present in # Google Cloud Storage and not in S3 - bucket_name, prefix = S3Hook.parse_s3_url(self.dest_s3_key) + bucket_name, prefix = AWSS3Hook.parse_s3_url(self.dest_s3_key) # look for the bucket and the prefix to avoid look into # parent directories/keys existing_files = s3_hook.list_keys(bucket_name, prefix=prefix) diff --git a/airflow/operators/google_api_to_s3_transfer.py b/airflow/operators/google_api_to_s3_transfer.py index d05ff4a42b372..4948c9e977cb3 100644 --- a/airflow/operators/google_api_to_s3_transfer.py +++ b/airflow/operators/google_api_to_s3_transfer.py @@ -24,7 +24,7 @@ import sys from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.models.xcom import MAX_XCOM_SIZE from airflow.utils.decorators import apply_defaults @@ -155,7 +155,7 @@ def _retrieve_data_from_google_api(self): return google_api_response def _load_data_to_s3(self, data): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id) s3_hook.load_string( string_data=json.dumps(data), key=self.s3_destination_key, diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index 2aac34991fb97..55e6f9df1839d 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -22,7 +22,7 @@ from typing import List, Optional, Union from airflow.hooks.postgres_hook import PostgresHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -98,7 +98,7 @@ def __init__( # pylint: disable=too-many-arguments def execute(self, context): postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) credentials = s3_hook.get_credentials() unload_options = '\n\t\t\t'.join(self.unload_options) diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index bfcb733478f66..ebef0a95071f9 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -23,7 +23,7 @@ from typing import Optional, Union from airflow.exceptions import AirflowException -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -111,9 +111,9 @@ def execute(self, context): raise AirflowException( "Either transform_script or select_expression must be specified") - source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, + source_s3 = AWSS3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify) - dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, + dest_s3 = AWSS3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) self.log.info("Downloading source S3 file %s", self.source_s3_key) diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 9c174e25849ae..84834b77ed039 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.hive_hooks import HiveCliHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.compression import uncompress_file from airflow.utils.decorators import apply_defaults @@ -148,7 +148,7 @@ def __init__( def execute(self, context): # Downloading file from S3 - self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + self.s3 = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) self.log.info("Downloading S3 file") diff --git a/airflow/operators/s3_to_redshift_operator.py b/airflow/operators/s3_to_redshift_operator.py index 0b393edfca1cb..3c3c8da998b9a 100644 --- a/airflow/operators/s3_to_redshift_operator.py +++ b/airflow/operators/s3_to_redshift_operator.py @@ -19,7 +19,7 @@ from typing import List, Optional, Union from airflow.hooks.postgres_hook import PostgresHook -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -85,7 +85,7 @@ def __init__( def execute(self, context): self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) - self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + self.s3 = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) credentials = self.s3.get_credentials() copy_options = '\n\t\t\t'.join(self.copy_options) diff --git a/airflow/providers/aws/hooks/s3.py b/airflow/providers/aws/hooks/s3.py index 7a1fc2daf457e..0fc25023cecf5 100644 --- a/airflow/providers/aws/hooks/s3.py +++ b/airflow/providers/aws/hooks/s3.py @@ -58,7 +58,7 @@ def has_arg(name): return wrapper -class S3Hook(AwsHook): +class AWSS3Hook(AwsHook): """ Interact with AWS S3, using the boto3 library. """ diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py index 3eca9784ad5d0..76f5a8e95f1d1 100644 --- a/airflow/sensors/s3_key_sensor.py +++ b/airflow/sensors/s3_key_sensor.py @@ -88,8 +88,8 @@ def __init__(self, self.verify = verify def poke(self, context): - from airflow.hooks.S3_hook import S3Hook - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + from airflow.providers.aws.hooks.s3 import AWSS3Hook + hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py index 18db735d10cfa..72308fbd32e67 100644 --- a/airflow/sensors/s3_prefix_sensor.py +++ b/airflow/sensors/s3_prefix_sensor.py @@ -73,8 +73,8 @@ def __init__(self, def poke(self, context): self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name) - from airflow.hooks.S3_hook import S3Hook - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + from airflow.providers.aws.hooks.s3 import AWSS3Hook + hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index aa7f608d6974c..f101bdb126418 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -43,11 +43,11 @@ def __init__(self, base_log_folder, s3_log_folder, filename_template): def hook(self): remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID') try: - from airflow.hooks.S3_hook import S3Hook - return S3Hook(remote_conn_id) + from airflow.providers.aws.hooks.s3 import AWSS3Hook + return AWSS3Hook(remote_conn_id) except Exception: self.log.error( - 'Could not create an S3Hook with connection id "%s". ' + 'Could not create an AWSS3Hook with connection id "%s". ' 'Please make sure that airflow[aws] is installed and ' 'the S3 connection exists.', remote_conn_id ) diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst index 38e4d4aa1e7f8..549a9285cee02 100644 --- a/docs/howto/write-logs.rst +++ b/docs/howto/write-logs.rst @@ -65,7 +65,7 @@ To enable this feature, ``airflow.cfg`` must be configured as follows: # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False -In the above example, Airflow will try to use ``S3Hook('MyS3Conn')``. +In the above example, Airflow will try to use ``AWSS3Hook('MyS3Conn')``. .. _write-logs-azure: diff --git a/tests/contrib/hooks/test_sagemaker_hook.py b/tests/contrib/hooks/test_sagemaker_hook.py index 2d9d26b2ed79d..cde30a0024f39 100644 --- a/tests/contrib/hooks/test_sagemaker_hook.py +++ b/tests/contrib/hooks/test_sagemaker_hook.py @@ -29,7 +29,7 @@ LogState, SageMakerHook, secondary_training_status_changed, secondary_training_status_message, ) from airflow.exceptions import AirflowException -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from tests.compat import mock role = 'arn:aws:iam:role/test-role' @@ -254,8 +254,8 @@ def test_multi_stream_iter(self, mock_log_stream): event_iter = hook.multi_stream_iter('log', [None, None, None]) self.assertEqual(next(event_iter), (0, event)) - @mock.patch.object(S3Hook, 'create_bucket') - @mock.patch.object(S3Hook, 'load_file') + @mock.patch.object(AWSS3Hook, 'create_bucket') + @mock.patch.object(AWSS3Hook, 'load_file') def test_configure_s3_resources(self, mock_load_file, mock_create_bucket): hook = SageMakerHook() evaluation_result = { @@ -268,9 +268,9 @@ def test_configure_s3_resources(self, mock_load_file, mock_create_bucket): mock_load_file.assert_called_once_with(path, key, bucket) @mock.patch.object(SageMakerHook, 'get_conn') - @mock.patch.object(S3Hook, 'check_for_key') - @mock.patch.object(S3Hook, 'check_for_bucket') - @mock.patch.object(S3Hook, 'check_for_prefix') + @mock.patch.object(AWSS3Hook, 'check_for_key') + @mock.patch.object(AWSS3Hook, 'check_for_bucket') + @mock.patch.object(AWSS3Hook, 'check_for_prefix') def test_check_s3_url(self, mock_check_prefix, mock_check_bucket, diff --git a/tests/contrib/operators/test_dynamodb_to_s3.py b/tests/contrib/operators/test_dynamodb_to_s3.py index ea09ebaba1ed0..4dfda20f44c54 100644 --- a/tests/contrib/operators/test_dynamodb_to_s3.py +++ b/tests/contrib/operators/test_dynamodb_to_s3.py @@ -45,7 +45,7 @@ def output_queue_to_list(self): items.append(self.output_queue.get()) return items - @patch('airflow.contrib.operators.dynamodb_to_s3.S3Hook') + @patch('airflow.contrib.operators.dynamodb_to_s3.AWSS3Hook') @patch('airflow.contrib.operators.dynamodb_to_s3.AwsDynamoDBHook') def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook): responses = [ diff --git a/tests/contrib/operators/test_imap_attachment_to_s3_operator.py b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py index b563c974a74f6..5e78997d07b12 100644 --- a/tests/contrib/operators/test_imap_attachment_to_s3_operator.py +++ b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py @@ -37,7 +37,7 @@ def setUp(self): dag=None ) - @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.S3Hook') + @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.AWSS3Hook') @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.ImapHook') def test_execute(self, mock_imap_hook, mock_s3_hook): mock_imap_hook.return_value.__enter__ = mock_imap_hook diff --git a/tests/contrib/operators/test_mongo_to_s3_operator.py b/tests/contrib/operators/test_mongo_to_s3_operator.py index fd04f1222316e..7b9acd3918326 100644 --- a/tests/contrib/operators/test_mongo_to_s3_operator.py +++ b/tests/contrib/operators/test_mongo_to_s3_operator.py @@ -84,7 +84,7 @@ def test_render_template(self): ) @mock.patch('airflow.contrib.operators.mongo_to_s3.MongoHook') - @mock.patch('airflow.contrib.operators.mongo_to_s3.S3Hook') + @mock.patch('airflow.contrib.operators.mongo_to_s3.AWSS3Hook') def test_execute(self, mock_s3_hook, mock_mongo_hook): operator = self.mock_operator diff --git a/tests/contrib/operators/test_s3_list_operator.py b/tests/contrib/operators/test_s3_list_operator.py index 5c418ff67bbdb..bd32d4e8b2cc7 100644 --- a/tests/contrib/operators/test_s3_list_operator.py +++ b/tests/contrib/operators/test_s3_list_operator.py @@ -30,7 +30,7 @@ class TestS3ListOperator(unittest.TestCase): - @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') def test_execute(self, mock_hook): mock_hook.return_value.list_keys.return_value = MOCK_FILES diff --git a/tests/contrib/operators/test_s3_to_gcs_operator.py b/tests/contrib/operators/test_s3_to_gcs_operator.py index 42821a755fe88..db83a1b6554bb 100644 --- a/tests/contrib/operators/test_s3_to_gcs_operator.py +++ b/tests/contrib/operators/test_s3_to_gcs_operator.py @@ -51,8 +51,8 @@ def test_init(self): self.assertEqual(operator.gcp_conn_id, GCS_CONN_ID) self.assertEqual(operator.dest_gcs, GCS_PATH_PREFIX) - @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook') - @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.AWSS3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') @mock.patch( 'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook') def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): @@ -86,8 +86,8 @@ def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): # we expect MOCK_FILES to be uploaded self.assertEqual(sorted(MOCK_FILES), sorted(uploaded_files)) - @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook') - @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.AWSS3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') @mock.patch( 'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook') def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): diff --git a/tests/contrib/operators/test_s3_to_sftp_operator.py b/tests/contrib/operators/test_s3_to_sftp_operator.py index 829e57c4d33f6..90ace66d0c00d 100644 --- a/tests/contrib/operators/test_s3_to_sftp_operator.py +++ b/tests/contrib/operators/test_s3_to_sftp_operator.py @@ -61,10 +61,10 @@ class TestS3ToSFTPOperator(unittest.TestCase): @mock_s3 def setUp(self): from airflow.contrib.hooks.ssh_hook import SSHHook - from airflow.hooks.S3_hook import S3Hook + from airflow.providers.aws.hooks.s3 import AWSS3Hook hook = SSHHook(ssh_conn_id='ssh_default') - s3_hook = S3Hook('aws_default') + s3_hook = AWSS3Hook('aws_default') hook.no_host_key_check = True args = { 'owner': 'airflow', diff --git a/tests/contrib/operators/test_sftp_to_s3_operator.py b/tests/contrib/operators/test_sftp_to_s3_operator.py index 1e444ecb76b00..72ce78ee52a1e 100644 --- a/tests/contrib/operators/test_sftp_to_s3_operator.py +++ b/tests/contrib/operators/test_sftp_to_s3_operator.py @@ -26,7 +26,7 @@ from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator from airflow.contrib.operators.ssh_operator import SSHOperator -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import DAG, TaskInstance from airflow.settings import Session from airflow.utils import timezone @@ -62,7 +62,7 @@ class TestSFTPToS3Operator(unittest.TestCase): @mock_s3 def setUp(self): hook = SSHHook(ssh_conn_id='ssh_default') - s3_hook = S3Hook('aws_default') + s3_hook = AWSS3Hook('aws_default') hook.no_host_key_check = True args = { 'owner': 'airflow', diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py index a06b17b4ec003..9991ffe541ff2 100644 --- a/tests/hooks/test_s3_hook.py +++ b/tests/hooks/test_s3_hook.py @@ -23,13 +23,13 @@ from botocore.exceptions import NoCredentialsError -from airflow.hooks.S3_hook import provide_bucket_name +from airflow.providers.aws.hooks.s3 import provide_bucket_name from airflow.models import Connection try: - from airflow.hooks.S3_hook import S3Hook + from airflow.providers.aws.hooks.s3 import AWSS3Hook except ImportError: - S3Hook = None # type: ignore + AWSS3Hook = None # type: ignore try: import boto3 @@ -38,24 +38,24 @@ mock_s3 = None -@unittest.skipIf(S3Hook is None, - "Skipping test because S3Hook is not available") +@unittest.skipIf(AWSS3Hook is None, + "Skipping test because AWSS3Hook is not available") @unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available") -class TestS3Hook(unittest.TestCase): +class TestAWSS3Hook(unittest.TestCase): def setUp(self): self.s3_test_url = "s3://test/this/is/not/a-real-key.txt" def test_parse_s3_url(self): - parsed = S3Hook.parse_s3_url(self.s3_test_url) + parsed = AWSS3Hook.parse_s3_url(self.s3_test_url) self.assertEqual(parsed, ("test", "this/is/not/a-real-key.txt"), "Incorrect parsing of the s3 url") @mock_s3 def test_check_for_bucket(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -63,27 +63,27 @@ def test_check_for_bucket(self): self.assertFalse(hook.check_for_bucket('not-a-bucket')) def test_check_for_bucket_raises_error_with_invalid_conn_id(self): - hook = S3Hook(aws_conn_id="does_not_exist") + hook = AWSS3Hook(aws_conn_id="does_not_exist") with self.assertRaises(NoCredentialsError): hook.check_for_bucket('bucket') @mock_s3 def test_get_bucket(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') self.assertIsNotNone(bucket) @mock_s3 def test_create_bucket_default_region(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @mock_s3 def test_create_bucket_us_standard_region(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket', region_name='us-east-1') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @@ -92,7 +92,7 @@ def test_create_bucket_us_standard_region(self): @mock_s3 def test_create_bucket_other_region(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket', region_name='us-east-2') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @@ -101,7 +101,7 @@ def test_create_bucket_other_region(self): @mock_s3 def test_check_for_prefix(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -112,7 +112,7 @@ def test_check_for_prefix(self): @mock_s3 def test_list_prefixes(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -125,7 +125,7 @@ def test_list_prefixes(self): @mock_s3 def test_list_prefixes_paged(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -142,7 +142,7 @@ def test_list_prefixes_paged(self): @mock_s3 def test_list_keys(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -155,7 +155,7 @@ def test_list_keys(self): @mock_s3 def test_list_keys_paged(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -169,7 +169,7 @@ def test_list_keys_paged(self): @mock_s3 def test_check_for_key(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -180,14 +180,14 @@ def test_check_for_key(self): self.assertFalse(hook.check_for_key('s3://bucket//b')) def test_check_for_key_raises_error_with_invalid_conn_id(self): - hook = S3Hook(aws_conn_id="does_not_exist") + hook = AWSS3Hook(aws_conn_id="does_not_exist") with self.assertRaises(NoCredentialsError): hook.check_for_key('a', 'bucket') @mock_s3 def test_get_key(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -197,7 +197,7 @@ def test_get_key(self): @mock_s3 def test_read_key(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -211,12 +211,12 @@ def test_read_key(self): def test_select_key(self, mock_get_client_type): mock_get_client_type.return_value.select_object_content.return_value = \ {'Payload': [{'Records': {'Payload': b'Cont\xC3\xA9nt'}}]} - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) self.assertEqual(hook.select_key('my_key', 'mybucket'), 'Contént') @mock_s3 def test_check_for_wildcard_key(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='abc', Body=b'a') @@ -233,7 +233,7 @@ def test_check_for_wildcard_key(self): @mock_s3 def test_get_wildcard_key(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='abc', Body=b'a') @@ -256,7 +256,7 @@ def test_get_wildcard_key(self): @mock_s3 def test_load_string(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -269,7 +269,7 @@ def test_load_string(self): @mock_s3 def test_load_bytes(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -282,7 +282,7 @@ def test_load_bytes(self): @mock_s3 def test_load_fileobj(self): - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -295,10 +295,10 @@ def test_load_fileobj(self): self.assertEqual(body, b'Content') - @mock.patch.object(S3Hook, 'get_connection', return_value=Connection(schema='test_bucket')) + @mock.patch.object(AWSS3Hook, 'get_connection', return_value=Connection(schema='test_bucket')) def test_provide_bucket_name(self, mock_get_connection): - class FakeS3Hook(S3Hook): + class FakeAWSS3Hook(AWSS3Hook): @provide_bucket_name def test_function(self, bucket_name=None): @@ -314,7 +314,7 @@ def test_function_with_key(self, key, bucket_name=None): def test_function_with_wildcard_key(self, wildcard_key, bucket_name=None): return bucket_name - fake_s3_hook = FakeS3Hook() + fake_s3_hook = FakeAWSS3Hook() test_bucket_name = fake_s3_hook.test_function() test_bucket_name_with_key = fake_s3_hook.test_function_with_key('test_key') test_bucket_name_with_wildcard_key = fake_s3_hook.test_function_with_wildcard_key('test_*_key') diff --git a/tests/operators/test_gcs_to_s3.py b/tests/operators/test_gcs_to_s3.py index 4e9d2d2a8df5a..5f7a45b26eb83 100644 --- a/tests/operators/test_gcs_to_s3.py +++ b/tests/operators/test_gcs_to_s3.py @@ -19,7 +19,7 @@ import unittest -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator from tests.compat import mock @@ -55,7 +55,7 @@ def test_execute_incremental(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() b.put_object(Key=MOCK_FILES[0], Body=b'testing') @@ -85,7 +85,7 @@ def test_execute_without_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket with all the files - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES] @@ -115,7 +115,7 @@ def test_execute(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket without files - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() @@ -144,7 +144,7 @@ def test_execute_with_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=True) # create dest bucket with all the files - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES] @@ -174,7 +174,7 @@ def test_execute_incremental_with_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=True) # create dest bucket with just two files (the first two files in MOCK_FILES) - hook = S3Hook(aws_conn_id=None) + hook = AWSS3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES[:2]] diff --git a/tests/operators/test_google_api_to_s3_transfer.py b/tests/operators/test_google_api_to_s3_transfer.py index 14cea11c01735..79cc80c7c8651 100644 --- a/tests/operators/test_google_api_to_s3_transfer.py +++ b/tests/operators/test_google_api_to_s3_transfer.py @@ -66,7 +66,7 @@ def setUp(self): } @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') def test_execute(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_api_hook_query): context = {'task_instance': Mock()} @@ -89,7 +89,7 @@ def test_execute(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_ap context['task_instance'].xcom_push.assert_not_called() @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') def test_execute_with_xcom(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_api_hook_query): context = {'task_instance': Mock()} @@ -124,7 +124,7 @@ def test_execute_with_xcom(self, mock_json_dumps, mock_s3_hook_load_string, mock ) @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') @patch('airflow.operators.google_api_to_s3_transfer.sys.getsizeof', return_value=MAX_XCOM_SIZE) def test_execute_with_xcom_exceeded_max_xcom_size( diff --git a/tests/operators/test_s3_file_transform_operator.py b/tests/operators/test_s3_file_transform_operator.py index f7dbed2f6a2bc..6aae9a44788e3 100644 --- a/tests/operators/test_s3_file_transform_operator.py +++ b/tests/operators/test_s3_file_transform_operator.py @@ -112,7 +112,7 @@ def test_execute_with_failing_transform_script(self, mock_Popen): self.assertEqual('Transform script failed: 42', str(e.exception)) - @mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input") + @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook.select_key', return_value="input") @mock_s3 def test_execute_with_select_expression(self, mock_select_key): bucket = "bucket" diff --git a/tests/operators/test_s3_to_hive_operator.py b/tests/operators/test_s3_to_hive_operator.py index d1241981bc560..ecee94f7a6e0c 100644 --- a/tests/operators/test_s3_to_hive_operator.py +++ b/tests/operators/test_s3_to_hive_operator.py @@ -247,7 +247,7 @@ def test_execute_with_select_expression(self, mock_hiveclihook): select_expression = "SELECT * FROM S3Object s" bucket = 'bucket' - # Only testing S3ToHiveTransfer calls S3Hook.select_key with + # Only testing S3ToHiveTransfer calls AWSS3Hook.select_key with # the right parameters and its execute method succeeds here, # since Moto doesn't support select_object_content as of 1.3.2. for (ext, has_header) in product(['.txt', '.gz', '.GZ'], [True, False]): @@ -274,7 +274,7 @@ def test_execute_with_select_expression(self, mock_hiveclihook): input_serialization['CSV']['FileHeaderInfo'] = 'USE' # Confirm that select_key was called with the right params - with mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', + with mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook.select_key', return_value="") as mock_select_key: # Execute S3ToHiveTransfer s32hive = S3ToHiveTransfer(**self.kwargs) diff --git a/tests/sensors/test_s3_key_sensor.py b/tests/sensors/test_s3_key_sensor.py index 70ebfebf389a8..da750d77fbc1f 100644 --- a/tests/sensors/test_s3_key_sensor.py +++ b/tests/sensors/test_s3_key_sensor.py @@ -64,7 +64,7 @@ def test_parse_bucket_key(self, key, bucket, parsed_key, parsed_bucket): self.assertEqual(s.bucket_key, parsed_key) self.assertEqual(s.bucket_name, parsed_bucket) - @mock.patch('airflow.hooks.S3_hook.S3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') def test_poke(self, mock_hook): s = S3KeySensor( task_id='s3_key_sensor', @@ -78,7 +78,7 @@ def test_poke(self, mock_hook): mock_hook.return_value.check_for_key.return_value = True self.assertTrue(s.poke(None)) - @mock.patch('airflow.hooks.S3_hook.S3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') def test_poke_wildcard(self, mock_hook): s = S3KeySensor( task_id='s3_key_sensor', diff --git a/tests/sensors/test_s3_prefix_sensor.py b/tests/sensors/test_s3_prefix_sensor.py index f7a5bfa87d617..40d300226a9c5 100644 --- a/tests/sensors/test_s3_prefix_sensor.py +++ b/tests/sensors/test_s3_prefix_sensor.py @@ -25,7 +25,7 @@ class TestS3PrefixSensor(unittest.TestCase): - @mock.patch('airflow.hooks.S3_hook.S3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') def test_poke(self, mock_hook): s = S3PrefixSensor( task_id='s3_prefix', diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py index d42a548bf1bef..f2696238ed9c9 100644 --- a/tests/test_core_to_contrib.py +++ b/tests/test_core_to_contrib.py @@ -133,7 +133,7 @@ "airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook", ), ( - "airflow.providers.aws.hooks.s3.S3Hook", + "airflow.providers.aws.hooks.s3.AWSS3Hook", "airflow.hooks.S3_hook.S3Hook", ), ] diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index 2557d796bdca4..0a78c029b5b79 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -21,7 +21,7 @@ import unittest from unittest import mock -from airflow.hooks.S3_hook import S3Hook +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import DAG, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.utils.log.s3_task_handler import S3TaskHandler @@ -76,18 +76,18 @@ def tearDown(self): pass def test_hook(self): - self.assertIsInstance(self.s3_task_handler.hook, S3Hook) + self.assertIsInstance(self.s3_task_handler.hook, AWSS3Hook) def test_hook_raises(self): handler = self.s3_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + with mock.patch("airflow.providers.aws.hooks.s3.AWSS3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') # Initialize the hook handler.hook mock_error.assert_called_once_with( - 'Could not create an S3Hook with connection id "%s". Please make ' + 'Could not create an AWSS3Hook with connection id "%s". Please make ' 'sure that airflow[aws] is installed and the S3 connection exists.', '' ) @@ -103,7 +103,7 @@ def test_log_exists_raises(self): self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo')) def test_log_exists_no_hook(self): - with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook: + with mock.patch("airflow.providers.aws.hooks.s3.AWSS3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) From cf251e3b5231c0b7e414fcf961055e178082e38b Mon Sep 17 00:00:00 2001 From: mingrammer Date: Wed, 30 Oct 2019 03:01:02 +0900 Subject: [PATCH 2/4] Fix import sort with isort --- airflow/contrib/operators/dynamodb_to_s3.py | 2 +- airflow/contrib/operators/imap_attachment_to_s3_operator.py | 2 +- airflow/contrib/operators/mongo_to_s3.py | 2 +- airflow/contrib/operators/s3_copy_object_operator.py | 2 +- airflow/contrib/operators/s3_delete_objects_operator.py | 2 +- airflow/contrib/operators/s3_list_operator.py | 2 +- airflow/contrib/operators/s3_to_sftp_operator.py | 2 +- airflow/contrib/operators/sftp_to_s3_operator.py | 2 +- airflow/operators/google_api_to_s3_transfer.py | 2 +- airflow/operators/redshift_to_s3_operator.py | 2 +- airflow/operators/s3_file_transform_operator.py | 6 +++--- airflow/operators/s3_to_hive_operator.py | 2 +- airflow/operators/s3_to_redshift_operator.py | 2 +- tests/contrib/operators/test_sftp_to_s3_operator.py | 2 +- tests/hooks/test_s3_hook.py | 2 +- tests/operators/test_gcs_to_s3.py | 2 +- tests/utils/log/test_s3_task_handler.py | 2 +- 17 files changed, 19 insertions(+), 19 deletions(-) diff --git a/airflow/contrib/operators/dynamodb_to_s3.py b/airflow/contrib/operators/dynamodb_to_s3.py index 6ff1c261ba0f9..4fa394c104776 100644 --- a/airflow/contrib/operators/dynamodb_to_s3.py +++ b/airflow/contrib/operators/dynamodb_to_s3.py @@ -32,8 +32,8 @@ from boto.compat import json # type: ignore from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models.baseoperator import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook def _convert_item_to_json_bytes(item): diff --git a/airflow/contrib/operators/imap_attachment_to_s3_operator.py b/airflow/contrib/operators/imap_attachment_to_s3_operator.py index fe2d2a4d7dbda..b7a232375eafd 100644 --- a/airflow/contrib/operators/imap_attachment_to_s3_operator.py +++ b/airflow/contrib/operators/imap_attachment_to_s3_operator.py @@ -20,8 +20,8 @@ This module allows you to transfer mail attachments from a mail server into s3 bucket. """ from airflow.contrib.hooks.imap_hook import ImapHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/mongo_to_s3.py b/airflow/contrib/operators/mongo_to_s3.py index d2bfa5e67ac1f..265eb03172a6d 100644 --- a/airflow/contrib/operators/mongo_to_s3.py +++ b/airflow/contrib/operators/mongo_to_s3.py @@ -21,8 +21,8 @@ from bson import json_util from airflow.contrib.hooks.mongo_hook import MongoHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/s3_copy_object_operator.py b/airflow/contrib/operators/s3_copy_object_operator.py index b6799fff033a8..00d80370ead4b 100644 --- a/airflow/contrib/operators/s3_copy_object_operator.py +++ b/airflow/contrib/operators/s3_copy_object_operator.py @@ -17,8 +17,8 @@ # specific language governing permissions and limitations # under the License. -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/s3_delete_objects_operator.py b/airflow/contrib/operators/s3_delete_objects_operator.py index e951c5930fe8f..fcb11c56b3178 100644 --- a/airflow/contrib/operators/s3_delete_objects_operator.py +++ b/airflow/contrib/operators/s3_delete_objects_operator.py @@ -18,8 +18,8 @@ # under the License. from airflow.exceptions import AirflowException -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index 9592ff96346cd..4d52cc78a1637 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -19,8 +19,8 @@ from typing import Iterable -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/s3_to_sftp_operator.py b/airflow/contrib/operators/s3_to_sftp_operator.py index 990796917fa3a..05927d9f44abb 100644 --- a/airflow/contrib/operators/s3_to_sftp_operator.py +++ b/airflow/contrib/operators/s3_to_sftp_operator.py @@ -21,8 +21,8 @@ from urllib.parse import urlparse from airflow.contrib.hooks.ssh_hook import SSHHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/operators/sftp_to_s3_operator.py b/airflow/contrib/operators/sftp_to_s3_operator.py index 208150818b37b..7195730e44587 100644 --- a/airflow/contrib/operators/sftp_to_s3_operator.py +++ b/airflow/contrib/operators/sftp_to_s3_operator.py @@ -21,8 +21,8 @@ from urllib.parse import urlparse from airflow.contrib.hooks.ssh_hook import SSHHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/google_api_to_s3_transfer.py b/airflow/operators/google_api_to_s3_transfer.py index 4948c9e977cb3..0f12f535c0031 100644 --- a/airflow/operators/google_api_to_s3_transfer.py +++ b/airflow/operators/google_api_to_s3_transfer.py @@ -24,9 +24,9 @@ import sys from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator from airflow.models.xcom import MAX_XCOM_SIZE +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index 55e6f9df1839d..e088ca02a10e7 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -22,8 +22,8 @@ from typing import List, Optional, Union from airflow.hooks.postgres_hook import PostgresHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index ebef0a95071f9..343d191ea93cd 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -23,8 +23,8 @@ from typing import Optional, Union from airflow.exceptions import AirflowException -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults @@ -112,9 +112,9 @@ def execute(self, context): "Either transform_script or select_expression must be specified") source_s3 = AWSS3Hook(aws_conn_id=self.source_aws_conn_id, - verify=self.source_verify) + verify=self.source_verify) dest_s3 = AWSS3Hook(aws_conn_id=self.dest_aws_conn_id, - verify=self.dest_verify) + verify=self.dest_verify) self.log.info("Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key): diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 84834b77ed039..ab6ac06a1f4c9 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -26,8 +26,8 @@ from airflow.exceptions import AirflowException from airflow.hooks.hive_hooks import HiveCliHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.compression import uncompress_file from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory diff --git a/airflow/operators/s3_to_redshift_operator.py b/airflow/operators/s3_to_redshift_operator.py index 3c3c8da998b9a..f203a0ae9c0aa 100644 --- a/airflow/operators/s3_to_redshift_operator.py +++ b/airflow/operators/s3_to_redshift_operator.py @@ -19,8 +19,8 @@ from typing import List, Optional, Union from airflow.hooks.postgres_hook import PostgresHook -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import BaseOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.decorators import apply_defaults diff --git a/tests/contrib/operators/test_sftp_to_s3_operator.py b/tests/contrib/operators/test_sftp_to_s3_operator.py index 72ce78ee52a1e..99411cce0fa04 100644 --- a/tests/contrib/operators/test_sftp_to_s3_operator.py +++ b/tests/contrib/operators/test_sftp_to_s3_operator.py @@ -26,8 +26,8 @@ from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator from airflow.contrib.operators.ssh_operator import SSHOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import DAG, TaskInstance +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.settings import Session from airflow.utils import timezone from airflow.utils.timezone import datetime diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py index 9991ffe541ff2..bd28afe96b78b 100644 --- a/tests/hooks/test_s3_hook.py +++ b/tests/hooks/test_s3_hook.py @@ -23,8 +23,8 @@ from botocore.exceptions import NoCredentialsError -from airflow.providers.aws.hooks.s3 import provide_bucket_name from airflow.models import Connection +from airflow.providers.aws.hooks.s3 import provide_bucket_name try: from airflow.providers.aws.hooks.s3 import AWSS3Hook diff --git a/tests/operators/test_gcs_to_s3.py b/tests/operators/test_gcs_to_s3.py index 5f7a45b26eb83..3e5b3751d0b74 100644 --- a/tests/operators/test_gcs_to_s3.py +++ b/tests/operators/test_gcs_to_s3.py @@ -19,8 +19,8 @@ import unittest -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from tests.compat import mock try: diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index 0a78c029b5b79..b2db98c08621e 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -21,9 +21,9 @@ import unittest from unittest import mock -from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.models import DAG, TaskInstance from airflow.operators.dummy_operator import DummyOperator +from airflow.providers.aws.hooks.s3 import AWSS3Hook from airflow.utils.log.s3_task_handler import S3TaskHandler from airflow.utils.state import State from airflow.utils.timezone import datetime From e12b99f8a1fb8a04f33aa813ac14aac7a6cc310e Mon Sep 17 00:00:00 2001 From: mingrammer Date: Tue, 5 Nov 2019 23:21:37 +0900 Subject: [PATCH 3/4] Rename AWSS3Hook back to S3Hook --- UPDATING.md | 2 +- airflow/contrib/hooks/sagemaker_hook.py | 6 +- airflow/contrib/operators/dynamodb_to_s3.py | 4 +- .../imap_attachment_to_s3_operator.py | 4 +- airflow/contrib/operators/mongo_to_s3.py | 4 +- .../operators/s3_copy_object_operator.py | 4 +- .../operators/s3_delete_objects_operator.py | 4 +- airflow/contrib/operators/s3_list_operator.py | 4 +- .../contrib/operators/s3_to_gcs_operator.py | 4 +- .../contrib/operators/s3_to_sftp_operator.py | 4 +- .../contrib/operators/sftp_to_s3_operator.py | 4 +- airflow/hooks/S3_hook.py | 16 +---- airflow/operators/gcs_to_s3.py | 6 +- .../operators/google_api_to_s3_transfer.py | 4 +- airflow/operators/redshift_to_s3_operator.py | 4 +- .../operators/s3_file_transform_operator.py | 6 +- airflow/operators/s3_to_hive_operator.py | 4 +- airflow/operators/s3_to_redshift_operator.py | 4 +- airflow/providers/aws/hooks/s3.py | 2 +- airflow/sensors/s3_key_sensor.py | 4 +- airflow/sensors/s3_prefix_sensor.py | 4 +- airflow/utils/log/s3_task_handler.py | 6 +- docs/howto/write-logs.rst | 2 +- tests/contrib/hooks/test_sagemaker_hook.py | 12 ++-- .../contrib/operators/test_dynamodb_to_s3.py | 2 +- .../test_imap_attachment_to_s3_operator.py | 2 +- .../operators/test_mongo_to_s3_operator.py | 2 +- .../operators/test_s3_list_operator.py | 2 +- .../operators/test_s3_to_gcs_operator.py | 8 +-- .../operators/test_s3_to_sftp_operator.py | 4 +- .../operators/test_sftp_to_s3_operator.py | 4 +- tests/hooks/test_s3_hook.py | 60 +++++++++---------- tests/operators/test_gcs_to_s3.py | 12 ++-- .../test_google_api_to_s3_transfer.py | 6 +- .../test_s3_file_transform_operator.py | 2 +- tests/operators/test_s3_to_hive_operator.py | 4 +- tests/sensors/test_s3_key_sensor.py | 4 +- tests/sensors/test_s3_prefix_sensor.py | 2 +- tests/test_core_to_contrib.py | 2 +- tests/utils/log/test_s3_task_handler.py | 10 ++-- 40 files changed, 115 insertions(+), 129 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 984c2ca9cbed5..11a5a4b18dc36 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -50,7 +50,7 @@ Migrated are: | Old path | New path | | ------------------------------------------------------------ | -------------------------------------------------------- | -| airflow.hooks.S3_hook.S3Hook | airflow.providers.aws.hooks.s3.AWSS3Hook | +| airflow.hooks.S3_hook.S3Hook | airflow.providers.aws.hooks.s3.S3Hook | | airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook | airflow.providers.aws.hooks.athena.AWSAthenaHook | | airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator | airflow.providers.aws.operators.athena.AWSAthenaOperator | | airflow.contrib.sensors.aws_athena_sensor.AthenaSensor | airflow.providers.aws.sensors.athena.AthenaSensor | diff --git a/airflow/contrib/hooks/sagemaker_hook.py b/airflow/contrib/hooks/sagemaker_hook.py index e27d8f2ff46a2..9106cba892737 100644 --- a/airflow/contrib/hooks/sagemaker_hook.py +++ b/airflow/contrib/hooks/sagemaker_hook.py @@ -28,7 +28,7 @@ from airflow.contrib.hooks.aws_hook import AwsHook from airflow.contrib.hooks.aws_logs_hook import AwsLogsHook from airflow.exceptions import AirflowException -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils import timezone @@ -130,7 +130,7 @@ class SageMakerHook(AwsHook): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id) + self.s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) self.logs_hook = AwsLogsHook(aws_conn_id=self.aws_conn_id) def tar_and_s3_upload(self, path, key, bucket): @@ -187,7 +187,7 @@ def check_s3_url(self, s3url): :type s3url: str :rtype: bool """ - bucket, key = AWSS3Hook.parse_s3_url(s3url) + bucket, key = S3Hook.parse_s3_url(s3url) if not self.s3_hook.check_for_bucket(bucket_name=bucket): raise AirflowException( "The input S3 Bucket {} does not exist ".format(bucket)) diff --git a/airflow/contrib/operators/dynamodb_to_s3.py b/airflow/contrib/operators/dynamodb_to_s3.py index 4fa394c104776..f9058bdb04ea1 100644 --- a/airflow/contrib/operators/dynamodb_to_s3.py +++ b/airflow/contrib/operators/dynamodb_to_s3.py @@ -33,7 +33,7 @@ from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook from airflow.models.baseoperator import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook def _convert_item_to_json_bytes(item): @@ -41,7 +41,7 @@ def _convert_item_to_json_bytes(item): def _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix): - s3_client = AWSS3Hook().get_conn() + s3_client = S3Hook().get_conn() file_obj.seek(0) s3_client.upload_file( Filename=file_obj.name, diff --git a/airflow/contrib/operators/imap_attachment_to_s3_operator.py b/airflow/contrib/operators/imap_attachment_to_s3_operator.py index b7a232375eafd..170019c1a952f 100644 --- a/airflow/contrib/operators/imap_attachment_to_s3_operator.py +++ b/airflow/contrib/operators/imap_attachment_to_s3_operator.py @@ -21,7 +21,7 @@ """ from airflow.contrib.hooks.imap_hook import ImapHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -92,7 +92,7 @@ def execute(self, context): mail_filter=self.imap_mail_filter, ) - s3_hook = AWSS3Hook(aws_conn_id=self.s3_conn_id) + s3_hook = S3Hook(aws_conn_id=self.s3_conn_id) s3_hook.load_bytes(bytes_data=imap_mail_attachments[0][1], key=self.s3_key, replace=self.s3_overwrite) diff --git a/airflow/contrib/operators/mongo_to_s3.py b/airflow/contrib/operators/mongo_to_s3.py index 265eb03172a6d..ca9005d1a16a1 100644 --- a/airflow/contrib/operators/mongo_to_s3.py +++ b/airflow/contrib/operators/mongo_to_s3.py @@ -22,7 +22,7 @@ from airflow.contrib.hooks.mongo_hook import MongoHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -73,7 +73,7 @@ def execute(self, context): """ Executed by task_instance at runtime """ - s3_conn = AWSS3Hook(self.s3_conn_id) + s3_conn = S3Hook(self.s3_conn_id) # Grab collection and execute query according to whether or not it is a pipeline if self.is_pipeline: diff --git a/airflow/contrib/operators/s3_copy_object_operator.py b/airflow/contrib/operators/s3_copy_object_operator.py index 00d80370ead4b..b228610af6a6f 100644 --- a/airflow/contrib/operators/s3_copy_object_operator.py +++ b/airflow/contrib/operators/s3_copy_object_operator.py @@ -18,7 +18,7 @@ # under the License. from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -90,7 +90,7 @@ def __init__( self.verify = verify def execute(self, context): - s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) s3_hook.copy_object(self.source_bucket_key, self.dest_bucket_key, self.source_bucket_name, self.dest_bucket_name, self.source_version_id) diff --git a/airflow/contrib/operators/s3_delete_objects_operator.py b/airflow/contrib/operators/s3_delete_objects_operator.py index fcb11c56b3178..56a7927602a02 100644 --- a/airflow/contrib/operators/s3_delete_objects_operator.py +++ b/airflow/contrib/operators/s3_delete_objects_operator.py @@ -19,7 +19,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -75,7 +75,7 @@ def __init__( self.verify = verify def execute(self, context): - s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) response = s3_hook.delete_objects(bucket=self.bucket, keys=self.keys) diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index 4d52cc78a1637..7dd0f8081a067 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -20,7 +20,7 @@ from typing import Iterable from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -86,7 +86,7 @@ def __init__(self, self.verify = verify def execute(self, context): - hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.log.info( 'Getting the list of files from bucket: %s in prefix: %s (Delimiter {%s)', diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py index fa2f4642018f1..3e8404fd78078 100644 --- a/airflow/contrib/operators/s3_to_gcs_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_operator.py @@ -22,7 +22,7 @@ from airflow.contrib.operators.s3_list_operator import S3ListOperator from airflow.exceptions import AirflowException from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -176,7 +176,7 @@ def execute(self, context): 'There are no new files to sync. Have a nice day!') if files: - hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) for file in files: # GCS hook builds its own in-memory file so we have to create diff --git a/airflow/contrib/operators/s3_to_sftp_operator.py b/airflow/contrib/operators/s3_to_sftp_operator.py index 05927d9f44abb..f86150af360b4 100644 --- a/airflow/contrib/operators/s3_to_sftp_operator.py +++ b/airflow/contrib/operators/s3_to_sftp_operator.py @@ -22,7 +22,7 @@ from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -76,7 +76,7 @@ def get_s3_key(s3_key): def execute(self, context): self.s3_key = self.get_s3_key(self.s3_key) ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) - s3_hook = AWSS3Hook(self.s3_conn_id) + s3_hook = S3Hook(self.s3_conn_id) s3_client = s3_hook.get_conn() sftp_client = ssh_hook.get_conn().open_sftp() diff --git a/airflow/contrib/operators/sftp_to_s3_operator.py b/airflow/contrib/operators/sftp_to_s3_operator.py index 7195730e44587..b88cdb107c4f1 100644 --- a/airflow/contrib/operators/sftp_to_s3_operator.py +++ b/airflow/contrib/operators/sftp_to_s3_operator.py @@ -22,7 +22,7 @@ from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -77,7 +77,7 @@ def get_s3_key(s3_key): def execute(self, context): self.s3_key = self.get_s3_key(self.s3_key) ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) - s3_hook = AWSS3Hook(self.s3_conn_id) + s3_hook = S3Hook(self.s3_conn_id) sftp_client = ssh_hook.get_conn().open_sftp() diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index b2a4e01788ffd..f66cd0db3d488 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -23,24 +23,10 @@ import warnings # pylint: disable=unused-import -from airflow.providers.aws.hooks.s3 import AWSS3Hook, provide_bucket_name # noqa +from airflow.providers.aws.hooks.s3 import S3Hook, provide_bucket_name # noqa warnings.warn( "This module is deprecated. Please use `airflow.providers.aws.hooks.s3`.", DeprecationWarning, stacklevel=2, ) - - -class S3Hook(AWSS3Hook): - """ - This class is deprecated. Please use `airflow.providers.aws.hooks.s3.AWSS3Hook`. - """ - - def __init__(self, *args, **kwargs): - warnings.warn( - "This class is deprecated. Please use `airflow.providers.aws.hooks.s3.AWSS3Hook`.", - DeprecationWarning, stacklevel=2 - ) - - super().__init__(*args, **kwargs) diff --git a/airflow/operators/gcs_to_s3.py b/airflow/operators/gcs_to_s3.py index 8d2ee60d6825e..b3484c9a4fe9a 100644 --- a/airflow/operators/gcs_to_s3.py +++ b/airflow/operators/gcs_to_s3.py @@ -23,7 +23,7 @@ from airflow.gcp.hooks.gcs import GoogleCloudStorageHook from airflow.gcp.operators.gcs import GoogleCloudStorageListOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -116,13 +116,13 @@ def __init__(self, # pylint: disable=too-many-arguments def execute(self, context): # use the super to list all files in an Google Cloud Storage bucket files = super().execute(context) - s3_hook = AWSS3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) + s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) if not self.replace: # if we are not replacing -> list all files in the S3 bucket # and only keep those files which are present in # Google Cloud Storage and not in S3 - bucket_name, prefix = AWSS3Hook.parse_s3_url(self.dest_s3_key) + bucket_name, prefix = S3Hook.parse_s3_url(self.dest_s3_key) # look for the bucket and the prefix to avoid look into # parent directories/keys existing_files = s3_hook.list_keys(bucket_name, prefix=prefix) diff --git a/airflow/operators/google_api_to_s3_transfer.py b/airflow/operators/google_api_to_s3_transfer.py index 0f12f535c0031..16d45dcab0ec5 100644 --- a/airflow/operators/google_api_to_s3_transfer.py +++ b/airflow/operators/google_api_to_s3_transfer.py @@ -26,7 +26,7 @@ from airflow.gcp.hooks.discovery_api import GoogleDiscoveryApiHook from airflow.models import BaseOperator from airflow.models.xcom import MAX_XCOM_SIZE -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -155,7 +155,7 @@ def _retrieve_data_from_google_api(self): return google_api_response def _load_data_to_s3(self, data): - s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id) + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) s3_hook.load_string( string_data=json.dumps(data), key=self.s3_destination_key, diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index e088ca02a10e7..ec47f4eaaa8ee 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -23,7 +23,7 @@ from airflow.hooks.postgres_hook import PostgresHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -98,7 +98,7 @@ def __init__( # pylint: disable=too-many-arguments def execute(self, context): postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) - s3_hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) credentials = s3_hook.get_credentials() unload_options = '\n\t\t\t'.join(self.unload_options) diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index 343d191ea93cd..ef973df1119dc 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -24,7 +24,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -111,9 +111,9 @@ def execute(self, context): raise AirflowException( "Either transform_script or select_expression must be specified") - source_s3 = AWSS3Hook(aws_conn_id=self.source_aws_conn_id, + source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify) - dest_s3 = AWSS3Hook(aws_conn_id=self.dest_aws_conn_id, + dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) self.log.info("Downloading source S3 file %s", self.source_s3_key) diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index ab6ac06a1f4c9..d2dfc6c42262f 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.hive_hooks import HiveCliHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.compression import uncompress_file from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory @@ -148,7 +148,7 @@ def __init__( def execute(self, context): # Downloading file from S3 - self.s3 = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) self.log.info("Downloading S3 file") diff --git a/airflow/operators/s3_to_redshift_operator.py b/airflow/operators/s3_to_redshift_operator.py index f203a0ae9c0aa..958bcf8818e76 100644 --- a/airflow/operators/s3_to_redshift_operator.py +++ b/airflow/operators/s3_to_redshift_operator.py @@ -20,7 +20,7 @@ from airflow.hooks.postgres_hook import PostgresHook from airflow.models import BaseOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.decorators import apply_defaults @@ -85,7 +85,7 @@ def __init__( def execute(self, context): self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) - self.s3 = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) credentials = self.s3.get_credentials() copy_options = '\n\t\t\t'.join(self.copy_options) diff --git a/airflow/providers/aws/hooks/s3.py b/airflow/providers/aws/hooks/s3.py index 0fc25023cecf5..7a1fc2daf457e 100644 --- a/airflow/providers/aws/hooks/s3.py +++ b/airflow/providers/aws/hooks/s3.py @@ -58,7 +58,7 @@ def has_arg(name): return wrapper -class AWSS3Hook(AwsHook): +class S3Hook(AwsHook): """ Interact with AWS S3, using the boto3 library. """ diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py index 76f5a8e95f1d1..c0a95f96e4818 100644 --- a/airflow/sensors/s3_key_sensor.py +++ b/airflow/sensors/s3_key_sensor.py @@ -88,8 +88,8 @@ def __init__(self, self.verify = verify def poke(self, context): - from airflow.providers.aws.hooks.s3 import AWSS3Hook - hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + from airflow.providers.aws.hooks.s3 import S3Hook + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py index 72308fbd32e67..63d2f668d6f59 100644 --- a/airflow/sensors/s3_prefix_sensor.py +++ b/airflow/sensors/s3_prefix_sensor.py @@ -73,8 +73,8 @@ def __init__(self, def poke(self, context): self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name) - from airflow.providers.aws.hooks.s3 import AWSS3Hook - hook = AWSS3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + from airflow.providers.aws.hooks.s3 import S3Hook + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index f101bdb126418..f168a95857633 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -43,11 +43,11 @@ def __init__(self, base_log_folder, s3_log_folder, filename_template): def hook(self): remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID') try: - from airflow.providers.aws.hooks.s3 import AWSS3Hook - return AWSS3Hook(remote_conn_id) + from airflow.providers.aws.hooks.s3 import S3Hook + return S3Hook(remote_conn_id) except Exception: self.log.error( - 'Could not create an AWSS3Hook with connection id "%s". ' + 'Could not create an S3Hook with connection id "%s". ' 'Please make sure that airflow[aws] is installed and ' 'the S3 connection exists.', remote_conn_id ) diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst index 549a9285cee02..38e4d4aa1e7f8 100644 --- a/docs/howto/write-logs.rst +++ b/docs/howto/write-logs.rst @@ -65,7 +65,7 @@ To enable this feature, ``airflow.cfg`` must be configured as follows: # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False -In the above example, Airflow will try to use ``AWSS3Hook('MyS3Conn')``. +In the above example, Airflow will try to use ``S3Hook('MyS3Conn')``. .. _write-logs-azure: diff --git a/tests/contrib/hooks/test_sagemaker_hook.py b/tests/contrib/hooks/test_sagemaker_hook.py index cde30a0024f39..b77e94ccbef19 100644 --- a/tests/contrib/hooks/test_sagemaker_hook.py +++ b/tests/contrib/hooks/test_sagemaker_hook.py @@ -29,7 +29,7 @@ LogState, SageMakerHook, secondary_training_status_changed, secondary_training_status_message, ) from airflow.exceptions import AirflowException -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from tests.compat import mock role = 'arn:aws:iam:role/test-role' @@ -254,8 +254,8 @@ def test_multi_stream_iter(self, mock_log_stream): event_iter = hook.multi_stream_iter('log', [None, None, None]) self.assertEqual(next(event_iter), (0, event)) - @mock.patch.object(AWSS3Hook, 'create_bucket') - @mock.patch.object(AWSS3Hook, 'load_file') + @mock.patch.object(S3Hook, 'create_bucket') + @mock.patch.object(S3Hook, 'load_file') def test_configure_s3_resources(self, mock_load_file, mock_create_bucket): hook = SageMakerHook() evaluation_result = { @@ -268,9 +268,9 @@ def test_configure_s3_resources(self, mock_load_file, mock_create_bucket): mock_load_file.assert_called_once_with(path, key, bucket) @mock.patch.object(SageMakerHook, 'get_conn') - @mock.patch.object(AWSS3Hook, 'check_for_key') - @mock.patch.object(AWSS3Hook, 'check_for_bucket') - @mock.patch.object(AWSS3Hook, 'check_for_prefix') + @mock.patch.object(S3Hook, 'check_for_key') + @mock.patch.object(S3Hook, 'check_for_bucket') + @mock.patch.object(S3Hook, 'check_for_prefix') def test_check_s3_url(self, mock_check_prefix, mock_check_bucket, diff --git a/tests/contrib/operators/test_dynamodb_to_s3.py b/tests/contrib/operators/test_dynamodb_to_s3.py index 4dfda20f44c54..ea09ebaba1ed0 100644 --- a/tests/contrib/operators/test_dynamodb_to_s3.py +++ b/tests/contrib/operators/test_dynamodb_to_s3.py @@ -45,7 +45,7 @@ def output_queue_to_list(self): items.append(self.output_queue.get()) return items - @patch('airflow.contrib.operators.dynamodb_to_s3.AWSS3Hook') + @patch('airflow.contrib.operators.dynamodb_to_s3.S3Hook') @patch('airflow.contrib.operators.dynamodb_to_s3.AwsDynamoDBHook') def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook): responses = [ diff --git a/tests/contrib/operators/test_imap_attachment_to_s3_operator.py b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py index 5e78997d07b12..b563c974a74f6 100644 --- a/tests/contrib/operators/test_imap_attachment_to_s3_operator.py +++ b/tests/contrib/operators/test_imap_attachment_to_s3_operator.py @@ -37,7 +37,7 @@ def setUp(self): dag=None ) - @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.AWSS3Hook') + @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.S3Hook') @patch('airflow.contrib.operators.imap_attachment_to_s3_operator.ImapHook') def test_execute(self, mock_imap_hook, mock_s3_hook): mock_imap_hook.return_value.__enter__ = mock_imap_hook diff --git a/tests/contrib/operators/test_mongo_to_s3_operator.py b/tests/contrib/operators/test_mongo_to_s3_operator.py index 7b9acd3918326..fd04f1222316e 100644 --- a/tests/contrib/operators/test_mongo_to_s3_operator.py +++ b/tests/contrib/operators/test_mongo_to_s3_operator.py @@ -84,7 +84,7 @@ def test_render_template(self): ) @mock.patch('airflow.contrib.operators.mongo_to_s3.MongoHook') - @mock.patch('airflow.contrib.operators.mongo_to_s3.AWSS3Hook') + @mock.patch('airflow.contrib.operators.mongo_to_s3.S3Hook') def test_execute(self, mock_s3_hook, mock_mongo_hook): operator = self.mock_operator diff --git a/tests/contrib/operators/test_s3_list_operator.py b/tests/contrib/operators/test_s3_list_operator.py index bd32d4e8b2cc7..5c418ff67bbdb 100644 --- a/tests/contrib/operators/test_s3_list_operator.py +++ b/tests/contrib/operators/test_s3_list_operator.py @@ -30,7 +30,7 @@ class TestS3ListOperator(unittest.TestCase): - @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') def test_execute(self, mock_hook): mock_hook.return_value.list_keys.return_value = MOCK_FILES diff --git a/tests/contrib/operators/test_s3_to_gcs_operator.py b/tests/contrib/operators/test_s3_to_gcs_operator.py index db83a1b6554bb..42821a755fe88 100644 --- a/tests/contrib/operators/test_s3_to_gcs_operator.py +++ b/tests/contrib/operators/test_s3_to_gcs_operator.py @@ -51,8 +51,8 @@ def test_init(self): self.assertEqual(operator.gcp_conn_id, GCS_CONN_ID) self.assertEqual(operator.dest_gcs, GCS_PATH_PREFIX) - @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.AWSS3Hook') - @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') + @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') @mock.patch( 'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook') def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): @@ -86,8 +86,8 @@ def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): # we expect MOCK_FILES to be uploaded self.assertEqual(sorted(MOCK_FILES), sorted(uploaded_files)) - @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.AWSS3Hook') - @mock.patch('airflow.contrib.operators.s3_list_operator.AWSS3Hook') + @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') @mock.patch( 'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook') def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): diff --git a/tests/contrib/operators/test_s3_to_sftp_operator.py b/tests/contrib/operators/test_s3_to_sftp_operator.py index 90ace66d0c00d..904781ff50b6d 100644 --- a/tests/contrib/operators/test_s3_to_sftp_operator.py +++ b/tests/contrib/operators/test_s3_to_sftp_operator.py @@ -61,10 +61,10 @@ class TestS3ToSFTPOperator(unittest.TestCase): @mock_s3 def setUp(self): from airflow.contrib.hooks.ssh_hook import SSHHook - from airflow.providers.aws.hooks.s3 import AWSS3Hook + from airflow.providers.aws.hooks.s3 import S3Hook hook = SSHHook(ssh_conn_id='ssh_default') - s3_hook = AWSS3Hook('aws_default') + s3_hook = S3Hook('aws_default') hook.no_host_key_check = True args = { 'owner': 'airflow', diff --git a/tests/contrib/operators/test_sftp_to_s3_operator.py b/tests/contrib/operators/test_sftp_to_s3_operator.py index 99411cce0fa04..0fb4b6fcb58c2 100644 --- a/tests/contrib/operators/test_sftp_to_s3_operator.py +++ b/tests/contrib/operators/test_sftp_to_s3_operator.py @@ -27,7 +27,7 @@ from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.models import DAG, TaskInstance -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.settings import Session from airflow.utils import timezone from airflow.utils.timezone import datetime @@ -62,7 +62,7 @@ class TestSFTPToS3Operator(unittest.TestCase): @mock_s3 def setUp(self): hook = SSHHook(ssh_conn_id='ssh_default') - s3_hook = AWSS3Hook('aws_default') + s3_hook = S3Hook('aws_default') hook.no_host_key_check = True args = { 'owner': 'airflow', diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py index bd28afe96b78b..c8709416ff826 100644 --- a/tests/hooks/test_s3_hook.py +++ b/tests/hooks/test_s3_hook.py @@ -27,9 +27,9 @@ from airflow.providers.aws.hooks.s3 import provide_bucket_name try: - from airflow.providers.aws.hooks.s3 import AWSS3Hook + from airflow.providers.aws.hooks.s3 import S3Hook except ImportError: - AWSS3Hook = None # type: ignore + S3Hook = None # type: ignore try: import boto3 @@ -38,24 +38,24 @@ mock_s3 = None -@unittest.skipIf(AWSS3Hook is None, - "Skipping test because AWSS3Hook is not available") +@unittest.skipIf(S3Hook is None, + "Skipping test because S3Hook is not available") @unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available") -class TestAWSS3Hook(unittest.TestCase): +class TestS3Hook(unittest.TestCase): def setUp(self): self.s3_test_url = "s3://test/this/is/not/a-real-key.txt" def test_parse_s3_url(self): - parsed = AWSS3Hook.parse_s3_url(self.s3_test_url) + parsed = S3Hook.parse_s3_url(self.s3_test_url) self.assertEqual(parsed, ("test", "this/is/not/a-real-key.txt"), "Incorrect parsing of the s3 url") @mock_s3 def test_check_for_bucket(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -63,27 +63,27 @@ def test_check_for_bucket(self): self.assertFalse(hook.check_for_bucket('not-a-bucket')) def test_check_for_bucket_raises_error_with_invalid_conn_id(self): - hook = AWSS3Hook(aws_conn_id="does_not_exist") + hook = S3Hook(aws_conn_id="does_not_exist") with self.assertRaises(NoCredentialsError): hook.check_for_bucket('bucket') @mock_s3 def test_get_bucket(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') self.assertIsNotNone(bucket) @mock_s3 def test_create_bucket_default_region(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @mock_s3 def test_create_bucket_us_standard_region(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket', region_name='us-east-1') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @@ -92,7 +92,7 @@ def test_create_bucket_us_standard_region(self): @mock_s3 def test_create_bucket_other_region(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) hook.create_bucket(bucket_name='new_bucket', region_name='us-east-2') bucket = hook.get_bucket('new_bucket') self.assertIsNotNone(bucket) @@ -101,7 +101,7 @@ def test_create_bucket_other_region(self): @mock_s3 def test_check_for_prefix(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -112,7 +112,7 @@ def test_check_for_prefix(self): @mock_s3 def test_list_prefixes(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -125,7 +125,7 @@ def test_list_prefixes(self): @mock_s3 def test_list_prefixes_paged(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -142,7 +142,7 @@ def test_list_prefixes_paged(self): @mock_s3 def test_list_keys(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -155,7 +155,7 @@ def test_list_keys(self): @mock_s3 def test_list_keys_paged(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() @@ -169,7 +169,7 @@ def test_list_keys_paged(self): @mock_s3 def test_check_for_key(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -180,14 +180,14 @@ def test_check_for_key(self): self.assertFalse(hook.check_for_key('s3://bucket//b')) def test_check_for_key_raises_error_with_invalid_conn_id(self): - hook = AWSS3Hook(aws_conn_id="does_not_exist") + hook = S3Hook(aws_conn_id="does_not_exist") with self.assertRaises(NoCredentialsError): hook.check_for_key('a', 'bucket') @mock_s3 def test_get_key(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='a', Body=b'a') @@ -197,7 +197,7 @@ def test_get_key(self): @mock_s3 def test_read_key(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -211,12 +211,12 @@ def test_read_key(self): def test_select_key(self, mock_get_client_type): mock_get_client_type.return_value.select_object_content.return_value = \ {'Payload': [{'Records': {'Payload': b'Cont\xC3\xA9nt'}}]} - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) self.assertEqual(hook.select_key('my_key', 'mybucket'), 'Contént') @mock_s3 def test_check_for_wildcard_key(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='abc', Body=b'a') @@ -233,7 +233,7 @@ def test_check_for_wildcard_key(self): @mock_s3 def test_get_wildcard_key(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) bucket = hook.get_bucket('bucket') bucket.create() bucket.put_object(Key='abc', Body=b'a') @@ -256,7 +256,7 @@ def test_get_wildcard_key(self): @mock_s3 def test_load_string(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -269,7 +269,7 @@ def test_load_string(self): @mock_s3 def test_load_bytes(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -282,7 +282,7 @@ def test_load_bytes(self): @mock_s3 def test_load_fileobj(self): - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) conn = hook.get_conn() # We need to create the bucket since this is all in Moto's 'virtual' # AWS account @@ -295,10 +295,10 @@ def test_load_fileobj(self): self.assertEqual(body, b'Content') - @mock.patch.object(AWSS3Hook, 'get_connection', return_value=Connection(schema='test_bucket')) + @mock.patch.object(S3Hook, 'get_connection', return_value=Connection(schema='test_bucket')) def test_provide_bucket_name(self, mock_get_connection): - class FakeAWSS3Hook(AWSS3Hook): + class FakeS3Hook(S3Hook): @provide_bucket_name def test_function(self, bucket_name=None): @@ -314,7 +314,7 @@ def test_function_with_key(self, key, bucket_name=None): def test_function_with_wildcard_key(self, wildcard_key, bucket_name=None): return bucket_name - fake_s3_hook = FakeAWSS3Hook() + fake_s3_hook = FakeS3Hook() test_bucket_name = fake_s3_hook.test_function() test_bucket_name_with_key = fake_s3_hook.test_function_with_key('test_key') test_bucket_name_with_wildcard_key = fake_s3_hook.test_function_with_wildcard_key('test_*_key') diff --git a/tests/operators/test_gcs_to_s3.py b/tests/operators/test_gcs_to_s3.py index 3e5b3751d0b74..a8858fc1d0c79 100644 --- a/tests/operators/test_gcs_to_s3.py +++ b/tests/operators/test_gcs_to_s3.py @@ -20,7 +20,7 @@ import unittest from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from tests.compat import mock try: @@ -55,7 +55,7 @@ def test_execute_incremental(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() b.put_object(Key=MOCK_FILES[0], Body=b'testing') @@ -85,7 +85,7 @@ def test_execute_without_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket with all the files - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES] @@ -115,7 +115,7 @@ def test_execute(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=False) # create dest bucket without files - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() @@ -144,7 +144,7 @@ def test_execute_with_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=True) # create dest bucket with all the files - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES] @@ -174,7 +174,7 @@ def test_execute_incremental_with_replace(self, mock_hook, mock_hook2): dest_s3_key=S3_BUCKET, replace=True) # create dest bucket with just two files (the first two files in MOCK_FILES) - hook = AWSS3Hook(aws_conn_id=None) + hook = S3Hook(aws_conn_id=None) b = hook.get_bucket('bucket') b.create() [b.put_object(Key=MOCK_FILE, Body=b'testing') for MOCK_FILE in MOCK_FILES[:2]] diff --git a/tests/operators/test_google_api_to_s3_transfer.py b/tests/operators/test_google_api_to_s3_transfer.py index 79cc80c7c8651..14cea11c01735 100644 --- a/tests/operators/test_google_api_to_s3_transfer.py +++ b/tests/operators/test_google_api_to_s3_transfer.py @@ -66,7 +66,7 @@ def setUp(self): } @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') def test_execute(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_api_hook_query): context = {'task_instance': Mock()} @@ -89,7 +89,7 @@ def test_execute(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_ap context['task_instance'].xcom_push.assert_not_called() @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') def test_execute_with_xcom(self, mock_json_dumps, mock_s3_hook_load_string, mock_google_api_hook_query): context = {'task_instance': Mock()} @@ -124,7 +124,7 @@ def test_execute_with_xcom(self, mock_json_dumps, mock_s3_hook_load_string, mock ) @patch('airflow.operators.google_api_to_s3_transfer.GoogleDiscoveryApiHook.query') - @patch('airflow.operators.google_api_to_s3_transfer.AWSS3Hook.load_string') + @patch('airflow.operators.google_api_to_s3_transfer.S3Hook.load_string') @patch('airflow.operators.google_api_to_s3_transfer.json.dumps') @patch('airflow.operators.google_api_to_s3_transfer.sys.getsizeof', return_value=MAX_XCOM_SIZE) def test_execute_with_xcom_exceeded_max_xcom_size( diff --git a/tests/operators/test_s3_file_transform_operator.py b/tests/operators/test_s3_file_transform_operator.py index 6aae9a44788e3..c49d825b16bad 100644 --- a/tests/operators/test_s3_file_transform_operator.py +++ b/tests/operators/test_s3_file_transform_operator.py @@ -112,7 +112,7 @@ def test_execute_with_failing_transform_script(self, mock_Popen): self.assertEqual('Transform script failed: 42', str(e.exception)) - @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook.select_key', return_value="input") + @mock.patch('airflow.providers.aws.hooks.s3.S3Hook.select_key', return_value="input") @mock_s3 def test_execute_with_select_expression(self, mock_select_key): bucket = "bucket" diff --git a/tests/operators/test_s3_to_hive_operator.py b/tests/operators/test_s3_to_hive_operator.py index ecee94f7a6e0c..bfac9c531b277 100644 --- a/tests/operators/test_s3_to_hive_operator.py +++ b/tests/operators/test_s3_to_hive_operator.py @@ -247,7 +247,7 @@ def test_execute_with_select_expression(self, mock_hiveclihook): select_expression = "SELECT * FROM S3Object s" bucket = 'bucket' - # Only testing S3ToHiveTransfer calls AWSS3Hook.select_key with + # Only testing S3ToHiveTransfer calls S3Hook.select_key with # the right parameters and its execute method succeeds here, # since Moto doesn't support select_object_content as of 1.3.2. for (ext, has_header) in product(['.txt', '.gz', '.GZ'], [True, False]): @@ -274,7 +274,7 @@ def test_execute_with_select_expression(self, mock_hiveclihook): input_serialization['CSV']['FileHeaderInfo'] = 'USE' # Confirm that select_key was called with the right params - with mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook.select_key', + with mock.patch('airflow.providers.aws.hooks.s3.S3Hook.select_key', return_value="") as mock_select_key: # Execute S3ToHiveTransfer s32hive = S3ToHiveTransfer(**self.kwargs) diff --git a/tests/sensors/test_s3_key_sensor.py b/tests/sensors/test_s3_key_sensor.py index da750d77fbc1f..213bc7bc0f588 100644 --- a/tests/sensors/test_s3_key_sensor.py +++ b/tests/sensors/test_s3_key_sensor.py @@ -64,7 +64,7 @@ def test_parse_bucket_key(self, key, bucket, parsed_key, parsed_bucket): self.assertEqual(s.bucket_key, parsed_key) self.assertEqual(s.bucket_name, parsed_bucket) - @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.S3Hook') def test_poke(self, mock_hook): s = S3KeySensor( task_id='s3_key_sensor', @@ -78,7 +78,7 @@ def test_poke(self, mock_hook): mock_hook.return_value.check_for_key.return_value = True self.assertTrue(s.poke(None)) - @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.S3Hook') def test_poke_wildcard(self, mock_hook): s = S3KeySensor( task_id='s3_key_sensor', diff --git a/tests/sensors/test_s3_prefix_sensor.py b/tests/sensors/test_s3_prefix_sensor.py index 40d300226a9c5..c737f96036429 100644 --- a/tests/sensors/test_s3_prefix_sensor.py +++ b/tests/sensors/test_s3_prefix_sensor.py @@ -25,7 +25,7 @@ class TestS3PrefixSensor(unittest.TestCase): - @mock.patch('airflow.providers.aws.hooks.s3.AWSS3Hook') + @mock.patch('airflow.providers.aws.hooks.s3.S3Hook') def test_poke(self, mock_hook): s = S3PrefixSensor( task_id='s3_prefix', diff --git a/tests/test_core_to_contrib.py b/tests/test_core_to_contrib.py index f2696238ed9c9..d42a548bf1bef 100644 --- a/tests/test_core_to_contrib.py +++ b/tests/test_core_to_contrib.py @@ -133,7 +133,7 @@ "airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook", ), ( - "airflow.providers.aws.hooks.s3.AWSS3Hook", + "airflow.providers.aws.hooks.s3.S3Hook", "airflow.hooks.S3_hook.S3Hook", ), ] diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index b2db98c08621e..44226793faa9d 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -23,7 +23,7 @@ from airflow.models import DAG, TaskInstance from airflow.operators.dummy_operator import DummyOperator -from airflow.providers.aws.hooks.s3 import AWSS3Hook +from airflow.providers.aws.hooks.s3 import S3Hook from airflow.utils.log.s3_task_handler import S3TaskHandler from airflow.utils.state import State from airflow.utils.timezone import datetime @@ -76,18 +76,18 @@ def tearDown(self): pass def test_hook(self): - self.assertIsInstance(self.s3_task_handler.hook, AWSS3Hook) + self.assertIsInstance(self.s3_task_handler.hook, S3Hook) def test_hook_raises(self): handler = self.s3_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - with mock.patch("airflow.providers.aws.hooks.s3.AWSS3Hook") as mock_hook: + with mock.patch("airflow.providers.aws.hooks.s3.S3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') # Initialize the hook handler.hook mock_error.assert_called_once_with( - 'Could not create an AWSS3Hook with connection id "%s". Please make ' + 'Could not create an S3Hook with connection id "%s". Please make ' 'sure that airflow[aws] is installed and the S3 connection exists.', '' ) @@ -103,7 +103,7 @@ def test_log_exists_raises(self): self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo')) def test_log_exists_no_hook(self): - with mock.patch("airflow.providers.aws.hooks.s3.AWSS3Hook") as mock_hook: + with mock.patch("airflow.providers.aws.hooks.s3.S3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) From e391766074f9afea869923315b12f3cf7729af88 Mon Sep 17 00:00:00 2001 From: mingrammer Date: Tue, 5 Nov 2019 23:32:31 +0900 Subject: [PATCH 4/4] Fix over-indented lint error --- airflow/operators/s3_file_transform_operator.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index ef973df1119dc..d4d46c3d6cb64 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -111,10 +111,8 @@ def execute(self, context): raise AirflowException( "Either transform_script or select_expression must be specified") - source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, - verify=self.source_verify) - dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, - verify=self.dest_verify) + source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify) + dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) self.log.info("Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key):