diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 7e6498cefd5ca..499b331da19c5 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -90,6 +90,8 @@ class GCSToGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param source_object_required: Whether you want to raise an exception when the source object + doesn't exist. It doesn't have any effect when the source objects are folders or patterns. :Example: @@ -190,6 +192,7 @@ def __init__( maximum_modified_time=None, is_older_than=None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + source_object_required=False, **kwargs, ): super().__init__(**kwargs) @@ -216,6 +219,7 @@ def __init__( self.maximum_modified_time = maximum_modified_time self.is_older_than = is_older_than self.impersonation_chain = impersonation_chain + self.source_object_required = source_object_required def execute(self, context: 'Context'): @@ -313,6 +317,11 @@ def _copy_source_without_wildcard(self, hook, prefix): self._copy_single_object( hook=hook, source_object=prefix, destination_object=self.destination_object ) + elif self.source_object_required: + msg = f"{prefix} does not exist in bucket {self.source_bucket}" + self.log.warning(msg) + raise AirflowException(msg) + for source_obj in objects: if self.destination_object is None: destination_object = source_obj diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index a23b50d42f893..dec0d378279a4 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -546,3 +546,20 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""), ] mock_hook.return_value.list.assert_has_calls(mock_calls) + + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') + def test_execute_source_object_required_flag_true(self, mock_hook): + mock_hook.return_value.exists.return_value = False + operator = GCSToGCSOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_objects=SOURCE_OBJECTS_SINGLE_FILE, + destination_bucket=DESTINATION_BUCKET, + destination_object=DESTINATION_OBJECT_PREFIX, + source_object_required=True, + ) + + with pytest.raises( + AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}" + ): + operator.execute(None)