Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class BaseDatasetEventInput(Protocol):
:meta private:
"""

def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
return DatasetAny(self, other)

def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
return DatasetAll(self, other)

def evaluate(self, statuses: dict[str, bool]) -> bool:
raise NotImplementedError

Expand All @@ -50,7 +56,7 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
__version__: ClassVar[int] = 1

@uri.validator
def _check_uri(self, attr, uri: str):
def _check_uri(self, attr, uri: str) -> None:
if uri.isspace():
raise ValueError(f"{attr.name} cannot be just whitespace")
try:
Expand All @@ -64,7 +70,7 @@ def _check_uri(self, attr, uri: str):
def __fspath__(self) -> str:
return self.uri

def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return self.uri == other.uri
else:
Expand Down Expand Up @@ -106,8 +112,22 @@ class DatasetAny(_DatasetBooleanCondition):

agg_func = any

def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
# Optimization: X | (Y | Z) is equivalent to X | Y | Z.
return DatasetAny(*self.objects, other)

def __repr__(self) -> str:
return f"DatasetAny({', '.join(map(str, self.objects))})"


class DatasetAll(_DatasetBooleanCondition):
"""Use to combine datasets schedule references in an "or" relationship."""

agg_func = all

def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
# Optimization: X & (Y & Z) is equivalent to X & Y & Z.
return DatasetAll(*self.objects, other)

def __repr__(self) -> str:
return f"DatasetAll({', '.join(map(str, self.objects))})"
76 changes: 60 additions & 16 deletions airflow/example_dags/example_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,39 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG for demonstrating behavior of Datasets feature.
Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and
dataset expression-based scheduling.

Notes on usage:

Turn on all the dags.
Turn on all the DAGs.

DAG dataset_produces_1 should run because it's on a schedule.
dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset
being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by
dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of
either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1.

After dataset_produces_1 runs, dataset_consumes_1 should be triggered immediately
because its only dataset dependency is managed by dataset_produces_1.
dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset
from dataset_produces_2, which has no schedule and must be manually triggered.

No other dags should be triggered. Note that even though dataset_consumes_1_and_2 depends on
the dataset in dataset_produces_1, it will not be triggered until dataset_produces_2 runs
(and dataset_produces_2 is left with no schedule so that we can trigger it manually).
After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should
run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be
triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated.
consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when
either dataset is updated.

Next, trigger dataset_produces_2. After dataset_produces_2 finishes,
dataset_consumes_1_and_2 should run.
consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic.
This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset
are updated. This example highlights the capability to combine updates from multiple datasets with logical
expressions for advanced scheduling.

Dags dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled should not run because
they depend on datasets that never get updated.
conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with
dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and
dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing
Airflow's versatility in handling mixed triggers for dataset and time-based scheduling.

The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run
automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks.
"""
from __future__ import annotations

Expand All @@ -50,6 +63,7 @@
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END dataset_def]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
dag_id="dataset_produces_1",
Expand Down Expand Up @@ -132,16 +146,46 @@
)

with DAG(
dag_id="dataset_and_time_based_timetable",
dag_id="consume_1_and_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset & dag2_dataset),
) as dag5:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_and_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | dag2_dataset),
) as dag6:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag7:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=[dag1_dataset]
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
) as dag7:
) as dag8:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
)
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def __init__(
self.dataset_triggers = DatasetAll(*schedule)
elif isinstance(schedule, Timetable):
timetable = schedule
elif schedule is not NOTSET:
elif schedule is not NOTSET and not isinstance(schedule, BaseDatasetEventInput):
schedule_interval = schedule

if isinstance(schedule, DatasetOrTimeSchedule):
Expand Down
64 changes: 64 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,68 @@ Example:

Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents.

Advanced Dataset Scheduling with Conditional Expressions
--------------------------------------------------------

Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers.

Logical Operators for Datasets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Airflow supports two logical operators for combining dataset conditions:

- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated.

These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows.

Example Usage
-------------

**Scheduling Based on Multiple Dataset Updates**

To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``):

.. code-block:: python

dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")

with DAG(
# Consume dataset 1 and 2 with dataset expressions
schedule=(dag1_dataset & dag2_dataset),
...,
):
...

**Scheduling Based on Any Dataset Update**

To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``):

.. code-block:: python

with DAG(
# Consume dataset 1 or 2 with dataset expressions
schedule=(dag1_dataset | dag2_dataset),
...,
):
...

**Complex Conditional Logic**

For scenarios requiring more intricate conditions, such as triggering a DAG when one dataset is updated or when both of two other datasets are updated, combine the OR and AND operators:

.. code-block:: python

dag3_dataset = Dataset("s3://dag3/output_3.txt")

with DAG(
# Consume dataset 1 or both 2 and 3 with dataset expressions
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
...,
):
...

Combining Dataset and Time-Based Schedules
------------------------------------------

Expand All @@ -245,3 +307,5 @@ DatasetTimetable Integration
With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable.

For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable <dataset-timetable-section>`.

These examples illustrate how Airflow's conditional dataset expressions can be used to create complex data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates.
48 changes: 28 additions & 20 deletions docs/apache-airflow/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,29 +192,37 @@ Here's an example of a DAG using ``DatasetTimetable``:

from airflow.timetables.dataset import DatasetTimetable
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from airflow.models import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
dag_id="dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]),
tags=["dataset-time-based-timetable"],
) as dag7:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
bash_command="sleep 5",
)


@dag(
schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset])
# Additional arguments here, replace this comment with actual arguments
)
def example_dag():
# DAG tasks go here
pass

In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``.

Future Enhancements
~~~~~~~~~~~~~~~~~~~
Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios.
Integrate conditional dataset with Time-Based Scheduling
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Combining conditional dataset expressions with time-based schedules enhances scheduling flexibility:

.. code-block:: python

from airflow.timetables import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
)
# Additional arguments here, replace this comment with actual arguments
)
def example_dag():
# DAG tasks go here
pass


Timetables comparisons
Expand Down
Loading