Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions airflow/providers/google/cloud/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,20 @@ def execute(self, context: Context):
super().execute(context)
return self._matches
else:
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=GCSPrefixBlobTrigger(
bucket=self.bucket,
prefix=self.prefix,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=GCSPrefixBlobTrigger(
bucket=self.bucket,
prefix=self.prefix,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)

def execute_complete(self, context: dict[str, Any], event: dict[str, str | list[str]]) -> str | list[str]:
"""
Expand Down
22 changes: 19 additions & 3 deletions tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,21 @@ def test_execute_timeout(self, mock_hook):
task.execute(mock.MagicMock)
mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET, prefix=TEST_PREFIX)

@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor.defer")
def test_gcs_object_prefix_existence_sensor_finish_before_deferred(self, mock_defer, mock_hook):
task = GCSObjectsWithPrefixExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
prefix=TEST_PREFIX,
google_cloud_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
deferrable=True,
)
mock_hook.return_value.list.return_value = True
task.execute(mock.MagicMock())
assert not mock_defer.called


class TestGCSObjectsWithPrefixExistenceSensorAsync:
OPERATOR = GCSObjectsWithPrefixExistenceSensor(
Expand All @@ -379,14 +394,15 @@ class TestGCSObjectsWithPrefixExistenceSensorAsync:
deferrable=True,
)

def test_gcs_object_with_prefix_existence_sensor_async(self, context):
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
def test_gcs_object_with_prefix_existence_sensor_async(self, mock_hook):
"""
Asserts that a task is deferred and a GCSPrefixBlobTrigger will be fired
when the GCSObjectsWithPrefixExistenceSensorAsync is executed.
"""

mock_hook.return_value.list.return_value = False
with pytest.raises(TaskDeferred) as exc:
self.OPERATOR.execute(context)
self.OPERATOR.execute(mock.MagicMock())
assert isinstance(exc.value.trigger, GCSPrefixBlobTrigger), "Trigger is not a GCSPrefixBlobTrigger"

def test_gcs_object_with_prefix_existence_sensor_async_execute_failure(self, context):
Expand Down