From 9608f71004ee3c538c2bedf4f716e65c13a22a7d Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 9 Aug 2024 23:45:57 +0200 Subject: [PATCH 1/3] Skip core tests from start to SkipMixin for Database Isolation Mode --- tests/jobs/test_base_job.py | 1 + tests/jobs/test_local_task_job.py | 25 ++++++++++++++++++++++--- tests/jobs/test_triggerer_job.py | 12 ++++++++++++ tests/models/test_baseoperator.py | 5 +++-- tests/models/test_baseoperatormeta.py | 6 ++++++ tests/models/test_dagbag.py | 11 +++++++++++ tests/models/test_mappedoperator.py | 25 +++++++++++++++++++++++++ tests/models/test_param.py | 3 +++ tests/models/test_renderedtifields.py | 8 +++++++- tests/models/test_serialized_dag.py | 9 +++++++++ tests/models/test_skipmixin.py | 9 ++++++--- 11 files changed, 105 insertions(+), 9 deletions(-) diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index e956d12889ca1..e9c9fe94ce737 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -267,6 +267,7 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_init_executors, assert test_job.executor == mock_sequential_executor assert test_job.executors == [mock_sequential_executor] + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_heartbeat(self, frozen_sleep, monkeypatch): monkeypatch.setattr("airflow.jobs.job.sleep", frozen_sleep) with create_session() as session: diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index aefb77997e517..a4e7c4f387752 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -109,7 +109,7 @@ def test_localtaskjob_essential_attr(self, dag_maker): of LocalTaskJob can be assigned with proper values without intervention """ - with dag_maker("test_localtaskjob_essential_attr"): + with dag_maker("test_localtaskjob_essential_attr", serialized=True): op1 = EmptyOperator(task_id="op1") dr = dag_maker.create_dagrun() @@ -127,6 +127,7 @@ def test_localtaskjob_essential_attr(self, dag_maker): check_result_2 = [getattr(job1, attr) is not None for attr in essential_attr] assert all(check_result_2) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_localtaskjob_heartbeat(self, dag_maker): session = settings.Session() with dag_maker("test_localtaskjob_heartbeat"): @@ -173,6 +174,7 @@ def test_localtaskjob_heartbeat(self, dag_maker): assert not job1.task_runner.run_as_user job_runner.heartbeat_callback() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @mock.patch("subprocess.check_call") @mock.patch("airflow.jobs.local_task_job_runner.psutil") def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker): @@ -227,6 +229,7 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker assert ti.pid != job1.task_runner.process.pid job_runner.heartbeat_callback() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "default_impersonation"): "testuser"}) @mock.patch("subprocess.check_call") @mock.patch("airflow.jobs.local_task_job_runner.psutil") @@ -282,6 +285,7 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _, assert ti.pid != job1.task_runner.process.pid job_runner.heartbeat_callback() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_heartbeat_failed_fast(self): """ Test that task heartbeat will sleep when it fails fast @@ -323,6 +327,7 @@ def test_heartbeat_failed_fast(self): delta = (time2 - time1).total_seconds() assert abs(delta - job.heartrate) < 0.8 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "task_success_overtime"): "1"}) def test_mark_success_no_kill(self, caplog, get_test_dag, session): """ @@ -354,6 +359,7 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session): "State of this instance has been externally set to success. Terminating instance." in caplog.text ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_localtaskjob_double_trigger(self): dag = self.dagbag.dags.get("test_localtaskjob_double_trigger") task = dag.get_task("test_localtaskjob_double_trigger_task") @@ -392,6 +398,7 @@ def test_localtaskjob_double_trigger(self): session.close() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch.object(StandardTaskRunner, "return_code") @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr", autospec=True) def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag): @@ -424,6 +431,7 @@ def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, ] ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch.object(StandardTaskRunner, "return_code") def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag): dag, task = create_dummy_dag("test_localtaskjob_double_trigger") @@ -456,6 +464,7 @@ def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create assert time_end - time_start < job1.heartrate assert "Task exited with return code 0" in caplog.text + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mark_failure_on_failure_callback(self, caplog, get_test_dag): """ Test that ensures that mark_failure in the UI fails @@ -488,6 +497,7 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag): "State of this instance has been externally set to failed. Terminating instance." ) in caplog.text + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag): """ Test that ensures that if a running task is externally skipped (due to a dagrun timeout) @@ -520,6 +530,7 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag): assert ti.state == State.SKIPPED assert "DagRun timed out after " in caplog.text + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, tmp_path, get_test_dag): """ Ensure failure callback of a task is run by the airflow run --raw process @@ -555,6 +566,7 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t assert m, "pid expected in output." assert os.getpid() != int(m.group(1)) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "task_success_overtime"): "5"}) def test_mark_success_on_success_callback(self, caplog, get_test_dag): """ @@ -586,6 +598,7 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag): "State of this instance has been externally set to success. Terminating instance." in caplog.text ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_success_listeners_executed(self, caplog, get_test_dag): """ Test that ensures that when listeners are executed, the task is not killed before they finish @@ -623,6 +636,7 @@ def test_success_listeners_executed(self, caplog, get_test_dag): ) lm.clear() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "task_success_overtime"): "3"}) def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag): """ @@ -659,6 +673,7 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag): ) lm.clear() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("core", "task_success_overtime"): "3"}) def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, caplog, get_test_dag): """ @@ -698,6 +713,7 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl ) lm.clear() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("signal_type", [signal.SIGTERM, signal.SIGKILL]) def test_process_os_signal_calls_on_failure_callback( self, monkeypatch, tmp_path, get_test_dag, signal_type @@ -792,6 +808,7 @@ def send_signal(ti, signal_sent, sig): lines = f.readlines() assert len(lines) == 0 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "conf, init_state, first_run_state, second_run_state, task_ids_to_run, error_message", [ @@ -876,6 +893,7 @@ def test_fast_follow( if scheduler_job_runner.processor_agent: scheduler_job_runner.processor_agent.end() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @conf_vars({("scheduler", "schedule_after_task_execution"): "True"}) def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag): dag = get_test_dag("test_dagrun_fast_follow") @@ -944,7 +962,7 @@ def task_function(ti): os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV) - with dag_maker(dag_id="test_segmentation_fault"): + with dag_maker(dag_id="test_segmentation_fault", serialized=True): task = PythonOperator( task_id="test_sigsegv", python_callable=task_function, @@ -975,7 +993,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker): mock_get_task_runner.return_value.return_code.side_effects = [[0], codes] unique_prefix = str(uuid.uuid4()) - with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries"): + with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries", serialized=True): task = EmptyOperator(task_id="test_state_succeeded1") dr = dag_maker.create_dagrun(run_id=unique_prefix, state=State.NONE) @@ -992,6 +1010,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker): class TestSigtermOnRunner: """Test receive SIGTERM on Task Runner.""" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "daemon", [pytest.param(True, id="daemon"), pytest.param(False, id="non-daemon")] ) diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 10d4196ac97e3..28fc00694b400 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -113,6 +113,7 @@ def create_trigger_in_db(session, trigger, operator=None): return dag_model, run, trigger_orm, task_instance +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_logging_sensitive_info(session, caplog): """ Checks that when a trigger fires, it doesn't log any sensitive @@ -176,6 +177,7 @@ def test_is_alive(): assert not triggerer_job.is_alive(), "Completed jobs even with recent heartbeat should not be alive" +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_is_needed(session): """Checks the triggerer-is-needed logic""" # No triggers, no need @@ -219,6 +221,7 @@ def test_capacity_decode(): TriggererJobRunner(job=job, capacity=input_str) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_lifecycle(session): """ Checks that the triggerer will correctly see a new Trigger in the database @@ -309,6 +312,7 @@ def test_update_trigger_with_triggerer_argument_change( assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.asyncio async def test_trigger_create_race_condition_38599(session, tmp_path): """ @@ -389,6 +393,7 @@ async def test_trigger_create_race_condition_38599(session, tmp_path): assert path.read_text() == "hi\n" +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_create_race_condition_18392(session, tmp_path): """ This verifies the resolution of race condition documented in github issue #18392. @@ -499,6 +504,7 @@ def handle_events(self): assert len(instances) == 1 +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_from_dead_triggerer(session, create_task_instance): """ Checks that the triggerer will correctly claim a Trigger that is assigned to a @@ -526,6 +532,7 @@ def test_trigger_from_dead_triggerer(session, create_task_instance): assert [x for x, y in job_runner.trigger_runner.to_create] == [1] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_from_expired_triggerer(session, create_task_instance): """ Checks that the triggerer will correctly claim a Trigger that is assigned to a @@ -560,6 +567,7 @@ def test_trigger_from_expired_triggerer(session, create_task_instance): assert [x for x, y in job_runner.trigger_runner.to_create] == [1] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_runner_exception_stops_triggerer(session): """ Checks that if an exception occurs when creating triggers, that the triggerer @@ -603,6 +611,7 @@ async def create_triggers(self): thread.join() +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_firing(session): """ Checks that when a trigger fires, it correctly makes it into the @@ -633,6 +642,7 @@ def test_trigger_firing(session): job_runner.trigger_runner.join(30) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_failing(session): """ Checks that when a trigger fails, it correctly makes it into the @@ -667,6 +677,7 @@ def test_trigger_failing(session): job_runner.trigger_runner.join(30) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_trigger_cleanup(session): """ Checks that the triggerer will correctly clean up triggers that do not @@ -686,6 +697,7 @@ def test_trigger_cleanup(session): assert session.query(Trigger).count() == 0 +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_invalid_trigger(session, dag_maker): """ Checks that the triggerer will correctly fail task instances that depend on diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index b94a1b9f819d6..8f94ba4e3eb94 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -1037,14 +1037,14 @@ def get_states(dr): return dict(ti_dict) -@pytest.mark.db_test +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_teardown_and_fail_stop(dag_maker): """ when fail_stop enabled, teardowns should run according to their setups. in this case, the second teardown skips because its setup skips. """ - with dag_maker(fail_stop=True) as dag: + with dag_maker(fail_stop=True, serialized=True) as dag: for num in (1, 2): with TaskGroup(f"tg_{num}"): @@ -1082,6 +1082,7 @@ def my_teardown(): assert states == expected +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_get_task_instances(session): import pendulum diff --git a/tests/models/test_baseoperatormeta.py b/tests/models/test_baseoperatormeta.py index 7c719189aadd9..6c6567b23899e 100644 --- a/tests/models/test_baseoperatormeta.py +++ b/tests/models/test_baseoperatormeta.py @@ -47,6 +47,7 @@ def setup_method(self): def teardown_method(self, method): ExecutorSafeguard.test_mode = conf.getboolean("core", "unit_test_mode") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_executor_when_classic_operator_called_from_dag(self, dag_maker): with dag_maker() as dag: @@ -55,6 +56,7 @@ def test_executor_when_classic_operator_called_from_dag(self, dag_maker): dag_run = dag.test() assert dag_run.state == DagRunState.SUCCESS + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "state, exception, retries", [ @@ -101,6 +103,7 @@ def _raise_if_exception(): assert ti.next_kwargs is None assert ti.state == state + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_executor_when_classic_operator_called_from_decorated_task_with_allow_nested_operators_false( self, dag_maker @@ -117,6 +120,7 @@ def say_hello(**context): dag_run = dag.test() assert dag_run.state == DagRunState.FAILED + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test @patch.object(HelloWorldOperator, "log") def test_executor_when_classic_operator_called_from_decorated_task_without_allow_nested_operators( @@ -139,6 +143,7 @@ def say_hello(**context): "HelloWorldOperator.execute cannot be called outside TaskInstance!" ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_executor_when_classic_operator_called_from_python_operator_with_allow_nested_operators_false( self, @@ -159,6 +164,7 @@ def say_hello(**context): dag_run = dag.test() assert dag_run.state == DagRunState.FAILED + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test @patch.object(HelloWorldOperator, "log") def test_executor_when_classic_operator_called_from_python_operator_without_allow_nested_operators( diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 836ede04df4fe..936852dd082e4 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -94,6 +94,7 @@ def test_get_non_existing_dag(self, tmp_path): non_existing_dag_id = "non_existing_dag_id" assert dagbag.get_dag(non_existing_dag_id) is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_serialized_dag_not_existing_doesnt_raise(self, tmp_path): """ test that retrieving a non existing dag id returns None without crashing @@ -459,6 +460,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): assert dag_id == dag.dag_id assert 2 == dagbag.process_file_calls + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_dag_removed_if_serialized_dag_is_removed(self, dag_maker, tmp_path): """ Test that if a DAG does not exist in serialized_dag table (as the DAG file was removed), @@ -789,6 +791,7 @@ def test_process_file_with_none(self, tmp_path): assert [] == dagbag.process_file(None) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_deactivate_unknown_dags(self): """ Test that dag_ids not passed into deactivate_unknown_dags @@ -812,6 +815,7 @@ def test_deactivate_unknown_dags(self): with create_session() as session: session.query(DagModel).filter(DagModel.dag_id == "test_deactivate_unknown_dags").delete() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_serialized_dags_are_written_to_db_on_sync(self): """ Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB @@ -832,6 +836,7 @@ def test_serialized_dags_are_written_to_db_on_sync(self): new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() assert new_serialized_dags_count == 1 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.serialized_dag.SerializedDagModel.write_dag") def test_serialized_dag_errors_are_import_errors(self, mock_serialize, caplog): """ @@ -899,6 +904,7 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, mock_s10n_write_dag, ] ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) @patch("airflow.models.dagbag.DagBag._sync_perm_for_dag") def test_sync_to_db_syncs_dag_specific_perms_on_update(self, mock_sync_perm_for_dag): @@ -932,6 +938,7 @@ def _sync_to_db(): _sync_to_db() mock_sync_perm_for_dag.assert_called_once_with(dag, session=session) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.www.security_appless.ApplessAirflowSecurityManager") def test_sync_perm_for_dag(self, mock_security_manager): """ @@ -968,6 +975,7 @@ def _sync_perms(): "test_example_bash_operator", {"Public": {"DAGs": {"can_read"}}} ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.www.security_appless.ApplessAirflowSecurityManager") def test_sync_perm_for_dag_with_dict_access_control(self, mock_security_manager): """ @@ -1004,6 +1012,7 @@ def _sync_perms(): "test_example_bash_operator", {"Public": {"DAGs": {"can_read"}, "DAG Runs": {"can_create"}}} ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) def test_get_dag_with_dag_serialization(self): @@ -1043,6 +1052,7 @@ def test_get_dag_with_dag_serialization(self): assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"} assert updated_ser_dag_1_update_time > ser_dag_1_update_time + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) def test_get_dag_refresh_race_condition(self): @@ -1091,6 +1101,7 @@ def test_get_dag_refresh_race_condition(self): assert set(updated_ser_dag.tags) == {"example", "example2", "new_tag"} assert updated_ser_dag_update_time > ser_dag_update_time + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_collect_dags_from_db(self): """DAGs are collected from Database""" db.clear_db_dags() diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py index 9f31652424aeb..2ee597879064c 100644 --- a/tests/models/test_mappedoperator.py +++ b/tests/models/test_mappedoperator.py @@ -70,6 +70,7 @@ def test_task_mapping_with_dag(): assert mapped.downstream_list == [finish] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.models.abstractoperator.AbstractOperator.render_template") def test_task_mapping_with_dag_and_list_of_pandas_dataframe(mock_render_template, caplog): class UnrenderableClass: @@ -159,6 +160,7 @@ def test_map_xcom_arg(): assert task1.downstream_list == [mapped] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_map_xcom_arg_multiple_upstream_xcoms(dag_maker, session): """Test that the correct number of downstream tasks are generated when mapping with an XComArg""" @@ -218,6 +220,7 @@ def test_partial_on_class_invalid_ctor_args() -> None: MockOperator.partial(task_id="a", foo="bar", bar=2) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( ["num_existing_tis", "expected"], ( @@ -285,6 +288,7 @@ def test_expand_mapped_task_instance(dag_maker, session, num_existing_tis, expec assert indices == expected +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_expand_mapped_task_failed_state_in_db(dag_maker, session): """ This test tries to recreate a faulty state in the database and checks if we can recover from it. @@ -336,6 +340,7 @@ def test_expand_mapped_task_failed_state_in_db(dag_maker, session): assert indices == [(0, "success"), (1, "success")] +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session): with dag_maker(session=session): task1 = BaseOperator(task_id="op1") @@ -401,6 +406,7 @@ def test_mapped_expand_against_params(dag_maker, dag_params, task_params, expect assert t.expand_input.value == {"params": [{"c": "x"}, {"d": 1}]} +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_render_template_fields_validating_operator(dag_maker, session, tmp_path): file_template_dir = tmp_path / "path" / "to" file_template_dir.mkdir(parents=True, exist_ok=True) @@ -466,6 +472,7 @@ def execute(self, context): assert mapped_ti.task.file_template == "loaded data", "Should be templated!" +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_expand_kwargs_render_template_fields_validating_operator(dag_maker, session, tmp_path): file_template_dir = tmp_path / "path" / "to" file_template_dir.mkdir(parents=True, exist_ok=True) @@ -515,6 +522,7 @@ def execute(self, context): assert mapped_ti.task.file_template == "loaded data", "Should be templated!" +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_render_nested_template_fields(dag_maker, session): with dag_maker(session=session): MockOperatorWithNestedFields.partial( @@ -539,6 +547,7 @@ def test_mapped_render_nested_template_fields(dag_maker, session): assert ti.task.arg2.field_2 == "value_2" +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( ["num_existing_tis", "expected"], ( @@ -658,6 +667,7 @@ def task1(map_name): return task1.expand(map_name=map_names) +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "template, expected_rendered_names", [ @@ -706,6 +716,7 @@ def test_expand_mapped_task_instance_with_named_index( assert indices == expected_rendered_names +@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "map_index, expected", [ @@ -872,6 +883,7 @@ def inner(*args, **kwargs): else: return PythonOperator(**kwargs) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_one_to_many_work_failed(self, type_, dag_maker): """ @@ -922,6 +934,7 @@ def my_work(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_many_one_explicit_odd_setup_mapped_setups_fail(self, type_, dag_maker): """ @@ -1008,6 +1021,7 @@ def my_work(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_many_one_explicit_odd_setup_all_setups_fail(self, type_, dag_maker): """ @@ -1105,6 +1119,7 @@ def my_work(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_many_one_explicit_odd_setup_one_mapped_fails(self, type_, dag_maker): """ @@ -1217,6 +1232,7 @@ def my_teardown_callable(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_one_to_many_as_teardown(self, type_, dag_maker): """ @@ -1272,6 +1288,7 @@ def my_work(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_one_to_many_as_teardown_on_failure_fail_dagrun(self, type_, dag_maker): """ @@ -1336,6 +1353,7 @@ def my_teardown_callable(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_mapped_task_group_simple(self, type_, dag_maker, session): """ @@ -1410,6 +1428,7 @@ def file_transforms(filename): assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_mapped_task_group_work_fail_or_skip(self, type_, dag_maker): """ @@ -1481,6 +1500,7 @@ def file_transforms(filename): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("type_", ["taskflow", "classic"]) def test_teardown_many_one_explicit(self, type_, dag_maker): """-- passing @@ -1541,6 +1561,7 @@ def my_work(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_one_to_many_with_teardown_and_fail_stop(self, dag_maker): """ With fail_stop enabled, the teardown for an already-completed setup @@ -1577,6 +1598,7 @@ def my_teardown(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_one_to_many_with_teardown_and_fail_stop_more_tasks(self, dag_maker): """ when fail_stop enabled, teardowns should run according to their setups. @@ -1619,6 +1641,7 @@ def my_teardown(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_one_to_many_with_teardown_and_fail_stop_more_tasks_mapped_setup(self, dag_maker): """ when fail_stop enabled, teardowns should run according to their setups. @@ -1668,6 +1691,7 @@ def my_teardown(val): } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_skip_one_mapped_task_from_task_group_with_generator(self, dag_maker): with dag_maker() as dag: @@ -1699,6 +1723,7 @@ def group(n: int) -> None: } assert states == expected + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_skip_one_mapped_task_from_task_group(self, dag_maker): with dag_maker() as dag: diff --git a/tests/models/test_param.py b/tests/models/test_param.py index 89421f0dc2620..18d4c190ad28a 100644 --- a/tests/models/test_param.py +++ b/tests/models/test_param.py @@ -323,6 +323,7 @@ def setup_class(self): def teardown_method(self): self.clean_db() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_dag_param_resolves(self, dag_maker): """Test dagparam resolves on operator execution""" @@ -345,6 +346,7 @@ def return_num(num): ti = dr.get_task_instances()[0] assert ti.xcom_pull() == self.VALUE + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_dag_param_overwrite(self, dag_maker): """Test dag param is overwritten from dagrun config""" @@ -370,6 +372,7 @@ def return_num(num): ti = dr.get_task_instances()[0] assert ti.xcom_pull() == new_value + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.db_test def test_dag_param_default(self, dag_maker): """Test dag param is retrieved from default config""" diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 3e631a0ba9d49..b8c45193814aa 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -90,6 +90,7 @@ def setup_method(self): def teardown_method(self): self.clean_db() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "templated_field, expected_rendered_field", [ @@ -169,6 +170,7 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da # Fetching them will return None assert RTIF.get_templated_fields(ti=ti2) is None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.enable_redact def test_secrets_are_masked_when_large_string(self, dag_maker): """ @@ -186,6 +188,7 @@ def test_secrets_are_masked_when_large_string(self, dag_maker): rtif = RTIF(ti=ti) assert "***" in rtif.rendered_fields.get("bash_command") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @mock.patch("airflow.models.BaseOperator.render_template") def test_pandas_dataframes_works_with_the_string_compare(self, render_mock, dag_maker): """Test that rendered dataframe gets passed through the serialized template fields.""" @@ -209,6 +212,7 @@ def consume_pd(data): rtif = RTIF(ti=ti2) rtif.write() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "rtif_num, num_to_keep, remaining_rtifs, expected_query_count", [ @@ -254,6 +258,7 @@ def test_delete_old_records( result = session.query(RTIF).filter(RTIF.dag_id == dag.dag_id, RTIF.task_id == task.task_id).all() assert remaining_rtifs == len(result) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize( "num_runs, num_to_keep, remaining_rtifs, expected_query_count", [ @@ -297,6 +302,7 @@ def test_delete_old_records_mapped( # Check that we have _all_ the data for each row assert len(result) == remaining_rtifs * 2 + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_write(self, dag_maker): """ Test records can be written and overwritten @@ -357,7 +363,7 @@ def test_write(self, dag_maker): @mock.patch.dict(os.environ, {"AIRFLOW_VAR_API_KEY": "secret"}) @mock.patch("airflow.utils.log.secrets_masker.redact", autospec=True) def test_redact(self, redact, dag_maker): - with dag_maker("test_ritf_redact"): + with dag_maker("test_ritf_redact", serialized=True): task = BashOperator( task_id="test", bash_command="echo {{ var.value.api_key }}", diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 848d26119506e..531ffb031925e 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -74,6 +74,7 @@ def _write_example_dags(self): SDM.write_dag(dag) return example_dags + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_write_dag(self): """DAGs can be written into database""" example_dags = self._write_example_dags() @@ -87,6 +88,7 @@ def test_write_dag(self): # Verifies JSON schema. SerializedDAG.validate_schema(result.data) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_serialized_dag_is_updated_if_dag_is_changed(self): """Test Serialized DAG is updated if DAG is changed""" example_dags = make_example_dags(example_dags_module) @@ -118,6 +120,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self): assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"] assert dag_updated is True + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_serialized_dag_is_updated_if_processor_subdir_changed(self): """Test Serialized DAG is updated if processor_subdir is changed""" example_dags = make_example_dags(example_dags_module) @@ -145,6 +148,7 @@ def test_serialized_dag_is_updated_if_processor_subdir_changed(self): assert s_dag.processor_subdir != s_dag_2.processor_subdir assert dag_updated is True + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_read_dags(self): """DAGs can be read from database.""" example_dags = self._write_example_dags() @@ -156,6 +160,7 @@ def test_read_dags(self): assert serialized_dag.dag_id == dag.dag_id assert set(serialized_dag.task_dict) == set(dag.task_dict) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_remove_dags_by_id(self): """DAGs can be removed from database.""" example_dags_list = list(self._write_example_dags().values()) @@ -167,6 +172,7 @@ def test_remove_dags_by_id(self): SDM.remove_dag(dag_removed_by_id.dag_id) assert not SDM.has_dag(dag_removed_by_id.dag_id) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_remove_dags_by_filepath(self): """DAGs can be removed from database.""" example_dags_list = list(self._write_example_dags().values()) @@ -181,6 +187,7 @@ def test_remove_dags_by_filepath(self): SDM.remove_deleted_dags(example_dag_files, processor_subdir="/tmp/test") assert not SDM.has_dag(dag_removed_by_file.dag_id) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_bulk_sync_to_db(self): dags = [ DAG("dag_1"), @@ -190,6 +197,7 @@ def test_bulk_sync_to_db(self): with assert_queries_count(10): SDM.bulk_sync_to_db(dags) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @pytest.mark.parametrize("dag_dependencies_fields", [{"dag_dependencies": None}, {}]) def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): """Test a pre-2.1.0 serialized DAG can deserialize DAG dependencies.""" @@ -206,6 +214,7 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_order_of_dag_params_is_stable(self): """ This asserts that we have logic in place which guarantees the order diff --git a/tests/models/test_skipmixin.py b/tests/models/test_skipmixin.py index 465d15130f4de..62d8c4d059baf 100644 --- a/tests/models/test_skipmixin.py +++ b/tests/models/test_skipmixin.py @@ -53,6 +53,7 @@ def setup_method(self): def teardown_method(self): self.clean_db() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.utils.timezone.utcnow") def test_skip(self, mock_now, dag_maker): session = settings.Session() @@ -75,14 +76,13 @@ def test_skip(self, mock_now, dag_maker): TI.end_date == now, ).one() + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @patch("airflow.utils.timezone.utcnow") def test_skip_none_dagrun(self, mock_now, dag_maker): - session = settings.Session() now = datetime.datetime.now(tz=pendulum.timezone("UTC")) mock_now.return_value = now with dag_maker( "dag", - session=session, ): tasks = [EmptyOperator(task_id="task")] dag_maker.create_dagrun(execution_date=now) @@ -93,6 +93,7 @@ def test_skip_none_dagrun(self, mock_now, dag_maker): ): SkipMixin().skip(dag_run=None, execution_date=now, tasks=tasks) + session = dag_maker.session session.query(TI).filter( TI.dag_id == "dag", TI.task_id == "task", @@ -121,6 +122,7 @@ def test_skip_none_tasks(self): def test_skip_all_except(self, dag_maker, branch_task_ids, expected_states): with dag_maker( "dag_test_skip_all_except", + serialized=True, ): task1 = EmptyOperator(task_id="task1") task2 = EmptyOperator(task_id="task2") @@ -143,6 +145,7 @@ def get_state(ti): assert executed_states == expected_states + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_mapped_tasks_skip_all_except(self, dag_maker): with dag_maker("dag_test_skip_all_except") as dag: @@ -209,7 +212,7 @@ def test_raise_exception_on_not_accepted_iterable_branch_task_ids_type(self, dag ], ) def test_raise_exception_on_not_valid_branch_task_ids(self, dag_maker, branch_task_ids): - with dag_maker("dag_test_skip_all_except_wrong_type"): + with dag_maker("dag_test_skip_all_except_wrong_type", serialized=True): task1 = EmptyOperator(task_id="task1") task2 = EmptyOperator(task_id="task2") task3 = EmptyOperator(task_id="task3") From 3bb98bddb1c774994a4449415dd95e7ded39de95 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 10 Aug 2024 00:05:32 +0200 Subject: [PATCH 2/3] Skip core tests from start to SkipMixin for Database Isolation Mode, uups --- tests/models/test_baseoperator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 8f94ba4e3eb94..c14206902b305 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -1044,7 +1044,7 @@ def test_teardown_and_fail_stop(dag_maker): in this case, the second teardown skips because its setup skips. """ - with dag_maker(fail_stop=True, serialized=True) as dag: + with dag_maker(fail_stop=True) as dag: for num in (1, 2): with TaskGroup(f"tg_{num}"): From 32c94c9619cfb4be630a517294e432f5aedf430c Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 10 Aug 2024 12:33:37 +0200 Subject: [PATCH 3/3] Skip core tests from start to SkipMixin for Database Isolation Mode, uups --- tests/models/test_baseoperator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index c14206902b305..89b268af1f8b1 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -1038,6 +1038,7 @@ def get_states(dr): @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode +@pytest.mark.db_test def test_teardown_and_fail_stop(dag_maker): """ when fail_stop enabled, teardowns should run according to their setups.