From 13212c37ca179b690a068a8e62b012a6a8d82e86 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 13 Aug 2024 10:20:26 -0700 Subject: [PATCH 1/5] Remove deprecated and unused methods / properties on DAG (cherry picked from commit 6bd4f83062151d427dab764bca123ba396eda6c0) --- airflow/models/dag.py | 239 +------------------------------ tests/jobs/test_scheduler_job.py | 2 +- 2 files changed, 5 insertions(+), 236 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7b762aa18dbf8..1d0a9066afe31 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -131,7 +131,6 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle -from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.decorators import fixup_decorator_warning_stack from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key from airflow.utils.log.logging_mixin import LoggingMixin @@ -944,60 +943,6 @@ def update_old_perm(permission: str): return updated_access_control - def date_range( - self, - start_date: pendulum.DateTime, - num: int | None = None, - end_date: datetime | None = None, - ) -> list[datetime]: - message = "`DAG.date_range()` is deprecated." - if num is not None: - warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - return utils_date_range( - start_date=start_date, num=num, delta=self.normalized_schedule_interval - ) - message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead." - warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2) - if end_date is None: - coerced_end_date = timezone.utcnow() - else: - coerced_end_date = end_date - it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False) - return [info.logical_date for info in it] - - def is_fixed_time_schedule(self): - """ - Figures out if the schedule has a fixed time (e.g. 3 AM every day). - - Detection is done by "peeking" the next two cron trigger time; if the - two times have the same minute and hour value, the schedule is fixed, - and we *don't* need to perform the DST fix. - - This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). - - Do not try to understand what this actually means. It is old logic that - should not be used anywhere. - """ - warnings.warn( - "`DAG.is_fixed_time_schedule()` is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - - from airflow.timetables._cron import CronMixin - - if not isinstance(self.timetable, CronMixin): - return True - - from croniter import croniter - - cron = croniter(self.timetable._expression) - next_a = cron.get_next(datetime) - next_b = cron.get_next(datetime) - return next_b.minute == next_a.minute and next_b.hour == next_a.hour - def following_schedule(self, dttm): """ Calculate the following schedule for this dag in UTC. @@ -1162,21 +1107,6 @@ def next_dagrun_info( info = None return info - def next_dagrun_after_date(self, date_last_automated_dagrun: pendulum.DateTime | None): - warnings.warn( - "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - if date_last_automated_dagrun is None: - data_interval = None - else: - data_interval = self.infer_automated_data_interval(date_last_automated_dagrun) - info = self.next_dagrun_info(data_interval) - if info is None: - return None - return info.run_after - @functools.cached_property def _time_restriction(self) -> TimeRestriction: start_dates = [t.start_date for t in self.tasks if t.start_date] @@ -1267,46 +1197,6 @@ def iter_dagrun_infos_between( ) break - def get_run_dates(self, start_date, end_date=None) -> list: - """ - Return a list of dates between the interval received as parameter using this dag's schedule interval. - - Returned dates can be used for execution dates. - - :param start_date: The start date of the interval. - :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. - :return: A list of dates within the interval following the dag's schedule. - """ - warnings.warn( - "`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - earliest = timezone.coerce_datetime(start_date) - if end_date is None: - latest = pendulum.now(timezone.utc) - else: - latest = timezone.coerce_datetime(end_date) - return [info.logical_date for info in self.iter_dagrun_infos_between(earliest, latest)] - - def normalize_schedule(self, dttm): - warnings.warn( - "`DAG.normalize_schedule()` is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - following = self.following_schedule(dttm) - if not following: # in case of @once - return dttm - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - previous_of_following = self.previous_schedule(following) - if previous_of_following != dttm: - return following - return dttm - @provide_session def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False): return get_last_dagrun( @@ -1330,43 +1220,6 @@ def dag_id(self) -> str: def dag_id(self, value: str) -> None: self._dag_id = value - @property - def full_filepath(self) -> str: - """ - Full file path to the DAG. - - :meta private: - """ - warnings.warn( - "DAG.full_filepath is deprecated in favour of fileloc", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.fileloc - - @full_filepath.setter - def full_filepath(self, value) -> None: - warnings.warn( - "DAG.full_filepath is deprecated in favour of fileloc", - RemovedInAirflow3Warning, - stacklevel=2, - ) - self.fileloc = value - - @property - def concurrency(self) -> int: - # TODO: Remove in Airflow 3.0 - warnings.warn( - "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self._max_active_tasks - - @concurrency.setter - def concurrency(self, value: int): - self._max_active_tasks = value - @property def max_active_tasks(self) -> int: return self._max_active_tasks @@ -1438,20 +1291,6 @@ def tasks_upstream_of_teardowns(self) -> list[Operator]: def task_group(self) -> TaskGroup: return self._task_group - @property - def filepath(self) -> str: - """ - Relative file path to the DAG. - - :meta private: - """ - warnings.warn( - "filepath is deprecated, use relative_fileloc instead", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return str(self.relative_fileloc) - @property def relative_fileloc(self) -> pathlib.Path: """File location of the importable dag 'file' relative to the configured DAGs folder.""" @@ -1496,16 +1335,6 @@ def get_concurrency_reached(self, session=NEW_SESSION) -> bool: ) return total_tasks >= self.max_active_tasks - @property - def concurrency_reached(self): - """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated.""" - warnings.warn( - "This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_concurrency_reached() - @provide_session def get_is_active(self, session=NEW_SESSION) -> None: """Return a boolean indicating whether this DAG is active.""" @@ -1526,21 +1355,6 @@ def is_paused(self): ) return self.get_is_paused() - @property - def normalized_schedule_interval(self) -> ScheduleInterval: - warnings.warn( - "DAG.normalized_schedule_interval() is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets: - _schedule_interval: ScheduleInterval = cron_presets.get(self.schedule_interval) - elif self.schedule_interval == "@once": - _schedule_interval = None - else: - _schedule_interval = self.schedule_interval - return _schedule_interval - @staticmethod @internal_api_call @provide_session @@ -1724,16 +1538,6 @@ def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum. """Return the latest date for which at least one dag run exists.""" return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id)) - @property - def latest_execution_date(self): - """Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated.""" - warnings.warn( - "This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_latest_execution_date() - def resolve_template_files(self): for t in self.tasks: t.resolve_template_files() @@ -1845,7 +1649,8 @@ def _get_task_instances( exclude_task_ids: Collection[str | tuple[str, int]] | None, session: Session, dag_bag: DagBag | None = ..., - ) -> Iterable[TaskInstance]: ... # pragma: no cover + ) -> Iterable[TaskInstance]: + ... # pragma: no cover @overload def _get_task_instances( @@ -1864,7 +1669,8 @@ def _get_task_instances( recursion_depth: int = ..., max_recursion_depth: int = ..., visited_external_tis: set[TaskInstanceKey] = ..., - ) -> set[TaskInstanceKey]: ... # pragma: no cover + ) -> set[TaskInstanceKey]: + ... # pragma: no cover def _get_task_instances( self, @@ -2264,28 +2070,6 @@ def nested_topo(group): return tuple(nested_topo(self.task_group)) - @provide_session - def set_dag_runs_state( - self, - state: DagRunState = DagRunState.RUNNING, - session: Session = NEW_SESSION, - start_date: datetime | None = None, - end_date: datetime | None = None, - dag_ids: list[str] | None = None, - ) -> None: - warnings.warn( - "This method is deprecated and will be removed in a future version.", - RemovedInAirflow3Warning, - stacklevel=3, - ) - dag_ids = dag_ids or [self.dag_id] - query = update(DagRun).where(DagRun.dag_id.in_(dag_ids)) - if start_date: - query = query.where(DagRun.execution_date >= start_date) - if end_date: - query = query.where(DagRun.execution_date <= end_date) - session.execute(query.values(state=state).execution_options(synchronize_session="fetch")) - @provide_session def clear( self, @@ -3055,21 +2839,6 @@ def create_dagrun( ) return run - @classmethod - @provide_session - def bulk_sync_to_db( - cls, - dags: Collection[DAG], - session=NEW_SESSION, - ): - """Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated.""" - warnings.warn( - "This method is deprecated and will be removed in a future version. Please use bulk_write_to_db", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return cls.bulk_write_to_db(dags=dags, session=session) - @classmethod @provide_session def bulk_write_to_db( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 8fdbf4826db7a..cd9862a3846af 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5286,7 +5286,7 @@ def test_find_zombies(self, load_examples): scheduler_job.executor.callback_sink.send.assert_called_once() requests = scheduler_job.executor.callback_sink.send.call_args.args assert 1 == len(requests) - assert requests[0].full_filepath == dag.fileloc + assert requests[0].fileloc == dag.fileloc assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti)) assert requests[0].is_failure_callback is True assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance) From 490fd226865123b8b776d1cc80cb7d07b2ad04ba Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 13 Aug 2024 11:36:46 -0700 Subject: [PATCH 2/5] add newsfragment --- newsfragments/41440.significant.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 newsfragments/41440.significant.rst diff --git a/newsfragments/41440.significant.rst b/newsfragments/41440.significant.rst new file mode 100644 index 0000000000000..4f819bb4d8f99 --- /dev/null +++ b/newsfragments/41440.significant.rst @@ -0,0 +1,16 @@ +Removed unused methods / properties in models/dag.py + +Methods removed: + * date_range + * is_fixed_time_schedule + * next_dagrun_after_date + * get_run_dates + * normalize_schedule + * full_filepath + * concurrency + * filepath + * concurrency_reached + * normalized_schedule_interval + * latest_execution_date + * set_dag_runs_state + * bulk_sync_to_db From 76efd9490e9c871e3d820c06887841354a315a5f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 13 Aug 2024 11:39:02 -0700 Subject: [PATCH 3/5] fix test --- tests/models/test_dag.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 2b3961f1e6c6f..95ec87d72bf16 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2588,20 +2588,6 @@ def test_validate_params_on_trigger_dag(self): data_interval=(TEST_DATE, TEST_DATE), ) - def test_return_date_range_with_num_method(self): - start_date = TEST_DATE - delta = timedelta(days=1) - - dag = DAG("dummy-dag", schedule=delta, start_date=start_date) - with pytest.warns(RemovedInAirflow3Warning, match=r"`DAG.date_range\(\)` is deprecated."): - dag_dates = dag.date_range(start_date=start_date, num=3) - - assert dag_dates == [ - start_date, - start_date + delta, - start_date + 2 * delta, - ] - def test_dag_owner_links(self): dag = DAG( "dag", From 7ddbecd94bfae908f1501ef6dfaa72accbb840f7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:31:58 -0700 Subject: [PATCH 4/5] fix test --- tests/jobs/test_scheduler_job.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index cd9862a3846af..8475e6c4cbb9b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5284,16 +5284,17 @@ def test_find_zombies(self, load_examples): self.job_runner._find_zombies() scheduler_job.executor.callback_sink.send.assert_called_once() - requests = scheduler_job.executor.callback_sink.send.call_args.args - assert 1 == len(requests) - assert requests[0].fileloc == dag.fileloc - assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti)) - assert requests[0].is_failure_callback is True - assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance) - assert ti.dag_id == requests[0].simple_task_instance.dag_id - assert ti.task_id == requests[0].simple_task_instance.task_id - assert ti.run_id == requests[0].simple_task_instance.run_id - assert ti.map_index == requests[0].simple_task_instance.map_index + callback_requests = scheduler_job.executor.callback_sink.send.call_args.args + assert len(callback_requests) == 1 + callback_request = callback_requests[0] + assert isinstance(callback_request.simple_task_instance, SimpleTaskInstance) + assert callback_request.full_filepath == dag.fileloc + assert callback_request.msg == str(self.job_runner._generate_zombie_message_details(ti)) + assert callback_request.is_failure_callback is True + assert callback_request.simple_task_instance.dag_id == ti.dag_id + assert callback_request.simple_task_instance.task_id == ti.task_id + assert callback_request.simple_task_instance.run_id == ti.run_id + assert callback_request.simple_task_instance.map_index == ti.map_index with create_session() as session: session.query(TaskInstance).delete() From 2b7878833a6342ef62cc670ad6c6dbc546f4acd2 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 14 Aug 2024 07:29:44 +0800 Subject: [PATCH 5/5] Fix formatting --- airflow/models/dag.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1d0a9066afe31..50e2222bd0185 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1649,8 +1649,7 @@ def _get_task_instances( exclude_task_ids: Collection[str | tuple[str, int]] | None, session: Session, dag_bag: DagBag | None = ..., - ) -> Iterable[TaskInstance]: - ... # pragma: no cover + ) -> Iterable[TaskInstance]: ... # pragma: no cover @overload def _get_task_instances( @@ -1669,8 +1668,7 @@ def _get_task_instances( recursion_depth: int = ..., max_recursion_depth: int = ..., visited_external_tis: set[TaskInstanceKey] = ..., - ) -> set[TaskInstanceKey]: - ... # pragma: no cover + ) -> set[TaskInstanceKey]: ... # pragma: no cover def _get_task_instances( self,