Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_init_executors,
assert test_job.executor == mock_sequential_executor
assert test_job.executors == [mock_sequential_executor]

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_heartbeat(self, frozen_sleep, monkeypatch):
monkeypatch.setattr("airflow.jobs.job.sleep", frozen_sleep)
with create_session() as session:
Expand Down
25 changes: 22 additions & 3 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
of LocalTaskJob can be assigned with
proper values without intervention
"""
with dag_maker("test_localtaskjob_essential_attr"):
with dag_maker("test_localtaskjob_essential_attr", serialized=True):
op1 = EmptyOperator(task_id="op1")

dr = dag_maker.create_dagrun()
Expand All @@ -127,6 +127,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
check_result_2 = [getattr(job1, attr) is not None for attr in essential_attr]
assert all(check_result_2)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_localtaskjob_heartbeat(self, dag_maker):
session = settings.Session()
with dag_maker("test_localtaskjob_heartbeat"):
Expand Down Expand Up @@ -173,6 +174,7 @@ def test_localtaskjob_heartbeat(self, dag_maker):
assert not job1.task_runner.run_as_user
job_runner.heartbeat_callback()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@mock.patch("subprocess.check_call")
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker):
Expand Down Expand Up @@ -227,6 +229,7 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker
assert ti.pid != job1.task_runner.process.pid
job_runner.heartbeat_callback()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("core", "default_impersonation"): "testuser"})
@mock.patch("subprocess.check_call")
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
Expand Down Expand Up @@ -282,6 +285,7 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _,
assert ti.pid != job1.task_runner.process.pid
job_runner.heartbeat_callback()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
Expand Down Expand Up @@ -323,6 +327,7 @@ def test_heartbeat_failed_fast(self):
delta = (time2 - time1).total_seconds()
assert abs(delta - job.heartrate) < 0.8

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("core", "task_success_overtime"): "1"})
def test_mark_success_no_kill(self, caplog, get_test_dag, session):
"""
Expand Down Expand Up @@ -354,6 +359,7 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session):
"State of this instance has been externally set to success. Terminating instance." in caplog.text
)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_localtaskjob_double_trigger(self):
dag = self.dagbag.dags.get("test_localtaskjob_double_trigger")
task = dag.get_task("test_localtaskjob_double_trigger_task")
Expand Down Expand Up @@ -392,6 +398,7 @@ def test_localtaskjob_double_trigger(self):

session.close()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@patch.object(StandardTaskRunner, "return_code")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr", autospec=True)
def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
Expand Down Expand Up @@ -424,6 +431,7 @@ def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code,
]
)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@patch.object(StandardTaskRunner, "return_code")
def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):
dag, task = create_dummy_dag("test_localtaskjob_double_trigger")
Expand Down Expand Up @@ -456,6 +464,7 @@ def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create
assert time_end - time_start < job1.heartrate
assert "Task exited with return code 0" in caplog.text

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
"""
Test that ensures that mark_failure in the UI fails
Expand Down Expand Up @@ -488,6 +497,7 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
"State of this instance has been externally set to failed. Terminating instance."
) in caplog.text

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
"""
Test that ensures that if a running task is externally skipped (due to a dagrun timeout)
Expand Down Expand Up @@ -520,6 +530,7 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
assert ti.state == State.SKIPPED
assert "DagRun timed out after " in caplog.text

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, tmp_path, get_test_dag):
"""
Ensure failure callback of a task is run by the airflow run --raw process
Expand Down Expand Up @@ -555,6 +566,7 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t
assert m, "pid expected in output."
assert os.getpid() != int(m.group(1))

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("core", "task_success_overtime"): "5"})
def test_mark_success_on_success_callback(self, caplog, get_test_dag):
"""
Expand Down Expand Up @@ -586,6 +598,7 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag):
"State of this instance has been externally set to success. Terminating instance." in caplog.text
)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_success_listeners_executed(self, caplog, get_test_dag):
"""
Test that ensures that when listeners are executed, the task is not killed before they finish
Expand Down Expand Up @@ -623,6 +636,7 @@ def test_success_listeners_executed(self, caplog, get_test_dag):
)
lm.clear()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("core", "task_success_overtime"): "3"})
def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
"""
Expand Down Expand Up @@ -659,6 +673,7 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
)
lm.clear()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("core", "task_success_overtime"): "3"})
def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, caplog, get_test_dag):
"""
Expand Down Expand Up @@ -698,6 +713,7 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl
)
lm.clear()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize("signal_type", [signal.SIGTERM, signal.SIGKILL])
def test_process_os_signal_calls_on_failure_callback(
self, monkeypatch, tmp_path, get_test_dag, signal_type
Expand Down Expand Up @@ -792,6 +808,7 @@ def send_signal(ti, signal_sent, sig):
lines = f.readlines()
assert len(lines) == 0

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize(
"conf, init_state, first_run_state, second_run_state, task_ids_to_run, error_message",
[
Expand Down Expand Up @@ -876,6 +893,7 @@ def test_fast_follow(
if scheduler_job_runner.processor_agent:
scheduler_job_runner.processor_agent.end()

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("scheduler", "schedule_after_task_execution"): "True"})
def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag):
dag = get_test_dag("test_dagrun_fast_follow")
Expand Down Expand Up @@ -944,7 +962,7 @@ def task_function(ti):

os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV)

with dag_maker(dag_id="test_segmentation_fault"):
with dag_maker(dag_id="test_segmentation_fault", serialized=True):
task = PythonOperator(
task_id="test_sigsegv",
python_callable=task_function,
Expand Down Expand Up @@ -975,7 +993,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
mock_get_task_runner.return_value.return_code.side_effects = [[0], codes]

unique_prefix = str(uuid.uuid4())
with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries"):
with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries", serialized=True):
task = EmptyOperator(task_id="test_state_succeeded1")

dr = dag_maker.create_dagrun(run_id=unique_prefix, state=State.NONE)
Expand All @@ -992,6 +1010,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
class TestSigtermOnRunner:
"""Test receive SIGTERM on Task Runner."""

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize(
"daemon", [pytest.param(True, id="daemon"), pytest.param(False, id="non-daemon")]
)
Expand Down
12 changes: 12 additions & 0 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def create_trigger_in_db(session, trigger, operator=None):
return dag_model, run, trigger_orm, task_instance


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_logging_sensitive_info(session, caplog):
"""
Checks that when a trigger fires, it doesn't log any sensitive
Expand Down Expand Up @@ -176,6 +177,7 @@ def test_is_alive():
assert not triggerer_job.is_alive(), "Completed jobs even with recent heartbeat should not be alive"


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_is_needed(session):
"""Checks the triggerer-is-needed logic"""
# No triggers, no need
Expand Down Expand Up @@ -219,6 +221,7 @@ def test_capacity_decode():
TriggererJobRunner(job=job, capacity=input_str)


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_lifecycle(session):
"""
Checks that the triggerer will correctly see a new Trigger in the database
Expand Down Expand Up @@ -309,6 +312,7 @@ def test_update_trigger_with_triggerer_argument_change(
assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.asyncio
async def test_trigger_create_race_condition_38599(session, tmp_path):
"""
Expand Down Expand Up @@ -389,6 +393,7 @@ async def test_trigger_create_race_condition_38599(session, tmp_path):
assert path.read_text() == "hi\n"


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue #18392.
Expand Down Expand Up @@ -499,6 +504,7 @@ def handle_events(self):
assert len(instances) == 1


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_from_dead_triggerer(session, create_task_instance):
"""
Checks that the triggerer will correctly claim a Trigger that is assigned to a
Expand Down Expand Up @@ -526,6 +532,7 @@ def test_trigger_from_dead_triggerer(session, create_task_instance):
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_from_expired_triggerer(session, create_task_instance):
"""
Checks that the triggerer will correctly claim a Trigger that is assigned to a
Expand Down Expand Up @@ -560,6 +567,7 @@ def test_trigger_from_expired_triggerer(session, create_task_instance):
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_runner_exception_stops_triggerer(session):
"""
Checks that if an exception occurs when creating triggers, that the triggerer
Expand Down Expand Up @@ -603,6 +611,7 @@ async def create_triggers(self):
thread.join()


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_firing(session):
"""
Checks that when a trigger fires, it correctly makes it into the
Expand Down Expand Up @@ -633,6 +642,7 @@ def test_trigger_firing(session):
job_runner.trigger_runner.join(30)


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_failing(session):
"""
Checks that when a trigger fails, it correctly makes it into the
Expand Down Expand Up @@ -667,6 +677,7 @@ def test_trigger_failing(session):
job_runner.trigger_runner.join(30)


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_trigger_cleanup(session):
"""
Checks that the triggerer will correctly clean up triggers that do not
Expand All @@ -686,6 +697,7 @@ def test_trigger_cleanup(session):
assert session.query(Trigger).count() == 0


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_invalid_trigger(session, dag_maker):
"""
Checks that the triggerer will correctly fail task instances that depend on
Expand Down
2 changes: 2 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ def get_states(dr):
return dict(ti_dict)


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
def test_teardown_and_fail_stop(dag_maker):
"""
Expand Down Expand Up @@ -1082,6 +1083,7 @@ def my_teardown():
assert states == expected


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
def test_get_task_instances(session):
import pendulum
Expand Down
6 changes: 6 additions & 0 deletions tests/models/test_baseoperatormeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def setup_method(self):
def teardown_method(self, method):
ExecutorSafeguard.test_mode = conf.getboolean("core", "unit_test_mode")

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
def test_executor_when_classic_operator_called_from_dag(self, dag_maker):
with dag_maker() as dag:
Expand All @@ -55,6 +56,7 @@ def test_executor_when_classic_operator_called_from_dag(self, dag_maker):
dag_run = dag.test()
assert dag_run.state == DagRunState.SUCCESS

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize(
"state, exception, retries",
[
Expand Down Expand Up @@ -101,6 +103,7 @@ def _raise_if_exception():
assert ti.next_kwargs is None
assert ti.state == state

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
def test_executor_when_classic_operator_called_from_decorated_task_with_allow_nested_operators_false(
self, dag_maker
Expand All @@ -117,6 +120,7 @@ def say_hello(**context):
dag_run = dag.test()
assert dag_run.state == DagRunState.FAILED

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
@patch.object(HelloWorldOperator, "log")
def test_executor_when_classic_operator_called_from_decorated_task_without_allow_nested_operators(
Expand All @@ -139,6 +143,7 @@ def say_hello(**context):
"HelloWorldOperator.execute cannot be called outside TaskInstance!"
)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
def test_executor_when_classic_operator_called_from_python_operator_with_allow_nested_operators_false(
self,
Expand All @@ -159,6 +164,7 @@ def say_hello(**context):
dag_run = dag.test()
assert dag_run.state == DagRunState.FAILED

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.db_test
@patch.object(HelloWorldOperator, "log")
def test_executor_when_classic_operator_called_from_python_operator_without_allow_nested_operators(
Expand Down
Loading