From 9ff606dc080c47d98aabeaeb976cc55882328293 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 14 Aug 2024 09:15:15 +0800 Subject: [PATCH 1/3] Unify DAG schedule args and change default to None The arguments 'schedule_interval' and 'timetable' are removed from both the DAG class and the `@dag` decorator. The default value of the 'schedule' argument (on both entities) is changed to None (i.e. a DAG will not have a schedule by default). The 'timetable' attribute still exists on DAG, and is now the only value that reflects the DAG's schedule. The 'schedule_interval' attribute is removed from DAG. The 'schedule_interval' on DagModel used to store a string representation of DAG's attribute of the same name, is now replaced by timetable_summary, which should (mostly?) work the same as before. We can fix minor UI differences as we go. Some use cases that rely on that field are also changed to use other fields instead (dataset_expression, for example, can be used to check whether a DAG is dataset-triggered). The API field 'schedule_interval' has also been removed since that field no longer exists in the database. This has some side effects. The field previously contains some type information for delta values, but now becomes a simple string. It is unclear if anyone really needs the information, but we can always bring it back if needed. --- airflow/api_connexion/openapi/v1.yaml | 25 +-- .../api_connexion/schemas/common_schema.py | 37 ---- airflow/api_connexion/schemas/dag_schema.py | 4 +- airflow/cli/commands/dag_command.py | 2 +- airflow/config_templates/config.yml | 2 +- airflow/exceptions.py | 4 - ..._rename_schedule_interval_to_timetable_.py | 60 +++++++ airflow/models/baseoperator.py | 7 +- airflow/models/dag.py | 159 ++++------------- airflow/serialization/pydantic/dag.py | 36 +--- airflow/serialization/schema.json | 8 - airflow/serialization/serialized_objects.py | 21 +-- .../ti_deps/deps/runnable_exec_date_dep.py | 2 +- airflow/timetables/base.py | 26 +-- airflow/timetables/datasets.py | 2 +- airflow/utils/db.py | 2 +- airflow/utils/sqlalchemy.py | 46 ----- airflow/www/static/js/dag/details/dag/Dag.tsx | 17 +- airflow/www/static/js/types/api-generated.ts | 21 +-- airflow/www/templates/airflow/dag.html | 6 +- airflow/www/templates/airflow/dags.html | 2 +- airflow/www/views.py | 19 +- .../apprise_notifier_howto_guide.rst | 2 +- .../smtp_notifier_howto_guide.rst | 2 +- docs/apache-airflow/howto/notifications.rst | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 18 +- docs/apache-airflow/migrations-ref.rst | 4 +- docs/apache-airflow/tutorial/pipeline.rst | 2 +- newsfragments/24842.significant.rst | 6 + newsfragments/41453.significant.rst | 7 + .../endpoints/test_dag_endpoint.py | 165 +++++------------- .../schemas/test_common_schema.py | 60 ------- .../api_connexion/schemas/test_dag_schema.py | 10 +- tests/conftest.py | 9 +- tests/dags/test_invalid_cron.py | 6 +- tests/jobs/test_scheduler_job.py | 14 +- tests/models/test_dag.py | 101 ++--------- tests/models/test_dagrun.py | 6 +- tests/models/test_taskinstance.py | 47 +++-- .../apache/druid/operators/test_druid.py | 7 +- .../fab/auth_manager/test_security.py | 4 +- tests/serialization/test_dag_serialization.py | 41 +---- tests/serialization/test_pydantic_models.py | 8 +- .../serialization/test_serialized_objects.py | 4 +- tests/timetables/test_interval_timetable.py | 2 +- 46 files changed, 289 insertions(+), 748 deletions(-) create mode 100644 airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py create mode 100644 newsfragments/24842.significant.rst create mode 100644 newsfragments/41453.significant.rst diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index a92499fc81e49..24b9c1be0d339 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3196,14 +3196,20 @@ components: description: > User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents. - schedule_interval: - $ref: "#/components/schemas/ScheduleInterval" + timetable_summary: + type: string + readOnly: true + nullable: true + description: | + Timetable summary. + + *New in version 3.0.0* timetable_description: type: string readOnly: true nullable: true description: | - Timetable/Schedule Interval description. + Timetable description. *New in version 2.3.0* tags: @@ -5190,19 +5196,6 @@ components: *New in version 3.0.0* # Common data type - ScheduleInterval: - description: | - Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's - execution_date to figure out the next schedule. - nullable: true - readOnly: true - anyOf: - - $ref: "#/components/schemas/TimeDelta" - - $ref: "#/components/schemas/RelativeDelta" - - $ref: "#/components/schemas/CronExpression" - discriminator: - propertyName: __type - TimeDelta: description: Time delta type: object diff --git a/airflow/api_connexion/schemas/common_schema.py b/airflow/api_connexion/schemas/common_schema.py index e91c7b23d8855..569a745a62f52 100644 --- a/airflow/api_connexion/schemas/common_schema.py +++ b/airflow/api_connexion/schemas/common_schema.py @@ -24,7 +24,6 @@ import marshmallow from dateutil import relativedelta from marshmallow import Schema, fields, validate -from marshmallow_oneofschema import OneOfSchema from airflow.models.mappedoperator import MappedOperator from airflow.serialization.serialized_objects import SerializedBaseOperator @@ -90,42 +89,6 @@ def make_cron_expression(self, data, **kwargs): return CronExpression(data["value"]) -class ScheduleIntervalSchema(OneOfSchema): - """ - Schedule interval. - - It supports the following types: - - * TimeDelta - * RelativeDelta - * CronExpression - """ - - type_field = "__type" - type_schemas = { - "TimeDelta": TimeDeltaSchema, - "RelativeDelta": RelativeDeltaSchema, - "CronExpression": CronExpressionSchema, - } - - def _dump(self, obj, update_fields=True, **kwargs): - if isinstance(obj, str): - obj = CronExpression(obj) - - return super()._dump(obj, update_fields=update_fields, **kwargs) - - def get_obj_type(self, obj): - """Select schema based on object type.""" - if isinstance(obj, datetime.timedelta): - return "TimeDelta" - elif isinstance(obj, relativedelta.relativedelta): - return "RelativeDelta" - elif isinstance(obj, CronExpression): - return "CronExpression" - else: - raise TypeError(f"Unknown object type: {obj.__class__.__name__}") - - class ColorField(fields.String): """Schema for color property.""" diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index 32eca2f0b8903..61046a0f4ab8e 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -22,7 +22,7 @@ from marshmallow import Schema, fields from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field -from airflow.api_connexion.schemas.common_schema import ScheduleIntervalSchema, TimeDeltaSchema, TimezoneField +from airflow.api_connexion.schemas.common_schema import TimeDeltaSchema, TimezoneField from airflow.configuration import conf from airflow.models.dag import DagModel, DagTag @@ -63,7 +63,7 @@ class Meta: file_token = fields.Method("get_token", dump_only=True) owners = fields.Method("get_owners", dump_only=True) description = auto_field(dump_only=True) - schedule_interval = fields.Nested(ScheduleIntervalSchema) + timetable_summary = auto_field(dump_only=True) timetable_description = auto_field(dump_only=True) tags = fields.List(fields.Nested(DagTagSchema), dump_only=True) max_active_tasks = auto_field(dump_only=True) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 2f300ebef2144..d89be8b589f73 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -345,7 +345,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict: "file_token": None, "owners": dag.owner, "description": dag.description, - "schedule_interval": dag.schedule_interval, + "timetable_summary": dag.timetable.summary, "timetable_description": dag.timetable.description, "tags": dag.tags, "max_active_tasks": dag.max_active_tasks, diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 70859aebcd594..a458105d28798 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2521,7 +2521,7 @@ scheduler: allow_trigger_in_future: description: | Allow externally triggered DagRuns for Execution Dates in the future - Only has effect if schedule_interval is set to None in DAG + Only has effect if schedule is set to None in DAG version_added: 1.10.8 type: boolean example: ~ diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 3831d909fc272..55dd02fdae313 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -199,10 +199,6 @@ def __str__(self) -> str: return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" -class AirflowDagInconsistent(AirflowException): - """Raise when a DAG has inconsistent attributes.""" - - class AirflowClusterPolicyViolation(AirflowException): """Raise when there is a violation of a Cluster Policy in DAG definition.""" diff --git a/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py b/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py new file mode 100644 index 0000000000000..b4434a65f3f48 --- /dev/null +++ b/airflow/migrations/versions/0004_3_0_0_rename_schedule_interval_to_timetable_.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Rename DagModel schedule_interval to timetable_summary. + +Revision ID: 0bfc26bc256e +Revises: d0f1c55954fa +Create Date: 2024-08-15 06:24:14.363316 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0bfc26bc256e" +down_revision = "d0f1c55954fa" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Rename DagModel schedule_interval to timetable_summary.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "schedule_interval", + new_column_name="timetable_summary", + type_=sa.Text, + nullable=True, + ) + + +def downgrade(): + """Rename timetable_summary back to schedule_interval.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "timetable_summary", + new_column_name="schedule_interval", + type_=sa.Text, + nullable=True, + ) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index ea100cd4e2abf..99ea294e1ec55 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -50,7 +50,6 @@ import attr import pendulum -from dateutil.relativedelta import relativedelta from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound @@ -120,8 +119,6 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.types import ArgNotSet -ScheduleInterval = Union[str, timedelta, relativedelta] - TaskPreExecuteHook = Callable[[Context], None] TaskPostExecuteHook = Callable[[Context, Any], None] @@ -612,10 +609,10 @@ class derived from this one results in the creation of a task object, :param start_date: The ``start_date`` for the task, determines the ``execution_date`` for the first task instance. The best practice is to have the start_date rounded - to your DAG's ``schedule_interval``. Daily jobs have their start_date + to your DAG's schedule. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest - ``execution_date`` and adds the ``schedule_interval`` to determine + ``execution_date`` and adds the schedule to determine the next ``execution_date``. It is also very important to note that different tasks' dependencies need to line up in time. If task A depends on task B and their diff --git a/airflow/models/dag.py b/airflow/models/dag.py index bdc508951a19c..4d8a73199992c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -86,7 +86,6 @@ from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll from airflow.datasets.manager import dataset_manager from airflow.exceptions import ( - AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, FailStopDagInvalidTriggerRule, @@ -134,19 +133,13 @@ from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle from airflow.utils.decorators import fixup_decorator_warning_stack -from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key +from airflow.utils.helpers import exactly_one, validate_instance_args, validate_key from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import ( - Interval, - UtcDateTime, - lock_rows, - tuple_in_condition, - with_row_locks, -) +from airflow.utils.sqlalchemy import UtcDateTime, lock_rows, tuple_in_condition, with_row_locks from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType +from airflow.utils.types import NOTSET, DagRunType, EdgeInfoType if TYPE_CHECKING: from types import ModuleType @@ -174,20 +167,15 @@ DagStateChangeCallback = Callable[[Context], None] ScheduleInterval = Union[None, str, timedelta, relativedelta] -# FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval], -# but Mypy cannot handle that right now. Track progress of PEP 661 for progress. -# See also: https://discuss.python.org/t/9126/7 -ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval] ScheduleArg = Union[ - ArgNotSet, ScheduleInterval, Timetable, BaseDataset, Collection[Union["Dataset", "DatasetAlias"]] + ScheduleInterval, + Timetable, + BaseDataset, + Collection[Union["Dataset", "DatasetAlias"]], ] SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] -# Backward compatibility: If neither schedule_interval nor timetable is -# *provided by the user*, default to a one-day interval. -DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1) - class InconsistentDataInterval(AirflowException): """ @@ -228,10 +216,8 @@ def _get_model_data_interval( return DataInterval(start, end) -def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable: - """Create a Timetable instance from a ``schedule_interval`` argument.""" - if interval is NOTSET: - return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL) +def create_timetable(interval: ScheduleInterval, timezone: Timezone | FixedTimezone) -> Timetable: + """Create a Timetable instance from a plain ``schedule`` value.""" if interval is None: return NullTimetable() if interval == "@once": @@ -245,7 +231,7 @@ def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTi return CronDataIntervalTimetable(interval, timezone) else: return CronTriggerTimetable(interval, timezone=timezone) - raise ValueError(f"{interval!r} is not a valid schedule_interval.") + raise ValueError(f"{interval!r} is not a valid schedule.") def get_last_dagrun(dag_id, session, include_externally_triggered=False): @@ -398,21 +384,22 @@ class DAG(LoggingMixin): The *schedule* argument to specify either time-based scheduling logic (timetable), or dataset-driven triggers. - .. deprecated:: 2.4 - The arguments *schedule_interval* and *timetable*. Their functionalities - are merged into the new *schedule* argument. + .. versionchanged:: 3.0 + The default value of *schedule* has been changed to *None* (no schedule). + The previous default was ``timedelta(days=1)``. :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :param description: The description for the DAG to e.g. be shown on the webserver - :param schedule: Defines the rules according to which DAG runs are scheduled. Can - accept cron string, timedelta object, Timetable, or list of Dataset objects. - If this is not provided, the DAG will be set to the default - schedule ``timedelta(days=1)``. See also :doc:`/howto/timetable`. + :param schedule: If provided, this defines the rules according to which DAG + runs are scheduled. Possible values include a cron expression string, + timedelta object, Timetable, or list of Dataset objects. + See also :doc:`/howto/timetable`. :param start_date: The timestamp from which the scheduler will - attempt to backfill + attempt to backfill. If this is not provided, backfilling must be done + manually with an explicit time range. :param end_date: A date beyond which your DAG won't run, leave to None - for open-ended scheduling + for open-ended scheduling. :param template_searchpath: This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by @@ -500,7 +487,6 @@ class DAG(LoggingMixin): "task_ids", "start_date", "end_date", - "schedule_interval", "fileloc", "template_searchpath", "last_loaded", @@ -522,9 +508,7 @@ def __init__( self, dag_id: str, description: str | None = None, - schedule: ScheduleArg = NOTSET, - schedule_interval: ScheduleIntervalArg = NOTSET, - timetable: Timetable | None = None, + schedule: ScheduleArg = None, start_date: datetime | None = None, end_date: datetime | None = None, full_filepath: str | None = None, @@ -636,63 +620,20 @@ def __init__( if "end_date" in self.default_args: self.default_args["end_date"] = timezone.convert_to_utc(self.default_args["end_date"]) - # sort out DAG's scheduling behavior - scheduling_args = [schedule_interval, timetable, schedule] - - has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args) - has_empty_start_date = not ("start_date" in self.default_args or self.start_date) - - if has_scheduling_args and has_empty_start_date: - raise ValueError("DAG is missing the start_date parameter") - - if not at_most_one(*scheduling_args): - raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.") - if schedule_interval is not NOTSET: - warnings.warn( - "Param `schedule_interval` is deprecated and will be removed in a future release. " - "Please use `schedule` instead. ", - RemovedInAirflow3Warning, - stacklevel=2, - ) - if timetable is not None: - warnings.warn( - "Param `timetable` is deprecated and will be removed in a future release. " - "Please use `schedule` instead. ", - RemovedInAirflow3Warning, - stacklevel=2, - ) - - if timetable is not None: - schedule = timetable - elif schedule_interval is not NOTSET: - schedule = schedule_interval - - # Kept for compatibility. Do not use in new code. - self.schedule_interval: ScheduleInterval - if isinstance(schedule, Timetable): self.timetable = schedule - self.schedule_interval = schedule.summary elif isinstance(schedule, BaseDataset): self.timetable = DatasetTriggeredTimetable(schedule) - self.schedule_interval = self.timetable.summary elif isinstance(schedule, Collection) and not isinstance(schedule, str): if not all(isinstance(x, (Dataset, DatasetAlias)) for x in schedule): raise ValueError("All elements in 'schedule' should be datasets or dataset aliases") self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule)) - self.schedule_interval = self.timetable.summary - elif isinstance(schedule, ArgNotSet): - warnings.warn( - "Creating a DAG with an implicit schedule is deprecated, and will stop working " - "in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - self.timetable = create_timetable(schedule, self.timezone) - self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL else: self.timetable = create_timetable(schedule, self.timezone) - self.schedule_interval = schedule + + requires_automatic_backfilling = self.timetable.can_be_scheduled and catchup + if requires_automatic_backfilling and not ("start_date" in self.default_args or self.start_date): + raise ValueError("start_date is required when catchup=True") if isinstance(template_searchpath, str): template_searchpath = [template_searchpath] @@ -796,46 +737,12 @@ def get_doc_md(self, doc_md: str | None) -> str | None: return doc_md - def _check_schedule_interval_matches_timetable(self) -> bool: - """ - Check ``schedule_interval`` and ``timetable`` match. - - This is done as a part of the DAG validation done before it's bagged, to - guard against the DAG's ``timetable`` (or ``schedule_interval``) from - being changed after it's created, e.g. - - .. code-block:: python - - dag1 = DAG("d1", timetable=MyTimetable()) - dag1.schedule_interval = "@once" - - dag2 = DAG("d2", schedule="@once") - dag2.timetable = MyTimetable() - - Validation is done by creating a timetable and check its summary matches - ``schedule_interval``. The logic is not bullet-proof, especially if a - custom timetable does not provide a useful ``summary``. But this is the - best we can do. - """ - if self.schedule_interval == self.timetable.summary: - return True - try: - timetable = create_timetable(self.schedule_interval, self.timezone) - except ValueError: - return False - return timetable.summary == self.timetable.summary - def validate(self): """ Validate the DAG has a coherent setup. This is called by the DAG bag before bagging the DAG. """ - if not self._check_schedule_interval_matches_timetable(): - raise AirflowDagInconsistent( - f"inconsistent schedule: timetable {self.timetable.summary!r} " - f"does not match schedule_interval {self.schedule_interval!r}", - ) self.validate_executor_field() self.validate_schedule_and_params() self.timetable.validate() @@ -2942,7 +2849,7 @@ def bulk_write_to_db( t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks ) - orm_dag.schedule_interval = dag.schedule_interval + orm_dag.timetable_summary = dag.timetable.summary orm_dag.timetable_description = dag.timetable.description orm_dag.dataset_expression = dag.timetable.dataset_condition.as_expression() @@ -3448,9 +3355,9 @@ class DagModel(Base): description = Column(Text) # Default view of the DAG inside the webserver default_view = Column(String(25)) - # Schedule interval - schedule_interval = Column(Interval) - # Timetable/Schedule Interval description + # Timetable summary + timetable_summary = Column(Text, nullable=True) + # Timetable description timetable_description = Column(String(1000), nullable=True) # Dataset expression based on dataset triggers dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) @@ -3795,7 +3702,7 @@ def calculate_dagrun_date_fields( @provide_session def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None: - if self.schedule_interval != "Dataset": + if self.dataset_expression is None: return None return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id] @@ -3805,9 +3712,7 @@ def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[st def dag( dag_id: str = "", description: str | None = None, - schedule: ScheduleArg = NOTSET, - schedule_interval: ScheduleIntervalArg = NOTSET, - timetable: Timetable | None = None, + schedule: ScheduleArg = None, start_date: datetime | None = None, end_date: datetime | None = None, full_filepath: str | None = None, @@ -3864,8 +3769,6 @@ def factory(*args, **kwargs): with DAG( dag_id or f.__name__, description=description, - schedule_interval=schedule_interval, - timetable=timetable, start_date=start_date, end_date=end_date, full_filepath=full_filepath, diff --git a/airflow/serialization/pydantic/dag.py b/airflow/serialization/pydantic/dag.py index a1fea6384aade..b916a0e580efd 100644 --- a/airflow/serialization/pydantic/dag.py +++ b/airflow/serialization/pydantic/dag.py @@ -17,10 +17,9 @@ from __future__ import annotations import pathlib -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, List, Optional -from dateutil import relativedelta from typing_extensions import Annotated from airflow import DAG, settings @@ -32,37 +31,6 @@ PlainValidator, ValidationInfo, ) -from airflow.utils.sqlalchemy import Interval - - -def serialize_interval(value: Interval) -> Interval: - interval = Interval() - return interval.process_bind_param(value, None) - - -def validate_interval(value: Interval | Any, _info: ValidationInfo) -> Any: - if ( - isinstance(value, Interval) - or isinstance(value, timedelta) - or isinstance(value, relativedelta.relativedelta) - ): - return value - interval = Interval() - try: - return interval.process_result_value(value, None) - except ValueError as e: - # Interval may be provided in string format (cron), - # so it must be returned as valid value. - if isinstance(value, str): - return value - raise e - - -PydanticInterval = Annotated[ - Interval, - PlainValidator(validate_interval), - PlainSerializer(serialize_interval, return_type=Interval), -] def serialize_operator(x: DAG) -> dict: @@ -121,7 +89,7 @@ class DagModelPydantic(BaseModelPydantic): owners: Optional[str] description: Optional[str] default_view: Optional[str] - schedule_interval: Optional[PydanticInterval] + timetable_summary: Optional[str] timetable_description: Optional[str] tags: List[DagTagPydantic] # noqa: UP006 dag_owner_links: List[DagOwnerAttributesPydantic] # noqa: UP006 diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index d76bfcb1a40bd..63cdf67b7d702 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -140,14 +140,6 @@ "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, - "schedule_interval": { - "anyOf": [ - { "type": "null" }, - { "type": "string" }, - { "$ref": "#/definitions/typed_timedelta" }, - { "$ref": "#/definitions/typed_relativedelta" } - ] - }, "owner_links": { "type": "object" }, "timetable": { "type": "object", diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9adb9f7334ba0..54542a79c6a9c 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -49,7 +49,7 @@ from airflow.models import Trigger from airflow.models.baseoperator import BaseOperator from airflow.models.connection import Connection -from airflow.models.dag import DAG, DagModel, create_timetable +from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun from airflow.models.expandinput import EXPAND_INPUT_EMPTY, create_expand_input, get_map_type_key from airflow.models.mappedoperator import MappedOperator @@ -1579,7 +1579,7 @@ class SerializedDAG(DAG, BaseSerialization): not pickle-able. SerializedDAG works for all DAGs. """ - _decorated_fields = {"schedule_interval", "default_args", "_access_control"} + _decorated_fields = {"default_args", "_access_control"} @staticmethod def __get_constructor_defaults(): @@ -1606,16 +1606,7 @@ def serialize_dag(cls, dag: DAG) -> dict: """Serialize a DAG into a JSON object.""" try: serialized_dag = cls.serialize_to_json(dag, cls._decorated_fields) - serialized_dag["_processor_dags_folder"] = DAGS_FOLDER - - # If schedule_interval is backed by timetable, serialize only - # timetable; vice versa for a timetable backed by schedule_interval. - if dag.timetable.summary == dag.schedule_interval: - del serialized_dag["schedule_interval"] - else: - del serialized_dag["timetable"] - serialized_dag["tasks"] = [cls.serialize(task) for _, task in dag.task_dict.items()] dag_deps = [ @@ -1682,14 +1673,6 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG: setattr(dag, k, v) - # A DAG is always serialized with only one of schedule_interval and - # timetable. This back-populates the other to ensure the two attributes - # line up correctly on the DAG instance. - if "timetable" in encoded_dag: - dag.schedule_interval = dag.timetable.summary - else: - dag.timetable = create_timetable(dag.schedule_interval, dag.timezone) - # Set _task_group if "_task_group" in encoded_dag: dag._task_group = TaskGroupSerialization.deserialize_task_group( diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py index b7a23e91e27e6..da9d88a2ec061 100644 --- a/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -33,7 +33,7 @@ def _get_dep_statuses(self, ti, session, dep_context): cur_date = timezone.utcnow() # don't consider runs that are executed in the future unless - # specified by config and schedule_interval is None + # specified by config and schedule is None logical_date = ti.get_dagrun(session).execution_date if logical_date > cur_date and not ti.task.dag.allow_future_exec_dates: yield self._failing_status( diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py index 9e173656f1141..ce701794a4c63 100644 --- a/airflow/timetables/base.py +++ b/airflow/timetables/base.py @@ -17,7 +17,6 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Sequence -from warnings import warn from airflow.datasets import BaseDataset from airflow.typing_compat import Protocol, runtime_checkable @@ -163,25 +162,14 @@ class Timetable(Protocol): like ``schedule=None`` and ``"@once"`` set it to *False*. """ - _can_be_scheduled: bool = True - - @property - def can_be_scheduled(self): - """ - Whether this timetable can actually schedule runs in an automated manner. + can_be_scheduled: bool = True + """ + Whether this timetable can actually schedule runs in an automated manner. - This defaults to and should generally be *True* (including non periodic - execution types like *@once* and data triggered tables), but - ``NullTimetable`` sets this to *False*. - """ - if hasattr(self, "can_run"): - warn( - 'can_run class variable is deprecated. Use "can_be_scheduled" instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.can_run - return self._can_be_scheduled + This defaults to and should generally be *True* (including non periodic + execution types like *@once* and data triggered tables), but + ``NullTimetable`` sets this to *False*. + """ run_ordering: Sequence[str] = ("data_interval_end", "execution_date") """How runs triggered from this timetable should be ordered in UI. diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py index 4c27f39b266b3..05db0d66cc2df 100644 --- a/airflow/timetables/datasets.py +++ b/airflow/timetables/datasets.py @@ -50,7 +50,7 @@ def __init__( self.description = f"Triggered by datasets or {timetable.description}" self.periodic = timetable.periodic - self._can_be_scheduled = timetable._can_be_scheduled + self.can_be_scheduled = timetable.can_be_scheduled self.active_runs_limit = timetable.active_runs_limit @classmethod diff --git a/airflow/utils/db.py b/airflow/utils/db.py index b963eef99a230..5e15e59d3a3ef 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol): _REVISION_HEADS_MAP = { "2.10.0": "22ed7efa9da2", - "3.0.0": "d0f1c55954fa", + "3.0.0": "0bfc26bc256e", } diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index b73757c9875aa..e7e44ef254725 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -20,12 +20,10 @@ import contextlib import copy import datetime -import json import logging from importlib import metadata from typing import TYPE_CHECKING, Any, Generator, Iterable, overload -from dateutil import relativedelta from packaging import version from sqlalchemy import TIMESTAMP, PickleType, event, nullsfirst, tuple_ from sqlalchemy.dialects import mysql @@ -293,50 +291,6 @@ def compare_values(self, x, y): return False -class Interval(TypeDecorator): - """Base class representing a time interval.""" - - impl = Text - - cache_ok = True - - attr_keys = { - datetime.timedelta: ("days", "seconds", "microseconds"), - relativedelta.relativedelta: ( - "years", - "months", - "days", - "leapdays", - "hours", - "minutes", - "seconds", - "microseconds", - "year", - "month", - "day", - "hour", - "minute", - "second", - "microsecond", - ), - } - - def process_bind_param(self, value, dialect): - if isinstance(value, tuple(self.attr_keys)): - attrs = {key: getattr(value, key) for key in self.attr_keys[type(value)]} - return json.dumps({"type": type(value).__name__, "attrs": attrs}) - return json.dumps(value) - - def process_result_value(self, value, dialect): - if not value: - return value - data = json.loads(value) - if isinstance(data, dict): - type_map = {key.__name__: key for key in self.attr_keys} - return type_map[data["type"]](**data["attrs"]) - return data - - def nulls_first(col, session: Session) -> dict[str, Any]: """ Specify *NULLS FIRST* to the column ordering. diff --git a/airflow/www/static/js/dag/details/dag/Dag.tsx b/airflow/www/static/js/dag/details/dag/Dag.tsx index 7cfd36f516418..9643064d2a3b1 100644 --- a/airflow/www/static/js/dag/details/dag/Dag.tsx +++ b/airflow/www/static/js/dag/details/dag/Dag.tsx @@ -70,7 +70,7 @@ const Dag = () => { const dagDataExcludeFields = [ "defaultView", "fileToken", - "scheduleInterval", + "timetableSummary", "tags", "owners", "params", @@ -298,20 +298,9 @@ const Dag = () => { - Schedule interval + Timetable - {dagDetailsData.scheduleInterval?.type === - "CronExpression" ? ( - {dagDetailsData.scheduleInterval?.value} - ) : ( - // for TimeDelta and RelativeDelta - - )} + {dagDetailsData.timetableSummary || ""} diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 1897fd90ce3b3..15de3545b6bb2 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1088,9 +1088,14 @@ export interface components { owners?: string[]; /** @description User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents. */ description?: string | null; - schedule_interval?: components["schemas"]["ScheduleInterval"]; /** - * @description Timetable/Schedule Interval description. + * @description Timetable summary. + * + * *New in version 3.0.0* + */ + timetable_summary?: string | null; + /** + * @description Timetable description. * * *New in version 2.3.0* */ @@ -2315,15 +2320,6 @@ export interface components { */ order_by?: string; }; - /** - * @description Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's - * execution_date to figure out the next schedule. - */ - ScheduleInterval: - | (Partial & - Partial & - Partial) - | null; /** @description Time delta */ TimeDelta: { __type: string; @@ -5686,9 +5682,6 @@ export type ListDagRunsForm = CamelCasedPropertiesDeep< export type ListTaskInstanceForm = CamelCasedPropertiesDeep< components["schemas"]["ListTaskInstanceForm"] >; -export type ScheduleInterval = CamelCasedPropertiesDeep< - components["schemas"]["ScheduleInterval"] ->; export type TimeDelta = CamelCasedPropertiesDeep< components["schemas"]["TimeDelta"] >; diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 973ca812e3fb2..1fc711d993ebd 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -138,17 +138,17 @@

- Schedule: {{ dag_model is defined and dag_model and dag_model.schedule_interval }} + Schedule: {{ dag_model is defined and dag_model and dag_model.timetable_summary }} {% if dag_model is defined and dag_model and dag_model.timetable_description %} {% endif %} - {% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.schedule_interval != 'Dataset' %} + {% if dag_model is defined and dag_model.next_dagrun is defined and dag_model.dataset_expression != None %}

Next Run ID:

{% endif %} - {% if dag_model is defined and dag_model.schedule_interval is defined and dag_model.schedule_interval == 'Dataset' %} + {% if dag_model is defined and dag_model.dataset_expression is defined and dag_model.dataset_expression != None %} {%- with ds_info = dag_model.get_dataset_triggered_next_run_info() -%} {{ page_title }}

- {{ dag.schedule_interval }} + {{ dag.timetable_summary }} {% if dag is defined and dag.timetable_description %}