Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 0 additions & 171 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import jinja2
import lazy_object_proxy
import pendulum
from deprecated import deprecated
from jinja2 import TemplateAssertionError, UndefinedError
from sqlalchemy import (
Column,
Expand Down Expand Up @@ -80,7 +79,6 @@
AirflowSkipException,
AirflowTaskTerminated,
AirflowTaskTimeout,
DagRunNotFound,
RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
Expand Down Expand Up @@ -425,7 +423,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance | TaskInstancePydantic,
def clear_task_instances(
tis: list[TaskInstance],
session: Session,
activate_dag_runs: None = None,
dag: DAG | None = None,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
) -> None:
Expand All @@ -443,7 +440,6 @@ def clear_task_instances(
:param dag_run_state: state to set finished DagRuns to.
If set to False, DagRuns state will not be changed.
:param dag: DAG object
:param activate_dag_runs: Deprecated parameter, do not pass
"""
job_ids = []
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
Expand Down Expand Up @@ -521,16 +517,6 @@ def clear_task_instances(

session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))

if activate_dag_runs is not None:
warnings.warn(
"`activate_dag_runs` parameter to clear_task_instances function is deprecated. "
"Please use `dag_run_state`",
RemovedInAirflow3Warning,
stacklevel=2,
)
if not activate_dag_runs:
dag_run_state = False

if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import

Expand Down Expand Up @@ -1922,7 +1908,6 @@ class TaskInstance(Base, LoggingMixin):
def __init__(
self,
task: Operator,
execution_date: datetime | None = None,
run_id: str | None = None,
state: str | None = None,
map_index: int = -1,
Expand All @@ -1938,42 +1923,7 @@ def __init__(
# init_on_load will config the log
self.init_on_load()

if run_id is None and execution_date is not None:
from airflow.models.dagrun import DagRun # Avoid circular import

warnings.warn(
"Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
RemovedInAirflow3Warning,
# Stack level is 4 because SQLA adds some wrappers around the constructor
stacklevel=4,
)
# make sure we have a localized execution_date stored in UTC
if execution_date and not timezone.is_localized(execution_date):
self.log.warning(
"execution date %s has no timezone information. Using default from dag or system",
execution_date,
)
if self.task.has_dag():
if TYPE_CHECKING:
assert self.task.dag
execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
else:
execution_date = timezone.make_aware(execution_date)

execution_date = timezone.convert_to_utc(execution_date)
with create_session() as session:
run_id = (
session.query(DagRun.run_id)
.filter_by(dag_id=self.dag_id, execution_date=execution_date)
.scalar()
)
if run_id is None:
raise DagRunNotFound(
f"DagRun for {self.dag_id!r} with date {execution_date} not found"
) from None

self.run_id = run_id

self.try_number = 0
self.max_tries = self.task.retries
self.unixname = getuser()
Expand All @@ -1989,26 +1939,6 @@ def __init__(
def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))

@property
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def _try_number(self):
"""
Do not use. For semblance of backcompat.

:meta private:
"""
return self.try_number

@_try_number.setter
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def _try_number(self, val):
"""
Do not use. For semblance of backcompat.

:meta private:
"""
self.try_number = val

@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
Expand Down Expand Up @@ -2051,23 +1981,6 @@ def init_on_load(self) -> None:
"""Initialize the attributes that aren't stored in the DB."""
self.test_mode = False # can be changed when calling 'run'

@property
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def prev_attempted_tries(self) -> int:
"""
Calculate the total number of attempted tries, defaulting to 0.

This used to be necessary because try_number did not always tell the truth.

:meta private:
"""
return self.try_number

@property
def next_try_number(self) -> int:
# todo (dstandish): deprecate this property; we don't need a property that is just + 1
return self.try_number + 1

@property
def operator_name(self) -> str | None:
"""@property: use a more friendly display name for the operator, if set."""
Expand Down Expand Up @@ -2498,40 +2411,6 @@ def get_previous_ti(
"""
return _get_previous_ti(task_instance=self, state=state, session=session)

@property
def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None:
"""
This attribute is deprecated.

Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_ti()

@property
def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic | None:
"""
This attribute is deprecated.

Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_ti(state=DagRunState.SUCCESS)

@provide_session
def get_previous_execution_date(
self,
Expand All @@ -2558,23 +2437,6 @@ def get_previous_start_date(
"""
return _get_previous_start_date(task_instance=self, state=state, session=session)

@property
def previous_start_date_success(self) -> pendulum.DateTime | None:
"""
This attribute is deprecated.

Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_start_date(state=DagRunState.SUCCESS)

@provide_session
def are_dependencies_met(
self, dep_context: DepContext | None = None, session: Session = NEW_SESSION, verbose: bool = False
Expand Down Expand Up @@ -4115,21 +3977,6 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__
return NotImplemented

def as_dict(self):
warnings.warn(
"This method is deprecated. Use BaseSerialization.serialize.",
RemovedInAirflow3Warning,
stacklevel=2,
)
new_dict = dict(self.__dict__)
for key in new_dict:
if key in ["start_date", "end_date"]:
val = new_dict[key]
if not val or isinstance(val, str):
continue
new_dict.update({key: val.isoformat()})
return new_dict

@classmethod
def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
return cls(
Expand All @@ -4150,24 +3997,6 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
priority_weight=ti.priority_weight if hasattr(ti, "priority_weight") else None,
)

@classmethod
def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance:
warnings.warn(
"This method is deprecated. Use BaseSerialization.deserialize.",
RemovedInAirflow3Warning,
stacklevel=2,
)
ti_key = TaskInstanceKey(*obj_dict.pop("key"))
start_date = None
end_date = None
start_date_str: str | None = obj_dict.pop("start_date")
end_date_str: str | None = obj_dict.pop("end_date")
if start_date_str:
start_date = timezone.parse(start_date_str)
if end_date_str:
end_date = timezone.parse(end_date_str)
return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key)


class TaskInstanceNote(TaskInstanceDependencies):
"""For storage of arbitrary notes concerning the task instance."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
airflow_cmd=ti.command_as_list(),
queue=ti.queue,
exec_config=ti.executor_config,
attempt_number=ti.prev_attempted_tries,
attempt_number=ti.try_number,
)
adopted_tis.append(ti)

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def read(self, task_instance, try_number=None, metadata=None):
# try number gets incremented in DB, i.e logs produced the time
# after cli run and before try_number + 1 in DB will not be displayed.
if try_number is None:
next_try = task_instance.next_try_number
next_try = task_instance.try_number + 1
try_numbers = list(range(1, next_try))
elif try_number < 1:
logs = [
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
:param metadata: A dictionary containing information about how to read the task log
"""
if try_number is None:
next_try = ti.next_try_number
next_try = ti.try_number + 1
try_numbers = list(range(1, next_try))
else:
try_numbers = [try_number]
Expand Down
12 changes: 12 additions & 0 deletions newsfragments/41784.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Removed a set of deprecations in from ``airflow.models.taskinstance``.

- Removed deprecated arg ``activate_dag_runs`` from ``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead.
- Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``. Please use ``run_id`` instead.
- Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead.
- Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead.
- Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead.
- Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
- Removed deprecated property ``previous_ti_success`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
- Removed deprecated property ``previous_start_date_success`` from ``TaskInstance``. Please use ``get_previous_start_date`` instead.
- Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.serialize`` instead.
- Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.deserialize`` instead.
2 changes: 1 addition & 1 deletion tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None):
operator.dag = dag
else:
operator = BaseOperator(task_id="test_ti", dag=dag)
task_instance = TaskInstance(operator, execution_date=run.execution_date, run_id=run.run_id)
task_instance = TaskInstance(operator, run_id=run.run_id)
task_instance.trigger_id = trigger_orm.id
session.add(dag_model)
session.add(run)
Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,11 @@ def test_get_task_instances(session):
"run_type": DagRunType.MANUAL,
}
dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1", **common_dr_kwargs)
ti_1 = TaskInstance(run_id=dr1.run_id, task=task, execution_date=first_execution_date)
ti_1 = TaskInstance(run_id=dr1.run_id, task=task)
dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2", **common_dr_kwargs)
ti_2 = TaskInstance(run_id=dr2.run_id, task=task, execution_date=second_execution_date)
ti_2 = TaskInstance(run_id=dr2.run_id, task=task)
dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3", **common_dr_kwargs)
ti_3 = TaskInstance(run_id=dr3.run_id, task=task, execution_date=third_execution_date)
ti_3 = TaskInstance(run_id=dr3.run_id, task=task)
session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3])
session.commit()

Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def task_2(arg2): ...
assert len(decision.schedulable_tis) == 2

# We insert a faulty record
session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date, dr.run_id))
session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id))
session.flush()

decision = dr.task_instance_scheduling_decisions()
Expand Down
5 changes: 1 addition & 4 deletions tests/providers/microsoft/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,11 @@ class MockedTaskInstance(TaskInstance):
def __init__(
self,
task,
execution_date: datetime | None = None,
run_id: str | None = "run_id",
state: str | None = TaskInstanceState.RUNNING,
map_index: int = -1,
):
super().__init__(
task=task, execution_date=execution_date, run_id=run_id, state=state, map_index=map_index
)
super().__init__(task=task, run_id=run_id, state=state, map_index=map_index)
self.values: dict[str, Any] = {}

def xcom_pull(
Expand Down
4 changes: 2 additions & 2 deletions tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_concurrency_reached(self):
"""
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=True))
task = Mock(dag=dag, pool_slots=1)
ti = TaskInstance(task, execution_date=None)
ti = TaskInstance(task)

assert not DagTISlotsAvailableDep().is_met(ti=ti)

Expand All @@ -44,6 +44,6 @@ def test_all_conditions_met(self):
"""
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=False))
task = Mock(dag=dag, pool_slots=1)
ti = TaskInstance(task, execution_date=None)
ti = TaskInstance(task)

assert DagTISlotsAvailableDep().is_met(ti=ti)
4 changes: 2 additions & 2 deletions tests/ti_deps/deps/test_dag_unpaused_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_concurrency_reached(self):
"""
dag = Mock(**{"get_is_paused.return_value": True})
task = Mock(dag=dag)
ti = TaskInstance(task=task, execution_date=None)
ti = TaskInstance(task=task)

assert not DagUnpausedDep().is_met(ti=ti)

Expand All @@ -44,6 +44,6 @@ def test_all_conditions_met(self):
"""
dag = Mock(**{"get_is_paused.return_value": False})
task = Mock(dag=dag)
ti = TaskInstance(task=task, execution_date=None)
ti = TaskInstance(task=task)

assert DagUnpausedDep().is_met(ti=ti)
2 changes: 1 addition & 1 deletion tests/ti_deps/deps/test_not_in_retry_period_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class TestNotInRetryPeriodDep:
def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)):
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
ti = TaskInstance(task=task, state=state, execution_date=None)
ti = TaskInstance(task=task, state=state)
ti.end_date = end_date
return ti

Expand Down