From 2ca88e1c163036f6e349cad1c7267a3237a8b5e5 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Aug 2023 15:47:52 +0800 Subject: [PATCH 1/2] feat(sensors/base): raise AirflowSkipException if soft_fail is set to True and exception occurs after running poke() --- airflow/sensors/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 77094269a1951..719bc26a1e5c6 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -240,14 +240,19 @@ def run_duration() -> float: except ( AirflowSensorTimeout, AirflowTaskTimeout, - AirflowSkipException, AirflowFailException, ) as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + except AirflowSkipException as e: raise e except Exception as e: if self.silent_fail: logging.error("Sensor poke failed: \n %s", traceback.format_exc()) poke_return = False + elif self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e else: raise e From 6b144d5eec1b3a0a7089f53e611d7ccfe2deb09a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Aug 2023 15:58:52 +0800 Subject: [PATCH 2/2] test(sensor/base): add test case for respecting soft_fail option when other kinds of exception is raised --- tests/sensors/test_base.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 4dff8222e200a..2dbd5cc686417 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -25,9 +25,11 @@ from airflow.exceptions import ( AirflowException, + AirflowFailException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, + AirflowTaskTimeout, ) from airflow.executors.debug_executor import DebugExecutor from airflow.executors.executor_constants import ( @@ -48,9 +50,7 @@ from airflow.providers.celery.executors.celery_executor import CeleryExecutor from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor -from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import ( - LocalKubernetesExecutor, -) +from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep from airflow.utils import timezone @@ -176,6 +176,28 @@ def test_soft_fail(self, make_sensor): if ti.task_id == DUMMY_OP: assert ti.state == State.NONE + @pytest.mark.parametrize( + "exception_cls", + ( + AirflowSensorTimeout, + AirflowTaskTimeout, + AirflowFailException, + Exception, + ), + ) + def test_soft_fail_with_non_skip_exception(self, make_sensor, exception_cls): + sensor, dr = make_sensor(False, soft_fail=True) + sensor.poke = Mock(side_effect=[exception_cls(None)]) + + self._run(sensor) + tis = dr.get_task_instances() + assert len(tis) == 2 + for ti in tis: + if ti.task_id == SENSOR_OP: + assert ti.state == State.SKIPPED + if ti.task_id == DUMMY_OP: + assert ti.state == State.NONE + def test_soft_fail_with_retries(self, make_sensor): sensor, dr = make_sensor( return_value=False, soft_fail=True, retries=1, retry_delay=timedelta(milliseconds=1) @@ -518,7 +540,6 @@ def run_duration(): assert sensor._get_next_poke_interval(started_at, run_duration, 2) == sensor.poke_interval def test_sensor_with_exponential_backoff_on(self): - sensor = DummySensor( task_id=SENSOR_OP, return_value=None, poke_interval=5, timeout=60, exponential_backoff=True ) @@ -575,7 +596,6 @@ def run_duration(): assert intervals[0] == intervals[-1] def test_sensor_with_exponential_backoff_on_and_max_wait(self): - sensor = DummySensor( task_id=SENSOR_OP, return_value=None,