diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 81526737e8d10..5d4e6e2634be5 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -56,38 +56,44 @@ Writing a deferrable operator takes a bit more work. There are some main points * You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control. * Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors. * In order for any changes to a Trigger to be reflected, the *triggerer* needs to be restarted whenever the Trigger is modified. -* If you want add an operator or sensor that supports both deferrable and non-deferrable modes. It's suggested to add ``deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that supports switch between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes.:: +* If you want to add an operator or sensor that supports both deferrable and non-deferrable modes, it's suggested to add ``deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that support switching between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes. + +.. code-block:: python import time from datetime import timedelta + from typing import Any + from airflow.configuration import conf from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): def __init__( - self, - deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), - **kwargs - ): + self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs + ) -> None: super().__init__(**kwargs) self.deferrable = deferable - def execute(self, context): - if deferrable: + def execute(self, context: Context) -> None: + if self.deferrable: self.defer( trigger=TimeDeltaTrigger(timedelta(hours=1)), - method_name="execute_complete" + method_name="execute_complete", ) else: time.sleep(3600) - def execute_complete(self, context, event=None): + def execute_complete( + self, + context: Context, + event: dict[str, Any] | None = None, + ) -> None: # We have no more work to do here. Mark as complete. return - Triggering Deferral ~~~~~~~~~~~~~~~~~~~ @@ -106,19 +112,23 @@ If your Operator returns from either its first ``execute()`` method when it's ne You are free to set ``method_name`` to ``execute`` if you want your Operator to have one entrypoint, but it, too, will have to accept ``event`` as an optional keyword argument. -Here's a basic example of how a sensor might trigger deferral:: +Here's a basic example of how a sensor might trigger deferral + +.. code-block:: python from datetime import timedelta + from typing import Any from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): - def execute(self, context): + def execute(self, context: Context) -> None: self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete") - def execute_complete(self, context, event=None): + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: # We have no more work to do here. Mark as complete. return @@ -151,8 +161,9 @@ There's also some design constraints to be aware of: Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful. -Here's the structure of a basic Trigger:: +Here's the structure of a basic Trigger +.. code-block:: python import asyncio @@ -161,7 +172,6 @@ Here's the structure of a basic Trigger:: class DateTimeTrigger(BaseTrigger): - def __init__(self, moment): super().__init__() self.moment = moment @@ -174,6 +184,7 @@ Here's the structure of a basic Trigger:: await asyncio.sleep(1) yield TriggerEvent(self.moment) + This is a very simplified version of Airflow's ``DateTimeTrigger``, and you can see several things here: * ``__init__`` and ``serialize`` are written as a pair; the Trigger is instantiated once when it is submitted by the Operator as part of its deferral request, then serialized and re-instantiated on any *triggerer* process that runs the trigger.