From 28e99f013041fb9dbb85671baef9525b19d54e61 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 11 Aug 2021 22:29:08 +0800 Subject: [PATCH 01/21] Mention data interval and add timetable how-to --- .../example_dags/example_workday_timetable.py | 92 ++++++ airflow/timetables/interval.py | 2 +- docs/apache-airflow/concepts/dags.rst | 13 +- docs/apache-airflow/dag-run.rst | 34 ++- docs/apache-airflow/howto/index.rst | 1 + docs/apache-airflow/howto/timetable.rst | 261 ++++++++++++++++++ docs/apache-airflow/python-api-ref.rst | 11 + docs/conf.py | 11 +- docs/spelling_wordlist.txt | 4 + 9 files changed, 416 insertions(+), 13 deletions(-) create mode 100644 airflow/example_dags/example_workday_timetable.py create mode 100644 docs/apache-airflow/howto/timetable.rst diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py new file mode 100644 index 0000000000000..dd29b978b1919 --- /dev/null +++ b/airflow/example_dags/example_workday_timetable.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example DAG demostrating how to implement a custom timetable for a DAG.""" + +# [START howto_timetable] +from datetime import timedelta +from typing import Optional + +from pendulum import Date, DateTime, Time, timezone + +from airflow import DAG +from airflow.operators.dummy import DummyOperator +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable + +UTC = timezone("UTC") + + +class AfterWorkdayTimetable(Timetable): + + # [START howto_timetable_infer_data_interval] + def infer_data_interval(self, run_after: DateTime) -> DataInterval: + weekday = run_after.weekday() + if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. + days_since_friday = (run_after.weekday() - 4) % 7 + delta = timedelta(days=days_since_friday) + else: # Otherwise the interval is yesterday. + delta = timedelta(days=1) + start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) + return DataInterval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_infer_data_interval] + + # [START howto_timetable_next_dagrun_info] + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_dagrun is not None: + # There was a previous run on the regular schedule. + # last_automated_dagrun os the last run's logical date. + weekday = last_automated_dagrun.weekday() + if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow. + delta = timedelta(days=1) + else: # Week is ending -- skip to next Monday. + delta = timedelta(days=(7 - weekday)) + start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) + else: # This is the first ever run on the regular schedule. + if restriction.earliest is None: # No start_date. Don't schedule. + return None + start = restriction.earliest + if start.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1) + if not restriction.catchup: + # If the DAG has catchup=False, today is the earliest to consider. + start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) + weekday = start.weekday() + if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday. + delta = timedelta(days=(7 - weekday)) + start = start + delta + if start > restriction.latest: # Over the DAG's scheduled end; don't schedule. + return None + return DagRunInfo.interval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_next_dagrun_info] + + +with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: + DummyOperator(task_id="run_this") + + +if __name__ == "__main__": + dag.cli() + +# [END howto_timetable] diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 8f6132ae170bd..8f5c3f1032a29 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -114,7 +114,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable): a five/six-segment representation, or one of ``cron_presets``. The implementation extends on croniter to add timezone awareness. This is - because crontier works only with naive timestamps, and cannot consider DST + because croniter works only with naive timestamps, and cannot consider DST when determining the next/previous time. Don't pass ``@once`` in here; use ``OnceTimetable`` instead. diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index 89a0995beca77..5c27a79fd7fb3 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -150,11 +150,20 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab `. DAG Runs can run in parallel for the same DAG, and each has a defined ``execution_date``, which identifies the *logical* date and time it is running for - not the *actual* time when it was started. +.. tip:: + + For more information on ``schedule_interval`` values, see :doc:`DAG Run `. + + If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables `. + +Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a :doc:`DAG Run `. DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the *logical* date and time range it is running for - not the *actual* time when it was started. As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It's been rewritten, and you want to run it on the previous 3 months of data - no problem, since Airflow can *backfill* the DAG and run copies of it for every day in those previous 3 months, all at once. -Those DAG Runs will all have been started on the same actual day, but their ``execution_date`` values will cover those last 3 months, and that's what all the tasks, operators and sensors inside the DAG look at when they run. +Those DAG Runs will all have been started on the same actual day, but each DAG +run will have one data interval covering a single day in that 3 month period, +and that data interval is all the tasks, operators and sensors inside the DAG +look at when they run. In much the same way a DAG instantiates into a DAG Run every time it's run, Tasks specified inside a DAG also instantiate into :ref:`Task Instances ` along with it. diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 5d47a0be46657..fb196a66df7c0 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -54,17 +54,33 @@ Cron Presets Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend. -.. note:: +Data Interval +------------- - If you run a DAG on a schedule_interval of one day, the run stamped 2020-01-01 - will be triggered soon after 2020-01-01T23:59. In other words, the job instance is - started once the period it covers has ended. The ``execution_date`` available in the context - will also be 2020-01-01. +Each DAG run in Airflow has an assigned "data interval" that represents the time +range it operates in. For a DAG scheduled with ``@daily``, for example, each of +its data interval would start at midnight of each day and end at midnight of the +next day. - The first DAG Run is created based on the minimum ``start_date`` for the tasks in your DAG. - Subsequent DAG Runs are created by the scheduler process, based on your DAG’s ``schedule_interval``, - sequentially. If your start_date is 2020-01-01 and schedule_interval is @daily, the first run - will be created on 2020-01-02 i.e., after your start date has passed. +A DAG run is scheduled *after* its associated data interval has ended, to ensure +the run is able to collect all the data within the time period. Therefore, a run +covering the data period of 2020-01-01 will not start to run until 2020-01-01 +has ended, i.e. after 2020-01-02 00:00:00. + +All dates in Airflow are tied to the data interval concept in some way. The +"logical date" (also called ``execution_date`` in Airflow versions prior to 2.2) +of a DAG run, for example, usually denotes the start of the data interval, not +when the DAG is actually executed. + +Similarly, since the ``start_date`` argument for the DAG and its tasks points to +the same logical date, it marks the start of *the DAG's fist data interval*, not +when tasks in the DAG will start running. In other words, a DAG run will only be +scheduled one interval after ``start_date``. + +.. tip:: + + If ``schedule_interval`` is not enough to express your DAG's schedule, + logical date, or data interval, see :doc:`Customizing imetables `. Re-run DAG '''''''''' diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index cf2ec014bd1cf..afea741f57dd2 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -33,6 +33,7 @@ configuring an Airflow environment. set-config set-up-database operator/index + timetable customize-ui custom-operator create-custom-decorator diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst new file mode 100644 index 0000000000000..61ff22fd92a5b --- /dev/null +++ b/docs/apache-airflow/howto/timetable.rst @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Customizing DAG Scheduling with Timetables +========================================== + +A DAG's scheduling strategy is determined by its internal "timetable". This +timetable can be created by specifying the DAG's ``schedule_interval`` argument, +as described in :doc:`DAG Run `. The timetable also dictates the data +interval and the logical time of each run created for the DAG. + +However, there are situations when a cron expression or simple ``timedelta`` +periods cannot properly express the schedule. Some of the examples are: + +* Data intervals with "holes" between. (Instead of continuous, as both the cron + expression and ``timedelta`` schedules represent.) +* Run tasks at different times each day. For example, an astronomer may find it + useful to run a task at dawn to process data collected from the previous + night-time period. +* Schedules not following the Gregorian calendar. For example, create a run for + each month in the `Traditional Chinese Calendar`_. This is conceptually + similar to the sunset case above, but for a different time scale. +* Rolling windows, or overlapping data intervals. For example, one may want to + have a run each day, but make each run cover the period of the previous seven + days. It is possible to "hack" this with a cron expression, but a custom data + interval would be a more natural representation. + +.. _`Traditional Chinese Calendar`: https://en.wikipedia.org/wiki/Chinese_calendar + + +For our example, let's say a company wants to run a job after each weekday to +process data collected during the work day. The first intuitive answer to this +would be ``schedule_interval="0 0 * * 1-5"`` (midnight on Monday to Friday), but +this means data collected on Friday will *not* be processed right after Friday +ends, but on the next Monday, and that run's interval would be from midnight +Friday to midnight *Monday*. + +This is, therefore, an example in the "holes" category above; the intended +schedule should not include the two weekend days. What we want is: + +* Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The + run's data interval would cover from midnight of each day, to midnight of the + next day (e.g. 2021-01-01 00:00:00 to 2021-01-02 00:00:00). +* Each run would be created right after the data interval ends. The run covering + Monday happens on midnight Tuesday and so on. The run covering Friday happens + on midnight Saturday. No runs happen on midnights Sunday and Monday. + +For simplicity, we will only deal with UTC datetimes in this example. + + +Define Scheduling Logic +----------------------- + +When Airflow's scheduler encounters a DAG, it calls one of the two methods to +know when to schedule the DAG's next run. + +* ``next_dagrun_info``: The scheduler uses this to learn the timetable's regular + schedule, i.e. the "one for every workday, run at the end of it" part in our + example. +* ``infer_data_interval``: When a DAG run is manually triggered (from the web + UI, for example), the scheduler uses this method to learn about how to + reverse-infer the out-of-schedule run's data interval. + +We'll start with ``infer_data_interval`` since it's the easier of the two: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_infer_data_interval] + :end-before: [END howto_timetable_infer_data_interval] + +The method accepts one argument ``run_after``, a ``pendulum.DateTime`` object +that indicates when the DAG is externally triggered. Since our timetable creates +a data interval for each complete work day, the data interval inferred here +should usually start at the midnight one day prior to ``run_after``, but if +``run_after`` falls on a Sunday or Monday (i.e. the prior day is Saturday or +Sunday), it should be pushed further back to the previous Friday. Once we know +the start of the interval, the end is simply one full day after it. We then +create a :class:`~airflow.timetables.base.DataInterval` object to describe this +interval. + +Next is the implementation of ``next_dagrun_info``: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_next_dagrun_info] + :end-before: [END howto_timetable_next_dagrun_info] + +This method accepts two arguments. ``last_automated_dagrun`` is a +``pendulum.DateTime`` object indicating the logical date of this DAG's previous +non-manually-triggered run, or ``None`` if this is the first time ever the DAG +is being scheduled. ``restriction`` encapsulates how the DAG and its tasks +specify the schedule, and contains three attributes: + +* ``earliest``: The earliest time the DAG may be scheduled. This is a + calculated ``pendulum.DateTime`` from all the ``start_date`` arguments from + the DAG and its tasks, or ``None`` oi there are no ``start_date`` arguments + found at all. +* ``latest``: Similar to ``earliest``, this is the latest time the DAG may be + scheduled, calculated from ``end_date`` arguments. +* ``catchup``: A boolean reflecting the DAG's ``catchup`` argument. + +.. note:: + + Both ``earliest`` and ``latest`` apply to the DAG run's logical date + (the *start* of the data interval), not when the run will be scheduled + (usually after the end of the data interval). + +If there was a run scheduled previously, we should now schedule for the next +weekday, i.e. plus one day if the previous run was on Monday through Thursday, +or three days if it was on Friday. If there was not a previous scheduled run, +however, we pick the next workday's midnight after ``restriction.earliest`` +(unless it *is* a workday's midnight; in which case it's used directly). +``restriction.catchup`` also needs to be considered---if it's ``False``, we +can't schedule before the current time, even if ``start_date`` values are in the +past. Finally, if our calculated data interval is later than +``restriction.latest``, we must respect it and not schedule a run by returning +``None``. + +If we decide to schedule a run, we need to describe it with a +:class:`~airflow.timetables.base.DagRunInfo`. This type has two arguments and +attributes: + +* ``data_interval``: A :class:`~airflow.timetables.base.DataInterval` instance + like ``infer_data_interval``'s return value. This describes the next run's + data interval. +* ``run_after``: A ``pendulum.DateTime`` instance that tells the scheduler when + the DAG run can be scheduled. + +.. note:: + + In case you're wondering---yes, the argument and return value of + ``infer_data_interval`` are also internally combined into a ``DagRunInfo``. + +A ``DagRunInfo`` can be created like this: + +.. code-block:: python + + info = DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=run_after, + ) + +But since we typically wan to schedule a run as soon as the data interval ends, +``end`` and ``run_after`` above are generally the same. ``DagRunInfo`` therefore +provides a shortcut for this: + +.. code-block:: python + + info = DagRunInfo.interval(start=start, end=end) + assert info.data_interval.end == info.run_after # Always True. + +For reference, here's our DAG file in its entirety: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :start-after: [START howto_timetable] + :end-before: [END howto_timetable] + + +DAG Serialization and Parameterized Timetables +---------------------------------------------- + +Sometimes we need to pass some run-time arguments to the timetable. Continuing +with our ``AfterWorkdayTimetable`` example, maybe we may have DAGs running on +different timezones, and we want to schedule some DAGs at 8am the next day, +instead of on midnight. Instead of creating a separate timetable for each +purpose, we'd want to do something like: + +.. code-block:: python + + from datetime import timedelta + + from pendulum import DateTime, Time + + + class SometimeAfterWorkdayTimetable(Timetable): + def __init__(self, schedule_at: Time) -> None: + self._schedule_at = schedule_at + + def next_dagrun_info(self, last_automated_dagrun, restriction): + ... + end = start + timedelta(days=1) + return DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=DateTime.combine(end.date(), self._schedule_at).replace( + tzinfo=end.tzinfo + ), + ) + +However, since the timetable is a part of the DAG, we need to tell Airflow how +to serialize it with the context we provide in ``__init__``. This is done by +implementing two additional methods on our timetable class: + +.. code-block:: python + + def serialize(self) -> Dict[str, Any]: + return {"schedule_at": self._schedule_at.isoformat()} + + + @classmethod + def deserialize(cls, value: Dict[str, Any]) -> Timetable: + return cls(Time.fromisoformat(value["schedule_at"])) + +When the DAG is being serialized, ``serialize`` is called to obtain a +JSON-serializable value. That value is passed to ``deserialize`` when the +serialized DAG is accessed by the scheduler to reconstruct the timetable. + + +Timetable Display in UI +======================= + +By default, a custom timetable is displayed by their class name in the UI (e.g. +the *Schedule* column in the "DAGs" table. It is possible to customize this +by overriding the ``summary`` property. This is especially useful for +parameterized timetables to include arguments provided in ``__init__``. For +our ``SometimeAfterWorkdayTimetable`` class, for example, we could have: + +.. code-block:: python + + @property + def summary(self) -> str: + return f"after each workday, at {self._schedule_at}" + +So for a DAG declared like this: + +.. code-block:: python + + from pendulum import Time + + + with DAG( + timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am. + ..., + ) as dag: + ... + +The *Schedule* column would say ``after each workday, at 08:00:00``. + + +.. seealso:: + + Module :mod:`airflow.timetables.base` + The public interface is heavily documented to explain what they should + be implemented by subclasses. diff --git a/docs/apache-airflow/python-api-ref.rst b/docs/apache-airflow/python-api-ref.rst index e79842ebc8e39..591bb84f5ecca 100644 --- a/docs/apache-airflow/python-api-ref.rst +++ b/docs/apache-airflow/python-api-ref.rst @@ -137,3 +137,14 @@ All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. :maxdepth: 1 _api/airflow/secrets/index + +Timetables +---------- +Custom timetable implementations provide Airflow's scheduler additional logic to +schedule DAG runs in ways not possible with built-in schedule expressions. + +.. toctree:: + :includehidden: + :maxdepth: 1 + + _api/airflow/timetables/index diff --git a/docs/conf.py b/docs/conf.py index b3af13c103f8c..8afba7fed2601 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -215,7 +215,16 @@ def _get_rst_filepath_from_path(filepath: str): name = os.path.basename(path) if os.path.isfile(path) and not path.endswith(_allowed_top_level): exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}") - browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"] + browsable_packages = [ + "hooks", + "executors", + "models", + "operators", + "providers", + "secrets", + "sensors", + "timetables", + ] if os.path.isdir(path) and name not in browsable_packages: exclude_patterns.append(f"_api/airflow/{name}") else: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 66d6b29c6c964..df81f30e166ef 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -228,6 +228,7 @@ Memorystore Mesos MessageAttributes Metastore +Midnights Mixin Mongo Moto @@ -259,6 +260,7 @@ PTarget Pagerduty Papermill Parallelize +Parameterized Parameterizing Paramiko Params @@ -578,6 +580,7 @@ createDisposition creationTimestamp credssp cron +croniter cronjob crontab crypto @@ -953,6 +956,7 @@ mget microservice microsoft middleware +midnights milton minicluster minikube From f9930ef26d0af9f42532aa9c3a7b9459aa6f8435 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 7 Sep 2021 20:11:46 +0800 Subject: [PATCH 02/21] Document timetable registration via plugin --- .../example_dags/example_workday_timetable.py | 66 +------------- airflow/example_dags/plugins/__init__.py | 16 ++++ airflow/example_dags/plugins/workday.py | 89 +++++++++++++++++++ docs/apache-airflow/howto/timetable.rst | 47 ++++++++-- docs/apache-airflow/plugins.rst | 3 + 5 files changed, 154 insertions(+), 67 deletions(-) create mode 100644 airflow/example_dags/plugins/__init__.py create mode 100644 airflow/example_dags/plugins/workday.py diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py index dd29b978b1919..be9c7f9fb3f4c 100644 --- a/airflow/example_dags/example_workday_timetable.py +++ b/airflow/example_dags/example_workday_timetable.py @@ -18,75 +18,17 @@ """Example DAG demostrating how to implement a custom timetable for a DAG.""" -# [START howto_timetable] -from datetime import timedelta -from typing import Optional - -from pendulum import Date, DateTime, Time, timezone +from plugins.workday import AfterWorkdayTimetable +# [START howto_timetable] from airflow import DAG from airflow.operators.dummy import DummyOperator -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable - -UTC = timezone("UTC") - - -class AfterWorkdayTimetable(Timetable): - - # [START howto_timetable_infer_data_interval] - def infer_data_interval(self, run_after: DateTime) -> DataInterval: - weekday = run_after.weekday() - if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. - days_since_friday = (run_after.weekday() - 4) % 7 - delta = timedelta(days=days_since_friday) - else: # Otherwise the interval is yesterday. - delta = timedelta(days=1) - start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) - return DataInterval(start=start, end=(start + timedelta(days=1))) - - # [END howto_timetable_infer_data_interval] - - # [START howto_timetable_next_dagrun_info] - def next_dagrun_info( - self, - last_automated_dagrun: Optional[DateTime], - restriction: TimeRestriction, - ) -> Optional[DagRunInfo]: - if last_automated_dagrun is not None: - # There was a previous run on the regular schedule. - # last_automated_dagrun os the last run's logical date. - weekday = last_automated_dagrun.weekday() - if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow. - delta = timedelta(days=1) - else: # Week is ending -- skip to next Monday. - delta = timedelta(days=(7 - weekday)) - start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) - else: # This is the first ever run on the regular schedule. - if restriction.earliest is None: # No start_date. Don't schedule. - return None - start = restriction.earliest - if start.time() != Time.min: - # If earliest does not fall on midnight, skip to the next day. - start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1) - if not restriction.catchup: - # If the DAG has catchup=False, today is the earliest to consider. - start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) - weekday = start.weekday() - if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday. - delta = timedelta(days=(7 - weekday)) - start = start + delta - if start > restriction.latest: # Over the DAG's scheduled end; don't schedule. - return None - return DagRunInfo.interval(start=start, end=(start + timedelta(days=1))) - - # [END howto_timetable_next_dagrun_info] - +# [START howto_timetable_example_dag] with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: DummyOperator(task_id="run_this") - +# [END howto_timetable_example_dag] if __name__ == "__main__": dag.cli() - # [END howto_timetable] diff --git a/airflow/example_dags/plugins/__init__.py b/airflow/example_dags/plugins/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/example_dags/plugins/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py new file mode 100644 index 0000000000000..bd566bbe876fc --- /dev/null +++ b/airflow/example_dags/plugins/workday.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Plugin to demostrate timetable registration and accomdate example DAGs.""" + +# [START howto_timetable] +from datetime import timedelta +from typing import Optional + +from pendulum import Date, DateTime, Time, timezone + +from airflow.plugins_manager import AirflowPlugin +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable + +UTC = timezone("UTC") + + +class AfterWorkdayTimetable(Timetable): + + # [START howto_timetable_infer_data_interval] + def infer_data_interval(self, run_after: DateTime) -> DataInterval: + weekday = run_after.weekday() + if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. + days_since_friday = (run_after.weekday() - 4) % 7 + delta = timedelta(days=days_since_friday) + else: # Otherwise the interval is yesterday. + delta = timedelta(days=1) + start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) + return DataInterval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_infer_data_interval] + + # [START howto_timetable_next_dagrun_info] + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_dagrun is not None: + # There was a previous run on the regular schedule. + # last_automated_dagrun os the last run's logical date. + weekday = last_automated_dagrun.weekday() + if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow. + delta = timedelta(days=1) + else: # Week is ending -- skip to next Monday. + delta = timedelta(days=(7 - weekday)) + start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) + else: # This is the first ever run on the regular schedule. + if restriction.earliest is None: # No start_date. Don't schedule. + return None + start = restriction.earliest + if start.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1) + if not restriction.catchup: + # If the DAG has catchup=False, today is the earliest to consider. + start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) + weekday = start.weekday() + if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday. + delta = timedelta(days=(7 - weekday)) + start = start + delta + if start > restriction.latest: # Over the DAG's scheduled end; don't schedule. + return None + return DagRunInfo.interval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_next_dagrun_info] + + +class WorkdayTimetablePlugin(AirflowPlugin): + name = "workday_timetable_plugin" + timetables = [AfterWorkdayTimetable] + + +# [END howto_timetable] diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 61ff22fd92a5b..73cd85ebcb35f 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -63,6 +63,38 @@ schedule should not include the two weekend days. What we want is: For simplicity, we will only deal with UTC datetimes in this example. +Timetable Registration +---------------------- + +A timetable must be a subclass of :class:`~airflow.timetables.base.Timetable`, +and be registered as a part of a :doc:`plugin `. The following is a +skeleton for us to implement a new timetable: + +.. code-block:: python + + from airflow.plugins_manager import AirflowPlugin + from airflow.timetables.base import Timetable + + + class AfterWorkdayTimetable(Timetable): + pass + + + class WorkdayTimetablePlugin(AirflowPlugin): + name = "workday_timetable_plugin" + timetables = [AfterWorkdayTimetable] + +Next, we'll start putting code into ``AfterWorkdayTimetable``. After the +implementation is finished, we should be able to use the timetable in our DAG +file: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_example_dag] + :end-before: [END howto_timetable_example_dag] + + Define Scheduling Logic ----------------------- @@ -78,7 +110,7 @@ know when to schedule the DAG's next run. We'll start with ``infer_data_interval`` since it's the easier of the two: -.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py :language: python :dedent: 4 :start-after: [START howto_timetable_infer_data_interval] @@ -96,7 +128,7 @@ interval. Next is the implementation of ``next_dagrun_info``: -.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py :language: python :dedent: 4 :start-after: [START howto_timetable_next_dagrun_info] @@ -166,7 +198,12 @@ provides a shortcut for this: info = DagRunInfo.interval(start=start, end=end) assert info.data_interval.end == info.run_after # Always True. -For reference, here's our DAG file in its entirety: +For reference, here's our plugin and DAG files in their entirety: + +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py + :language: python + :start-after: [START howto_timetable] + :end-before: [END howto_timetable] .. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py :language: python @@ -174,8 +211,8 @@ For reference, here's our DAG file in its entirety: :end-before: [END howto_timetable] -DAG Serialization and Parameterized Timetables ----------------------------------------------- +Parameterized Timetables +------------------------ Sometimes we need to pass some run-time arguments to the timetable. Continuing with our ``AfterWorkdayTimetable`` example, maybe we may have DAGs running on diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst index d59659e7fd83b..7943cb0b08a14 100644 --- a/docs/apache-airflow/plugins.rst +++ b/docs/apache-airflow/plugins.rst @@ -139,6 +139,9 @@ looks like: # buttons. operator_extra_links = [] + # A list of timetable classes to register so they can be used in DAGs. + timetables = [] + You can derive it by inheritance (please refer to the example below). In the example, all options have been defined as class attributes, but you can also define them as properties if you need to perform additional initialization. Please note ``name`` inside this class must be specified. From 9455c9049c2fe2df4a30435341e5058d521afcd4 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 7 Sep 2021 20:15:49 +0800 Subject: [PATCH 03/21] Typos --- docs/apache-airflow/howto/timetable.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 73cd85ebcb35f..2d01ba13dc74a 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -141,8 +141,8 @@ is being scheduled. ``restriction`` encapsulates how the DAG and its tasks specify the schedule, and contains three attributes: * ``earliest``: The earliest time the DAG may be scheduled. This is a - calculated ``pendulum.DateTime`` from all the ``start_date`` arguments from - the DAG and its tasks, or ``None`` oi there are no ``start_date`` arguments + ``pendulum.DateTime`` calculated from all the ``start_date`` arguments from + the DAG and its tasks, or ``None`` if there are no ``start_date`` arguments found at all. * ``latest``: Similar to ``earliest``, this is the latest time the DAG may be scheduled, calculated from ``end_date`` arguments. @@ -189,7 +189,7 @@ A ``DagRunInfo`` can be created like this: run_after=run_after, ) -But since we typically wan to schedule a run as soon as the data interval ends, +But since we typically want to schedule a run as soon as the data interval ends, ``end`` and ``run_after`` above are generally the same. ``DagRunInfo`` therefore provides a shortcut for this: @@ -215,7 +215,7 @@ Parameterized Timetables ------------------------ Sometimes we need to pass some run-time arguments to the timetable. Continuing -with our ``AfterWorkdayTimetable`` example, maybe we may have DAGs running on +with our ``AfterWorkdayTimetable`` example, maybe we have DAGs running on different timezones, and we want to schedule some DAGs at 8am the next day, instead of on midnight. Instead of creating a separate timetable for each purpose, we'd want to do something like: From d54e529972cd480bd07a19e8bf16c0b2514b18c2 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 7 Sep 2021 22:03:36 +0800 Subject: [PATCH 04/21] Fix imports in example DAG --- airflow/example_dags/example_workday_timetable.py | 5 +---- docs/apache-airflow/howto/timetable.rst | 12 +++++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py index be9c7f9fb3f4c..c1165e6e08d4a 100644 --- a/airflow/example_dags/example_workday_timetable.py +++ b/airflow/example_dags/example_workday_timetable.py @@ -18,16 +18,13 @@ """Example DAG demostrating how to implement a custom timetable for a DAG.""" -from plugins.workday import AfterWorkdayTimetable - # [START howto_timetable] from airflow import DAG +from airflow.example_dags.plugins.workday import AfterWorkdayTimetable from airflow.operators.dummy import DummyOperator -# [START howto_timetable_example_dag] with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: DummyOperator(task_id="run_this") -# [END howto_timetable_example_dag] if __name__ == "__main__": dag.cli() diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 2d01ba13dc74a..a76f9445fe9d1 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -88,11 +88,13 @@ Next, we'll start putting code into ``AfterWorkdayTimetable``. After the implementation is finished, we should be able to use the timetable in our DAG file: -.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py - :language: python - :dedent: 4 - :start-after: [START howto_timetable_example_dag] - :end-before: [END howto_timetable_example_dag] +.. code-block:: python + + from airflow import DAG + + + with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: + ... Define Scheduling Logic From 05c3f184831369adc8c028707985849605b51388 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 11:16:25 +0800 Subject: [PATCH 05/21] Rewrite example with new last_automated_dagrun --- airflow/example_dags/plugins/workday.py | 39 ++++++++++++------------ docs/apache-airflow/howto/timetable.rst | 40 ++++++++----------------- 2 files changed, 32 insertions(+), 47 deletions(-) diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index bd566bbe876fc..fae27c735655c 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -48,35 +48,34 @@ def infer_data_interval(self, run_after: DateTime) -> DataInterval: # [START howto_timetable_next_dagrun_info] def next_dagrun_info( self, - last_automated_dagrun: Optional[DateTime], + last_automated_dagrun: Optional[DataInterval], restriction: TimeRestriction, ) -> Optional[DagRunInfo]: - if last_automated_dagrun is not None: - # There was a previous run on the regular schedule. - # last_automated_dagrun os the last run's logical date. - weekday = last_automated_dagrun.weekday() - if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow. + if last_automated_dagrun is not None: # There was a previous run on the regular schedule. + last_start_weekday = last_automated_dagrun.start.weekday() + if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow. delta = timedelta(days=1) - else: # Week is ending -- skip to next Monday. - delta = timedelta(days=(7 - weekday)) - start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) + else: # Last run on Friday -- skip to next Monday. + delta = timedelta(days=(7 - last_start_weekday)) + next_start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) else: # This is the first ever run on the regular schedule. if restriction.earliest is None: # No start_date. Don't schedule. return None - start = restriction.earliest - if start.time() != Time.min: - # If earliest does not fall on midnight, skip to the next day. - start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1) + next_start = restriction.earliest if not restriction.catchup: # If the DAG has catchup=False, today is the earliest to consider. - start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) - weekday = start.weekday() - if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday. - delta = timedelta(days=(7 - weekday)) - start = start + delta - if start > restriction.latest: # Over the DAG's scheduled end; don't schedule. + next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) + elif next_start.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + next_day = next_start.date() + timedelta(days=1) + next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC) + next_start_weekday = next_start.weekday() + if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday. + delta = timedelta(days=(7 - next_start_weekday)) + next_start = next_start + delta + if next_start > restriction.latest: # Over the DAG's scheduled end; don't schedule. return None - return DagRunInfo.interval(start=start, end=(start + timedelta(days=1))) + return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) # [END howto_timetable_next_dagrun_info] diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index a76f9445fe9d1..3a77d356f32a6 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -137,10 +137,10 @@ Next is the implementation of ``next_dagrun_info``: :end-before: [END howto_timetable_next_dagrun_info] This method accepts two arguments. ``last_automated_dagrun`` is a -``pendulum.DateTime`` object indicating the logical date of this DAG's previous -non-manually-triggered run, or ``None`` if this is the first time ever the DAG -is being scheduled. ``restriction`` encapsulates how the DAG and its tasks -specify the schedule, and contains three attributes: +:class:`~airflow.timetables.base.DataInterval` instance indicating the data +interval of this DAG's previous non-manually-triggered run, or ``None`` if this +is the first time ever the DAG is being scheduled. ``restriction`` encapsulates +how the DAG and its tasks specify the schedule, and contains three attributes: * ``earliest``: The earliest time the DAG may be scheduled. This is a ``pendulum.DateTime`` calculated from all the ``start_date`` arguments from @@ -172,16 +172,10 @@ If we decide to schedule a run, we need to describe it with a attributes: * ``data_interval``: A :class:`~airflow.timetables.base.DataInterval` instance - like ``infer_data_interval``'s return value. This describes the next run's - data interval. + describing the next run's data interval. * ``run_after``: A ``pendulum.DateTime`` instance that tells the scheduler when the DAG run can be scheduled. -.. note:: - - In case you're wondering---yes, the argument and return value of - ``infer_data_interval`` are also internally combined into a ``DagRunInfo``. - A ``DagRunInfo`` can be created like this: .. code-block:: python @@ -224,11 +218,6 @@ purpose, we'd want to do something like: .. code-block:: python - from datetime import timedelta - - from pendulum import DateTime, Time - - class SometimeAfterWorkdayTimetable(Timetable): def __init__(self, schedule_at: Time) -> None: self._schedule_at = schedule_at @@ -238,9 +227,7 @@ purpose, we'd want to do something like: end = start + timedelta(days=1) return DagRunInfo( data_interval=DataInterval(start=start, end=end), - run_after=DateTime.combine(end.date(), self._schedule_at).replace( - tzinfo=end.tzinfo - ), + run_after=DateTime.combine(end.date(), self._schedule_at), ) However, since the timetable is a part of the DAG, we need to tell Airflow how @@ -249,13 +236,15 @@ implementing two additional methods on our timetable class: .. code-block:: python - def serialize(self) -> Dict[str, Any]: - return {"schedule_at": self._schedule_at.isoformat()} + class SometimeAfterWorkdayTimetable(Timetable): + ... + def serialize(self) -> Dict[str, Any]: + return {"schedule_at": self._schedule_at.isoformat()} - @classmethod - def deserialize(cls, value: Dict[str, Any]) -> Timetable: - return cls(Time.fromisoformat(value["schedule_at"])) + @classmethod + def deserialize(cls, value: Dict[str, Any]) -> Timetable: + return cls(Time.fromisoformat(value["schedule_at"])) When the DAG is being serialized, ``serialize`` is called to obtain a JSON-serializable value. That value is passed to ``deserialize`` when the @@ -281,9 +270,6 @@ So for a DAG declared like this: .. code-block:: python - from pendulum import Time - - with DAG( timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am. ..., From 7c9056ea4ab98e431c28c848642d87da5a16650c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 14:05:06 +0800 Subject: [PATCH 06/21] Remove execution date reference in docs Not all usages can be removed, unfortunately, since there are still many usages requiring the value, especially in Python APIs. This also adds additional values for Sentry tags and Elastic log filenames based on data interval instead of execution date, so the user can migrate as soon as possible, even if we have not changed the default. --- airflow/models/taskinstance.py | 4 +- .../elasticsearch/log/es_task_handler.py | 23 ++- airflow/sentry.py | 14 +- .../logging/index.rst | 4 +- .../operators/cloud/gcs.rst | 7 +- docs/apache-airflow/best-practices.rst | 139 +++++++++--------- docs/apache-airflow/concepts/dags.rst | 6 +- docs/apache-airflow/concepts/operators.rst | 8 +- docs/apache-airflow/concepts/overview.rst | 2 +- docs/apache-airflow/concepts/scheduler.rst | 10 +- docs/apache-airflow/concepts/tasks.rst | 6 +- docs/apache-airflow/dag-run.rst | 27 ++-- docs/apache-airflow/faq.rst | 33 +++-- .../howto/define_extra_link.rst | 4 +- .../howto/operator/external_task_sensor.rst | 3 +- docs/apache-airflow/howto/operator/python.rst | 2 +- docs/apache-airflow/lineage.rst | 4 +- .../logging-monitoring/errors.rst | 7 +- .../logging-monitoring/logging-tasks.rst | 2 +- docs/apache-airflow/plugins.rst | 4 +- docs/apache-airflow/timezone.rst | 12 +- docs/apache-airflow/tutorial.rst | 31 ++-- 22 files changed, 200 insertions(+), 152 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 9dca1f3e71c3c..7d8c0c9341a6b 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2271,10 +2271,8 @@ def xcom_pull( if dag_id is None: dag_id = self.dag_id - execution_date = self.get_dagrun(session).execution_date - query = XCom.get_many( - execution_date=execution_date, + run_id=self.run_id, key=key, dag_ids=dag_id, task_ids=task_ids, diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 1a562805213fc..1b69a8b49c49e 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -101,25 +101,36 @@ def __init__( self.context_set = False def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: + dag_run = ti.dag_run + if self.json_format: - execution_date = self._clean_execution_date(ti.execution_date) + data_interval_start = self._clean_date(dag_run.data_interval_start) + data_interval_end = self._clean_date(dag_run.data_interval_end) + execution_date = self._clean_date(dag_run.execution_date) else: - execution_date = ti.execution_date.isoformat() + data_interval_start = dag_run.data_interval_start.isoformat() + data_interval_end = dag_run.data_interval_end.isoformat() + execution_date = dag_run.execution_date.isoformat() return self.log_id_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, execution_date=execution_date, try_number=try_number + dag_id=ti.dag_id, + task_id=ti.task_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + execution_date=execution_date, + try_number=try_number, ) @staticmethod - def _clean_execution_date(execution_date: datetime) -> str: + def _clean_date(value: datetime) -> str: """ - Clean up an execution date so that it is safe to query in elasticsearch + Clean up a date value so that it is safe to query in elasticsearch by removing reserved characters. # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters :param execution_date: execution date of the dag run. """ - return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f") + return value.strftime("%Y_%m_%dT%H_%M_%S_%f") def _group_logs_by_host(self, logs): grouped_logs = defaultdict(list) diff --git a/airflow/sentry.py b/airflow/sentry.py index a6b2adf891fd5..19d83f636c6b4 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -59,7 +59,10 @@ def flush(self): class ConfiguredSentry(DummySentry): """Configure Sentry SDK.""" - SCOPE_TAGS = frozenset(("task_id", "dag_id", "execution_date", "operator", "try_number")) + SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) + SCOPE_TASK_TAGS = frozenset(("operator",)) + SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) + SCOPE_TAGS = SCOPE_DAG_RUN_TAGS | SCOPE_TASK_TAGS | SCOPE_TASK_INSTANCE_TAGS SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) UNSUPPORTED_SENTRY_OPTIONS = frozenset( @@ -116,14 +119,17 @@ def __init__(self): def add_tagging(self, task_instance): """Function to add tagging for a task_instance.""" + dag_run = task_instance.dag_run task = task_instance.task with sentry_sdk.configure_scope() as scope: - for tag_name in self.SCOPE_TAGS: + for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: attribute = getattr(task_instance, tag_name) - if tag_name == "operator": - attribute = task.__class__.__name__ scope.set_tag(tag_name, attribute) + for tag_name in self.SCOPE_DAG_RUN_TAGS: + attribute = getattr(dag_run, tag_name) + scope.set_tag(tag_name, attribute) + scope.set_tag("operator", task.__class__.__name__) @provide_session def add_breadcrumbs(self, task_instance, session=None): diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst b/docs/apache-airflow-providers-elasticsearch/logging/index.rst index e558db52fd83b..ae9c0c2d8a6e0 100644 --- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst @@ -38,7 +38,7 @@ First, to use the handler, ``airflow.cfg`` must be configured as follows: [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} + log_id_template = {dag_id}-{task_id}-{data_interval_start}-{try_number} end_of_log_mark = end_of_log write_stdout = json_fields = @@ -56,7 +56,7 @@ To output task logs to stdout in JSON format, the following config could be used [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} + log_id_template = {dag_id}-{task_id}-{data_interval_start}-{try_number} end_of_log_mark = end_of_log write_stdout = True json_format = True diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst index 5fa330c7a7d6c..0a5e2435407ea 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst @@ -58,11 +58,10 @@ GCSTimeSpanFileTransformOperator Use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator` -to transform files that were modified in a specific time span. The time span is defined -by the DAG instance logical execution timestamp (``execution_date``, start of time span) -and the timestamp when the next DAG instance execution is scheduled (end of time span). If a DAG +to transform files that were modified in a specific time span (the data interval). +The time span is defined by the time span's start and end timestamps. If a DAG does not have a *next* DAG instance scheduled, the time span end infinite, meaning the operator -processes all files older than ``execution_date``. +processes all files older than ``data_interval_start``. .. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py :language: python diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 013699b886674..3dfd583bb0063 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -43,21 +43,26 @@ Please follow our guide on :ref:`custom Operators `. Creating a task --------------- -You should treat tasks in Airflow equivalent to transactions in a database. This implies that you should never produce -incomplete results from your tasks. An example is not to produce incomplete data in ``HDFS`` or ``S3`` at the end of a task. - -Airflow can retry a task if it fails. Thus, the tasks should produce the same outcome on every re-run. -Some of the ways you can avoid producing a different result - - -* Do not use INSERT during a task re-run, an INSERT statement might lead to duplicate rows in your database. - Replace it with UPSERT. -* Read and write in a specific partition. Never read the latest available data in a task. - Someone may update the input data between re-runs, which results in different outputs. - A better way is to read the input data from a specific partition. You can use ``execution_date`` as a partition. - You should follow this partitioning method while writing data in S3/HDFS, as well. -* The Python datetime ``now()`` function gives the current datetime object. - This function should never be used inside a task, especially to do the critical computation, as it leads to different outcomes on each run. - It's fine to use it, for example, to generate a temporary log. +You should treat tasks in Airflow equivalent to transactions in a database. This +implies that you should never produce incomplete results from your tasks. An +example is not to produce incomplete data in ``HDFS`` or ``S3`` at the end of a +task. + +Airflow can retry a task if it fails. Thus, the tasks should produce the same +outcome on every re-run. Some of the ways you can avoid producing a different +result - + +* Do not use INSERT during a task re-run, an INSERT statement might lead to + duplicate rows in your database. Replace it with UPSERT. +* Read and write in a specific partition. Never read the latest available data + in a task. Someone may update the input data between re-runs, which results in + different outputs. A better way is to read the input data from a specific + partition. You can use ``data_interval_start`` as a partition. You should + follow this partitioning method while writing data in S3/HDFS, as well. +* The Python datetime ``now()`` function gives the current datetime object. This + function should never be used inside a task, especially to do the critical + computation, as it leads to different outcomes on each run. It's fine to use + it, for example, to generate a temporary log. .. tip:: @@ -270,77 +275,73 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write uni .. code-block:: python - from airflow.models import DagBag - import unittest + import pytest + from airflow.models import DagBag - class TestHelloWorldDAG(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.dagbag = DagBag() - def test_dag_loaded(self): - dag = self.dagbag.get_dag(dag_id="hello_world") - assert self.dagbag.import_errors == {} - assert dag is not None - assert len(dag.tasks) == 1 + @pytest.fixture(scope="scope") + def dagbag(self): + return DagBag() + + + def test_dag_loaded(self, dagbag): + dag = dagbag.get_dag(dag_id="hello_world") + assert dagbag.import_errors == {} + assert dag is not None + assert len(dag.tasks) == 1 + **Unit test a DAG structure:** This is an example test want to verify the structure of a code-generated DAG against a dict object .. code-block:: python - import unittest + def assert_dag_dict_equal(source, dag): + assert dag.task_dict.keys() == source.keys() + for task_id, downstream_list in source.items(): + assert dag.has_task(task_id) + task = dag.get_task(task_id) + assert task.downstream_task_ids == set(downstream_list) - class testClass(unittest.TestCase): - def assertDagDictEqual(self, source, dag): - assert dag.task_dict.keys() == source.keys() - for task_id, downstream_list in source.items(): - assert dag.has_task(task_id) - task = dag.get_task(task_id) - assert task.downstream_task_ids == set(downstream_list) + def test_dag(): + assert_dag_dict_equal( + { + "DummyInstruction_0": ["DummyInstruction_1"], + "DummyInstruction_1": ["DummyInstruction_2"], + "DummyInstruction_2": ["DummyInstruction_3"], + "DummyInstruction_3": [], + }, + dag, + ) - def test_dag(self): - self.assertDagDictEqual( - { - "DummyInstruction_0": ["DummyInstruction_1"], - "DummyInstruction_1": ["DummyInstruction_2"], - "DummyInstruction_2": ["DummyInstruction_3"], - "DummyInstruction_3": [], - }, - dag, - ) **Unit test for custom operator:** .. code-block:: python - import unittest - from airflow.utils.state import State - - DEFAULT_DATE = "2019-10-03" - TEST_DAG_ID = "test_my_custom_operator" - - - class MyCustomOperatorTest(unittest.TestCase): - def setUp(self): - self.dag = DAG( - TEST_DAG_ID, - schedule_interval="@daily", - default_args={"start_date": DEFAULT_DATE}, - ) - self.op = MyCustomOperator( - dag=self.dag, - task_id="test", - prefix="s3://bucket/some/prefix", - ) - self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE) - - def test_execute_no_trigger(self): - self.ti.run(ignore_ti_state=True) - assert self.ti.state == State.SUCCESS - # Assert something related to tasks results + import pytest + + from airflow.utils.state import State + + + def test_my_custom_oeprator_execute_no_trigger(dag_maker): + with dag_maker( + dag_id="test_my_custom_oeprator_execute_no_trigger", + schedule_interval="@daily", + default_args={"start_date": DEFAULT_DATE}, + ) as dag: + MyCustomOperator( + dag=self.dag, + task_id="test", + prefix="s3://bucket/some/prefix", + ) + dagrun = dag_maker.create_dagrun() + (ti,) = dagrun.task_instances + assert ti.state == State.SUCCESS + # Assert something related to tasks results + Self-Checks ------------ diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index 5c27a79fd7fb3..bf22cd74f182c 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -288,7 +288,7 @@ As with the callable for ``BranchPythonOperator``, this method should return the """ Run an extra branch on the first day of the month """ - if context['execution_date'].day == 1: + if context['data_interval_start'].day == 1: return ['daily_task_id', 'monthly_task_id'] else: return 'daily_task_id' @@ -328,7 +328,7 @@ Depends On Past You can also say a task can only run if the *previous* run of the task in the previous DAG Run succeeded. To use this, you just need to set the ``depends_on_past`` argument on your Task to ``True``. -Note that if you are running the DAG at the very start of its life - specifically, that the ``execution_date`` matches the ``start_date`` - then the Task will still run, as there is no previous run to depend on. +Note that if you are running the DAG at the very start of its life---specifically, its first ever *automated* run---then the Task will still run, as there is no previous run to depend on. .. _concepts:trigger-rules: @@ -621,7 +621,7 @@ in which one DAG can depend on another: - waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG -with different execution dates. The **Dag Dependencies** view +with different data intervals. The **Dag Dependencies** view ``Menu -> Browse -> DAG Dependencies`` helps visualize dependencies between DAGs. The dependencies are calculated by the scheduler during DAG serialization and the webserver uses them to build the dependency graph. diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 25146356becad..695e9d39c4cd3 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -66,20 +66,20 @@ Jinja Templating ---------------- Airflow leverages the power of `Jinja Templating `_ and this can be a powerful tool to use in combination with :ref:`macros `. -For example, say you want to pass the execution date as an environment variable to a Bash script using the ``BashOperator``: +For example, say you want to pass the start of the data interval as an environment variable to a Bash script using the ``BashOperator``: .. code-block:: python - # The execution date as YYYY-MM-DD + # The start of the data interval as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( task_id="test_env", bash_command="/tmp/test.sh ", dag=dag, - env={"EXECUTION_DATE": date}, + env={"DATA_INTERVAL_START": date}, ) -Here, ``{{ ds }}`` is a templated variable, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the execution date will be available as an environment variable named ``EXECUTION_DATE`` in your Bash script. +Here, ``{{ ds }}`` is a templated variable, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the data interval's start date will be available as an environment variable named ``DATA_INTERVAL_START`` in your Bash script. You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the ``pre_execute`` function of your operator is called. diff --git a/docs/apache-airflow/concepts/overview.rst b/docs/apache-airflow/concepts/overview.rst index 7571992b1ab58..b9d9f1dbe89d3 100644 --- a/docs/apache-airflow/concepts/overview.rst +++ b/docs/apache-airflow/concepts/overview.rst @@ -60,7 +60,7 @@ Internally, these are all actually subclasses of Airflow's ``BaseOperator``, and Control Flow ------------ -:doc:`dags` are designed to be run many times, and multiple runs of them can happen in parallel. DAGs are parameterized, always including a date they are "running for" (the ``execution_date``), but with other optional parameters as well. +:doc:`dags` are designed to be run many times, and multiple runs of them can happen in parallel. DAGs are parameterized, always including an interval they are "running for" (the :ref:`data interval `), but with other optional parameters as well. :doc:`tasks` have dependencies declared on each other. You'll see this in a DAG either using the ``>>`` and ``<<`` operators:: diff --git a/docs/apache-airflow/concepts/scheduler.rst b/docs/apache-airflow/concepts/scheduler.rst index 3a54b0269746c..2376d1e118b04 100644 --- a/docs/apache-airflow/concepts/scheduler.rst +++ b/docs/apache-airflow/concepts/scheduler.rst @@ -54,19 +54,19 @@ In the UI, it appears as if Airflow is running your tasks a day **late** .. note:: - If you run a DAG on a ``schedule_interval`` of one day, the run with ``execution_date`` ``2019-11-21`` triggers soon after ``2019-11-21T23:59``. + If you run a DAG on a ``schedule_interval`` of one day, the run with data interval starting on ``2019-11-21`` triggers after ``2019-11-21T23:59``. - **Let’s Repeat That**, the scheduler runs your job one ``schedule_interval`` AFTER the start date, at the END of the period. + **Let’s Repeat That**, the scheduler runs your job one ``schedule_interval`` AFTER the start date, at the END of the interval. You should refer to :doc:`/dag-run` for details on scheduling a DAG. Triggering DAG with Future Date ------------------------------- -If you want to use 'external trigger' to run future-dated execution dates, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``. +If you want to use 'external trigger' to run future-dated data intervals, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``. This only has effect if your DAG has no ``schedule_interval``. -If you keep default ``allow_trigger_in_future = False`` and try 'external trigger' to run future-dated execution dates, -the scheduler won't execute it now but the scheduler will execute it in the future once the current date rolls over to the execution date. +If you keep default ``allow_trigger_in_future = False`` and try 'external trigger' to run future-dated data intervals, +the scheduler won't execute it now but the scheduler will execute it in the future once the current date rolls over to the start of the data interval. .. _scheduler:ha: diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst index c1db8416d53f7..efc709050b11f 100644 --- a/docs/apache-airflow/concepts/tasks.rst +++ b/docs/apache-airflow/concepts/tasks.rst @@ -59,7 +59,7 @@ Task Instances Much in the same way that a DAG is instantiated into a :ref:`DAG Run ` each time it runs, the tasks under a DAG are instantiated into *Task Instances*. -An instance of a Task is a specific run of that task for a given DAG (and thus for a given ``execution_date``). They are also the representation of a Task that has *state*, representing what stage of the lifecycle it is in. +An instance of a Task is a specific run of that task for a given DAG (and thus for a given data interval). They are also the representation of a Task that has *state*, representing what stage of the lifecycle it is in. .. _concepts:task-states: @@ -97,9 +97,9 @@ Firstly, it can have *upstream* and *downstream* tasks:: task1 >> task2 >> task3 -When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same ``execution_date``. +When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same data interval. -There may also be instances of the *same task*, but for different values of ``execution_date`` - from other runs of the same DAG. We call these *previous* and *next* - it is a different relationship to *upstream* and *downstream*! +There may also be instances of the *same task*, but for different data intervals - from other runs of the same DAG. We call these *previous* and *next* - it is a different relationship to *upstream* and *downstream*! .. note:: diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index fb196a66df7c0..17a13c7617ac6 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -54,6 +54,9 @@ Cron Presets Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend. + +.. _data-interval: + Data Interval ------------- @@ -69,8 +72,8 @@ has ended, i.e. after 2020-01-02 00:00:00. All dates in Airflow are tied to the data interval concept in some way. The "logical date" (also called ``execution_date`` in Airflow versions prior to 2.2) -of a DAG run, for example, usually denotes the start of the data interval, not -when the DAG is actually executed. +of a DAG run, for example, denotes the start of the data interval, not when the +DAG is actually executed. Similarly, since the ``start_date`` argument for the DAG and its tasks points to the same logical date, it marks the start of *the DAG's fist data interval*, not @@ -94,7 +97,7 @@ Catchup An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a series of intervals which the scheduler turns into individual DAG Runs and executes. The scheduler, by default, will -kick off a DAG Run for any interval that has not been run since the last execution date (or has been cleared). This concept is called Catchup. +kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup. If your DAG is not written to handle its catchup (i.e., not limited to the interval, but instead to ``Now`` for instance.), then you will want to turn catchup off. This can be done by setting ``catchup = False`` in DAG or ``catchup_by_default = False`` @@ -130,9 +133,11 @@ in the configuration file. When turned off, the scheduler creates a DAG run only catchup=False, ) -In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, -(or from the command line), a single DAG Run will be created, with an `execution_date` of 2016-01-01, -and the next one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02. +In the example above, if the DAG is picked up by the scheduler daemon on +2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created +with a data between 2016-01-01 and 2016-01-02, and the next one will be created +just after midnight on the morning of 2016-01-03 with a data interval between +2016-01-02 and 2016-01-03. If the ``dag.catchup`` value had been ``True`` instead, the scheduler would have created a DAG Run for each completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, @@ -174,12 +179,12 @@ The executor will re-run it. There are multiple options you can select to re-run - -* **Past** - All the instances of the task in the runs before the current DAG's execution date -* **Future** - All the instances of the task in the runs after the current DAG's execution date +* **Past** - All the instances of the task in the runs before the DAG's most recent data interval +* **Future** - All the instances of the task in the runs after the DAG's most recent data interval * **Upstream** - The upstream tasks in the current DAG * **Downstream** - The downstream tasks in the current DAG * **Recursive** - All the tasks in the child DAGs and parent DAGs -* **Failed** - Only the failed tasks in the current DAG +* **Failed** - Only the failed tasks in the DAG's most recent run You can also clear the task through CLI using the command: @@ -204,10 +209,10 @@ Note that DAG Runs can also be created manually through the CLI. Just run the .. code-block:: bash - airflow dags trigger --exec-date execution_date run_id + airflow dags trigger --exec-date logical_date run_id The DAG Runs created externally to the scheduler get associated with the trigger’s timestamp and are displayed -in the UI alongside scheduled DAG runs. The execution date passed inside the DAG can be specified using the ``-e`` argument. +in the UI alongside scheduled DAG runs. The logical date passed inside the DAG can be specified using the ``-e`` argument. The default is the current date in the UTC timezone. In addition, you can also manually trigger a DAG Run using the web UI (tab **DAGs** -> column **Links** -> button **Trigger Dag**) diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 645c24258143a..35bb9bce61589 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -216,12 +216,22 @@ actually start. If this were not the case, the backfill just would not start. What does ``execution_date`` mean? ---------------------------------- -Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if you want to -summarize data for 2016-02-19, You would do it at 2016-02-20 midnight UTC, which would be right after all data for -2016-02-19 becomes available. - -This datetime value is available to you as :ref:`Template variables` with various formats in Jinja templated -fields. They are also included in the context dictionary given to an Operator's execute function. +*Execution date* or ``execution_date`` is a historical name for what is called a +*logical date*, and also usually the start of the data interval represented by a +DAG run. + +Airflow was developed as a solution for ETL needs. In the ETL world, you +typically summarize data. So, if you want to summarize data for 2016-02-19, You +would do it at 2016-02-20 midnight UTC, which would be right after all data for +2016-02-19 becomes available. This interval between midnights of 2016-02-19 and +2016-02-20 is called the *data interval*, and since the it represents data in +the date of 2016-02-19, this date is thus called the run's *logical date*, or +the date that this DAG run is executed for, thus *execution date*. + +For backward compatibility, a datetime value ``execution_date`` is still +as :ref:`Template variables` with various formats in Jinja +templated fields, and in Airflow's Python API. It is also included in the +context dictionary given to an Operator's execute function. .. code-block:: python @@ -229,7 +239,12 @@ fields. They are also included in the context dictionary given to an Operator's def execute(self, context): logging.info(context["execution_date"]) -Note that ``ds`` refers to date_string, not date start as may be confusing to some. +However, you should always using ``data_interval_start`` and +``data_interval_end`` if possible, sinc those names are semantically more +correct and less prine to misunderstandings. + +Note that ``ds`` (the YYYY-MM-DD form of ``data_interval_start``) refers to +*date* ***string***, not *date* ***start*** as may be confusing to some. How to create DAGs dynamically? @@ -295,7 +310,7 @@ commonly attempted in ``user_defined_macros``. bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag) -This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with the execution date 2020-01-01 00:00:00. +This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with ``data_interval_start`` 2020-01-01 00:00:00. .. code-block:: python @@ -310,7 +325,7 @@ Why ``next_ds`` or ``prev_ds`` might not contain expected values? - When scheduling DAG, the ``next_ds`` ``next_ds_nodash`` ``prev_ds`` ``prev_ds_nodash`` are calculated using ``execution_date`` and ``schedule_interval``. If you set ``schedule_interval`` as ``None`` or ``@once``, the ``next_ds``, ``next_ds_nodash``, ``prev_ds``, ``prev_ds_nodash`` values will be set to ``None``. -- When manually triggering DAG, the schedule will be ignored, and ``prev_ds == next_ds == ds`` +- When manually triggering DAG, the schedule will be ignored, and ``prev_ds == next_ds == ds``. Task execution interactions diff --git a/docs/apache-airflow/howto/define_extra_link.rst b/docs/apache-airflow/howto/define_extra_link.rst index 223cd424387d7..b3dc6c698da19 100644 --- a/docs/apache-airflow/howto/define_extra_link.rst +++ b/docs/apache-airflow/howto/define_extra_link.rst @@ -95,10 +95,10 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{logical_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, - execution_date=dttm, + logical_date=dttm, ) diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst index 28987f439d059..f6ae421969bec 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst @@ -29,7 +29,8 @@ tasks on the same DAG. For example: on a daily DAG. - Different teams are responsible for different DAGs, but these DAGs have some cross-DAG dependencies. -- A task may depend on another task on the same DAG, but for a different ``execution_date``. +- A task may depend on another task on the same DAG, but for a different ``execution_date`` + (start of the data interval). - Use ``execution_delta`` for tasks running at different times, like ``execution_delta=timedelta(hours=1)`` to check against a task that runs 1 hour earlier. diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 212d9a2270c2c..ec56b702eabbd 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -77,7 +77,7 @@ Unfortunately we currently do not support to serialize ``var`` and ``ti`` / ``ta with the underlying library. For airflow context variables make sure that you either have access to Airflow through setting ``system_site_packages`` to ``True`` or add ``apache-airflow`` to the ``requirements`` argument. Otherwise you won't have access to the most context variables of Airflow in ``op_kwargs``. -If you want the context related to datetime objects like ``execution_date`` you can add ``pendulum`` and +If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy``. Templating diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 227a386a333c1..3680010a08b9c 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -57,7 +57,7 @@ works. f_in = File(url="/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: - f_out = File(url="/tmp/{}/{{{{ execution_date }}}}".format(file)) + f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file)) outlets.append(f_out) run_this = BashOperator( @@ -73,7 +73,7 @@ for the downstream task. .. note:: Operators can add inlets and outlets automatically if the operator supports it. In the example DAG task ``run_this`` (task_id=``run_me_first``) is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are -generated from a list. Note that ``execution_date`` is a templated field and will be rendered when the task is running. +generated from a list. Note that ``data_interval_start`` is a templated field and will be rendered when the task is running. .. note:: Behind the scenes Airflow prepares the lineage metadata as part of the ``pre_execute`` method of a task. When the task has finished execution ``post_execute`` is called and lineage metadata is pushed into XCOM. Thus if you are creating diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst index c8c303da6e66c..68afe104aa858 100644 --- a/docs/apache-airflow/logging-monitoring/errors.rst +++ b/docs/apache-airflow/logging-monitoring/errors.rst @@ -52,14 +52,17 @@ Name Description ======================================= ================================================== ``dag_id`` Dag name of the dag that failed ``task_id`` Task name of the task that failed -``execution_date`` Execution date when the task failed +``data_interval_start`` Start of data interval when the task failed +``data_interval_end`` End of data interval when the task failed ``operator`` Operator name of the task that failed ======================================= ================================================== +For backward compatibility, and additional tag ``execution_date`` is also available the represent the logical date. The tag should be considered deprecated in favour of ``data_interval_start``. + + Breadcrumbs ------------ - When a task fails with an error `breadcrumbs `__ will be added for the other tasks in the current dag run. ======================================= ============================================================== diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst index c1b8636aa3a2b..aa5a6fa2e34e2 100644 --- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst @@ -38,7 +38,7 @@ directory. .. note:: For more information on setting the configuration, see :doc:`/howto/set-config` -The following convention is followed while naming logs: ``{dag_id}/{task_id}/{execution_date}/{try_number}.log`` +The following convention is followed while naming logs: ``{dag_id}/{task_id}/{logical_date}/{try_number}.log`` In addition, users can supply a remote location to store current logs and backups. diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst index 7943cb0b08a14..3543aef05dfed 100644 --- a/docs/apache-airflow/plugins.rst +++ b/docs/apache-airflow/plugins.rst @@ -247,10 +247,10 @@ definitions in Airflow. operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{logical_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, - execution_date=dttm, + logical_date=dttm, ) diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 2a191f2dc70c2..8a371e717ebde 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -140,10 +140,12 @@ using ``pendulum``. op = DummyOperator(task_id="dummy", dag=dag) print(dag.timezone) # -Please note that while it is possible to set a ``start_date`` and ``end_date`` for Tasks always the DAG timezone -or global timezone (in that order) will be used to calculate the next execution date. Upon first encounter -the start date or end date will be converted to UTC using the timezone associated with start_date or end_date, -then for calculations this timezone information will be disregarded. +Please note that while it is possible to set a ``start_date`` and ``end_date`` +for Tasks, the DAG timezone or global timezone (in that order) will always be +used to calculate data intervals. Upon first encounter, the start date or end +date will be converted to UTC using the timezone associated with ``start_date`` +or ``end_date``, then for calculations this timezone information will be +disregarded. Templates ''''''''' @@ -156,7 +158,7 @@ It is left up to the DAG to handle this. import pendulum local_tz = pendulum.timezone("Europe/Amsterdam") - local_tz.convert(execution_date) + local_tz.convert(logical_date) Cron schedules '''''''''''''' diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index 70f89fd2c09ce..c541d0e488467 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -288,11 +288,17 @@ Let's run a few commands to validate this script further. Testing ''''''' -Let's test by running the actual task instances for a specific date. The -date specified in this context is called ``execution_date``. This is the -*logical* date, which simulates the scheduler running your task or dag at -a specific date and time, even though it *physically* will run now ( -or as soon as its dependencies are met). +Let's test by running the actual task instances for a specific date. The date +specified in this context is called the *logical date* (also called *execution +date* for historical reasons), which simulates the scheduler running your task +or DAG for a specific date and time, even though it *physically* will run now +(or as soon as its dependencies are met). + +We said the scheduler runs your task *for* a specific date and time, not *at*. +This is because each run of a DAG conceptually represents not a specific date +and time, but an interval between two times, called a +:ref:`data interval `. A DAG run's logical date is the start of +its data interval. .. code-block:: bash @@ -320,10 +326,11 @@ their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, ...) to the database. It simply allows testing a single task instance. -The same applies to ``airflow dags test [dag_id] [execution_date]``, but on a DAG level. It performs a single -DAG run of the given DAG id. While it does take task dependencies into account, no state is registered in the -database. It is convenient for locally testing a full run of your DAG, given that e.g. if one of your tasks -expects data at some location, it is available. +The same applies to ``airflow dags test [dag_id] [logical_date]``, but on a DAG +level. It performs a single DAG run of the given DAG id. While it does take task +dependencies into account, no state is registered in the database. It is +convenient for locally testing a full run of your DAG, given that e.g. if one of +your tasks expects data at some location, it is available. Backfill '''''''' @@ -335,9 +342,9 @@ are interested in tracking the progress visually as your backfill progresses. Note that if you use ``depends_on_past=True``, individual task instances will depend on the success of their previous task instance (that is, previous -according to ``execution_date``). Task instances with ``execution_date==start_date`` -will disregard this dependency because there would be no -past task instances created for them. +according to the logical date). Task instances with their logical dates equal to +``start_date`` will disregard this dependency because there would be no past +task instances created for them. You may also want to consider ``wait_for_downstream=True`` when using ``depends_on_past=True``. While ``depends_on_past=True`` causes a task instance to depend on the success From 97420195759e35d1ceaae16f61a0b5da0acd2061 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 14:34:13 +0800 Subject: [PATCH 07/21] Airflow's CI is American --- docs/apache-airflow/logging-monitoring/errors.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst index 68afe104aa858..ee5052103f55e 100644 --- a/docs/apache-airflow/logging-monitoring/errors.rst +++ b/docs/apache-airflow/logging-monitoring/errors.rst @@ -57,7 +57,7 @@ Name Description ``operator`` Operator name of the task that failed ======================================= ================================================== -For backward compatibility, and additional tag ``execution_date`` is also available the represent the logical date. The tag should be considered deprecated in favour of ``data_interval_start``. +For backward compatibility, and additional tag ``execution_date`` is also available the represent the logical date. The tag should be considered deprecated in favor of ``data_interval_start``. Breadcrumbs From 7c0213f251f27f6a04953b31d512bee553618155 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:00:56 +0800 Subject: [PATCH 08/21] More typos --- docs/apache-airflow/faq.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 35bb9bce61589..0cf0f79297cdf 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -239,9 +239,9 @@ context dictionary given to an Operator's execute function. def execute(self, context): logging.info(context["execution_date"]) -However, you should always using ``data_interval_start`` and -``data_interval_end`` if possible, sinc those names are semantically more -correct and less prine to misunderstandings. +However, you should always use ``data_interval_start`` or ``data_interval_end`` +if possible, since those names are semantically more correct and less prone to +misunderstandings. Note that ``ds`` (the YYYY-MM-DD form of ``data_interval_start``) refers to *date* ***string***, not *date* ***start*** as may be confusing to some. From 625f1c87e687718670d4a87cd387db209454bf1c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:06:16 +0800 Subject: [PATCH 09/21] Fix example --- airflow/example_dags/example_workday_timetable.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py index c1165e6e08d4a..98162343b7d91 100644 --- a/airflow/example_dags/example_workday_timetable.py +++ b/airflow/example_dags/example_workday_timetable.py @@ -23,7 +23,11 @@ from airflow.example_dags.plugins.workday import AfterWorkdayTimetable from airflow.operators.dummy import DummyOperator -with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: +with DAG( + dag_id="example_workday_timetable", + timetable=AfterWorkdayTimetable(), + tags=["example", "timetable"], +) as dag: DummyOperator(task_id="run_this") if __name__ == "__main__": From 363c67bf7a0d3b7b444e521bd7a0b1fbd62eee67 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:15:45 +0800 Subject: [PATCH 10/21] Fix sentry tests to account for additional tags --- tests/core/test_sentry.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py index bb962ccd2b38a..99fac4064f154 100644 --- a/tests/core/test_sentry.py +++ b/tests/core/test_sentry.py @@ -29,6 +29,8 @@ from tests.test_utils.config import conf_vars EXECUTION_DATE = timezone.utcnow() +SCHEDULE_INTERVAL = datetime.timedelta(days=1) +DATA_INTERVAL = (EXECUTION_DATE, EXECUTION_DATE + SCHEDULE_INTERVAL) DAG_ID = "test_dag" TASK_ID = "test_task" OPERATOR = "PythonOperator" @@ -37,6 +39,8 @@ TEST_SCOPE = { "dag_id": DAG_ID, "task_id": TASK_ID, + "data_interval_start": DATA_INTERVAL[0], + "data_interval_end": DATA_INTERVAL[1], "execution_date": EXECUTION_DATE, "operator": OPERATOR, "try_number": TRY_NUMBER, @@ -62,10 +66,10 @@ class TestSentryHook: @pytest.fixture def task_instance(self, dag_maker): # Mock the Dag - with dag_maker(DAG_ID): + with dag_maker(DAG_ID, schedule_interval=SCHEDULE_INTERVAL): task = PythonOperator(task_id=TASK_ID, python_callable=int) - dr = dag_maker.create_dagrun(execution_date=EXECUTION_DATE) + dr = dag_maker.create_dagrun(data_interval=DATA_INTERVAL, execution_date=EXECUTION_DATE) ti = dr.task_instances[0] ti.state = STATE ti.task = task From 9f0dbfa4057054c7fb1f3a294f71cfa7983052f6 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:19:16 +0800 Subject: [PATCH 11/21] Revert TI.xcom_pull() usage We need to use execution_date here :/ --- airflow/models/taskinstance.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 7d8c0c9341a6b..9dca1f3e71c3c 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2271,8 +2271,10 @@ def xcom_pull( if dag_id is None: dag_id = self.dag_id + execution_date = self.get_dagrun(session).execution_date + query = XCom.get_many( - run_id=self.run_id, + execution_date=execution_date, key=key, dag_ids=dag_id, task_ids=task_ids, From b43deee24ea2d09392d081d72c779fe5ffa5f0c5 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:20:34 +0800 Subject: [PATCH 12/21] Fix Elastic tests depending on private function --- airflow/providers/elasticsearch/log/es_task_handler.py | 2 +- tests/providers/elasticsearch/log/test_es_task_handler.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 1b69a8b49c49e..53c8358dd3300 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -281,7 +281,7 @@ def set_context(self, ti: TaskInstance) -> None: extras={ 'dag_id': str(ti.dag_id), 'task_id': str(ti.task_id), - 'execution_date': self._clean_execution_date(ti.execution_date), + 'execution_date': self._clean_date(ti.execution_date), 'try_number': str(ti.try_number), 'log_id': self._render_log_id(ti, ti.try_number), }, diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 004a8bcca4461..604e5aa44a164 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -43,7 +43,7 @@ class TestElasticsearchTaskHandler: TASK_ID = 'task_for_testing_es_log_handler' EXECUTION_DATE = datetime(2016, 1, 1) LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1' - JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_execution_date(EXECUTION_DATE)}-1' + JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1' @pytest.fixture() def ti(self, create_task_instance): @@ -406,8 +406,8 @@ def test_render_log_id(self, ti): self.es_task_handler.json_format = True assert self.JSON_LOG_ID == self.es_task_handler._render_log_id(ti, 1) - def test_clean_execution_date(self): - clean_execution_date = self.es_task_handler._clean_execution_date(datetime(2016, 7, 8, 9, 10, 11, 12)) + def test_clean_date(self): + clean_execution_date = self.es_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12)) assert '2016_07_08T09_10_11_000012' == clean_execution_date @pytest.mark.parametrize( From d22cc731c70c7448d17194cf74058e6e77eb127b Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 15:46:00 +0800 Subject: [PATCH 13/21] Make the run_after description a bit vague A DAG run CAN technically start before its data interval ends, but that's an edge case we shouldn't cover early in the documentation, so let's make wording vague to focus on the general cases to make things less complicated for users reading this for the first time. --- docs/apache-airflow/dag-run.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 17a13c7617ac6..6ddcc9507dc63 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -65,10 +65,10 @@ range it operates in. For a DAG scheduled with ``@daily``, for example, each of its data interval would start at midnight of each day and end at midnight of the next day. -A DAG run is scheduled *after* its associated data interval has ended, to ensure -the run is able to collect all the data within the time period. Therefore, a run -covering the data period of 2020-01-01 will not start to run until 2020-01-01 -has ended, i.e. after 2020-01-02 00:00:00. +A DAG run is usually scheduled *after* its associated data interval has ended, +to ensure the run is able to collect all the data within the time period. In +other words, a run covering the data period of 2020-01-01 generally does not +start to run until 2020-01-01 has ended, i.e. after 2020-01-02 00:00:00. All dates in Airflow are tied to the data interval concept in some way. The "logical date" (also called ``execution_date`` in Airflow versions prior to 2.2) From 8137aefe62ba51d0de15694e11360073226d2c90 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 21:24:35 +0800 Subject: [PATCH 14/21] Fix example DAG --- .../example_dags/example_workday_timetable.py | 3 +++ airflow/example_dags/plugins/workday.py | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py index 98162343b7d91..614fd5f432fd8 100644 --- a/airflow/example_dags/example_workday_timetable.py +++ b/airflow/example_dags/example_workday_timetable.py @@ -19,12 +19,15 @@ """Example DAG demostrating how to implement a custom timetable for a DAG.""" # [START howto_timetable] +import datetime + from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable from airflow.operators.dummy import DummyOperator with DAG( dag_id="example_workday_timetable", + start_date=datetime.datetime(2021, 1, 1), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index fae27c735655c..3b39a6ef61cea 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -48,20 +48,22 @@ def infer_data_interval(self, run_after: DateTime) -> DataInterval: # [START howto_timetable_next_dagrun_info] def next_dagrun_info( self, - last_automated_dagrun: Optional[DataInterval], + *, + last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction, ) -> Optional[DagRunInfo]: - if last_automated_dagrun is not None: # There was a previous run on the regular schedule. - last_start_weekday = last_automated_dagrun.start.weekday() + if last_automated_data_interval is not None: # There was a previous run on the regular schedule. + last_start = last_automated_data_interval.start + last_start_weekday = 7 - last_start.weekday() if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow. delta = timedelta(days=1) else: # Last run on Friday -- skip to next Monday. delta = timedelta(days=(7 - last_start_weekday)) - next_start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) + next_start = DateTime.combine((last_start + delta).date(), Time.min) else: # This is the first ever run on the regular schedule. - if restriction.earliest is None: # No start_date. Don't schedule. - return None next_start = restriction.earliest + if next_start is None: # No start_date. Don't schedule. + return None if not restriction.catchup: # If the DAG has catchup=False, today is the earliest to consider. next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) @@ -73,8 +75,8 @@ def next_dagrun_info( if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday. delta = timedelta(days=(7 - next_start_weekday)) next_start = next_start + delta - if next_start > restriction.latest: # Over the DAG's scheduled end; don't schedule. - return None + if restriction.latest is not None and next_start > restriction.latest: + return None # Over the DAG's scheduled end; don't schedule. return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) # [END howto_timetable_next_dagrun_info] From e70adb00f6ac7f54fc8ee1fe29ac139ceed65354 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 13 Sep 2021 23:17:20 +0800 Subject: [PATCH 15/21] Inline the example custom timetable DAG A custom timetable requires plugin initialization, which is not done for unit tests. But we really don't want to automatically register that plugin since it makes testing an unregistered plugin impossible. Since that example DAG is very short and referenced only once anyway, let's just not bother. --- .../example_dags/example_workday_timetable.py | 38 ------------------- docs/apache-airflow/howto/timetable.rst | 20 ++++++++-- 2 files changed, 16 insertions(+), 42 deletions(-) delete mode 100644 airflow/example_dags/example_workday_timetable.py diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py deleted file mode 100644 index 614fd5f432fd8..0000000000000 --- a/airflow/example_dags/example_workday_timetable.py +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Example DAG demostrating how to implement a custom timetable for a DAG.""" - -# [START howto_timetable] -import datetime - -from airflow import DAG -from airflow.example_dags.plugins.workday import AfterWorkdayTimetable -from airflow.operators.dummy import DummyOperator - -with DAG( - dag_id="example_workday_timetable", - start_date=datetime.datetime(2021, 1, 1), - timetable=AfterWorkdayTimetable(), - tags=["example", "timetable"], -) as dag: - DummyOperator(task_id="run_this") - -if __name__ == "__main__": - dag.cli() -# [END howto_timetable] diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 3a77d356f32a6..ce6181c99f672 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -201,10 +201,22 @@ For reference, here's our plugin and DAG files in their entirety: :start-after: [START howto_timetable] :end-before: [END howto_timetable] -.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py - :language: python - :start-after: [START howto_timetable] - :end-before: [END howto_timetable] +.. code-block:: python + + import datetime + + from airflow import DAG + from airflow.example_dags.plugins.workday import AfterWorkdayTimetable + from airflow.operators.dummy import DummyOperator + + + with DAG( + dag_id="example_workday_timetable", + start_date=datetime.datetime(2021, 1, 1), + timetable=AfterWorkdayTimetable(), + tags=["example", "timetable"], + ) as dag: + DummyOperator(task_id="run_this") Parameterized Timetables From eed6aff8e7bc823f2ad3c9077a7c073bd39fc164 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 15 Sep 2021 10:32:27 +0800 Subject: [PATCH 16/21] Typos and wording/styling tweaks --- docs/apache-airflow/best-practices.rst | 2 +- docs/apache-airflow/dag-run.rst | 2 +- docs/apache-airflow/faq.rst | 13 +++++++------ docs/apache-airflow/howto/timetable.rst | 6 +++--- docs/apache-airflow/logging-monitoring/errors.rst | 4 +++- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 3dfd583bb0063..0758d597277fd 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -58,7 +58,7 @@ result - in a task. Someone may update the input data between re-runs, which results in different outputs. A better way is to read the input data from a specific partition. You can use ``data_interval_start`` as a partition. You should - follow this partitioning method while writing data in S3/HDFS, as well. + follow this partitioning method while writing data in S3/HDFS as well. * The Python datetime ``now()`` function gives the current datetime object. This function should never be used inside a task, especially to do the critical computation, as it leads to different outcomes on each run. It's fine to use diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 6ddcc9507dc63..f340fe2928004 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -83,7 +83,7 @@ scheduled one interval after ``start_date``. .. tip:: If ``schedule_interval`` is not enough to express your DAG's schedule, - logical date, or data interval, see :doc:`Customizing imetables `. + logical date, or data interval, see :doc:`/howto/timetable`. Re-run DAG '''''''''' diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 0cf0f79297cdf..02f8c21213a60 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -221,12 +221,13 @@ What does ``execution_date`` mean? DAG run. Airflow was developed as a solution for ETL needs. In the ETL world, you -typically summarize data. So, if you want to summarize data for 2016-02-19, You -would do it at 2016-02-20 midnight UTC, which would be right after all data for -2016-02-19 becomes available. This interval between midnights of 2016-02-19 and -2016-02-20 is called the *data interval*, and since the it represents data in -the date of 2016-02-19, this date is thus called the run's *logical date*, or -the date that this DAG run is executed for, thus *execution date*. +typically summarize data. So, if you want to summarize data for ``2016-02-19``, +you would do it at ``2016-02-20`` midnight UTC, which would be right after all +data for ``2016-02-19`` becomes available. This interval between midnights of +``2016-02-19`` and ``2016-02-20`` is called the *data interval*, and since it +represents data in the date of ``2016-02-19``, this date is also called the +run's *logical date*, or the date that this DAG run is executed for, thus +*execution date*. For backward compatibility, a datetime value ``execution_date`` is still as :ref:`Template variables` with various formats in Jinja diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index ce6181c99f672..8c1354f59cc47 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -185,7 +185,7 @@ A ``DagRunInfo`` can be created like this: run_after=run_after, ) -But since we typically want to schedule a run as soon as the data interval ends, +Since we typically want to schedule a run as soon as the data interval ends, ``end`` and ``run_after`` above are generally the same. ``DagRunInfo`` therefore provides a shortcut for this: @@ -294,5 +294,5 @@ The *Schedule* column would say ``after each workday, at 08:00:00``. .. seealso:: Module :mod:`airflow.timetables.base` - The public interface is heavily documented to explain what they should - be implemented by subclasses. + The public interface is heavily documented to explain what should be + implemented by subclasses. diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst index ee5052103f55e..7a5df129585fa 100644 --- a/docs/apache-airflow/logging-monitoring/errors.rst +++ b/docs/apache-airflow/logging-monitoring/errors.rst @@ -57,7 +57,9 @@ Name Description ``operator`` Operator name of the task that failed ======================================= ================================================== -For backward compatibility, and additional tag ``execution_date`` is also available the represent the logical date. The tag should be considered deprecated in favor of ``data_interval_start``. +For backward compatibility, an additional tag ``execution_date`` is also +available to represent the logical date. The tag should be considered deprecated +in favor of ``data_interval_start``. Breadcrumbs From dca9b5ac9919be94901b6a326eda6e77dfc5ced7 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Sep 2021 15:43:20 +0800 Subject: [PATCH 17/21] Use run_id in Elasticsearch log ID template This ensures the log ID is unique across the board, similar to the guarantee we had prior to AIP-39 (via execution date). --- airflow/providers/elasticsearch/log/es_task_handler.py | 1 + docs/apache-airflow-providers-elasticsearch/logging/index.rst | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 53c8358dd3300..cd0897153dfdc 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -115,6 +115,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: return self.log_id_template.format( dag_id=ti.dag_id, task_id=ti.task_id, + run_id=ti.run_id, data_interval_start=data_interval_start, data_interval_end=data_interval_end, execution_date=execution_date, diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst b/docs/apache-airflow-providers-elasticsearch/logging/index.rst index ae9c0c2d8a6e0..5e16ccbdc0e78 100644 --- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst @@ -38,7 +38,7 @@ First, to use the handler, ``airflow.cfg`` must be configured as follows: [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{data_interval_start}-{try_number} + log_id_template = {dag_id}-{task_id}-{run_id}-{try_number} end_of_log_mark = end_of_log write_stdout = json_fields = @@ -56,7 +56,7 @@ To output task logs to stdout in JSON format, the following config could be used [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{data_interval_start}-{try_number} + log_id_template = {dag_id}-{task_id}-{run_id}-{try_number} end_of_log_mark = end_of_log write_stdout = True json_format = True From b8ca6f932b1ad7989670ce81f1aea5d0c726063e Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Sep 2021 16:32:37 +0800 Subject: [PATCH 18/21] Don't use dag_maker in Best Practice docs --- docs/apache-airflow/best-practices.rst | 43 +++++++++++++++++++------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 0758d597277fd..dba73ec859400 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -280,7 +280,7 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write uni from airflow.models import DagBag - @pytest.fixture(scope="scope") + @pytest.fixture() def dagbag(self): return DagBag() @@ -321,26 +321,47 @@ This is an example test want to verify the structure of a code-generated DAG aga .. code-block:: python + import datetime + import pytest - from airflow.utils.state import State + from airflow.utils.state import DagRunState + from airflow.utils.types import DagRunType + + DATA_INTERVAL_START = datetime.datetime(2021, 9, 13) + DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) + + TEST_DAG_ID = "my_custom_operator_dag" + TEST_TASK_ID = "my_custom_operator_task" - def test_my_custom_oeprator_execute_no_trigger(dag_maker): - with dag_maker( - dag_id="test_my_custom_oeprator_execute_no_trigger", + @pytest.fixture() + def dag(): + with DAG( + dag_id=TEST_DAG_ID, schedule_interval="@daily", - default_args={"start_date": DEFAULT_DATE}, + default_args={"start_date": DATA_INTERVAL_START}, ) as dag: MyCustomOperator( - dag=self.dag, - task_id="test", + task_id=TEST_TASK_ID, prefix="s3://bucket/some/prefix", ) - dagrun = dag_maker.create_dagrun() - (ti,) = dagrun.task_instances + return dag + + + def test_my_custom_operator_execute_no_trigger(dag): + dagrun = dag.create_dagrun( + state=DagRunState.RUNNING, + execution_date=DATA_INTERVAL_START, + data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), + start_date=DATA_INTERVAL_END, + run_type=DagRunType.MANUAL, + ) + ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) + ti.task = dag.get_task(task_id=TEST_TASK_ID) + ti.run(ignore_ti_state=True) assert ti.state == State.SUCCESS - # Assert something related to tasks results + # Assert something related to tasks results. Self-Checks From b601b9a1ef4208064b74835443801f3c5ee57759 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Sep 2021 16:35:52 +0800 Subject: [PATCH 19/21] Remove logical date reference in DAG concept --- docs/apache-airflow/concepts/dags.rst | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index bf22cd74f182c..b8718956aba5a 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -156,16 +156,24 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab `. -Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a :doc:`DAG Run `. DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the *logical* date and time range it is running for - not the *actual* time when it was started. +Every time you run a DAG, you are creating a new instance of that DAG which +Airflow calls a :doc:`DAG Run `. DAG Runs can run in parallel for the +same DAG, and each has a defined data interval, which identifies the period of +data the tasks should operate on. -As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It's been rewritten, and you want to run it on the previous 3 months of data - no problem, since Airflow can *backfill* the DAG and run copies of it for every day in those previous 3 months, all at once. +As an example of why this is useful, consider writing a DAG that processes a +daily set of experimental data. It's been rewritten, and you want to run it on +the previous 3 months of data---no problem, since Airflow can *backfill* the DAG +and run copies of it for every day in those previous 3 months, all at once. Those DAG Runs will all have been started on the same actual day, but each DAG run will have one data interval covering a single day in that 3 month period, and that data interval is all the tasks, operators and sensors inside the DAG look at when they run. -In much the same way a DAG instantiates into a DAG Run every time it's run, Tasks specified inside a DAG also instantiate into :ref:`Task Instances ` along with it. +In much the same way a DAG instantiates into a DAG Run every time it's run, +Tasks specified inside a DAG are also instantiated into +:ref:`Task Instances ` along with it. DAG Assignment From 8bea3907f30417938f71beaa578ebd6471fff3d0 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Sep 2021 16:37:16 +0800 Subject: [PATCH 20/21] Wording tweak --- docs/apache-airflow/faq.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 02f8c21213a60..599a1f683c150 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -311,7 +311,8 @@ commonly attempted in ``user_defined_macros``. bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag) -This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with ``data_interval_start`` 2020-01-01 00:00:00. +This will echo "day={{ ds }}" instead of "day=2020-01-01" for a DAG run with a +``data_interval_start`` of 2020-01-01 00:00:00. .. code-block:: python From 7a83182a39394014706cdb991e9fea34919a3b0a Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Sep 2021 16:42:17 +0800 Subject: [PATCH 21/21] Replace ti.xcom_pull() in extra link example We don't want to expose TaskInstance(execution_date=...) anymore, but the extra link interface still relies on execution date, so we need an alternative. --- docs/apache-airflow/howto/define_extra_link.rst | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/apache-airflow/howto/define_extra_link.rst b/docs/apache-airflow/howto/define_extra_link.rst index b3dc6c698da19..631bf7e6a770f 100644 --- a/docs/apache-airflow/howto/define_extra_link.rst +++ b/docs/apache-airflow/howto/define_extra_link.rst @@ -121,6 +121,7 @@ Console, but if we wanted to change that link we could: from airflow.plugins_manager import AirflowPlugin from airflow.models.baseoperator import BaseOperatorLink + from airflow.models.xcom import XCom from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator # Change from https to http just to display the override @@ -136,8 +137,11 @@ Console, but if we wanted to change that link we could: operators = [BigQueryOperator] def get_link(self, operator, dttm): - ti = TaskInstance(task=operator, execution_date=dttm) - job_id = ti.xcom_pull(task_ids=operator.task_id, key="job_id") + job_id = XCom.get_one( + execution_date=dttm, + task_id=operator.task_id, + key="job_id", + ) return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""