Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 0 additions & 233 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -944,60 +943,6 @@ def update_old_perm(permission: str):

return updated_access_control

def date_range(
self,
start_date: pendulum.DateTime,
num: int | None = None,
end_date: datetime | None = None,
) -> list[datetime]:
message = "`DAG.date_range()` is deprecated."
if num is not None:
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
return utils_date_range(
start_date=start_date, num=num, delta=self.normalized_schedule_interval
)
message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead."
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
if end_date is None:
coerced_end_date = timezone.utcnow()
else:
coerced_end_date = end_date
it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False)
return [info.logical_date for info in it]

def is_fixed_time_schedule(self):
"""
Figures out if the schedule has a fixed time (e.g. 3 AM every day).

Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.

This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).

Do not try to understand what this actually means. It is old logic that
should not be used anywhere.
"""
warnings.warn(
"`DAG.is_fixed_time_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)

from airflow.timetables._cron import CronMixin

if not isinstance(self.timetable, CronMixin):
return True

from croniter import croniter

cron = croniter(self.timetable._expression)
next_a = cron.get_next(datetime)
next_b = cron.get_next(datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour

def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
Expand Down Expand Up @@ -1162,21 +1107,6 @@ def next_dagrun_info(
info = None
return info

def next_dagrun_after_date(self, date_last_automated_dagrun: pendulum.DateTime | None):
warnings.warn(
"`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if date_last_automated_dagrun is None:
data_interval = None
else:
data_interval = self.infer_automated_data_interval(date_last_automated_dagrun)
info = self.next_dagrun_info(data_interval)
if info is None:
return None
return info.run_after

@functools.cached_property
def _time_restriction(self) -> TimeRestriction:
start_dates = [t.start_date for t in self.tasks if t.start_date]
Expand Down Expand Up @@ -1267,46 +1197,6 @@ def iter_dagrun_infos_between(
)
break

def get_run_dates(self, start_date, end_date=None) -> list:
"""
Return a list of dates between the interval received as parameter using this dag's schedule interval.

Returned dates can be used for execution dates.

:param start_date: The start date of the interval.
:param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``.
:return: A list of dates within the interval following the dag's schedule.
"""
warnings.warn(
"`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
earliest = timezone.coerce_datetime(start_date)
if end_date is None:
latest = pendulum.now(timezone.utc)
else:
latest = timezone.coerce_datetime(end_date)
return [info.logical_date for info in self.iter_dagrun_infos_between(earliest, latest)]

def normalize_schedule(self, dttm):
warnings.warn(
"`DAG.normalize_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
following = self.following_schedule(dttm)
if not following: # in case of @once
return dttm
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
previous_of_following = self.previous_schedule(following)
if previous_of_following != dttm:
return following
return dttm

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
return get_last_dagrun(
Expand All @@ -1330,43 +1220,6 @@ def dag_id(self) -> str:
def dag_id(self, value: str) -> None:
self._dag_id = value

@property
def full_filepath(self) -> str:
"""
Full file path to the DAG.

:meta private:
"""
warnings.warn(
"DAG.full_filepath is deprecated in favour of fileloc",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.fileloc

@full_filepath.setter
def full_filepath(self, value) -> None:
warnings.warn(
"DAG.full_filepath is deprecated in favour of fileloc",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.fileloc = value

@property
def concurrency(self) -> int:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self._max_active_tasks

@concurrency.setter
def concurrency(self, value: int):
self._max_active_tasks = value

@property
def max_active_tasks(self) -> int:
return self._max_active_tasks
Expand Down Expand Up @@ -1438,20 +1291,6 @@ def tasks_upstream_of_teardowns(self) -> list[Operator]:
def task_group(self) -> TaskGroup:
return self._task_group

@property
def filepath(self) -> str:
"""
Relative file path to the DAG.

:meta private:
"""
warnings.warn(
"filepath is deprecated, use relative_fileloc instead",
RemovedInAirflow3Warning,
stacklevel=2,
)
return str(self.relative_fileloc)

@property
def relative_fileloc(self) -> pathlib.Path:
"""File location of the importable dag 'file' relative to the configured DAGs folder."""
Expand Down Expand Up @@ -1496,16 +1335,6 @@ def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
)
return total_tasks >= self.max_active_tasks

@property
def concurrency_reached(self):
"""Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated."""
warnings.warn(
"This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_concurrency_reached()

@provide_session
def get_is_active(self, session=NEW_SESSION) -> None:
"""Return a boolean indicating whether this DAG is active."""
Expand All @@ -1526,21 +1355,6 @@ def is_paused(self):
)
return self.get_is_paused()

@property
def normalized_schedule_interval(self) -> ScheduleInterval:
warnings.warn(
"DAG.normalized_schedule_interval() is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
_schedule_interval: ScheduleInterval = cron_presets.get(self.schedule_interval)
elif self.schedule_interval == "@once":
_schedule_interval = None
else:
_schedule_interval = self.schedule_interval
return _schedule_interval

@staticmethod
@internal_api_call
@provide_session
Expand Down Expand Up @@ -1724,16 +1538,6 @@ def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum.
"""Return the latest date for which at least one dag run exists."""
return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id))

@property
def latest_execution_date(self):
"""Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated."""
warnings.warn(
"This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_latest_execution_date()

def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()
Expand Down Expand Up @@ -2264,28 +2068,6 @@ def nested_topo(group):

return tuple(nested_topo(self.task_group))

@provide_session
def set_dag_runs_state(
self,
state: DagRunState = DagRunState.RUNNING,
session: Session = NEW_SESSION,
start_date: datetime | None = None,
end_date: datetime | None = None,
dag_ids: list[str] | None = None,
) -> None:
warnings.warn(
"This method is deprecated and will be removed in a future version.",
RemovedInAirflow3Warning,
stacklevel=3,
)
dag_ids = dag_ids or [self.dag_id]
query = update(DagRun).where(DagRun.dag_id.in_(dag_ids))
if start_date:
query = query.where(DagRun.execution_date >= start_date)
if end_date:
query = query.where(DagRun.execution_date <= end_date)
session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))

@provide_session
def clear(
self,
Expand Down Expand Up @@ -3055,21 +2837,6 @@ def create_dagrun(
)
return run

@classmethod
@provide_session
def bulk_sync_to_db(
cls,
dags: Collection[DAG],
session=NEW_SESSION,
):
"""Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated."""
warnings.warn(
"This method is deprecated and will be removed in a future version. Please use bulk_write_to_db",
RemovedInAirflow3Warning,
stacklevel=2,
)
return cls.bulk_write_to_db(dags=dags, session=session)

@classmethod
@provide_session
def bulk_write_to_db(
Expand Down
16 changes: 16 additions & 0 deletions newsfragments/41440.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Removed unused methods / properties in models/dag.py

Methods removed:
* date_range
* is_fixed_time_schedule
* next_dagrun_after_date
* get_run_dates
* normalize_schedule
* full_filepath
* concurrency
* filepath
* concurrency_reached
* normalized_schedule_interval
* latest_execution_date
* set_dag_runs_state
* bulk_sync_to_db
21 changes: 11 additions & 10 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5284,16 +5284,17 @@ def test_find_zombies(self, load_examples):
self.job_runner._find_zombies()

scheduler_job.executor.callback_sink.send.assert_called_once()
requests = scheduler_job.executor.callback_sink.send.call_args.args
assert 1 == len(requests)
assert requests[0].full_filepath == dag.fileloc
assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti))
assert requests[0].is_failure_callback is True
assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
assert ti.dag_id == requests[0].simple_task_instance.dag_id
assert ti.task_id == requests[0].simple_task_instance.task_id
assert ti.run_id == requests[0].simple_task_instance.run_id
assert ti.map_index == requests[0].simple_task_instance.map_index
callback_requests = scheduler_job.executor.callback_sink.send.call_args.args
assert len(callback_requests) == 1
callback_request = callback_requests[0]
assert isinstance(callback_request.simple_task_instance, SimpleTaskInstance)
assert callback_request.full_filepath == dag.fileloc
assert callback_request.msg == str(self.job_runner._generate_zombie_message_details(ti))
assert callback_request.is_failure_callback is True
assert callback_request.simple_task_instance.dag_id == ti.dag_id
assert callback_request.simple_task_instance.task_id == ti.task_id
assert callback_request.simple_task_instance.run_id == ti.run_id
assert callback_request.simple_task_instance.map_index == ti.map_index

with create_session() as session:
session.query(TaskInstance).delete()
Expand Down
14 changes: 0 additions & 14 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2588,20 +2588,6 @@ def test_validate_params_on_trigger_dag(self):
data_interval=(TEST_DATE, TEST_DATE),
)

def test_return_date_range_with_num_method(self):
start_date = TEST_DATE
delta = timedelta(days=1)

dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
with pytest.warns(RemovedInAirflow3Warning, match=r"`DAG.date_range\(\)` is deprecated."):
dag_dates = dag.date_range(start_date=start_date, num=3)

assert dag_dates == [
start_date,
start_date + delta,
start_date + 2 * delta,
]

def test_dag_owner_links(self):
dag = DAG(
"dag",
Expand Down