From 467b304db95ac39f737c374bf2f678a3acbe5c3f Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 10:34:26 +0530 Subject: [PATCH 1/6] airflow.models.taskinstance deprecations removed --- airflow/models/taskinstance.py | 134 ------------------ .../aws/executors/batch/batch_executor.py | 2 +- airflow/utils/log/file_task_handler.py | 2 +- airflow/utils/log/log_reader.py | 2 +- 4 files changed, 3 insertions(+), 137 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e2b59ebbab472..29828219c8c4d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -38,7 +38,6 @@ import jinja2 import lazy_object_proxy import pendulum -from deprecated import deprecated from jinja2 import TemplateAssertionError, UndefinedError from sqlalchemy import ( Column, @@ -425,7 +424,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance | TaskInstancePydantic, def clear_task_instances( tis: list[TaskInstance], session: Session, - activate_dag_runs: None = None, dag: DAG | None = None, dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED, ) -> None: @@ -443,7 +441,6 @@ def clear_task_instances( :param dag_run_state: state to set finished DagRuns to. If set to False, DagRuns state will not be changed. :param dag: DAG object - :param activate_dag_runs: Deprecated parameter, do not pass """ job_ids = [] # Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id @@ -521,16 +518,6 @@ def clear_task_instances( session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING)) - if activate_dag_runs is not None: - warnings.warn( - "`activate_dag_runs` parameter to clear_task_instances function is deprecated. " - "Please use `dag_run_state`", - RemovedInAirflow3Warning, - stacklevel=2, - ) - if not activate_dag_runs: - dag_run_state = False - if dag_run_state is not False and tis: from airflow.models.dagrun import DagRun # Avoid circular import @@ -1989,26 +1976,6 @@ def __init__( def __hash__(self): return hash((self.task_id, self.dag_id, self.run_id, self.map_index)) - @property - @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) - def _try_number(self): - """ - Do not use. For semblance of backcompat. - - :meta private: - """ - return self.try_number - - @_try_number.setter - @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) - def _try_number(self, val): - """ - Do not use. For semblance of backcompat. - - :meta private: - """ - self.try_number = val - @property def stats_tags(self) -> dict[str, str]: """Returns task instance tags.""" @@ -2051,23 +2018,6 @@ def init_on_load(self) -> None: """Initialize the attributes that aren't stored in the DB.""" self.test_mode = False # can be changed when calling 'run' - @property - @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) - def prev_attempted_tries(self) -> int: - """ - Calculate the total number of attempted tries, defaulting to 0. - - This used to be necessary because try_number did not always tell the truth. - - :meta private: - """ - return self.try_number - - @property - def next_try_number(self) -> int: - # todo (dstandish): deprecate this property; we don't need a property that is just + 1 - return self.try_number + 1 - @property def operator_name(self) -> str | None: """@property: use a more friendly display name for the operator, if set.""" @@ -2498,40 +2448,6 @@ def get_previous_ti( """ return _get_previous_ti(task_instance=self, state=state, session=session) - @property - def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None: - """ - This attribute is deprecated. - - Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`. - """ - warnings.warn( - """ - This attribute is deprecated. - Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. - """, - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_previous_ti() - - @property - def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic | None: - """ - This attribute is deprecated. - - Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`. - """ - warnings.warn( - """ - This attribute is deprecated. - Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. - """, - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_previous_ti(state=DagRunState.SUCCESS) - @provide_session def get_previous_execution_date( self, @@ -2558,23 +2474,6 @@ def get_previous_start_date( """ return _get_previous_start_date(task_instance=self, state=state, session=session) - @property - def previous_start_date_success(self) -> pendulum.DateTime | None: - """ - This attribute is deprecated. - - Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`. - """ - warnings.warn( - """ - This attribute is deprecated. - Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method. - """, - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_previous_start_date(state=DagRunState.SUCCESS) - @provide_session def are_dependencies_met( self, dep_context: DepContext | None = None, session: Session = NEW_SESSION, verbose: bool = False @@ -4115,21 +4014,6 @@ def __eq__(self, other): return self.__dict__ == other.__dict__ return NotImplemented - def as_dict(self): - warnings.warn( - "This method is deprecated. Use BaseSerialization.serialize.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - new_dict = dict(self.__dict__) - for key in new_dict: - if key in ["start_date", "end_date"]: - val = new_dict[key] - if not val or isinstance(val, str): - continue - new_dict.update({key: val.isoformat()}) - return new_dict - @classmethod def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance: return cls( @@ -4150,24 +4034,6 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance: priority_weight=ti.priority_weight if hasattr(ti, "priority_weight") else None, ) - @classmethod - def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance: - warnings.warn( - "This method is deprecated. Use BaseSerialization.deserialize.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - ti_key = TaskInstanceKey(*obj_dict.pop("key")) - start_date = None - end_date = None - start_date_str: str | None = obj_dict.pop("start_date") - end_date_str: str | None = obj_dict.pop("end_date") - if start_date_str: - start_date = timezone.parse(start_date_str) - if end_date_str: - end_date = timezone.parse(end_date_str) - return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key) - class TaskInstanceNote(TaskInstanceDependencies): """For storage of arbitrary notes concerning the task instance.""" diff --git a/airflow/providers/amazon/aws/executors/batch/batch_executor.py b/airflow/providers/amazon/aws/executors/batch/batch_executor.py index 92790eb6c2e32..b38688defbba7 100644 --- a/airflow/providers/amazon/aws/executors/batch/batch_executor.py +++ b/airflow/providers/amazon/aws/executors/batch/batch_executor.py @@ -448,7 +448,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task airflow_cmd=ti.command_as_list(), queue=ti.queue, exec_config=ti.executor_config, - attempt_number=ti.prev_attempted_tries, + attempt_number=ti.try_number, ) adopted_tis.append(ti) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index eed196e8d8705..24379d340a784 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -462,7 +462,7 @@ def read(self, task_instance, try_number=None, metadata=None): # try number gets incremented in DB, i.e logs produced the time # after cli run and before try_number + 1 in DB will not be displayed. if try_number is None: - next_try = task_instance.next_try_number + next_try = task_instance.try_number + 1 try_numbers = list(range(1, next_try)) elif try_number < 1: logs = [ diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index ad61a139086c3..c99efc350bb19 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -74,7 +74,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di :param metadata: A dictionary containing information about how to read the task log """ if try_number is None: - next_try = ti.next_try_number + next_try = ti.try_number + 1 try_numbers = list(range(1, next_try)) else: try_numbers = [try_number] From c8a09a7884c6d3a6bd8a6b5cb7651e7433d93068 Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 10:47:53 +0530 Subject: [PATCH 2/6] news fragment added --- newsfragments/41784.significant.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 newsfragments/41784.significant.rst diff --git a/newsfragments/41784.significant.rst b/newsfragments/41784.significant.rst new file mode 100644 index 0000000000000..bd89b65ab103f --- /dev/null +++ b/newsfragments/41784.significant.rst @@ -0,0 +1,10 @@ +Removed a set of deprecations in from ``airflow.models.taskinstance``. + + - Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead. + - Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead. + - Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead. + - Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please use ``get_previous_ti`` instead. + - Removed deprecated property ``previous_ti_success`` from ``TaskInstance``. Please use ``get_previous_ti`` instead. + - Removed deprecated property ``previous_start_date_success`` from ``TaskInstance``. Please use ``get_previous_start_date`` instead. + - Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.serialize`` instead. + - Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.deserialize`` instead. From aa5984cf00b6e19f1ce41fa5bf3bf704a5a8862d Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 12:59:21 +0530 Subject: [PATCH 3/6] news fragment updated --- newsfragments/41784.significant.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/newsfragments/41784.significant.rst b/newsfragments/41784.significant.rst index bd89b65ab103f..b62446e714bcc 100644 --- a/newsfragments/41784.significant.rst +++ b/newsfragments/41784.significant.rst @@ -1,5 +1,6 @@ Removed a set of deprecations in from ``airflow.models.taskinstance``. + - Removed deprecated arg ``activate_dag_runs`` from ``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead. - Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead. - Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead. - Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead. From 3d84129fc3931f8c7a9d5a11a655b27e12074ecc Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 16:21:50 +0530 Subject: [PATCH 4/6] airflow.models.taskinstance deprecations removed --- airflow/models/taskinstance.py | 36 ----------------------------- newsfragments/41784.significant.rst | 1 + tests/models/test_baseoperator.py | 6 ++--- 3 files changed, 4 insertions(+), 39 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 29828219c8c4d..34587f356eb35 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1909,7 +1909,6 @@ class TaskInstance(Base, LoggingMixin): def __init__( self, task: Operator, - execution_date: datetime | None = None, run_id: str | None = None, state: str | None = None, map_index: int = -1, @@ -1925,42 +1924,7 @@ def __init__( # init_on_load will config the log self.init_on_load() - if run_id is None and execution_date is not None: - from airflow.models.dagrun import DagRun # Avoid circular import - - warnings.warn( - "Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id", - RemovedInAirflow3Warning, - # Stack level is 4 because SQLA adds some wrappers around the constructor - stacklevel=4, - ) - # make sure we have a localized execution_date stored in UTC - if execution_date and not timezone.is_localized(execution_date): - self.log.warning( - "execution date %s has no timezone information. Using default from dag or system", - execution_date, - ) - if self.task.has_dag(): - if TYPE_CHECKING: - assert self.task.dag - execution_date = timezone.make_aware(execution_date, self.task.dag.timezone) - else: - execution_date = timezone.make_aware(execution_date) - - execution_date = timezone.convert_to_utc(execution_date) - with create_session() as session: - run_id = ( - session.query(DagRun.run_id) - .filter_by(dag_id=self.dag_id, execution_date=execution_date) - .scalar() - ) - if run_id is None: - raise DagRunNotFound( - f"DagRun for {self.dag_id!r} with date {execution_date} not found" - ) from None - self.run_id = run_id - self.try_number = 0 self.max_tries = self.task.retries self.unixname = getuser() diff --git a/newsfragments/41784.significant.rst b/newsfragments/41784.significant.rst index b62446e714bcc..d0d8a07cd0a72 100644 --- a/newsfragments/41784.significant.rst +++ b/newsfragments/41784.significant.rst @@ -1,6 +1,7 @@ Removed a set of deprecations in from ``airflow.models.taskinstance``. - Removed deprecated arg ``activate_dag_runs`` from ``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead. + - Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``. Please use ``run_id`` instead. - Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead. - Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead. - Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead. diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 48aaf2699b918..fa0eb56413937 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -1116,11 +1116,11 @@ def test_get_task_instances(session): "run_type": DagRunType.MANUAL, } dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1", **common_dr_kwargs) - ti_1 = TaskInstance(run_id=dr1.run_id, task=task, execution_date=first_execution_date) + ti_1 = TaskInstance(run_id=dr1.run_id, task=task) dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2", **common_dr_kwargs) - ti_2 = TaskInstance(run_id=dr2.run_id, task=task, execution_date=second_execution_date) + ti_2 = TaskInstance(run_id=dr2.run_id, task=task) dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3", **common_dr_kwargs) - ti_3 = TaskInstance(run_id=dr3.run_id, task=task, execution_date=third_execution_date) + ti_3 = TaskInstance(run_id=dr3.run_id, task=task) session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3]) session.commit() From 55a96a4d0698e3eb47da0e3759492ff6dac49acb Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 17:14:53 +0530 Subject: [PATCH 5/6] airflow.models.taskinstance deprecations removed --- airflow/models/taskinstance.py | 1 - tests/providers/microsoft/conftest.py | 5 +---- tests/ti_deps/deps/test_dag_ti_slots_available_dep.py | 4 ++-- tests/ti_deps/deps/test_dag_unpaused_dep.py | 4 ++-- tests/ti_deps/deps/test_not_in_retry_period_dep.py | 2 +- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 34587f356eb35..1d3909aed4a9e 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -79,7 +79,6 @@ AirflowSkipException, AirflowTaskTerminated, AirflowTaskTimeout, - DagRunNotFound, RemovedInAirflow3Warning, TaskDeferralError, TaskDeferred, diff --git a/tests/providers/microsoft/conftest.py b/tests/providers/microsoft/conftest.py index b18a2cb1fe329..c77dd7747d19d 100644 --- a/tests/providers/microsoft/conftest.py +++ b/tests/providers/microsoft/conftest.py @@ -124,14 +124,11 @@ class MockedTaskInstance(TaskInstance): def __init__( self, task, - execution_date: datetime | None = None, run_id: str | None = "run_id", state: str | None = TaskInstanceState.RUNNING, map_index: int = -1, ): - super().__init__( - task=task, execution_date=execution_date, run_id=run_id, state=state, map_index=map_index - ) + super().__init__(task=task, run_id=run_id, state=state, map_index=map_index) self.values: dict[str, Any] = {} def xcom_pull( diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py index 1deff072bd46d..b52a6f8e93f51 100644 --- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py +++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py @@ -34,7 +34,7 @@ def test_concurrency_reached(self): """ dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=True)) task = Mock(dag=dag, pool_slots=1) - ti = TaskInstance(task, execution_date=None) + ti = TaskInstance(task) assert not DagTISlotsAvailableDep().is_met(ti=ti) @@ -44,6 +44,6 @@ def test_all_conditions_met(self): """ dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=False)) task = Mock(dag=dag, pool_slots=1) - ti = TaskInstance(task, execution_date=None) + ti = TaskInstance(task) assert DagTISlotsAvailableDep().is_met(ti=ti) diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py b/tests/ti_deps/deps/test_dag_unpaused_dep.py index e3f740a54cce5..576c4277ebe22 100644 --- a/tests/ti_deps/deps/test_dag_unpaused_dep.py +++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py @@ -34,7 +34,7 @@ def test_concurrency_reached(self): """ dag = Mock(**{"get_is_paused.return_value": True}) task = Mock(dag=dag) - ti = TaskInstance(task=task, execution_date=None) + ti = TaskInstance(task=task) assert not DagUnpausedDep().is_met(ti=ti) @@ -44,6 +44,6 @@ def test_all_conditions_met(self): """ dag = Mock(**{"get_is_paused.return_value": False}) task = Mock(dag=dag) - ti = TaskInstance(task=task, execution_date=None) + ti = TaskInstance(task=task) assert DagUnpausedDep().is_met(ti=ti) diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index 17736abbf7bc0..1b0de7c99185f 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -34,7 +34,7 @@ class TestNotInRetryPeriodDep: def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)): task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False) - ti = TaskInstance(task=task, state=state, execution_date=None) + ti = TaskInstance(task=task, state=state) ti.end_date = end_date return ti From 1683504283f1bed2ecbbfb14acd92468b8bf7ed6 Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 27 Aug 2024 18:18:15 +0530 Subject: [PATCH 6/6] airflow.models.taskinstance deprecations removed --- tests/jobs/test_triggerer_job.py | 2 +- tests/models/test_dagrun.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 378afa0499ca4..84b422342e4ce 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None): operator.dag = dag else: operator = BaseOperator(task_id="test_ti", dag=dag) - task_instance = TaskInstance(operator, execution_date=run.execution_date, run_id=run.run_id) + task_instance = TaskInstance(operator, run_id=run.run_id) task_instance.trigger_id = trigger_orm.id session.add(dag_model) session.add(run) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index fd0647577aff2..2c167a542d855 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -1422,7 +1422,7 @@ def task_2(arg2): ... assert len(decision.schedulable_tis) == 2 # We insert a faulty record - session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date, dr.run_id)) + session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id)) session.flush() decision = dr.task_instance_scheduling_decisions()