diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 1d08d7d6d3621..953333c6596b7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -33,6 +33,12 @@ class BaseDatasetEventInput(Protocol): :meta private: """ + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: + return DatasetAny(self, other) + + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: + return DatasetAll(self, other) + def evaluate(self, statuses: dict[str, bool]) -> bool: raise NotImplementedError @@ -50,7 +56,7 @@ class Dataset(os.PathLike, BaseDatasetEventInput): __version__: ClassVar[int] = 1 @uri.validator - def _check_uri(self, attr, uri: str): + def _check_uri(self, attr, uri: str) -> None: if uri.isspace(): raise ValueError(f"{attr.name} cannot be just whitespace") try: @@ -64,7 +70,7 @@ def _check_uri(self, attr, uri: str): def __fspath__(self) -> str: return self.uri - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if isinstance(other, self.__class__): return self.uri == other.uri else: @@ -106,8 +112,22 @@ class DatasetAny(_DatasetBooleanCondition): agg_func = any + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: + # Optimization: X | (Y | Z) is equivalent to X | Y | Z. + return DatasetAny(*self.objects, other) + + def __repr__(self) -> str: + return f"DatasetAny({', '.join(map(str, self.objects))})" + class DatasetAll(_DatasetBooleanCondition): """Use to combine datasets schedule references in an "or" relationship.""" agg_func = all + + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: + # Optimization: X & (Y & Z) is equivalent to X & Y & Z. + return DatasetAll(*self.objects, other) + + def __repr__(self) -> str: + return f"DatasetAll({', '.join(map(str, self.objects))})" diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 9dfaaf0c34de0..c2a869ba4aa52 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -15,26 +15,39 @@ # specific language governing permissions and limitations # under the License. """ -Example DAG for demonstrating behavior of Datasets feature. +Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and +dataset expression-based scheduling. Notes on usage: -Turn on all the dags. +Turn on all the DAGs. -DAG dataset_produces_1 should run because it's on a schedule. +dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset +being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by +dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of +either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1. -After dataset_produces_1 runs, dataset_consumes_1 should be triggered immediately -because its only dataset dependency is managed by dataset_produces_1. +dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset +from dataset_produces_2, which has no schedule and must be manually triggered. -No other dags should be triggered. Note that even though dataset_consumes_1_and_2 depends on -the dataset in dataset_produces_1, it will not be triggered until dataset_produces_2 runs -(and dataset_produces_2 is left with no schedule so that we can trigger it manually). +After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should +run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be +triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated. +consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when +either dataset is updated. -Next, trigger dataset_produces_2. After dataset_produces_2 finishes, -dataset_consumes_1_and_2 should run. +consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic. +This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset +are updated. This example highlights the capability to combine updates from multiple datasets with logical +expressions for advanced scheduling. -Dags dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled should not run because -they depend on datasets that never get updated. +conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with +dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and +dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing +Airflow's versatility in handling mixed triggers for dataset and time-based scheduling. + +The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run +automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks. """ from __future__ import annotations @@ -50,6 +63,7 @@ dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"}) +dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( dag_id="dataset_produces_1", @@ -132,16 +146,46 @@ ) with DAG( - dag_id="dataset_and_time_based_timetable", + dag_id="consume_1_and_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset & dag2_dataset), +) as dag5: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_and_2_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="consume_1_or_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | dag2_dataset), +) as dag6: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_or_2_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), +) as dag7: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_or_both_2_and_3_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="conditional_dataset_and_time_based_timetable", catchup=False, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=DatasetOrTimeSchedule( - timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=[dag1_dataset] + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) ), tags=["dataset-time-based-timetable"], -) as dag7: +) as dag8: BashOperator( outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], - task_id="consuming_dataset_time_based", + task_id="conditional_dataset_and_time_based_timetable", bash_command="sleep 5", ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 19bf4285430cf..688faa648d589 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -589,7 +589,7 @@ def __init__( self.dataset_triggers = DatasetAll(*schedule) elif isinstance(schedule, Timetable): timetable = schedule - elif schedule is not NOTSET: + elif schedule is not NOTSET and not isinstance(schedule, BaseDatasetEventInput): schedule_interval = schedule if isinstance(schedule, DatasetOrTimeSchedule): diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 2456503649476..191c08e1e880f 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -237,6 +237,68 @@ Example: Note that this example is using `(.values() | first | first) `_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents. +Advanced Dataset Scheduling with Conditional Expressions +-------------------------------------------------------- + +Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers. + +Logical Operators for Datasets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Airflow supports two logical operators for combining dataset conditions: + +- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated. +- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated. + +These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows. + +Example Usage +------------- + +**Scheduling Based on Multiple Dataset Updates** + +To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``): + +.. code-block:: python + + dag1_dataset = Dataset("s3://dag1/output_1.txt") + dag2_dataset = Dataset("s3://dag2/output_1.txt") + + with DAG( + # Consume dataset 1 and 2 with dataset expressions + schedule=(dag1_dataset & dag2_dataset), + ..., + ): + ... + +**Scheduling Based on Any Dataset Update** + +To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``): + +.. code-block:: python + + with DAG( + # Consume dataset 1 or 2 with dataset expressions + schedule=(dag1_dataset | dag2_dataset), + ..., + ): + ... + +**Complex Conditional Logic** + +For scenarios requiring more intricate conditions, such as triggering a DAG when one dataset is updated or when both of two other datasets are updated, combine the OR and AND operators: + +.. code-block:: python + + dag3_dataset = Dataset("s3://dag3/output_3.txt") + + with DAG( + # Consume dataset 1 or both 2 and 3 with dataset expressions + schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), + ..., + ): + ... + Combining Dataset and Time-Based Schedules ------------------------------------------ @@ -245,3 +307,5 @@ DatasetTimetable Integration With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable. For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable `. + +These examples illustrate how Airflow's conditional dataset expressions can be used to create complex data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates. diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index c33cd075ab886..5ea18b36c494f 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -192,29 +192,37 @@ Here's an example of a DAG using ``DatasetTimetable``: from airflow.timetables.dataset import DatasetTimetable from airflow.timetables.trigger import CronTriggerTimetable - from airflow.datasets import Dataset - from airflow.models import DAG - from airflow.operators.bash import BashOperator - import pendulum - - with DAG( - dag_id="dataset_and_time_based_timetable", - catchup=False, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]), - tags=["dataset-time-based-timetable"], - ) as dag7: - BashOperator( - outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], - task_id="consuming_dataset_time_based", - bash_command="sleep 5", - ) + + + @dag( + schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]) + # Additional arguments here, replace this comment with actual arguments + ) + def example_dag(): + # DAG tasks go here + pass In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``. -Future Enhancements -~~~~~~~~~~~~~~~~~~~ -Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios. +Integrate conditional dataset with Time-Based Scheduling +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Combining conditional dataset expressions with time-based schedules enhances scheduling flexibility: + +.. code-block:: python + + from airflow.timetables import DatasetOrTimeSchedule + from airflow.timetables.trigger import CronTriggerTimetable + + + @dag( + schedule=DatasetOrTimeSchedule( + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) + ) + # Additional arguments here, replace this comment with actual arguments + ) + def example_dag(): + # DAG tasks go here + pass Timetables comparisons diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 258e542cec264..ba29e41361b86 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -23,7 +23,7 @@ import pytest from sqlalchemy.sql import select -from airflow.datasets import Dataset, DatasetAll, DatasetAny +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator @@ -89,6 +89,55 @@ def test_hash(): hash(dataset) +def test_dataset_logic_operations(): + result_or = dataset1 | dataset2 + assert isinstance(result_or, DatasetAny) + result_and = dataset1 & dataset2 + assert isinstance(result_and, DatasetAll) + + +def test_dataset_iter_datasets(): + assert list(dataset1.iter_datasets()) == [("s3://bucket1/data1", dataset1)] + + +def test_dataset_evaluate(): + assert dataset1.evaluate({"s3://bucket1/data1": True}) is True + assert dataset1.evaluate({"s3://bucket1/data1": False}) is False + + +def test_dataset_any_operations(): + result_or = (dataset1 | dataset2) | dataset3 + assert isinstance(result_or, DatasetAny) + assert len(result_or.objects) == 3 + result_and = (dataset1 | dataset2) & dataset3 + assert isinstance(result_and, DatasetAll) + + +def test_dataset_all_operations(): + result_or = (dataset1 & dataset2) | dataset3 + assert isinstance(result_or, DatasetAny) + result_and = (dataset1 & dataset2) & dataset3 + assert isinstance(result_and, DatasetAll) + + +def test_datasetbooleancondition_evaluate_iter(): + """ + Tests _DatasetBooleanCondition's evaluate and iter_datasets methods through DatasetAny and DatasetAll. + Ensures DatasetAny evaluate returns True with any true condition, DatasetAll evaluate returns False if + any condition is false, and both classes correctly iterate over datasets without duplication. + """ + any_condition = DatasetAny(dataset1, dataset2) + all_condition = DatasetAll(dataset1, dataset2) + assert any_condition.evaluate({"s3://bucket1/data1": False, "s3://bucket2/data2": True}) is True + assert all_condition.evaluate({"s3://bucket1/data1": True, "s3://bucket2/data2": False}) is False + + # Testing iter_datasets indirectly through the subclasses + datasets_any = set(any_condition.iter_datasets()) + datasets_all = set(all_condition.iter_datasets()) + assert datasets_any == {("s3://bucket1/data1", dataset1), ("s3://bucket2/data2", dataset2)} + assert datasets_all == {("s3://bucket1/data1", dataset1), ("s3://bucket2/data2", dataset2)} + + @pytest.mark.parametrize( "inputs, scenario, expected", [ @@ -269,3 +318,69 @@ def test_dag_with_complex_dataset_triggers(session, dag_maker): assert isinstance( serialized_dag_dict["dataset_triggers"], dict ), "Serialized 'dataset_triggers' should be a dict" + + +def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) -> bool: + if type(d1) != type(d2): + return False + + if isinstance(d1, Dataset) and isinstance(d2, Dataset): + return d1.uri == d2.uri + + elif isinstance(d1, (DatasetAny, DatasetAll)) and isinstance(d2, (DatasetAny, DatasetAll)): + if len(d1.objects) != len(d2.objects): + return False + + # Compare each pair of objects + for obj1, obj2 in zip(d1.objects, d2.objects): + # If obj1 or obj2 is a Dataset, DatasetAny, or DatasetAll instance, + # recursively call datasets_equal + if not datasets_equal(obj1, obj2): + return False + return True + + return False + + +dataset1 = Dataset(uri="s3://bucket1/data1") +dataset2 = Dataset(uri="s3://bucket2/data2") +dataset3 = Dataset(uri="s3://bucket3/data3") +dataset4 = Dataset(uri="s3://bucket4/data4") +dataset5 = Dataset(uri="s3://bucket5/data5") + +test_cases = [ + (lambda: dataset1, dataset1), + (lambda: dataset1 & dataset2, DatasetAll(dataset1, dataset2)), + (lambda: dataset1 | dataset2, DatasetAny(dataset1, dataset2)), + (lambda: dataset1 | (dataset2 & dataset3), DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + (lambda: dataset1 | dataset2 & dataset3, DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), + (lambda: dataset1 & dataset2 | dataset3, DatasetAny(DatasetAll(dataset1, dataset2), dataset3)), + ( + lambda: (dataset1 | dataset2) & (dataset3 | dataset4), + DatasetAll(DatasetAny(dataset1, dataset2), DatasetAny(dataset3, dataset4)), + ), + ( + lambda: (dataset1 & dataset2) | (dataset3 & (dataset4 | dataset5)), + DatasetAny(DatasetAll(dataset1, dataset2), DatasetAll(dataset3, DatasetAny(dataset4, dataset5))), + ), + ( + lambda: (dataset1 & dataset2) & (dataset3 & dataset4), + DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)), + ), + (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, dataset3)), + (lambda: dataset1 & dataset2 & dataset3, DatasetAll(dataset1, dataset2, dataset3)), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), +] + + +@pytest.mark.parametrize("expression, expected", test_cases) +def test_evaluate_datasets_expression(expression, expected): + expr = expression() + assert datasets_equal(expr, expected) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8ed5b7ef99aea..c4431ec0854e5 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2158,7 +2158,10 @@ def test_outlet_datasets(self, create_task_instance): assert session.query(DatasetDagRunQueue.target_dag_id).filter_by( dataset_id=event.dataset.id ).order_by(DatasetDagRunQueue.target_dag_id).all() == [ - ("dataset_and_time_based_timetable",), + ("conditional_dataset_and_time_based_timetable",), + ("consume_1_and_2_with_dataset_expressions",), + ("consume_1_or_2_with_dataset_expressions",), + ("consume_1_or_both_2_and_3_with_dataset_expressions",), ("dataset_consumes_1",), ("dataset_consumes_1_and_2",), ("dataset_consumes_1_never_scheduled",),