From fcc0c0b30640010cf9d5942adf29303e8c620639 Mon Sep 17 00:00:00 2001 From: dprieto91 Date: Mon, 7 Feb 2022 13:42:31 +0100 Subject: [PATCH 1/5] Optionally raise an error if source file does not exist in GCSToGCSOperator --- .../providers/google/cloud/transfers/gcs_to_gcs.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 7e6498cefd5ca..dbddc62bd689b 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -90,6 +90,9 @@ class GCSToGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param source_object_required: When source_object_required is True, if you want to copy / move a specific blob + and it doesn't exist, an exception is raised and the task is marked as failed. + This parameter doesn't have any effect when the source_object that you pass is a folder or pattern. :Example: @@ -190,6 +193,7 @@ def __init__( maximum_modified_time=None, is_older_than=None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + source_object_required=False, **kwargs, ): super().__init__(**kwargs) @@ -216,6 +220,7 @@ def __init__( self.maximum_modified_time = maximum_modified_time self.is_older_than = is_older_than self.impersonation_chain = impersonation_chain + self.source_object_required = source_object_required def execute(self, context: 'Context'): @@ -313,6 +318,14 @@ def _copy_source_without_wildcard(self, hook, prefix): self._copy_single_object( hook=hook, source_object=prefix, destination_object=self.destination_object ) + else: + msg = ( + f'{prefix} does not exist in bucket {self.source_bucket}' + ) + self.log.warning(msg) + if self.source_objects_required: + raise AirflowException(msg) + for source_obj in objects: if self.destination_object is None: destination_object = source_obj From 28103fa57e3d4c70a2eda68b40ba7892233a6078 Mon Sep 17 00:00:00 2001 From: dprieto91 Date: Mon, 7 Feb 2022 13:46:41 +0100 Subject: [PATCH 2/5] Fix typo --- airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index dbddc62bd689b..2400d467a73bc 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -323,7 +323,7 @@ def _copy_source_without_wildcard(self, hook, prefix): f'{prefix} does not exist in bucket {self.source_bucket}' ) self.log.warning(msg) - if self.source_objects_required: + if self.source_object_required: raise AirflowException(msg) for source_obj in objects: From cb31090270bdb8d05b49969930c2dd847c04cd3e Mon Sep 17 00:00:00 2001 From: dprieto91 Date: Tue, 8 Feb 2022 17:03:12 +0100 Subject: [PATCH 3/5] Add unit test for source object required flag true usecase --- .../google/cloud/transfers/gcs_to_gcs.py | 5 ++--- .../google/cloud/transfers/test_gcs_to_gcs.py | 20 ++++++++++++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 2400d467a73bc..f6114889b7ce3 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -318,13 +318,12 @@ def _copy_source_without_wildcard(self, hook, prefix): self._copy_single_object( hook=hook, source_object=prefix, destination_object=self.destination_object ) - else: + elif self.source_object_required: msg = ( f'{prefix} does not exist in bucket {self.source_bucket}' ) self.log.warning(msg) - if self.source_object_required: - raise AirflowException(msg) + raise AirflowException(msg) for source_obj in objects: if self.destination_object is None: diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index a23b50d42f893..62ebf4b761da2 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -537,7 +537,7 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, destination_bucket=DESTINATION_BUCKET, destination_object=DESTINATION_OBJECT_PREFIX, - replace=False, + replace=False ) operator.execute(None) @@ -546,3 +546,21 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""), ] mock_hook.return_value.list.assert_has_calls(mock_calls) + + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') + def test_execute_source_object_required_flag_true(self, mock_hook): + mock_hook.return_value.exists.return_value = False + operator = GCSToGCSOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_objects=SOURCE_OBJECTS_SINGLE_FILE, + destination_bucket=DESTINATION_BUCKET, + destination_object=DESTINATION_OBJECT_PREFIX, + source_object_required=True + ) + + with pytest.raises( + AirflowException, + match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}" + ): + operator.execute(None) From b97f68aef782f6d7ea4517a0a084157ae2ed02b4 Mon Sep 17 00:00:00 2001 From: dprieto91 Date: Tue, 8 Feb 2022 17:05:59 +0100 Subject: [PATCH 4/5] Remove changes to other unit tests --- tests/providers/google/cloud/transfers/test_gcs_to_gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 62ebf4b761da2..27ce7bacfe63e 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -537,7 +537,7 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, destination_bucket=DESTINATION_BUCKET, destination_object=DESTINATION_OBJECT_PREFIX, - replace=False + replace=False, ) operator.execute(None) From 18864ed9020b609e7cc66e4fbe8070bda7520bc6 Mon Sep 17 00:00:00 2001 From: dprieto91 Date: Tue, 8 Feb 2022 23:38:03 +0100 Subject: [PATCH 5/5] fix static checks --- airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 9 +++------ .../providers/google/cloud/transfers/test_gcs_to_gcs.py | 5 ++--- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index f6114889b7ce3..499b331da19c5 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -90,9 +90,8 @@ class GCSToGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). - :param source_object_required: When source_object_required is True, if you want to copy / move a specific blob - and it doesn't exist, an exception is raised and the task is marked as failed. - This parameter doesn't have any effect when the source_object that you pass is a folder or pattern. + :param source_object_required: Whether you want to raise an exception when the source object + doesn't exist. It doesn't have any effect when the source objects are folders or patterns. :Example: @@ -319,9 +318,7 @@ def _copy_source_without_wildcard(self, hook, prefix): hook=hook, source_object=prefix, destination_object=self.destination_object ) elif self.source_object_required: - msg = ( - f'{prefix} does not exist in bucket {self.source_bucket}' - ) + msg = f"{prefix} does not exist in bucket {self.source_bucket}" self.log.warning(msg) raise AirflowException(msg) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 27ce7bacfe63e..dec0d378279a4 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -556,11 +556,10 @@ def test_execute_source_object_required_flag_true(self, mock_hook): source_objects=SOURCE_OBJECTS_SINGLE_FILE, destination_bucket=DESTINATION_BUCKET, destination_object=DESTINATION_OBJECT_PREFIX, - source_object_required=True + source_object_required=True, ) with pytest.raises( - AirflowException, - match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}" + AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}" ): operator.execute(None)