From 74793057fc8c12a193da8ac928e30ccece5e2e9a Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Tue, 6 Feb 2024 20:08:48 +0530 Subject: [PATCH 01/11] Implement | and & operators so that they can be used instead of DatasetAll and DatasetAny Implement | and & operators so that they can be used instead of DatasetAll and DatasetAny --- airflow/datasets/__init__.py | 10 ++ airflow/example_dags/example_datasets.py | 77 +++++++++--- airflow/jobs/scheduler_job_runner.py | 114 +++++++++--------- airflow/models/dag.py | 10 +- airflow/models/dataset.py | 104 +++++++++++++++- airflow/serialization/serialized_objects.py | 2 + .../authoring-and-scheduling/datasets.rst | 81 +++++++++++++ .../authoring-and-scheduling/timetable.rst | 26 +++- tests/models/test_datasets.py | 87 +++++++++++++ tests/models/test_taskinstance.py | 5 +- tests/serialization/test_dag_serialization.py | 3 + 11 files changed, 436 insertions(+), 83 deletions(-) create mode 100644 tests/models/test_datasets.py diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index eaa25d0a30c35..c45be56080f58 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -55,3 +55,13 @@ def __eq__(self, other): def __hash__(self): return hash(self.uri) + + def __or__(self, other): + from airflow.models.dataset import DatasetAny + + return DatasetAny(self, other) + + def __and__(self, other): + from airflow.models.dataset import DatasetAll + + return DatasetAll(self, other) diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 9dfaaf0c34de0..9b44c7a707889 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -15,26 +15,40 @@ # specific language governing permissions and limitations # under the License. """ -Example DAG for demonstrating behavior of Datasets feature. +Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and +dataset expression-based scheduling. Notes on usage: -Turn on all the dags. +Turn on all the DAGs. -DAG dataset_produces_1 should run because it's on a schedule. +dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset +being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by +dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of +either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1. -After dataset_produces_1 runs, dataset_consumes_1 should be triggered immediately -because its only dataset dependency is managed by dataset_produces_1. +dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset +from dataset_produces_2, which has no schedule and must be manually triggered. -No other dags should be triggered. Note that even though dataset_consumes_1_and_2 depends on -the dataset in dataset_produces_1, it will not be triggered until dataset_produces_2 runs -(and dataset_produces_2 is left with no schedule so that we can trigger it manually). +After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should +run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be +triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated. +consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when +either dataset is updated. -Next, trigger dataset_produces_2. After dataset_produces_2 finishes, -dataset_consumes_1_and_2 should run. +consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic. +This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset +are updated. This example highlights the capability to combine updates from multiple datasets with logical +expressions for advanced scheduling. -Dags dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled should not run because -they depend on datasets that never get updated. +conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with +dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and +dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing +Airflow's versatility in handling mixed triggers for dataset and time-based scheduling. + + +The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run +automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks. """ from __future__ import annotations @@ -50,6 +64,7 @@ dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"}) # [END dataset_def] dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"}) +dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"}) with DAG( dag_id="dataset_produces_1", @@ -132,16 +147,46 @@ ) with DAG( - dag_id="dataset_and_time_based_timetable", + dag_id="consume_1_and_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset & dag2_dataset), +) as dag5: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_and_2_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="consume_1_or_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | dag2_dataset), +) as dag6: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_or_2_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), +) as dag7: + BashOperator( + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + task_id="consume_1_or_both_2_and_3_with_dataset_expressions", + bash_command="sleep 5", + ) +with DAG( + dag_id="conditional_dataset_and_time_based_timetable", catchup=False, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=DatasetOrTimeSchedule( - timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=[dag1_dataset] + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) ), tags=["dataset-time-based-timetable"], -) as dag7: +) as dag8: BashOperator( outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], - task_id="consuming_dataset_time_based", + task_id="conditional_dataset_and_time_based_timetable", bash_command="sleep 5", ) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 32cc9f5a634ab..5dd2a65e27be8 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -31,7 +31,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator -from sqlalchemy import and_, delete, func, not_, or_, select, text, update +from sqlalchemy import and_, delete, exists, func, literal_column, not_, or_, select, text, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -1215,13 +1215,6 @@ def _create_dag_runs_dataset_triggered( dag_id: timezone.coerce_datetime(last_time) for dag_id, (_, last_time) in dataset_triggered_dag_info.items() } - existing_dagruns: set[tuple[str, timezone.DateTime]] = set( - session.execute( - select(DagRun.dag_id, DagRun.execution_date).where( - tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items()) - ) - ) - ) for dag_model in dag_models: dag = self.dagbag.get_dag(dag_model.dag_id, session=session) @@ -1247,59 +1240,68 @@ def _create_dag_runs_dataset_triggered( # create a new one. This is so that in the next Scheduling loop we try to create new runs # instead of falling in a loop of Integrity Error. exec_date = exec_dates[dag.dag_id] - if (dag.dag_id, exec_date) not in existing_dagruns: - previous_dag_run = session.scalar( - select(DagRun) - .where( - DagRun.dag_id == dag.dag_id, - DagRun.execution_date < exec_date, - DagRun.run_type == DagRunType.DATASET_TRIGGERED, - ) - .order_by(DagRun.execution_date.desc()) + curr_date_query = select( + exists( + select(literal_column("1")) + .where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == exec_date) .limit(1) ) - dataset_event_filters = [ - DagScheduleDatasetReference.dag_id == dag.dag_id, - DatasetEvent.timestamp <= exec_date, - ] - if previous_dag_run: - dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) - - dataset_events = session.scalars( - select(DatasetEvent) - .join( - DagScheduleDatasetReference, - DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id, - ) - .join(DatasetEvent.source_dag_run) - .where(*dataset_event_filters) - ).all() - - data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) - run_id = dag.timetable.generate_run_id( - run_type=DagRunType.DATASET_TRIGGERED, - logical_date=exec_date, - data_interval=data_interval, - session=session, - events=dataset_events, - ) + ) + if session.scalar(curr_date_query): # dag already exists + continue - dag_run = dag.create_dagrun( - run_id=run_id, - run_type=DagRunType.DATASET_TRIGGERED, - execution_date=exec_date, - data_interval=data_interval, - state=DagRunState.QUEUED, - external_trigger=False, - session=session, - dag_hash=dag_hash, - creating_job_id=self.job.id, + prev_exec_date = session.scalar( + select(DagRun.execution_date) + .where( + DagRun.dag_id == dag.dag_id, + DagRun.execution_date < exec_date, + DagRun.run_type == DagRunType.DATASET_TRIGGERED, ) - Stats.incr("dataset.triggered_dagruns") - dag_run.consumed_dataset_events.extend(dataset_events) - session.execute( - delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) + .order_by(DagRun.execution_date.desc()) + .limit(1) + ) + dataset_event_filters = [ + DagScheduleDatasetReference.dag_id == dag.dag_id, + DatasetEvent.timestamp <= exec_date, + ] + if prev_exec_date: + dataset_event_filters.append(DatasetEvent.timestamp > prev_exec_date) + + dataset_events = session.scalars( + select(DatasetEvent) + .join( + DagScheduleDatasetReference, + DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id, ) + .join(DatasetEvent.source_dag_run) + .where(*dataset_event_filters) + ).all() + + data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) + run_id = dag.timetable.generate_run_id( + run_type=DagRunType.DATASET_TRIGGERED, + logical_date=exec_date, + data_interval=data_interval, + session=session, + events=dataset_events, + ) + + dag_run = dag.create_dagrun( + run_id=run_id, + run_type=DagRunType.DATASET_TRIGGERED, + execution_date=exec_date, + data_interval=data_interval, + state=DagRunState.QUEUED, + external_trigger=False, + session=session, + dag_hash=dag_hash, + creating_job_id=self.job.id, + ) + Stats.incr("dataset.triggered_dagruns") + dag_run.consumed_dataset_events.extend(dataset_events) + session.execute( + delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) + ) def _should_update_dag_next_dagruns( self, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 237759010ac22..62a7af7a7c195 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -104,6 +104,8 @@ DatasetBooleanCondition, DatasetDagRunQueue, DatasetModel, + DatasetsExpression, + extract_datasets, ) from airflow.models.param import DagParam, ParamsDict from airflow.models.taskinstance import ( @@ -174,7 +176,7 @@ # but Mypy cannot handle that right now. Track progress of PEP 661 for progress. # See also: https://discuss.python.org/t/9126/7 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] -ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, Collection["Dataset"]] +ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, Collection["Dataset"], DatasetsExpression] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] @@ -587,7 +589,9 @@ def __init__( self.timetable: Timetable self.schedule_interval: ScheduleInterval self.dataset_triggers: DatasetBooleanCondition | None = None - if isinstance(schedule, (DatasetAll, DatasetAny)): + if isinstance(schedule, DatasetsExpression): + self.dataset_triggers = extract_datasets(dataset_expression=schedule) + elif isinstance(schedule, (DatasetAll, DatasetAny)): self.dataset_triggers = schedule if isinstance(schedule, Collection) and not isinstance(schedule, str): from airflow.datasets import Dataset @@ -597,7 +601,7 @@ def __init__( self.dataset_triggers = DatasetAll(*schedule) elif isinstance(schedule, Timetable): timetable = schedule - elif schedule is not NOTSET: + elif schedule is not NOTSET and not isinstance(schedule, DatasetsExpression): schedule_interval = schedule if isinstance(schedule, DatasetOrTimeSchedule): diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index bf28777358786..cf734d024a0ae 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -341,9 +341,9 @@ def __repr__(self) -> str: class DatasetBooleanCondition: """ - Base class for boolean logic for dataset triggers. + Base class for boolean conditions on datasets. This class is intended for internal use only. - :meta private: + :param objects: A variable number of Dataset, DatasetAny, or DatasetAll instances. """ agg_func: Callable[[Iterable], bool] @@ -352,14 +352,26 @@ def __init__(self, *objects) -> None: self.objects = objects def evaluate(self, statuses: dict[str, bool]) -> bool: + """ + Evaluates the boolean condition based on the statuses of datasets. + + :param statuses: A dictionary mapping dataset URIs to their boolean statuses. + """ return self.agg_func(self.eval_one(x, statuses) for x in self.objects) def eval_one(self, obj: Dataset | DatasetAny | DatasetAll, statuses) -> bool: + """ + Evaluates the status of a single object (Dataset, DatasetAny, or DatasetAll). + + :param obj: The Dataset, DatasetAny, or DatasetAll instance to evaluate. + :param statuses: A dictionary mapping dataset URIs to their boolean statuses. + """ if isinstance(obj, Dataset): return statuses.get(obj.uri, False) return obj.evaluate(statuses=statuses) def all_datasets(self) -> dict[str, Dataset]: + """Retrieves all unique datasets contained within the boolean condition.""" uris = {} for x in self.objects: if isinstance(x, Dataset): @@ -374,12 +386,96 @@ def all_datasets(self) -> dict[str, Dataset]: class DatasetAny(DatasetBooleanCondition): - """Use to combine datasets schedule references in an "and" relationship.""" + """ + Represents a logical OR condition of datasets. + + Inherits from DatasetBooleanCondition. + """ agg_func = any + def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): + """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" + super().__init__(*objects) + + def __or__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAny(*self.objects, other) + return NotImplemented + + def __and__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAll(self, other) + return NotImplemented + + def __repr__(self) -> str: + return f"DatasetAny({', '.join(map(str, self.objects))})" + class DatasetAll(DatasetBooleanCondition): - """Use to combine datasets schedule references in an "or" relationship.""" + """Represents a logical AND condition of datasets. Inherits from DatasetBooleanCondition.""" agg_func = all + + def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): + """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" + super().__init__(*objects) + + def __or__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAny(self, other) + return NotImplemented + + def __and__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAll(*self.objects, other) + return NotImplemented + + def __repr__(self) -> str: + return f"DatasetAnd({', '.join(map(str, self.objects))})" + + +class DatasetsExpression: + """ + Represents a node in an expression tree for dataset conditions. + + :param value: The value of the node, which can be a 'Dataset', '&', or '|'. + :param left: The left child node. + :param right: The right child node. + """ + + def __init__(self, value, left=None, right=None): + self.value = value # value can be 'Dataset', '&', or '|' + self.left = left + self.right = right + + def __or__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + return DatasetsExpression("|", self, other) + + def __and__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + return DatasetsExpression("&", self, other) + + def __repr__(self): + if isinstance(self.value, Dataset): + return f"Dataset(uri='{self.value.uri}')" + elif self.value == "&": + return repr(DatasetAll(self.left, self.right)) + elif self.value == "|": + return repr(DatasetAny(self.left, self.right)) + + +def extract_datasets(dataset_expression: DatasetsExpression | Dataset): + """ + Extracts the dataset(s) from an DatasetsExpression. + + :param dataset_expression: The DatasetsExpression to extract from. + """ + if isinstance(dataset_expression, DatasetsExpression): + if dataset_expression.value == "&": + return DatasetAll(dataset_expression.left, dataset_expression.right) + elif dataset_expression.value == "|": + return DatasetAny(dataset_expression.left, dataset_expression.right) + else: + raise ValueError("Invalid Expression node value") + else: + return dataset_expression diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 5e6073233e273..7126d5e0ac740 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -407,6 +407,8 @@ def serialize_to_json( serialized_object[key] = encode_timetable(value) elif key == "dataset_triggers": serialized_object[key] = cls.serialize(value) + elif key == "dataset_triggers": + serialized_object[key] = cls.serialize(value) else: value = cls.serialize(value) if isinstance(value, dict) and Encoding.TYPE in value: diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 2456503649476..c642ec63dfae6 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -237,6 +237,85 @@ Example: Note that this example is using `(.values() | first | first) `_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents. +Advanced Dataset Scheduling with Conditional Expressions +-------------------------------------------------------- + +Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers. + +Logical Operators for Datasets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Airflow supports two logical operators for combining dataset conditions: + +- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated. +- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated. + +These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows. + +Example Usage +------------- + +**Scheduling Based on Multiple Dataset Updates** + +To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``): + +.. code-block:: python + + from airflow.models import DAG + from airflow.operators.bash import BashOperator + from airflow.datasets import Dataset + import pendulum + + dag1_dataset = Dataset("s3://dag1/output_1.txt") + dag2_dataset = Dataset("s3://dag2/output_1.txt") + + with DAG( + dag_id="consume_1_and_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset & dag2_dataset), + ) as dag: + BashOperator( + task_id="consume_1_and_2_with_dataset_expressions", + bash_command="sleep 5", + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + ) + +**Scheduling Based on Any Dataset Update** + +To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``): + +.. code-block:: python + + with DAG( + dag_id="consume_1_or_2_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | dag2_dataset), + ) as dag: + BashOperator( + task_id="consume_1_or_2_with_dataset_expressions", + bash_command="sleep 5", + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + ) + +**Complex Conditional Logic** + +For scenarios requiring more intricate conditions, such as triggering a DAG when one dataset is updated or when both of two other datasets are updated, combine the OR and AND operators: + +.. code-block:: python + + dag3_dataset = Dataset("s3://dag3/output_3.txt") + + with DAG( + dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), + ) as dag: + BashOperator( + task_id="consume_1_or_both_2_and_3_with_dataset_expressions", + bash_command="sleep 5", + outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], + ) + Combining Dataset and Time-Based Schedules ------------------------------------------ @@ -245,3 +324,5 @@ DatasetTimetable Integration With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable. For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable `. + +These examples illustrate how Airflow's conditional dataset expressions can be used to create complex, data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates. diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index c33cd075ab886..628d355c98665 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``: In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``. -Future Enhancements -~~~~~~~~~~~~~~~~~~~ -Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios. +Integrate conditional dataset with Time-Based Scheduling +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Combining conditional dataset expressions with time-based schedules enhances scheduling flexibility: + +.. code-block:: python + + from airflow.timetables import DatasetOrTimeSchedule + from airflow.timetables.trigger import CronTriggerTimetable + + with DAG( + dag_id="conditional_dataset_and_time_based_timetable", + catchup=False, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=DatasetOrTimeSchedule( + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) + ), + tags=["dataset-time-based-timetable"], + ) as dag: + BashOperator( + task_id="conditional_dataset_and_time_based_timetable", + bash_command="sleep 5", + outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], + ) Timetables comparisons diff --git a/tests/models/test_datasets.py b/tests/models/test_datasets.py new file mode 100644 index 0000000000000..835de9be691e5 --- /dev/null +++ b/tests/models/test_datasets.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.models.dataset import Dataset, DatasetAll, DatasetAny, extract_datasets + + +def datasets_equal(d1, d2): + if type(d1) != type(d2): + return False + + if isinstance(d1, Dataset): + return d1.uri == d2.uri + + elif isinstance(d1, (DatasetAny, DatasetAll)): + if len(d1.objects) != len(d2.objects): + return False + + # Compare each pair of objects + for obj1, obj2 in zip(d1.objects, d2.objects): + # If obj1 or obj2 is a Dataset, DatasetAny, or DatasetAll instance, + # recursively call datasets_equal + if not datasets_equal(obj1, obj2): + return False + return True + + return False + + +dataset1 = Dataset(uri="s3://bucket1/data1") +dataset2 = Dataset(uri="s3://bucket2/data2") +dataset3 = Dataset(uri="s3://bucket3/data3") +dataset4 = Dataset(uri="s3://bucket4/data4") +dataset5 = Dataset(uri="s3://bucket5/data5") + +test_cases = [ + (lambda: dataset1, dataset1), + (lambda: dataset1 & dataset2, DatasetAll(dataset1, dataset2)), + (lambda: dataset1 | dataset2, DatasetAny(dataset1, dataset2)), + (lambda: dataset1 | (dataset2 & dataset3), DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + (lambda: dataset1 | dataset2 & dataset3, DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), + (lambda: dataset1 & dataset2 | dataset3, DatasetAny(DatasetAll(dataset1, dataset2), dataset3)), + ( + lambda: (dataset1 | dataset2) & (dataset3 | dataset4), + DatasetAll(DatasetAny(dataset1, dataset2), DatasetAny(dataset3, dataset4)), + ), + ( + lambda: (dataset1 & dataset2) | (dataset3 & (dataset4 | dataset5)), + DatasetAny(DatasetAll(dataset1, dataset2), DatasetAll(dataset3, DatasetAny(dataset4, dataset5))), + ), + ( + lambda: (dataset1 & dataset2) & (dataset3 & dataset4), + DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)), + ), + (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, dataset3)), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), +] + + +@pytest.mark.parametrize("expression, expected", test_cases) +def test_extract_datasets(expression, expected): + expr = expression() + result = extract_datasets(expr) + assert datasets_equal(result, expected) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8f5536cab624f..e88937763970c 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2157,7 +2157,10 @@ def test_outlet_datasets(self, create_task_instance): assert session.query(DatasetDagRunQueue.target_dag_id).filter_by( dataset_id=event.dataset.id ).order_by(DatasetDagRunQueue.target_dag_id).all() == [ - ("dataset_and_time_based_timetable",), + ("conditional_dataset_and_time_based_timetable",), + ("consume_1_and_2_with_dataset_expressions",), + ("consume_1_or_2_with_dataset_expressions",), + ("consume_1_or_both_2_and_3_with_dataset_expressions",), ("dataset_consumes_1",), ("dataset_consumes_1_and_2",), ("dataset_consumes_1_never_scheduled",), diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 2adc956b6f4d0..b62b69cb5a80d 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -554,6 +554,9 @@ def validate_deserialized_dag(self, serialized_dag, dag): compare_serialization_list = { "dataset_triggers", } + compare_serialization_list = { + "dataset_triggers", + } fields_to_check = dag.get_serialized_fields() - exclusion_list for field in fields_to_check: actual = getattr(serialized_dag, field) From d878038b437b401a153335609c4b170ef07bdf38 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:29:52 +0530 Subject: [PATCH 02/11] Refactor dataset class inheritance (#37590) * Refactor DatasetAll and DatasetAny inheritance They are moved from airflow.models.datasets to airflow.datasets since the intention is to use them with Dataset, not DatasetModel. It is more natural for users to import from the latter module instead. A new (abstract) base class is added for the two classes, plus the OG Dataset class, to inherit from. This allows us to replace a few isinstance checks with simple molymorphism and make the logic a bit simpler. --------- Co-authored-by: Tzu-ping Chung Co-authored-by: Wei Lee --- airflow/datasets/__init__.py | 64 ++++++++- airflow/models/dag.py | 26 ++-- airflow/models/dataset.py | 143 -------------------- airflow/serialization/serialized_objects.py | 5 +- airflow/timetables/datasets.py | 13 +- tests/datasets/test_dataset.py | 4 +- 6 files changed, 85 insertions(+), 170 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index c45be56080f58..2852824712f99 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -14,18 +14,35 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from __future__ import annotations import os -from typing import Any, ClassVar +from typing import Any, Callable, ClassVar, Iterable, Iterator, Protocol, runtime_checkable from urllib.parse import urlsplit import attr +__all__ = ["Dataset", "DatasetAll", "DatasetAny"] + + +@runtime_checkable +class BaseDatasetEventInput(Protocol): + """Protocol for all dataset triggers to use in ``DAG(schedule=...)``. + + :meta private: + """ + + def evaluate(self, statuses: dict[str, bool]) -> bool: + raise NotImplementedError + + def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: + raise NotImplementedError + @attr.define() -class Dataset(os.PathLike): - """A Dataset is used for marking data dependencies between workflows.""" +class Dataset(os.PathLike, BaseDatasetEventInput): + """A representation of data dependencies between workflows.""" uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) extra: dict[str, Any] | None = None @@ -44,7 +61,7 @@ def _check_uri(self, attr, uri: str): if parsed.scheme and parsed.scheme.lower() == "airflow": raise ValueError(f"{attr.name!r} scheme `airflow` is reserved") - def __fspath__(self): + def __fspath__(self) -> str: return self.uri def __eq__(self, other): @@ -65,3 +82,42 @@ def __and__(self, other): from airflow.models.dataset import DatasetAll return DatasetAll(self, other) + + def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: + yield self.uri, self + + def evaluate(self, statuses: dict[str, bool]) -> bool: + return statuses.get(self.uri, False) + + +class _DatasetBooleanCondition(BaseDatasetEventInput): + """Base class for dataset boolean logic.""" + + agg_func: Callable[[Iterable], bool] + + def __init__(self, *objects: BaseDatasetEventInput) -> None: + self.objects = objects + + def evaluate(self, statuses: dict[str, bool]) -> bool: + return self.agg_func(x.evaluate(statuses=statuses) for x in self.objects) + + def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: + seen = set() # We want to keep the first instance. + for o in self.objects: + for k, v in o.iter_datasets(): + if k in seen: + continue + yield k, v + seen.add(k) + + +class DatasetAny(_DatasetBooleanCondition): + """Use to combine datasets schedule references in an "and" relationship.""" + + agg_func = any + + +class DatasetAll(_DatasetBooleanCondition): + """Use to combine datasets schedule references in an "or" relationship.""" + + agg_func = all diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 62a7af7a7c195..82da83b8c180b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -80,6 +80,7 @@ from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf as airflow_conf, secrets_backend_list +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll from airflow.datasets.manager import dataset_manager from airflow.exceptions import ( AirflowDagInconsistent, @@ -98,16 +99,16 @@ from airflow.models.dagcode import DagCode from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import RUN_ID_REGEX, DagRun -from airflow.models.dataset import ( +from airflow.models.dataset import DatasetDagRunQueue, DatasetModel +from airflow.models.param import DagParam, ParamsDict +from airflow.datasets import ( + BaseDatasetEventInput, + Dataset, DatasetAll, DatasetAny, - DatasetBooleanCondition, - DatasetDagRunQueue, - DatasetModel, DatasetsExpression, extract_datasets, ) -from airflow.models.param import DagParam, ParamsDict from airflow.models.taskinstance import ( Context, TaskInstance, @@ -152,7 +153,6 @@ from sqlalchemy.orm.query import Query from sqlalchemy.orm.session import Session - from airflow.datasets import Dataset from airflow.decorators import TaskDecoratorCollection from airflow.models.dagbag import DagBag from airflow.models.operator import Operator @@ -176,7 +176,7 @@ # but Mypy cannot handle that right now. Track progress of PEP 661 for progress. # See also: https://discuss.python.org/t/9126/7 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] -ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, Collection["Dataset"], DatasetsExpression] +ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, BaseDatasetEventInput, Collection["Dataset"], DatasetsExpression] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] @@ -588,14 +588,12 @@ def __init__( self.timetable: Timetable self.schedule_interval: ScheduleInterval - self.dataset_triggers: DatasetBooleanCondition | None = None + self.dataset_triggers: BaseDatasetEventInput | None = None if isinstance(schedule, DatasetsExpression): self.dataset_triggers = extract_datasets(dataset_expression=schedule) elif isinstance(schedule, (DatasetAll, DatasetAny)): self.dataset_triggers = schedule - if isinstance(schedule, Collection) and not isinstance(schedule, str): - from airflow.datasets import Dataset - + elif isinstance(schedule, Collection) and not isinstance(schedule, str): if not all(isinstance(x, Dataset) for x in schedule): raise ValueError("All elements in 'schedule' should be datasets") self.dataset_triggers = DatasetAll(*schedule) @@ -3185,7 +3183,7 @@ def bulk_write_to_db( if curr_orm_dag and curr_orm_dag.schedule_dataset_references: curr_orm_dag.schedule_dataset_references = [] else: - for dataset in dag.dataset_triggers.all_datasets().values(): + for _, dataset in dag.dataset_triggers.iter_datasets(): dag_references[dag.dag_id].add(dataset.uri) input_datasets[DatasetModel.from_public(dataset)] = None curr_outlet_references = curr_orm_dag and curr_orm_dag.task_outlet_dataset_references @@ -3797,14 +3795,14 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ """ from airflow.models.serialized_dag import SerializedDagModel - def dag_ready(dag_id: str, cond: DatasetBooleanCondition, statuses: dict) -> bool | None: + def dag_ready(dag_id: str, cond: BaseDatasetEventInput, statuses: dict) -> bool | None: # if dag was serialized before 2.9 and we *just* upgraded, # we may be dealing with old version. In that case, # just wait for the dag to be reserialized. try: return cond.evaluate(statuses) except AttributeError: - log.warning("dag '%s' has old serialization; skipping dag run creation.", dag_id) + log.warning("dag '%s' has old serialization; skipping DAG run creation.", dag_id) return None # this loads all the DDRQ records.... may need to limit num dags diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index cf734d024a0ae..aa10eb3809756 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -from typing import Callable, Iterable from urllib.parse import urlsplit import sqlalchemy_jsonfield @@ -337,145 +336,3 @@ def __repr__(self) -> str: ]: args.append(f"{attr}={getattr(self, attr)!r}") return f"{self.__class__.__name__}({', '.join(args)})" - - -class DatasetBooleanCondition: - """ - Base class for boolean conditions on datasets. This class is intended for internal use only. - - :param objects: A variable number of Dataset, DatasetAny, or DatasetAll instances. - """ - - agg_func: Callable[[Iterable], bool] - - def __init__(self, *objects) -> None: - self.objects = objects - - def evaluate(self, statuses: dict[str, bool]) -> bool: - """ - Evaluates the boolean condition based on the statuses of datasets. - - :param statuses: A dictionary mapping dataset URIs to their boolean statuses. - """ - return self.agg_func(self.eval_one(x, statuses) for x in self.objects) - - def eval_one(self, obj: Dataset | DatasetAny | DatasetAll, statuses) -> bool: - """ - Evaluates the status of a single object (Dataset, DatasetAny, or DatasetAll). - - :param obj: The Dataset, DatasetAny, or DatasetAll instance to evaluate. - :param statuses: A dictionary mapping dataset URIs to their boolean statuses. - """ - if isinstance(obj, Dataset): - return statuses.get(obj.uri, False) - return obj.evaluate(statuses=statuses) - - def all_datasets(self) -> dict[str, Dataset]: - """Retrieves all unique datasets contained within the boolean condition.""" - uris = {} - for x in self.objects: - if isinstance(x, Dataset): - if x.uri not in uris: - uris[x.uri] = x - else: - # keep the first instance - for k, v in x.all_datasets().items(): - if k not in uris: - uris[k] = v - return uris - - -class DatasetAny(DatasetBooleanCondition): - """ - Represents a logical OR condition of datasets. - - Inherits from DatasetBooleanCondition. - """ - - agg_func = any - - def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): - """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" - super().__init__(*objects) - - def __or__(self, other): - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAny(*self.objects, other) - return NotImplemented - - def __and__(self, other): - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAll(self, other) - return NotImplemented - - def __repr__(self) -> str: - return f"DatasetAny({', '.join(map(str, self.objects))})" - - -class DatasetAll(DatasetBooleanCondition): - """Represents a logical AND condition of datasets. Inherits from DatasetBooleanCondition.""" - - agg_func = all - - def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): - """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" - super().__init__(*objects) - - def __or__(self, other): - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAny(self, other) - return NotImplemented - - def __and__(self, other): - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAll(*self.objects, other) - return NotImplemented - - def __repr__(self) -> str: - return f"DatasetAnd({', '.join(map(str, self.objects))})" - - -class DatasetsExpression: - """ - Represents a node in an expression tree for dataset conditions. - - :param value: The value of the node, which can be a 'Dataset', '&', or '|'. - :param left: The left child node. - :param right: The right child node. - """ - - def __init__(self, value, left=None, right=None): - self.value = value # value can be 'Dataset', '&', or '|' - self.left = left - self.right = right - - def __or__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: - return DatasetsExpression("|", self, other) - - def __and__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: - return DatasetsExpression("&", self, other) - - def __repr__(self): - if isinstance(self.value, Dataset): - return f"Dataset(uri='{self.value.uri}')" - elif self.value == "&": - return repr(DatasetAll(self.left, self.right)) - elif self.value == "|": - return repr(DatasetAny(self.left, self.right)) - - -def extract_datasets(dataset_expression: DatasetsExpression | Dataset): - """ - Extracts the dataset(s) from an DatasetsExpression. - - :param dataset_expression: The DatasetsExpression to extract from. - """ - if isinstance(dataset_expression, DatasetsExpression): - if dataset_expression.value == "&": - return DatasetAll(dataset_expression.left, dataset_expression.right) - elif dataset_expression.value == "|": - return DatasetAny(dataset_expression.left, dataset_expression.right) - else: - raise ValueError("Invalid Expression node value") - else: - return dataset_expression diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 7126d5e0ac740..0fc49e9b98dbc 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -35,14 +35,13 @@ from airflow.compat.functools import cache from airflow.configuration import conf -from airflow.datasets import Dataset +from airflow.datasets import Dataset, DatasetAll, DatasetAny from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, SerializationError from airflow.jobs.job import Job from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, create_timetable from airflow.models.dagrun import DagRun -from airflow.models.dataset import DatasetAll, DatasetAny from airflow.models.expandinput import EXPAND_INPUT_EMPTY, create_expand_input, get_map_type_key from airflow.models.mappedoperator import MappedOperator from airflow.models.param import Param, ParamsDict @@ -790,7 +789,7 @@ def detect_dag_dependencies(dag: DAG | None) -> Iterable[DagDependency]: return if not dag.dataset_triggers: return - for uri in dag.dataset_triggers.all_datasets().keys(): + for uri, _ in dag.dataset_triggers.iter_datasets(): yield DagDependency( source="dataset", target=dag.dag_id, diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py index c755df964ee4d..dcc0652929285 100644 --- a/airflow/timetables/datasets.py +++ b/airflow/timetables/datasets.py @@ -19,8 +19,8 @@ import typing +from airflow.datasets import BaseDatasetEventInput, DatasetAll from airflow.exceptions import AirflowTimetableInvalid -from airflow.models.dataset import DatasetAll, DatasetBooleanCondition from airflow.timetables.simple import DatasetTriggeredTimetable as DatasetTriggeredSchedule from airflow.utils.types import DagRunType @@ -36,9 +36,14 @@ class DatasetOrTimeSchedule(DatasetTriggeredSchedule): """Combine time-based scheduling with event-based scheduling.""" - def __init__(self, timetable: Timetable, datasets: Collection[Dataset] | DatasetBooleanCondition) -> None: + def __init__( + self, + *, + timetable: Timetable, + datasets: Collection[Dataset] | BaseDatasetEventInput, + ) -> None: self.timetable = timetable - if isinstance(datasets, DatasetBooleanCondition): + if isinstance(datasets, BaseDatasetEventInput): self.datasets = datasets else: self.datasets = DatasetAll(*datasets) @@ -70,7 +75,7 @@ def serialize(self) -> dict[str, typing.Any]: def validate(self) -> None: if isinstance(self.timetable, DatasetTriggeredSchedule): raise AirflowTimetableInvalid("cannot nest dataset timetables") - if not isinstance(self.datasets, DatasetBooleanCondition): + if not isinstance(self.datasets, BaseDatasetEventInput): raise AirflowTimetableInvalid("all elements in 'datasets' must be datasets") @property diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index e10264b0e2490..258e542cec264 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -23,8 +23,8 @@ import pytest from sqlalchemy.sql import select -from airflow.datasets import Dataset -from airflow.models.dataset import DatasetAll, DatasetAny, DatasetDagRunQueue, DatasetModel +from airflow.datasets import Dataset, DatasetAll, DatasetAny +from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG From 1e12daf3eeafa41479c4dd3e9be1bef583d42de0 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Thu, 22 Feb 2024 15:34:08 +0530 Subject: [PATCH 03/11] Refactor the dataset inheritance --- airflow/datasets/__init__.py | 94 +++++++++++++++++-- airflow/models/dag.py | 3 +- airflow/serialization/serialized_objects.py | 2 - tests/datasets/test_dataset.py | 68 +++++++++++++- tests/models/test_datasets.py | 87 ----------------- tests/serialization/test_dag_serialization.py | 3 - 6 files changed, 154 insertions(+), 103 deletions(-) delete mode 100644 tests/models/test_datasets.py diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 2852824712f99..772362aa188c5 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -50,7 +50,7 @@ class Dataset(os.PathLike, BaseDatasetEventInput): __version__: ClassVar[int] = 1 @uri.validator - def _check_uri(self, attr, uri: str): + def _check_uri(self, attr, uri: str) -> None: if uri.isspace(): raise ValueError(f"{attr.name} cannot be just whitespace") try: @@ -64,7 +64,7 @@ def _check_uri(self, attr, uri: str): def __fspath__(self) -> str: return self.uri - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if isinstance(other, self.__class__): return self.uri == other.uri else: @@ -73,14 +73,10 @@ def __eq__(self, other): def __hash__(self): return hash(self.uri) - def __or__(self, other): - from airflow.models.dataset import DatasetAny - + def __or__(self, other: Dataset): return DatasetAny(self, other) - def __and__(self, other): - from airflow.models.dataset import DatasetAll - + def __and__(self, other: Dataset): return DatasetAll(self, other) def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: @@ -116,8 +112,90 @@ class DatasetAny(_DatasetBooleanCondition): agg_func = any + def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None: + """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" + super().__init__(*objects) + + def __or__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAny(*self.objects, other) + return NotImplemented + + def __and__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAll(self, other) + return NotImplemented + + def __repr__(self) -> str: + return f"DatasetAny({', '.join(map(str, self.objects))})" + class DatasetAll(_DatasetBooleanCondition): """Use to combine datasets schedule references in an "or" relationship.""" agg_func = all + + def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): + """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" + super().__init__(*objects) + + def __or__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAny(self, other) + return NotImplemented + + def __and__(self, other): + if isinstance(other, (Dataset, DatasetAny, DatasetAll)): + return DatasetAll(*self.objects, other) + return NotImplemented + + def __repr__(self) -> str: + return f"DatasetAll({', '.join(map(str, self.objects))})" + + +class DatasetsExpression: + """ + Represents a node in an expression tree for dataset conditions. + + :param value: The value of the node, which can be a 'Dataset', '&', or '|'. + :param left: The left child node. + :param right: The right child node. + """ + + def __init__(self, value, left=None, right=None) -> None: + self.value = value # value can be 'Dataset', '&', or '|' + self.left = left + self.right = right + + def __or__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + return DatasetsExpression("|", self, other) + + def __and__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + return DatasetsExpression("&", self, other) + + def __repr__(self) -> str: + if isinstance(self.value, Dataset): + return f"Dataset(uri='{self.value.uri}')" + elif self.value == "&": + return repr(DatasetAll(self.left, self.right)) + elif self.value == "|": + return repr(DatasetAny(self.left, self.right)) + else: + return f"Invalid DatasetsExpression(value={self.value})" + + +def extract_datasets( + dataset_expression: DatasetsExpression | Dataset, +) -> BaseDatasetEventInput: + """ + Extract datasets from the given DatasetsExpression. + + :param dataset_expression: The DatasetsExpression to extract from. + """ + if isinstance(dataset_expression, DatasetsExpression): + if dataset_expression.value == "&": + return DatasetAll(dataset_expression.left, dataset_expression.right) + elif dataset_expression.value == "|": + return DatasetAny(dataset_expression.left, dataset_expression.right) + raise ValueError("Invalid Expression node value") + return dataset_expression diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 82da83b8c180b..92721d08df460 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -80,7 +80,6 @@ from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf as airflow_conf, secrets_backend_list -from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll from airflow.datasets.manager import dataset_manager from airflow.exceptions import ( AirflowDagInconsistent, @@ -176,7 +175,7 @@ # but Mypy cannot handle that right now. Track progress of PEP 661 for progress. # See also: https://discuss.python.org/t/9126/7 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] -ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, BaseDatasetEventInput, Collection["Dataset"], DatasetsExpression] +ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, DatasetsExpression, Collection["Dataset"]] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 0fc49e9b98dbc..552244d73ba7b 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -406,8 +406,6 @@ def serialize_to_json( serialized_object[key] = encode_timetable(value) elif key == "dataset_triggers": serialized_object[key] = cls.serialize(value) - elif key == "dataset_triggers": - serialized_object[key] = cls.serialize(value) else: value = cls.serialize(value) if isinstance(value, dict) and Encoding.TYPE in value: diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 258e542cec264..dcf3c5a5f9ab1 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -23,7 +23,7 @@ import pytest from sqlalchemy.sql import select -from airflow.datasets import Dataset, DatasetAll, DatasetAny +from airflow.datasets import Dataset, DatasetAll, DatasetAny, extract_datasets from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator @@ -269,3 +269,69 @@ def test_dag_with_complex_dataset_triggers(session, dag_maker): assert isinstance( serialized_dag_dict["dataset_triggers"], dict ), "Serialized 'dataset_triggers' should be a dict" + + +def datasets_equal(d1, d2): + if type(d1) != type(d2): + return False + + if isinstance(d1, Dataset): + return d1.uri == d2.uri + + elif isinstance(d1, (DatasetAny, DatasetAll)): + if len(d1.objects) != len(d2.objects): + return False + + # Compare each pair of objects + for obj1, obj2 in zip(d1.objects, d2.objects): + # If obj1 or obj2 is a Dataset, DatasetAny, or DatasetAll instance, + # recursively call datasets_equal + if not datasets_equal(obj1, obj2): + return False + return True + + return False + + +dataset1 = Dataset(uri="s3://bucket1/data1") +dataset2 = Dataset(uri="s3://bucket2/data2") +dataset3 = Dataset(uri="s3://bucket3/data3") +dataset4 = Dataset(uri="s3://bucket4/data4") +dataset5 = Dataset(uri="s3://bucket5/data5") + +test_cases = [ + (lambda: dataset1, dataset1), + (lambda: dataset1 & dataset2, DatasetAll(dataset1, dataset2)), + (lambda: dataset1 | dataset2, DatasetAny(dataset1, dataset2)), + (lambda: dataset1 | (dataset2 & dataset3), DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + (lambda: dataset1 | dataset2 & dataset3, DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), + (lambda: dataset1 & dataset2 | dataset3, DatasetAny(DatasetAll(dataset1, dataset2), dataset3)), + ( + lambda: (dataset1 | dataset2) & (dataset3 | dataset4), + DatasetAll(DatasetAny(dataset1, dataset2), DatasetAny(dataset3, dataset4)), + ), + ( + lambda: (dataset1 & dataset2) | (dataset3 & (dataset4 | dataset5)), + DatasetAny(DatasetAll(dataset1, dataset2), DatasetAll(dataset3, DatasetAny(dataset4, dataset5))), + ), + ( + lambda: (dataset1 & dataset2) & (dataset3 & dataset4), + DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)), + ), + (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, dataset3)), + ( + lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), + DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), + ), +] + + +@pytest.mark.parametrize("expression, expected", test_cases) +def test_extract_datasets(expression, expected): + expr = expression() + result = extract_datasets(expr) + assert datasets_equal(result, expected) diff --git a/tests/models/test_datasets.py b/tests/models/test_datasets.py deleted file mode 100644 index 835de9be691e5..0000000000000 --- a/tests/models/test_datasets.py +++ /dev/null @@ -1,87 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.models.dataset import Dataset, DatasetAll, DatasetAny, extract_datasets - - -def datasets_equal(d1, d2): - if type(d1) != type(d2): - return False - - if isinstance(d1, Dataset): - return d1.uri == d2.uri - - elif isinstance(d1, (DatasetAny, DatasetAll)): - if len(d1.objects) != len(d2.objects): - return False - - # Compare each pair of objects - for obj1, obj2 in zip(d1.objects, d2.objects): - # If obj1 or obj2 is a Dataset, DatasetAny, or DatasetAll instance, - # recursively call datasets_equal - if not datasets_equal(obj1, obj2): - return False - return True - - return False - - -dataset1 = Dataset(uri="s3://bucket1/data1") -dataset2 = Dataset(uri="s3://bucket2/data2") -dataset3 = Dataset(uri="s3://bucket3/data3") -dataset4 = Dataset(uri="s3://bucket4/data4") -dataset5 = Dataset(uri="s3://bucket5/data5") - -test_cases = [ - (lambda: dataset1, dataset1), - (lambda: dataset1 & dataset2, DatasetAll(dataset1, dataset2)), - (lambda: dataset1 | dataset2, DatasetAny(dataset1, dataset2)), - (lambda: dataset1 | (dataset2 & dataset3), DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), - (lambda: dataset1 | dataset2 & dataset3, DatasetAny(dataset1, DatasetAll(dataset2, dataset3))), - ( - lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), - DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), - ), - (lambda: dataset1 & dataset2 | dataset3, DatasetAny(DatasetAll(dataset1, dataset2), dataset3)), - ( - lambda: (dataset1 | dataset2) & (dataset3 | dataset4), - DatasetAll(DatasetAny(dataset1, dataset2), DatasetAny(dataset3, dataset4)), - ), - ( - lambda: (dataset1 & dataset2) | (dataset3 & (dataset4 | dataset5)), - DatasetAny(DatasetAll(dataset1, dataset2), DatasetAll(dataset3, DatasetAny(dataset4, dataset5))), - ), - ( - lambda: (dataset1 & dataset2) & (dataset3 & dataset4), - DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)), - ), - (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, dataset3)), - ( - lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), - DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), - ), -] - - -@pytest.mark.parametrize("expression, expected", test_cases) -def test_extract_datasets(expression, expected): - expr = expression() - result = extract_datasets(expr) - assert datasets_equal(result, expected) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index b62b69cb5a80d..2adc956b6f4d0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -554,9 +554,6 @@ def validate_deserialized_dag(self, serialized_dag, dag): compare_serialization_list = { "dataset_triggers", } - compare_serialization_list = { - "dataset_triggers", - } fields_to_check = dag.get_serialized_fields() - exclusion_list for field in fields_to_check: actual = getattr(serialized_dag, field) From 6238c792d69a4f680740756d801234bf8721255d Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 10:53:02 +0530 Subject: [PATCH 04/11] Apply suggestions from code review Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/datasets/__init__.py | 15 ++++---- airflow/example_dags/example_datasets.py | 1 - .../authoring-and-scheduling/datasets.rst | 38 +++++++------------ .../authoring-and-scheduling/timetable.rst | 14 ++----- tests/datasets/test_dataset.py | 1 + 5 files changed, 26 insertions(+), 43 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 772362aa188c5..fee2b02cf4498 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -192,10 +192,11 @@ def extract_datasets( :param dataset_expression: The DatasetsExpression to extract from. """ - if isinstance(dataset_expression, DatasetsExpression): - if dataset_expression.value == "&": - return DatasetAll(dataset_expression.left, dataset_expression.right) - elif dataset_expression.value == "|": - return DatasetAny(dataset_expression.left, dataset_expression.right) - raise ValueError("Invalid Expression node value") - return dataset_expression + if not isinstance(dataset_expression, DatasetsExpression): + return dataset_expression + + if dataset_expression.value == "&": + return DatasetAll(dataset_expression.left, dataset_expression.right) + elif dataset_expression.value == "|": + return DatasetAny(dataset_expression.left, dataset_expression.right) + raise ValueError("Invalid Expression node value") diff --git a/airflow/example_dags/example_datasets.py b/airflow/example_dags/example_datasets.py index 9b44c7a707889..c2a869ba4aa52 100644 --- a/airflow/example_dags/example_datasets.py +++ b/airflow/example_dags/example_datasets.py @@ -46,7 +46,6 @@ dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing Airflow's versatility in handling mixed triggers for dataset and time-based scheduling. - The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks. """ diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index c642ec63dfae6..921d2f1b720fa 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -270,15 +270,11 @@ To schedule a DAG to run only when two specific datasets have both been updated, dag2_dataset = Dataset("s3://dag2/output_1.txt") with DAG( - dag_id="consume_1_and_2_with_dataset_expressions", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + # Consume dataset 1 and 2 with dataset expressions schedule=(dag1_dataset & dag2_dataset), - ) as dag: - BashOperator( - task_id="consume_1_and_2_with_dataset_expressions", - bash_command="sleep 5", - outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], - ) + ... + ): + ... **Scheduling Based on Any Dataset Update** @@ -287,15 +283,11 @@ To trigger a DAG execution when either of two datasets is updated, apply the OR .. code-block:: python with DAG( - dag_id="consume_1_or_2_with_dataset_expressions", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + # Consume dataset 1 or 2 with dataset expressions schedule=(dag1_dataset | dag2_dataset), - ) as dag: - BashOperator( - task_id="consume_1_or_2_with_dataset_expressions", - bash_command="sleep 5", - outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], - ) + ... + ): + ... **Complex Conditional Logic** @@ -306,15 +298,11 @@ For scenarios requiring more intricate conditions, such as triggering a DAG when dag3_dataset = Dataset("s3://dag3/output_3.txt") with DAG( - dag_id="consume_1_or_both_2_and_3_with_dataset_expressions", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + # Consume dataset 1 or both 2 and 3 with dataset expressions schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), - ) as dag: - BashOperator( - task_id="consume_1_or_both_2_and_3_with_dataset_expressions", - bash_command="sleep 5", - outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")], - ) + ... + ): + ... Combining Dataset and Time-Based Schedules ------------------------------------------ @@ -325,4 +313,4 @@ With the introduction of ``DatasetTimetable``, it is now possible to schedule DA For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable `. -These examples illustrate how Airflow's conditional dataset expressions can be used to create complex, data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates. +These examples illustrate how Airflow's conditional dataset expressions can be used to create complex data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates. diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index 628d355c98665..b0e88a3900e82 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -222,19 +222,13 @@ Combining conditional dataset expressions with time-based schedules enhances sch from airflow.timetables.trigger import CronTriggerTimetable with DAG( - dag_id="conditional_dataset_and_time_based_timetable", - catchup=False, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + # Conditional dataset and time based timetable schedule=DatasetOrTimeSchedule( timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) ), - tags=["dataset-time-based-timetable"], - ) as dag: - BashOperator( - task_id="conditional_dataset_and_time_based_timetable", - bash_command="sleep 5", - outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], - ) + ... + ): + ... Timetables comparisons diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index dcf3c5a5f9ab1..1faae59d98cd0 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -323,6 +323,7 @@ def datasets_equal(d1, d2): DatasetAll(dataset1, dataset2, DatasetAll(dataset3, dataset4)), ), (lambda: dataset1 | dataset2 | dataset3, DatasetAny(dataset1, dataset2, dataset3)), + (lambda: dataset1 & dataset2 & dataset3, DatasetAll(dataset1, dataset2, dataset3)), ( lambda: ((dataset1 & dataset2) | dataset3) & (dataset4 | dataset5), DatasetAll(DatasetAny(DatasetAll(dataset1, dataset2), dataset3), DatasetAny(dataset4, dataset5)), From 5625225dd273961b048f0a319cfd1ae9ca9d6824 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:10:41 +0530 Subject: [PATCH 05/11] Fix documentation changes as per PR comments --- .../authoring-and-scheduling/datasets.rst | 11 ++--- .../authoring-and-scheduling/timetable.rst | 40 ++++++++----------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index 921d2f1b720fa..191c08e1e880f 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -261,18 +261,13 @@ To schedule a DAG to run only when two specific datasets have both been updated, .. code-block:: python - from airflow.models import DAG - from airflow.operators.bash import BashOperator - from airflow.datasets import Dataset - import pendulum - dag1_dataset = Dataset("s3://dag1/output_1.txt") dag2_dataset = Dataset("s3://dag2/output_1.txt") with DAG( # Consume dataset 1 and 2 with dataset expressions schedule=(dag1_dataset & dag2_dataset), - ... + ..., ): ... @@ -285,7 +280,7 @@ To trigger a DAG execution when either of two datasets is updated, apply the OR with DAG( # Consume dataset 1 or 2 with dataset expressions schedule=(dag1_dataset | dag2_dataset), - ... + ..., ): ... @@ -300,7 +295,7 @@ For scenarios requiring more intricate conditions, such as triggering a DAG when with DAG( # Consume dataset 1 or both 2 and 3 with dataset expressions schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)), - ... + ..., ): ... diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst b/docs/apache-airflow/authoring-and-scheduling/timetable.rst index b0e88a3900e82..5ea18b36c494f 100644 --- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst +++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst @@ -192,23 +192,15 @@ Here's an example of a DAG using ``DatasetTimetable``: from airflow.timetables.dataset import DatasetTimetable from airflow.timetables.trigger import CronTriggerTimetable - from airflow.datasets import Dataset - from airflow.models import DAG - from airflow.operators.bash import BashOperator - import pendulum - - with DAG( - dag_id="dataset_and_time_based_timetable", - catchup=False, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]), - tags=["dataset-time-based-timetable"], - ) as dag7: - BashOperator( - outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")], - task_id="consuming_dataset_time_based", - bash_command="sleep 5", - ) + + + @dag( + schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]) + # Additional arguments here, replace this comment with actual arguments + ) + def example_dag(): + # DAG tasks go here + pass In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``. @@ -221,14 +213,16 @@ Combining conditional dataset expressions with time-based schedules enhances sch from airflow.timetables import DatasetOrTimeSchedule from airflow.timetables.trigger import CronTriggerTimetable - with DAG( - # Conditional dataset and time based timetable + + @dag( schedule=DatasetOrTimeSchedule( timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset) - ), - ... - ): - ... + ) + # Additional arguments here, replace this comment with actual arguments + ) + def example_dag(): + # DAG tasks go here + pass Timetables comparisons From 6b2fdab6e91e50871327a9219ee6b1122cea5c85 Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 13:01:37 +0530 Subject: [PATCH 06/11] fix the typing and PR comments --- airflow/datasets/__init__.py | 22 +++++++++++----------- airflow/models/dag.py | 4 +++- tests/datasets/test_dataset.py | 8 ++++---- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index fee2b02cf4498..4ba7bc80e557a 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -73,10 +73,10 @@ def __eq__(self, other: Any) -> bool: def __hash__(self): return hash(self.uri) - def __or__(self, other: Dataset): + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: return DatasetAny(self, other) - def __and__(self, other: Dataset): + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: return DatasetAll(self, other) def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: @@ -112,16 +112,16 @@ class DatasetAny(_DatasetBooleanCondition): agg_func = any - def __init__(self, *objects: Dataset | DatasetAny | DatasetAll) -> None: + def __init__(self, *objects: BaseDatasetEventInput) -> None: """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" super().__init__(*objects) - def __or__(self, other): + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: if isinstance(other, (Dataset, DatasetAny, DatasetAll)): return DatasetAny(*self.objects, other) return NotImplemented - def __and__(self, other): + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: if isinstance(other, (Dataset, DatasetAny, DatasetAll)): return DatasetAll(self, other) return NotImplemented @@ -135,16 +135,16 @@ class DatasetAll(_DatasetBooleanCondition): agg_func = all - def __init__(self, *objects: Dataset | DatasetAny | DatasetAll): + def __init__(self, *objects: BaseDatasetEventInput): """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" super().__init__(*objects) - def __or__(self, other): + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: if isinstance(other, (Dataset, DatasetAny, DatasetAll)): return DatasetAny(self, other) return NotImplemented - def __and__(self, other): + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: if isinstance(other, (Dataset, DatasetAny, DatasetAll)): return DatasetAll(*self.objects, other) return NotImplemented @@ -167,10 +167,10 @@ def __init__(self, value, left=None, right=None) -> None: self.left = left self.right = right - def __or__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + def __or__(self, other: BaseDatasetEventInput | DatasetsExpression) -> DatasetsExpression: return DatasetsExpression("|", self, other) - def __and__(self, other: Dataset | DatasetsExpression) -> DatasetsExpression: + def __and__(self, other: BaseDatasetEventInput | DatasetsExpression) -> DatasetsExpression: return DatasetsExpression("&", self, other) def __repr__(self) -> str: @@ -181,7 +181,7 @@ def __repr__(self) -> str: elif self.value == "|": return repr(DatasetAny(self.left, self.right)) else: - return f"Invalid DatasetsExpression(value={self.value})" + raise ValueError(f"Invalid DatasetsExpression(value={self.value})") def extract_datasets( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8943231628179..30da08b2e0e58 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -174,7 +174,9 @@ # but Mypy cannot handle that right now. Track progress of PEP 661 for progress. # See also: https://discuss.python.org/t/9126/7 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] -ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, DatasetsExpression, Collection["Dataset"]] +ScheduleArg = Union[ + ArgNotSet, ScheduleInterval, Timetable, DatasetsExpression, BaseDatasetEventInput, Collection["Dataset"] +] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 1faae59d98cd0..c4ed99100221a 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -23,7 +23,7 @@ import pytest from sqlalchemy.sql import select -from airflow.datasets import Dataset, DatasetAll, DatasetAny, extract_datasets +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny, extract_datasets from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator @@ -271,14 +271,14 @@ def test_dag_with_complex_dataset_triggers(session, dag_maker): ), "Serialized 'dataset_triggers' should be a dict" -def datasets_equal(d1, d2): +def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) -> bool: if type(d1) != type(d2): return False - if isinstance(d1, Dataset): + if isinstance(d1, Dataset) and isinstance(d2, Dataset): return d1.uri == d2.uri - elif isinstance(d1, (DatasetAny, DatasetAll)): + elif isinstance(d1, (DatasetAny, DatasetAll)) and isinstance(d2, (DatasetAny, DatasetAll)): if len(d1.objects) != len(d2.objects): return False From f40f6cd88194f2c99830643daefca80e85c2c7fb Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:25:21 +0530 Subject: [PATCH 07/11] revert back the scheduler changes --- airflow/jobs/scheduler_job_runner.py | 114 +++++++++++++-------------- 1 file changed, 56 insertions(+), 58 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 5dd2a65e27be8..32cc9f5a634ab 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -31,7 +31,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator -from sqlalchemy import and_, delete, exists, func, literal_column, not_, or_, select, text, update +from sqlalchemy import and_, delete, func, not_, or_, select, text, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -1215,6 +1215,13 @@ def _create_dag_runs_dataset_triggered( dag_id: timezone.coerce_datetime(last_time) for dag_id, (_, last_time) in dataset_triggered_dag_info.items() } + existing_dagruns: set[tuple[str, timezone.DateTime]] = set( + session.execute( + select(DagRun.dag_id, DagRun.execution_date).where( + tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items()) + ) + ) + ) for dag_model in dag_models: dag = self.dagbag.get_dag(dag_model.dag_id, session=session) @@ -1240,68 +1247,59 @@ def _create_dag_runs_dataset_triggered( # create a new one. This is so that in the next Scheduling loop we try to create new runs # instead of falling in a loop of Integrity Error. exec_date = exec_dates[dag.dag_id] - curr_date_query = select( - exists( - select(literal_column("1")) - .where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == exec_date) + if (dag.dag_id, exec_date) not in existing_dagruns: + previous_dag_run = session.scalar( + select(DagRun) + .where( + DagRun.dag_id == dag.dag_id, + DagRun.execution_date < exec_date, + DagRun.run_type == DagRunType.DATASET_TRIGGERED, + ) + .order_by(DagRun.execution_date.desc()) .limit(1) ) - ) - if session.scalar(curr_date_query): # dag already exists - continue + dataset_event_filters = [ + DagScheduleDatasetReference.dag_id == dag.dag_id, + DatasetEvent.timestamp <= exec_date, + ] + if previous_dag_run: + dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) + + dataset_events = session.scalars( + select(DatasetEvent) + .join( + DagScheduleDatasetReference, + DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id, + ) + .join(DatasetEvent.source_dag_run) + .where(*dataset_event_filters) + ).all() + + data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) + run_id = dag.timetable.generate_run_id( + run_type=DagRunType.DATASET_TRIGGERED, + logical_date=exec_date, + data_interval=data_interval, + session=session, + events=dataset_events, + ) - prev_exec_date = session.scalar( - select(DagRun.execution_date) - .where( - DagRun.dag_id == dag.dag_id, - DagRun.execution_date < exec_date, - DagRun.run_type == DagRunType.DATASET_TRIGGERED, + dag_run = dag.create_dagrun( + run_id=run_id, + run_type=DagRunType.DATASET_TRIGGERED, + execution_date=exec_date, + data_interval=data_interval, + state=DagRunState.QUEUED, + external_trigger=False, + session=session, + dag_hash=dag_hash, + creating_job_id=self.job.id, ) - .order_by(DagRun.execution_date.desc()) - .limit(1) - ) - dataset_event_filters = [ - DagScheduleDatasetReference.dag_id == dag.dag_id, - DatasetEvent.timestamp <= exec_date, - ] - if prev_exec_date: - dataset_event_filters.append(DatasetEvent.timestamp > prev_exec_date) - - dataset_events = session.scalars( - select(DatasetEvent) - .join( - DagScheduleDatasetReference, - DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id, + Stats.incr("dataset.triggered_dagruns") + dag_run.consumed_dataset_events.extend(dataset_events) + session.execute( + delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) - .join(DatasetEvent.source_dag_run) - .where(*dataset_event_filters) - ).all() - - data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) - run_id = dag.timetable.generate_run_id( - run_type=DagRunType.DATASET_TRIGGERED, - logical_date=exec_date, - data_interval=data_interval, - session=session, - events=dataset_events, - ) - - dag_run = dag.create_dagrun( - run_id=run_id, - run_type=DagRunType.DATASET_TRIGGERED, - execution_date=exec_date, - data_interval=data_interval, - state=DagRunState.QUEUED, - external_trigger=False, - session=session, - dag_hash=dag_hash, - creating_job_id=self.job.id, - ) - Stats.incr("dataset.triggered_dagruns") - dag_run.consumed_dataset_events.extend(dataset_events) - session.execute( - delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) - ) def _should_update_dag_next_dagruns( self, From fd11581b4663878e69235a8d0b0758e75019c2ac Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 15:37:30 +0530 Subject: [PATCH 08/11] use Dataset, DatasetAll, and DatasetAny directly for dataset expression --- airflow/datasets/__init__.py | 49 ---------------------------------- airflow/models/dag.py | 18 +++---------- tests/datasets/test_dataset.py | 5 ++-- 3 files changed, 6 insertions(+), 66 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 4ba7bc80e557a..11f95474622e2 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -151,52 +151,3 @@ def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: def __repr__(self) -> str: return f"DatasetAll({', '.join(map(str, self.objects))})" - - -class DatasetsExpression: - """ - Represents a node in an expression tree for dataset conditions. - - :param value: The value of the node, which can be a 'Dataset', '&', or '|'. - :param left: The left child node. - :param right: The right child node. - """ - - def __init__(self, value, left=None, right=None) -> None: - self.value = value # value can be 'Dataset', '&', or '|' - self.left = left - self.right = right - - def __or__(self, other: BaseDatasetEventInput | DatasetsExpression) -> DatasetsExpression: - return DatasetsExpression("|", self, other) - - def __and__(self, other: BaseDatasetEventInput | DatasetsExpression) -> DatasetsExpression: - return DatasetsExpression("&", self, other) - - def __repr__(self) -> str: - if isinstance(self.value, Dataset): - return f"Dataset(uri='{self.value.uri}')" - elif self.value == "&": - return repr(DatasetAll(self.left, self.right)) - elif self.value == "|": - return repr(DatasetAny(self.left, self.right)) - else: - raise ValueError(f"Invalid DatasetsExpression(value={self.value})") - - -def extract_datasets( - dataset_expression: DatasetsExpression | Dataset, -) -> BaseDatasetEventInput: - """ - Extract datasets from the given DatasetsExpression. - - :param dataset_expression: The DatasetsExpression to extract from. - """ - if not isinstance(dataset_expression, DatasetsExpression): - return dataset_expression - - if dataset_expression.value == "&": - return DatasetAll(dataset_expression.left, dataset_expression.right) - elif dataset_expression.value == "|": - return DatasetAny(dataset_expression.left, dataset_expression.right) - raise ValueError("Invalid Expression node value") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 30da08b2e0e58..688faa648d589 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -80,13 +80,7 @@ from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf as airflow_conf, secrets_backend_list -from airflow.datasets import ( - BaseDatasetEventInput, - Dataset, - DatasetAll, - DatasetsExpression, - extract_datasets, -) +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll from airflow.datasets.manager import dataset_manager from airflow.exceptions import ( AirflowDagInconsistent, @@ -174,9 +168,7 @@ # but Mypy cannot handle that right now. Track progress of PEP 661 for progress. # See also: https://discuss.python.org/t/9126/7 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] -ScheduleArg = Union[ - ArgNotSet, ScheduleInterval, Timetable, DatasetsExpression, BaseDatasetEventInput, Collection["Dataset"] -] +ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, BaseDatasetEventInput, Collection["Dataset"]] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] @@ -589,9 +581,7 @@ def __init__( self.timetable: Timetable self.schedule_interval: ScheduleInterval self.dataset_triggers: BaseDatasetEventInput | None = None - if isinstance(schedule, DatasetsExpression): - self.dataset_triggers = extract_datasets(dataset_expression=schedule) - elif isinstance(schedule, BaseDatasetEventInput): + if isinstance(schedule, BaseDatasetEventInput): self.dataset_triggers = schedule elif isinstance(schedule, Collection) and not isinstance(schedule, str): if not all(isinstance(x, Dataset) for x in schedule): @@ -599,7 +589,7 @@ def __init__( self.dataset_triggers = DatasetAll(*schedule) elif isinstance(schedule, Timetable): timetable = schedule - elif schedule is not NOTSET and not isinstance(schedule, DatasetsExpression): + elif schedule is not NOTSET and not isinstance(schedule, BaseDatasetEventInput): schedule_interval = schedule if isinstance(schedule, DatasetOrTimeSchedule): diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index c4ed99100221a..96cfb3a3abcb7 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -23,7 +23,7 @@ import pytest from sqlalchemy.sql import select -from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny, extract_datasets +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator @@ -334,5 +334,4 @@ def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) -> bool @pytest.mark.parametrize("expression, expected", test_cases) def test_extract_datasets(expression, expected): expr = expression() - result = extract_datasets(expr) - assert datasets_equal(result, expected) + assert datasets_equal(expr, expected) From afc20df63cf7d263491282982a8eb020e33d8f9a Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Fri, 23 Feb 2024 16:02:05 +0530 Subject: [PATCH 09/11] add the test for Dataset expressions --- tests/datasets/test_dataset.py | 51 +++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 96cfb3a3abcb7..ba29e41361b86 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -89,6 +89,55 @@ def test_hash(): hash(dataset) +def test_dataset_logic_operations(): + result_or = dataset1 | dataset2 + assert isinstance(result_or, DatasetAny) + result_and = dataset1 & dataset2 + assert isinstance(result_and, DatasetAll) + + +def test_dataset_iter_datasets(): + assert list(dataset1.iter_datasets()) == [("s3://bucket1/data1", dataset1)] + + +def test_dataset_evaluate(): + assert dataset1.evaluate({"s3://bucket1/data1": True}) is True + assert dataset1.evaluate({"s3://bucket1/data1": False}) is False + + +def test_dataset_any_operations(): + result_or = (dataset1 | dataset2) | dataset3 + assert isinstance(result_or, DatasetAny) + assert len(result_or.objects) == 3 + result_and = (dataset1 | dataset2) & dataset3 + assert isinstance(result_and, DatasetAll) + + +def test_dataset_all_operations(): + result_or = (dataset1 & dataset2) | dataset3 + assert isinstance(result_or, DatasetAny) + result_and = (dataset1 & dataset2) & dataset3 + assert isinstance(result_and, DatasetAll) + + +def test_datasetbooleancondition_evaluate_iter(): + """ + Tests _DatasetBooleanCondition's evaluate and iter_datasets methods through DatasetAny and DatasetAll. + Ensures DatasetAny evaluate returns True with any true condition, DatasetAll evaluate returns False if + any condition is false, and both classes correctly iterate over datasets without duplication. + """ + any_condition = DatasetAny(dataset1, dataset2) + all_condition = DatasetAll(dataset1, dataset2) + assert any_condition.evaluate({"s3://bucket1/data1": False, "s3://bucket2/data2": True}) is True + assert all_condition.evaluate({"s3://bucket1/data1": True, "s3://bucket2/data2": False}) is False + + # Testing iter_datasets indirectly through the subclasses + datasets_any = set(any_condition.iter_datasets()) + datasets_all = set(all_condition.iter_datasets()) + assert datasets_any == {("s3://bucket1/data1", dataset1), ("s3://bucket2/data2", dataset2)} + assert datasets_all == {("s3://bucket1/data1", dataset1), ("s3://bucket2/data2", dataset2)} + + @pytest.mark.parametrize( "inputs, scenario, expected", [ @@ -332,6 +381,6 @@ def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) -> bool @pytest.mark.parametrize("expression, expected", test_cases) -def test_extract_datasets(expression, expected): +def test_evaluate_datasets_expression(expression, expected): expr = expression() assert datasets_equal(expr, expected) From 2d670847d74790eee482c2f684a8ab0e0ddb0bec Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 26 Feb 2024 13:59:32 +0800 Subject: [PATCH 10/11] Remove some unneeded checks --- airflow/datasets/__init__.py | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 11f95474622e2..b247e63470a76 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -33,6 +33,12 @@ class BaseDatasetEventInput(Protocol): :meta private: """ + def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: + return DatasetAny(self, other) + + def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: + return DatasetAll(self, other) + def evaluate(self, statuses: dict[str, bool]) -> bool: raise NotImplementedError @@ -73,12 +79,6 @@ def __eq__(self, other: Any) -> bool: def __hash__(self): return hash(self.uri) - def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: - return DatasetAny(self, other) - - def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: - return DatasetAll(self, other) - def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: yield self.uri, self @@ -117,14 +117,8 @@ def __init__(self, *objects: BaseDatasetEventInput) -> None: super().__init__(*objects) def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAny(*self.objects, other) - return NotImplemented - - def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAll(self, other) - return NotImplemented + # Optimization: X | (Y | Z) is equivalent to X | Y | Z. + return DatasetAny(*self.objects, other) def __repr__(self) -> str: return f"DatasetAny({', '.join(map(str, self.objects))})" @@ -139,15 +133,9 @@ def __init__(self, *objects: BaseDatasetEventInput): """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" super().__init__(*objects) - def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAny(self, other) - return NotImplemented - def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: - if isinstance(other, (Dataset, DatasetAny, DatasetAll)): - return DatasetAll(*self.objects, other) - return NotImplemented + # Optimization: X & (Y & Z) is equivalent to X & Y & Z. + return DatasetAll(*self.objects, other) def __repr__(self) -> str: return f"DatasetAll({', '.join(map(str, self.objects))})" From cc6fee3c17b1e3a8cf32d6b848b1a259c80bff17 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 26 Feb 2024 14:16:19 +0800 Subject: [PATCH 11/11] Remove unneeded __init__ --- airflow/datasets/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index b247e63470a76..953333c6596b7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -112,10 +112,6 @@ class DatasetAny(_DatasetBooleanCondition): agg_func = any - def __init__(self, *objects: BaseDatasetEventInput) -> None: - """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" - super().__init__(*objects) - def __or__(self, other: BaseDatasetEventInput) -> DatasetAny: # Optimization: X | (Y | Z) is equivalent to X | Y | Z. return DatasetAny(*self.objects, other) @@ -129,10 +125,6 @@ class DatasetAll(_DatasetBooleanCondition): agg_func = all - def __init__(self, *objects: BaseDatasetEventInput): - """Initialize with one or more Dataset, DatasetAny, or DatasetAll instances.""" - super().__init__(*objects) - def __and__(self, other: BaseDatasetEventInput) -> DatasetAll: # Optimization: X & (Y & Z) is equivalent to X & Y & Z. return DatasetAll(*self.objects, other)