From 4a76b21e99a2bdd531aff2bc4b85b9d74fae2a4a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 7 Jul 2023 17:07:17 +0800 Subject: [PATCH 1/3] docs(deferring): add type annotation to code examples --- .../authoring-and-scheduling/deferring.rst | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 81526737e8d10..dca8aa0d2a05f 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -60,9 +60,12 @@ Writing a deferrable operator takes a bit more work. There are some main points import time from datetime import timedelta + from typing import Any + from airflow.configuration import conf from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): @@ -70,24 +73,27 @@ Writing a deferrable operator takes a bit more work. There are some main points self, deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs - ): + ) -> None: super().__init__(**kwargs) self.deferrable = deferable - def execute(self, context): - if deferrable: + def execute(self, context: Context) -> None: + if self.deferrable: self.defer( trigger=TimeDeltaTrigger(timedelta(hours=1)), - method_name="execute_complete" + method_name="execute_complete", ) else: time.sleep(3600) - def execute_complete(self, context, event=None): + def execute_complete( + self, + context: Context, + event: dict[str, Any] | None = None, + ) -> None: # We have no more work to do here. Mark as complete. return - Triggering Deferral ~~~~~~~~~~~~~~~~~~~ @@ -109,16 +115,22 @@ You are free to set ``method_name`` to ``execute`` if you want your Operator to Here's a basic example of how a sensor might trigger deferral:: from datetime import timedelta + from typing import Any from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): - def execute(self, context): - self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete") - - def execute_complete(self, context, event=None): + def execute(self, context: Context) -> None: + self.defer( + trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete" + ) + + def execute_complete( + self, context: Context, event: dict[str, Any] | None = None + ) -> None: # We have no more work to do here. Mark as complete. return @@ -153,7 +165,6 @@ There's also some design constraints to be aware of: Here's the structure of a basic Trigger:: - import asyncio from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -161,7 +172,6 @@ Here's the structure of a basic Trigger:: class DateTimeTrigger(BaseTrigger): - def __init__(self, moment): super().__init__() self.moment = moment @@ -174,6 +184,7 @@ Here's the structure of a basic Trigger:: await asyncio.sleep(1) yield TriggerEvent(self.moment) + This is a very simplified version of Airflow's ``DateTimeTrigger``, and you can see several things here: * ``__init__`` and ``serialize`` are written as a pair; the Trigger is instantiated once when it is submitted by the Operator as part of its deferral request, then serialized and re-instantiated on any *triggerer* process that runs the trigger. From 0ceeac9e7eaa55122c34bea53c8c751f69d724b0 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 7 Jul 2023 18:26:28 +0800 Subject: [PATCH 2/3] docs(deferring): use code-block syntax for code example --- .../authoring-and-scheduling/deferring.rst | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index dca8aa0d2a05f..4e284d4dc4ffa 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -56,7 +56,9 @@ Writing a deferrable operator takes a bit more work. There are some main points * You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control. * Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors. * In order for any changes to a Trigger to be reflected, the *triggerer* needs to be restarted whenever the Trigger is modified. -* If you want add an operator or sensor that supports both deferrable and non-deferrable modes. It's suggested to add ``deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that supports switch between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes.:: +* If you want add an operator or sensor that supports both deferrable and non-deferrable modes. It's suggested to add ``deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that supports switch between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes. + +.. code-block:: python import time from datetime import timedelta @@ -70,9 +72,7 @@ Writing a deferrable operator takes a bit more work. There are some main points class WaitOneHourSensor(BaseSensorOperator): def __init__( - self, - deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), - **kwargs + self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs ) -> None: super().__init__(**kwargs) self.deferrable = deferable @@ -112,7 +112,9 @@ If your Operator returns from either its first ``execute()`` method when it's ne You are free to set ``method_name`` to ``execute`` if you want your Operator to have one entrypoint, but it, too, will have to accept ``event`` as an optional keyword argument. -Here's a basic example of how a sensor might trigger deferral:: +Here's a basic example of how a sensor might trigger deferral + +.. code-block:: python from datetime import timedelta from typing import Any @@ -124,13 +126,9 @@ Here's a basic example of how a sensor might trigger deferral:: class WaitOneHourSensor(BaseSensorOperator): def execute(self, context: Context) -> None: - self.defer( - trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete" - ) + self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete") - def execute_complete( - self, context: Context, event: dict[str, Any] | None = None - ) -> None: + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: # We have no more work to do here. Mark as complete. return @@ -163,7 +161,9 @@ There's also some design constraints to be aware of: Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful. -Here's the structure of a basic Trigger:: +Here's the structure of a basic Trigger + +.. code-block:: python import asyncio From b3de16bc495cbcd12080554b59d2c5039e892ffc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sat, 8 Jul 2023 22:43:45 +0800 Subject: [PATCH 3/3] rewording Co-authored-by: Syed Hussain <103602455+syedahsn@users.noreply.github.com> --- docs/apache-airflow/authoring-and-scheduling/deferring.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 4e284d4dc4ffa..5d4e6e2634be5 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -56,7 +56,7 @@ Writing a deferrable operator takes a bit more work. There are some main points * You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control. * Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors. * In order for any changes to a Trigger to be reflected, the *triggerer* needs to be restarted whenever the Trigger is modified. -* If you want add an operator or sensor that supports both deferrable and non-deferrable modes. It's suggested to add ``deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that supports switch between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes. +* If you want to add an operator or sensor that supports both deferrable and non-deferrable modes, it's suggested to add ``deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that support switching between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes. .. code-block:: python