diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f53fb5e27481f..baa4e3eed0c20 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1404,6 +1404,25 @@ def _get_previous_execution_date( return pendulum.instance(prev_ti.execution_date) if prev_ti and prev_ti.execution_date else None +def _get_previous_start_date( + *, + task_instance: TaskInstance | TaskInstancePydantic, + state: DagRunState | None, + session: Session, +) -> pendulum.DateTime | None: + """ + Return the start date from property previous_ti_success. + + :param task_instance: the task instance + :param state: If passed, it only take into account instances of a specific state. + :param session: SQLAlchemy ORM Session + """ + log.debug("previous_start_date was called") + prev_ti = task_instance.get_previous_ti(state=state, session=session) + # prev_ti may not exist and prev_ti.start_date may be None. + return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None + + def _email_alert( *, task_instance: TaskInstance | TaskInstancePydantic, exception, task: BaseOperator ) -> None: @@ -2533,10 +2552,7 @@ def get_previous_start_date( :param state: If passed, it only take into account instances of a specific state. :param session: SQLAlchemy ORM Session """ - self.log.debug("previous_start_date was called") - prev_ti = self.get_previous_ti(state=state, session=session) - # prev_ti may not exist and prev_ti.start_date may be None. - return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None + return _get_previous_start_date(task_instance=self, state=state, session=session) @property def previous_start_date_success(self) -> pendulum.DateTime | None: diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 2af5dcbecaf11..e89d21cd98da6 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -381,6 +381,21 @@ def get_previous_execution_date( return _get_previous_execution_date(task_instance=self, state=state, session=session) + def get_previous_start_date( + self, + state: DagRunState | None = None, + session: Session | None = None, + ) -> pendulum.DateTime | None: + """ + Return the execution date from property previous_ti_success. + + :param state: If passed, it only take into account instances of a specific state. + :param session: SQLAlchemy ORM Session + """ + from airflow.models.taskinstance import _get_previous_start_date + + return _get_previous_start_date(task_instance=self, state=state, session=session) + def email_alert(self, exception, task: BaseOperator) -> None: """ Send alert email with exception information. diff --git a/tests/conftest.py b/tests/conftest.py index 560146140918f..56acc986eb27a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1082,6 +1082,7 @@ def create_task_instance(dag_maker, create_dummy_dag): Uses ``create_dummy_dag`` to create the dag structure. """ + from airflow.operators.empty import EmptyOperator def maker( execution_date=None, @@ -1091,6 +1092,19 @@ def maker( run_type=None, data_interval=None, external_executor_id=None, + dag_id="dag", + task_id="op1", + task_display_name=None, + max_active_tis_per_dag=16, + max_active_tis_per_dagrun=None, + pool="default_pool", + executor_config=None, + trigger_rule="all_done", + on_success_callback=None, + on_execute_callback=None, + on_failure_callback=None, + on_retry_callback=None, + email=None, map_index=-1, **kwargs, ) -> TaskInstance: @@ -1098,7 +1112,26 @@ def maker( from airflow.utils import timezone execution_date = timezone.utcnow() - _, task = create_dummy_dag(with_dagrun_type=None, **kwargs) + with dag_maker(dag_id, **kwargs): + op_kwargs = {} + from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS + + if AIRFLOW_V_2_9_PLUS: + op_kwargs["task_display_name"] = task_display_name + task = EmptyOperator( + task_id=task_id, + max_active_tis_per_dag=max_active_tis_per_dag, + max_active_tis_per_dagrun=max_active_tis_per_dagrun, + executor_config=executor_config or {}, + on_success_callback=on_success_callback, + on_execute_callback=on_execute_callback, + on_failure_callback=on_failure_callback, + on_retry_callback=on_retry_callback, + email=email, + pool=pool, + trigger_rule=trigger_rule, + **op_kwargs, + ) dagrun_kwargs = {"execution_date": execution_date, "state": dagrun_state} if run_id is not None: diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index f490dd61dc689..c2993e9ce8d95 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -206,6 +206,7 @@ def test_set_task_dates(self, dag_maker): assert op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1) assert op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_current_state(self, create_task_instance, session): ti = create_task_instance(session=session) assert ti.current_state(session=session) is None @@ -243,6 +244,7 @@ def test_set_dag(self, dag_maker): assert op.dag is dag assert op in dag.tasks + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_infer_dag(self, create_dummy_dag): op1 = EmptyOperator(task_id="test_op_1") op2 = EmptyOperator(task_id="test_op_2") @@ -287,6 +289,7 @@ def test_init_on_load(self, create_task_instance): assert ti.log.name == "airflow.task" assert not ti.test_mode + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, mock not on server side @patch.object(DAG, "get_concurrency_reached") def test_requeue_over_dag_concurrency(self, mock_concurrency_reached, create_task_instance, dag_maker): mock_concurrency_reached.return_value = True @@ -309,6 +312,7 @@ def test_requeue_over_max_active_tis_per_dag(self, create_task_instance): max_active_runs=1, max_active_tasks=2, dagrun_state=State.QUEUED, + serialized=True, ) ti.run() @@ -322,6 +326,7 @@ def test_requeue_over_max_active_tis_per_dagrun(self, create_task_instance): max_active_runs=1, max_active_tasks=2, dagrun_state=State.QUEUED, + serialized=True, ) ti.run() @@ -334,6 +339,7 @@ def test_requeue_over_pool_concurrency(self, create_task_instance, test_pool): max_active_tis_per_dag=0, max_active_runs=1, max_active_tasks=2, + serialized=True, ) with create_session() as session: test_pool.slots = 0 @@ -341,6 +347,7 @@ def test_requeue_over_pool_concurrency(self, create_task_instance, test_pool): ti.run() assert ti.state == State.NONE + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.usefixtures("test_pool") def test_not_requeue_non_requeueable_task_instance(self, dag_maker): # Use BaseSensorOperator because sensor got @@ -376,6 +383,7 @@ def test_not_requeue_non_requeueable_task_instance(self, dag_maker): for dep_patch, _ in patch_dict.values(): dep_patch.stop() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mark_non_runnable_task_as_success(self, create_task_instance): """ test that running task with mark_success param update task state @@ -399,6 +407,7 @@ def test_run_pooling_task(self, create_task_instance): dag_id="test_run_pooling_task", task_id="test_run_pooling_task_op", pool="test_pool", + serialized=True, ) ti.run() @@ -420,6 +429,7 @@ def test_pool_slots_property(self): pool_slots=0, ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @provide_session def test_ti_updates_with_task(self, create_task_instance, session=None): """ @@ -462,6 +472,7 @@ def test_run_pooling_task_with_mark_success(self, create_task_instance): ti = create_task_instance( dag_id="test_run_pooling_task_with_mark_success", task_id="test_run_pooling_task_with_mark_success_op", + serialized=True, ) ti.run(mark_success=True) @@ -476,7 +487,7 @@ def test_run_pooling_task_with_skip(self, dag_maker): def raise_skip_exception(): raise AirflowSkipException - with dag_maker(dag_id="test_run_pooling_task_with_skip"): + with dag_maker(dag_id="test_run_pooling_task_with_skip", serialized=True): task = PythonOperator( task_id="test_run_pooling_task_with_skip", python_callable=raise_skip_exception, @@ -488,6 +499,7 @@ def raise_skip_exception(): ti.run() assert State.SKIPPED == ti.state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_task_sigterm_calls_on_failure_callback(self, dag_maker, caplog): """ Test that ensures that tasks call on_failure_callback when they receive sigterm @@ -518,7 +530,7 @@ def test_task_sigterm_works_with_retries(self, dag_maker): def task_function(ti): os.kill(ti.pid, signal.SIGTERM) - with dag_maker("test_mark_failure_2"): + with dag_maker("test_mark_failure_2", serialized=True): task = PythonOperator( task_id="test_on_failure", python_callable=task_function, @@ -534,6 +546,7 @@ def task_function(ti): ti.refresh_from_db() assert ti.state == State.UP_FOR_RETRY + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode as DB access in code @pytest.mark.parametrize("state", [State.SUCCESS, State.FAILED, State.SKIPPED]) def test_task_sigterm_doesnt_change_state_of_finished_tasks(self, state, dag_maker): session = settings.Session() @@ -558,6 +571,7 @@ def task_function(ti): ti.refresh_from_db() ti.state == state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "state, exception, retries", [ @@ -605,6 +619,7 @@ def _raise_if_exception(): assert ti.next_kwargs is None assert ti.state == state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_retry_delay(self, dag_maker, time_machine): """ Test that retry delays are respected @@ -651,6 +666,7 @@ def run_with_error(ti): assert ti.state == State.FAILED assert ti.try_number == 3 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_retry_handling(self, dag_maker, session): """ Test that task retries are handled properly @@ -778,6 +794,7 @@ def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, seconds): date = ti.next_retry_datetime() assert date == ti.end_date + datetime.timedelta(seconds=1) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_reschedule_handling(self, dag_maker, task_reschedules_for_ti): """ Test that task reschedules are handled properly @@ -886,6 +903,7 @@ def run_ti_and_assert( done, fail = True, False run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_reschedule_handling(self, dag_maker, task_reschedules_for_ti): """ Test that mapped task reschedules are handled properly @@ -989,6 +1007,7 @@ def run_ti_and_assert( done, fail = True, False run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.usefixtures("test_pool") def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti): """ @@ -1053,6 +1072,7 @@ def run_ti_and_assert( # Check that reschedules for ti have also been cleared. assert not task_reschedules_for_ti(ti) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.usefixtures("test_pool") def test_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti): """ @@ -1118,7 +1138,7 @@ def run_ti_and_assert( assert not task_reschedules_for_ti(ti) def test_depends_on_past(self, dag_maker): - with dag_maker(dag_id="test_depends_on_past"): + with dag_maker(dag_id="test_depends_on_past", serialized=True): task = EmptyOperator( task_id="test_dop_task", depends_on_past=True, @@ -1554,6 +1574,7 @@ def test_respects_prev_dagrun_dep(self, create_task_instance): ): assert ti.are_dependencies_met() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "downstream_ti_state, expected_are_dependents_done", [ @@ -1579,6 +1600,7 @@ def test_are_dependents_done( session.flush() assert ti.are_dependents_done(session) == expected_are_dependents_done + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_xcom_pull(self, dag_maker): """Test xcom_pull, using different filtering methods.""" with dag_maker(dag_id="test_xcom") as dag: @@ -1610,6 +1632,7 @@ def test_xcom_pull(self, dag_maker): result = ti1.xcom_pull(task_ids=["test_xcom_1", "test_xcom_2"], key="foo") assert result == ["bar", "baz"] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_xcom_pull_mapped(self, dag_maker, session): with dag_maker(dag_id="test_xcom", session=session): # Use the private _expand() method to avoid the empty kwargs check. @@ -1651,6 +1674,7 @@ def test_xcom_pull_after_success(self, create_task_instance): schedule="@monthly", task_id="test_xcom", pool="test_xcom", + serialized=True, ) ti.run(mark_success=True) @@ -1666,6 +1690,7 @@ def test_xcom_pull_after_success(self, create_task_instance): ti.run(ignore_all_deps=True) assert ti.xcom_pull(task_ids="test_xcom", key=key) is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_xcom_pull_after_deferral(self, create_task_instance, session): """ tests xcom will not clear before a task runs its next method after deferral. @@ -1691,6 +1716,7 @@ def test_xcom_pull_after_deferral(self, create_task_instance, session): ti.run(ignore_all_deps=True) assert ti.xcom_pull(task_ids="test_xcom", key=key) == value + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_xcom_pull_different_execution_date(self, create_task_instance): """ tests xcom fetch behavior with different execution dates, using @@ -1729,7 +1755,7 @@ def test_xcom_push_flag(self, dag_maker): value = "hello" task_id = "test_no_xcom_push" - with dag_maker(dag_id="test_xcom"): + with dag_maker(dag_id="test_xcom", serialized=True): # nothing saved to XCom task = PythonOperator( task_id=task_id, @@ -1748,7 +1774,7 @@ def test_xcom_without_multiple_outputs(self, dag_maker): value = {"key1": "value1", "key2": "value2"} task_id = "test_xcom_push_without_multiple_outputs" - with dag_maker(dag_id="test_xcom"): + with dag_maker(dag_id="test_xcom", serialized=True): task = PythonOperator( task_id=task_id, python_callable=lambda: value, @@ -1766,7 +1792,7 @@ def test_xcom_with_multiple_outputs(self, dag_maker): value = {"key1": "value1", "key2": "value2"} task_id = "test_xcom_push_with_multiple_outputs" - with dag_maker(dag_id="test_xcom"): + with dag_maker(dag_id="test_xcom", serialized=True): task = PythonOperator( task_id=task_id, python_callable=lambda: value, @@ -1787,7 +1813,7 @@ def test_xcom_with_multiple_outputs_and_no_mapping_result(self, dag_maker): value = "value" task_id = "test_xcom_push_with_multiple_outputs" - with dag_maker(dag_id="test_xcom"): + with dag_maker(dag_id="test_xcom", serialized=True): task = PythonOperator( task_id=task_id, python_callable=lambda: value, @@ -1816,7 +1842,7 @@ def post_execute(self, context, result=None): if result == "error": raise TestError("expected error.") - with dag_maker(dag_id="test_post_execute_dag"): + with dag_maker(dag_id="test_post_execute_dag", serialized=True): task = TestOperator( task_id="test_operator", python_callable=lambda: "error", @@ -1826,6 +1852,7 @@ def post_execute(self, context, result=None): with pytest.raises(TestError): ti.run() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution(self, create_task_instance): expected_external_executor_id = "banana" ti = create_task_instance( @@ -1844,6 +1871,7 @@ def test_check_and_change_state_before_execution(self, create_task_instance): assert ti_from_deserialized_task.state == State.RUNNING assert ti_from_deserialized_task.try_number == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution_provided_id_overrides(self, create_task_instance): expected_external_executor_id = "banana" ti = create_task_instance( @@ -1865,6 +1893,7 @@ def test_check_and_change_state_before_execution_provided_id_overrides(self, cre assert ti_from_deserialized_task.state == State.RUNNING assert ti_from_deserialized_task.try_number == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution_with_exec_id(self, create_task_instance): expected_external_executor_id = "minions" ti = create_task_instance(dag_id="test_check_and_change_state_before_execution") @@ -1883,6 +1912,7 @@ def test_check_and_change_state_before_execution_with_exec_id(self, create_task_ assert ti_from_deserialized_task.state == State.RUNNING assert ti_from_deserialized_task.try_number == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution_dep_not_met(self, create_task_instance): ti = create_task_instance(dag_id="test_check_and_change_state_before_execution") task2 = EmptyOperator(task_id="task2", dag=ti.task.dag, start_date=DEFAULT_DATE) @@ -1893,6 +1923,7 @@ def test_check_and_change_state_before_execution_dep_not_met(self, create_task_i ti2 = TI(task=serialized_dag.get_task(task2.task_id), run_id=ti.run_id) assert not ti2.check_and_change_state_before_execution() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution_dep_not_met_already_running(self, create_task_instance): """return False if the task instance state is running""" ti = create_task_instance(dag_id="test_check_and_change_state_before_execution") @@ -1908,6 +1939,7 @@ def test_check_and_change_state_before_execution_dep_not_met_already_running(sel assert ti_from_deserialized_task.state == State.RUNNING assert ti_from_deserialized_task.external_executor_id is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_check_and_change_state_before_execution_dep_not_met_not_runnable_state( self, create_task_instance ): @@ -1938,6 +1970,7 @@ def test_try_number(self, create_task_instance): ti.state = State.SUCCESS assert ti.try_number == 2 # unaffected by state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_get_num_running_task_instances(self, create_task_instance): session = settings.Session() @@ -1973,6 +2006,7 @@ def test_get_num_running_task_instances(self, create_task_instance): assert 1 == ti2.get_num_running_task_instances(session=session) assert 1 == ti3.get_num_running_task_instances(session=session) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_get_num_running_task_instances_per_dagrun(self, create_task_instance, dag_maker): session = settings.Session() @@ -2070,6 +2104,7 @@ def test_overwrite_params_with_dag_run_conf_none(self, create_task_instance): params = process_params(ti.task.dag, ti.task, dag_run, suppress_exception=False) assert params["override"] is False + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("use_native_obj", [True, False]) @patch("airflow.models.taskinstance.send_email") def test_email_alert(self, mock_send_email, dag_maker, use_native_obj): @@ -2087,6 +2122,7 @@ def test_email_alert(self, mock_send_email, dag_maker, use_native_obj): assert "test_email_alert" in body assert "Try 0" in body + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars( { ("email", "subject_template"): "/subject/path", @@ -2114,6 +2150,7 @@ def test_email_alert_with_config(self, mock_send_email, dag_maker): assert "template: test_email_alert_with_config" == title assert "template: test_email_alert_with_config" == body + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.taskinstance.send_email") def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker): with dag_maker(dag_id="test_failure_email"): @@ -2144,6 +2181,7 @@ def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker): assert title_default == title_error assert body_default == body_error + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("task_id", ["test_email_alert", "test_email_alert__1"]) @patch("airflow.models.taskinstance.send_email") def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session, task_id): @@ -2192,6 +2230,7 @@ def test_set_duration_empty_dates(self): ti.set_duration() assert ti.duration is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_success_callback_no_race_condition(self, create_task_instance): callback_wrapper = CallbackWrapper() ti = create_task_instance( @@ -2212,6 +2251,7 @@ def test_success_callback_no_race_condition(self, create_task_instance): ti.refresh_from_db() assert ti.state == State.SUCCESS + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_datasets(self, create_task_instance): """ Verify that when we have an outlet dataset on a task, and the task @@ -2270,6 +2310,7 @@ def test_outlet_datasets(self, create_task_instance): ) assert all([event.timestamp < ddrq_timestamp for (ddrq_timestamp,) in ddrq_timestamps]) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_datasets_failed(self, create_task_instance): """ Verify that when we have an outlet dataset on a task, and the task @@ -2301,6 +2342,7 @@ def test_outlet_datasets_failed(self, create_task_instance): # check that no dataset events were generated assert session.query(DatasetEvent).count() == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_current_state(self, dag_maker): with dag_maker(dag_id="test_mapped_current_state") as _: from airflow.decorators import task @@ -2324,6 +2366,7 @@ def raise_an_exception(placeholder: int): task_instance.run() assert task_instance.current_state() == TaskInstanceState.SUCCESS + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_datasets_skipped(self): """ Verify that when we have an outlet dataset on a task, and the task @@ -2354,6 +2397,7 @@ def test_outlet_datasets_skipped(self): # check that no dataset events were generated assert session.query(DatasetEvent).count() == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_extra(self, dag_maker, session): from airflow.datasets import Dataset @@ -2395,6 +2439,7 @@ def _write2_post_execute(context, _): assert events["write2"].dataset.uri == "test_outlet_dataset_extra_2" assert events["write2"].extra == {"x": 1} + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_extra_ignore_different(self, dag_maker, session): from airflow.datasets import Dataset @@ -2416,6 +2461,7 @@ def write(*, outlet_events): assert event.source_task_id == "write" assert event.extra == {"one": 1} + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_extra_yield(self, dag_maker, session): from airflow.datasets import Dataset from airflow.datasets.metadata import Metadata @@ -2465,6 +2511,7 @@ def _write2_post_execute(context, result): assert events["write2"].dataset.uri == "test_outlet_dataset_extra_2" assert events["write2"].extra == {"x": 1} + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_alias(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias @@ -2513,6 +2560,7 @@ def producer(*, outlet_events): assert len(dsa_obj.datasets) == 1 assert dsa_obj.datasets[0].uri == ds_uri + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_multiple_dataset_alias(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias @@ -2573,6 +2621,7 @@ def producer(*, outlet_events): assert len(dsa_obj.datasets) == 1 assert dsa_obj.datasets[0].uri == ds_uri + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_alias_through_metadata(self, dag_maker, session): from airflow.datasets import DatasetAlias from airflow.datasets.metadata import Metadata @@ -2617,6 +2666,7 @@ def producer(*, outlet_events): assert len(dsa_obj.datasets) == 1 assert dsa_obj.datasets[0].uri == ds_uri + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_outlet_dataset_alias_dataset_not_exists(self, dag_maker, session): from airflow.datasets import Dataset, DatasetAlias @@ -2656,6 +2706,7 @@ def producer(*, outlet_events): assert len(dsa_obj.datasets) == 1 assert dsa_obj.datasets[0].uri == ds_uri + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_inlet_dataset_extra(self, dag_maker, session): from airflow.datasets import Dataset @@ -2709,6 +2760,7 @@ def read(*, inlet_events): assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis assert read_task_evaluated + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_inlet_dataset_alias_extra(self, dag_maker, session): ds_uri = "test_inlet_dataset_extra_ds" dsa_name = "test_inlet_dataset_extra_dsa" @@ -2796,6 +2848,7 @@ def read(*, inlet_events): # Should be done. assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "slicer, expected", [ @@ -2849,6 +2902,7 @@ def read(*, inlet_events): assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis assert result == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "slicer, expected", [ @@ -2909,6 +2963,7 @@ def read(*, inlet_events): assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis assert result == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_changing_of_dataset_when_ddrq_is_already_populated(self, dag_maker): """ Test that when a task that produces dataset has ran, that changing the consumer @@ -3029,6 +3084,7 @@ def test_previous_execution_date_success(self, schedule_interval, catchup, dag_m assert ti_list[3].get_previous_execution_date(state=State.SUCCESS) == ti_list[1].execution_date assert ti_list[3].get_previous_execution_date(state=State.SUCCESS) != ti_list[2].execution_date + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("schedule_interval, catchup", _prev_dates_param_list) def test_previous_start_date_success(self, schedule_interval, catchup, dag_maker) -> None: scenario = [State.FAILED, State.SUCCESS, State.FAILED, State.SUCCESS] @@ -3044,7 +3100,7 @@ def test_get_previous_start_date_none(self, dag_maker): """ Test that get_previous_start_date() can handle TaskInstance with no start_date. """ - with dag_maker("test_get_previous_start_date_none", schedule=None) as dag: + with dag_maker("test_get_previous_start_date_none", schedule=None, serialized=True): task = EmptyOperator(task_id="op") day_1 = DEFAULT_DATE @@ -3059,7 +3115,7 @@ def test_get_previous_start_date_none(self, dag_maker): run_type=DagRunType.MANUAL, ) - dagrun_2 = dag.create_dagrun( + dagrun_2 = dag_maker.create_dagrun( execution_date=day_2, state=State.RUNNING, run_type=DagRunType.MANUAL, @@ -3074,6 +3130,7 @@ def test_get_previous_start_date_none(self, dag_maker): assert ti_2.get_previous_start_date() == ti_1.start_date assert ti_1.start_date is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_context_triggering_dataset_events_none(self, session, create_task_instance): ti = create_task_instance() template_context = ti.get_template_context() @@ -3083,6 +3140,7 @@ def test_context_triggering_dataset_events_none(self, session, create_task_insta assert template_context["triggering_dataset_events"] == {} + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_context_triggering_dataset_events(self, create_dummy_dag, session): ds1 = DatasetModel(id=1, uri="one") ds2 = DatasetModel(id=2, uri="two") @@ -3135,6 +3193,7 @@ def test_pendulum_template_dates(self, create_task_instance): dag_id="test_pendulum_template_dates", task_id="test_pendulum_template_dates_task", schedule="0 12 * * *", + serialized=True, ) template_context = ti.get_template_context() @@ -3167,6 +3226,7 @@ def test_template_render_deprecated(self, create_task_instance, session): result = ti.task.render_template("Execution date: {{ execution_date }}", template_context) assert result.startswith("Execution date: ") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "content, expected_output", [ @@ -3288,7 +3348,7 @@ def test_template_with_json_variable_missing(self, create_task_instance, session ], ) def test_deprecated_context(self, field, expected, create_task_instance): - ti = create_task_instance(execution_date=DEFAULT_DATE) + ti = create_task_instance(execution_date=DEFAULT_DATE, serialized=True) context = ti.get_template_context() with pytest.deprecated_call() as recorder: assert context[field] == expected @@ -3375,6 +3435,7 @@ def on_finish_callable(context): assert "Executing on_finish_callable callback" in caplog.text assert "Error when executing on_finish_callable callback" in caplog.text + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @provide_session def test_handle_failure(self, create_dummy_dag, session=None): start_date = timezone.datetime(2016, 6, 1) @@ -3472,6 +3533,7 @@ def test_handle_failure(self, create_dummy_dag, session=None): assert "task_instance" in context_arg_3 mock_on_retry_3.assert_not_called() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): session = settings.Session() with dag_maker(): @@ -3485,6 +3547,7 @@ def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): ti.handle_failure("test queued ti", test_mode=True) assert ti.state == State.UP_FOR_RETRY + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch.object(Stats, "incr") def test_handle_failure_no_task(self, Stats_incr, dag_maker): """ @@ -3519,6 +3582,7 @@ def test_handle_failure_no_task(self, Stats_incr, dag_maker): "operator_failures", tags={**expected_stats_tags, "operator": "EmptyOperator"} ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_handle_failure_task_undefined(self, create_task_instance): """ When the loaded taskinstance does not use refresh_from_task, the task may be undefined. @@ -3529,6 +3593,7 @@ def test_handle_failure_task_undefined(self, create_task_instance): del ti.task ti.handle_failure("test ti.task undefined") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @provide_session def test_handle_failure_fail_stop(self, create_dummy_dag, session=None): start_date = timezone.datetime(2016, 6, 1) @@ -3586,6 +3651,7 @@ def test_handle_failure_fail_stop(self, create_dummy_dag, session=None): for i in range(len(states)): assert tasks[i].state == exp_states[i] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_does_not_retry_on_airflow_fail_exception(self, dag_maker): def fail(): raise AirflowFailException("hopeless") @@ -3602,6 +3668,7 @@ def fail(): ti.run() assert State.FAILED == ti.state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_retries_on_other_exceptions(self, dag_maker): def fail(): raise AirflowException("maybe this will pass?") @@ -3618,6 +3685,7 @@ def fail(): ti.run() assert State.UP_FOR_RETRY == ti.state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch.object(TaskInstance, "logger") def test_stacktrace_on_failure_starts_with_task_execute_method(self, mock_get_log, dag_maker): def fail(): @@ -3703,6 +3771,7 @@ def f(*args, **kwargs): ti.refresh_from_db() assert ti.state == expected_state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_get_current_context_works_in_template(self, dag_maker): def user_defined_macro(): from airflow.operators.python import get_current_context @@ -3816,6 +3885,7 @@ def test_generate_command_specific_param(self): ) assert assert_command == generate_command + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @provide_session def test_get_rendered_template_fields(self, dag_maker, session=None): with dag_maker("test-dag", session=session) as dag: @@ -3839,6 +3909,7 @@ def test_get_rendered_template_fields(self, dag_maker, session=None): with create_session() as session: session.query(RenderedTaskInstanceFields).delete() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_set_state_up_for_retry(self, create_task_instance): ti = create_task_instance(state=State.RUNNING) @@ -3937,6 +4008,7 @@ def test_operator_field_with_serialization(self, create_task_instance): assert ser_ti.operator == "EmptyOperator" assert ser_ti.task.operator_name == "EmptyOperator" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_clear_db_references(self, session, create_task_instance): tables = [TaskFail, RenderedTaskInstanceFields, XCom] ti = create_task_instance() @@ -3968,7 +4040,7 @@ def raise_skip_exception(): callback_function = mock.MagicMock() callback_function.__name__ = "callback_function" - with dag_maker(dag_id="test_skipped_task"): + with dag_maker(dag_id="test_skipped_task", serialized=True): task = PythonOperator( task_id="test_skipped_task", python_callable=raise_skip_exception, @@ -3983,7 +4055,7 @@ def raise_skip_exception(): assert callback_function.called def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker, session): - with dag_maker(): + with dag_maker(serialized=True): task = BashOperator( task_id="test_history_tab", bash_command="ech", @@ -4070,6 +4142,7 @@ def teardown_method(self) -> None: self._clean() +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("mode", ["poke", "reschedule"]) @pytest.mark.parametrize("retries", [0, 1]) def test_sensor_timeout(mode, retries, dag_maker): @@ -4099,6 +4172,7 @@ def timeout(): assert ti.state == State.FAILED +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("mode", ["poke", "reschedule"]) @pytest.mark.parametrize("retries", [0, 1]) def test_mapped_sensor_timeout(mode, retries, dag_maker): @@ -4137,7 +4211,7 @@ def test_mapped_sensor_works(mode, retries, dag_maker): def timeout(ti): return 1 - with dag_maker(dag_id=f"test_sensor_timeout_{mode}_{retries}"): + with dag_maker(dag_id=f"test_sensor_timeout_{mode}_{retries}", serialized=True): PythonSensor.partial( task_id="test_raise_sensor_timeout", python_callable=timeout, @@ -4157,6 +4231,7 @@ def setup_class(self): with create_session() as session: session.query(TaskMap).delete() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("xcom_value", [[1, 2, 3], {"a": 1, "b": 2}, "abc"]) def test_not_recorded_if_leaf(self, dag_maker, xcom_value): """Return value should not be recorded if there are no downstreams.""" @@ -4173,6 +4248,7 @@ def push_something(): assert dag_maker.session.query(TaskMap).count() == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("xcom_value", [[1, 2, 3], {"a": 1, "b": 2}, "abc"]) def test_not_recorded_if_not_used(self, dag_maker, xcom_value): """Return value should not be recorded if no downstreams are mapped.""" @@ -4193,6 +4269,7 @@ def completely_different(): assert dag_maker.session.query(TaskMap).count() == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("xcom_1", [[1, 2, 3], {"a": 1, "b": 2}, "abc"]) @pytest.mark.parametrize("xcom_4", [[1, 2, 3], {"a": 1, "b": 2}]) def test_not_recorded_if_irrelevant(self, dag_maker, xcom_1, xcom_4): @@ -4241,6 +4318,7 @@ def tg(arg): tis["push_4"].run() assert dag_maker.session.query(TaskMap).count() == 2 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "return_value, exception_type, error_message", [ @@ -4270,6 +4348,7 @@ def pull_something(value): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == error_message + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "return_value, exception_type, error_message", [ @@ -4301,6 +4380,7 @@ def push(): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == error_message + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "return_value, exception_type, error_message", [ @@ -4336,6 +4416,7 @@ def tg(arg): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == error_message + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "return_value, exception_type, error_message", [ @@ -4371,6 +4452,7 @@ def tg(arg): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == error_message + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "create_upstream", [ @@ -4406,6 +4488,7 @@ def pull(v): ti.run() assert str(ctx.value) == "expand_kwargs() expects a list[dict], not list[int]" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "downstream, error_message", [ @@ -4461,6 +4544,7 @@ def pull(arg1, arg2): ti.run() ti.xcom_pull(task_ids=downstream, map_indexes=1, session=session) == ["b", "c"] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_error_if_upstream_does_not_push(self, dag_maker): """Fail the upstream task if it fails to push the XCom used for task mapping.""" with dag_maker(dag_id="test_not_recorded_for_unused") as dag: @@ -4483,6 +4567,7 @@ def pull_something(value): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == "did not push XCom for task mapping" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "max_map_length"): "1"}) def test_error_if_unmappable_length(self, dag_maker): """If an unmappable return value is used to map, fail the task that pushed the XCom.""" @@ -4506,6 +4591,7 @@ def pull_something(value): assert ti.state == TaskInstanceState.FAILED assert str(ctx.value) == "unmappable return value length: 2 > 1" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "xcom_value, expected_length, expected_keys", [ @@ -4539,6 +4625,7 @@ def pull_something(value): assert task_map.length == expected_length assert task_map.keys == expected_keys + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_no_error_on_changing_from_non_mapped_to_mapped(self, dag_maker, session): """If a task changes from non-mapped to mapped, don't fail on integrity error.""" with dag_maker(dag_id="test_no_error_on_changing_from_non_mapped_to_mapped") as dag: @@ -4574,6 +4661,7 @@ def add_two(x): class TestMappedTaskInstanceReceiveValue: + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "literal, expected_outputs", [ @@ -4607,6 +4695,7 @@ def show(value): ti.run() assert outputs == expected_outputs + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "upstream_return, expected_outputs", [ @@ -4643,6 +4732,7 @@ def show(value): ti.run() assert outputs == expected_outputs + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_map_product(self, dag_maker, session): outputs = [] @@ -4684,6 +4774,7 @@ def show(number, letter): (2, ("c", "z")), ] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_map_product_same(self, dag_maker, session): """Test a mapped task can refer to the same source multiple times.""" outputs = [] @@ -4717,6 +4808,7 @@ def show(a, b): ti.run() assert outputs == [(1, 1), (1, 2), (2, 1), (2, 2)] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_map_literal_cross_product(self, dag_maker, session): """Test a mapped task with literal cross product args expand properly.""" outputs = [] @@ -4752,6 +4844,7 @@ def show(a, b): ti.run() assert outputs == [(2, 5), (2, 10), (4, 5), (4, 10), (8, 5), (8, 10)] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_map_in_group(self, tmp_path: pathlib.Path, dag_maker, session): out = tmp_path.joinpath("out") out.touch() @@ -4819,6 +4912,7 @@ def _get_lazy_xcom_access_expected_sql_lines() -> list[str]: raise RuntimeError(f"unknown backend {backend!r}") +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_lazy_xcom_access_does_not_pickle_session(dag_maker, session): with dag_maker(session=session): EmptyOperator(task_id="t") @@ -4848,6 +4942,7 @@ def test_lazy_xcom_access_does_not_pickle_session(dag_maker, session): assert list(processed) == [123] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @mock.patch("airflow.models.taskinstance.XCom.deserialize_value", side_effect=XCom.deserialize_value) def test_ti_xcom_pull_on_mapped_operator_return_lazy_iterable(mock_deserialize_value, dag_maker, session): """Ensure we access XCom lazily when pulling from a mapped operator.""" @@ -4884,6 +4979,7 @@ def test_ti_xcom_pull_on_mapped_operator_return_lazy_iterable(mock_deserialize_v next(it) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_ti_mapped_depends_on_mapped_xcom_arg(dag_maker, session): with dag_maker(session=session) as dag: @@ -4909,6 +5005,7 @@ def add_one(x): assert [x.value for x in query.order_by(None).order_by(XCom.map_index)] == [3, 4, 5] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_upstream_return_none_should_skip(dag_maker, session): results = set() @@ -4965,6 +5062,7 @@ def get_extra_env(): assert "get_extra_env" in echo_task.upstream_task_ids +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_task_does_not_error_in_mini_scheduler_if_upstreams_are_not_done(dag_maker, caplog, session): """ This tests that when scheduling child tasks of a task and there's a mapped downstream task, @@ -5007,6 +5105,7 @@ def last_task(): assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session): """ This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler. @@ -5051,6 +5150,7 @@ def second_task(): assert "2 downstream tasks scheduled from follow-on schedule" in caplog.text +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, caplog, session): """Test that mini scheduler expands mapped task""" with dag_maker() as dag: @@ -5092,6 +5192,7 @@ def last_task(): assert "3 downstream tasks scheduled from follow-on schedule" in caplog.text +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mini_scheduler_not_skip_mapped_downstream_until_all_upstreams_finish(dag_maker, session): with dag_maker(session=session):