diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index d0c8bf98f4152..5a790d8597af2 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -22,6 +22,7 @@ import importlib.machinery import importlib.util import os +import signal import sys import textwrap import traceback @@ -358,6 +359,17 @@ def get_pools(dag) -> dict[str, set[str]]: def _load_modules_from_file(self, filepath, safe_mode): from airflow.sdk.definitions._internal.contextmanager import DagContext + def handler(signum, frame): + """Handle SIGSEGV signal and let the user know that the import failed.""" + msg = f"Received SIGSEGV signal while processing {filepath}." + self.log.error(msg) + self.import_errors[filepath] = msg + + try: + signal.signal(signal.SIGSEGV, handler) + except ValueError: + self.log.warning("SIGSEGV signal handler registration failed. Not in the main thread") + if not might_contain_dag(filepath, safe_mode): # Don't want to spam user with skip messages if not self.has_logged: diff --git a/airflow-core/tests/unit/models/test_dagbag.py b/airflow-core/tests/unit/models/test_dagbag.py index 44eb06229ee89..d3afcdb479555 100644 --- a/airflow-core/tests/unit/models/test_dagbag.py +++ b/airflow-core/tests/unit/models/test_dagbag.py @@ -959,3 +959,56 @@ def test_dag_warnings_invalid_pool(self, known_pools, expected): dagbag = DagBag(dag_folder="", include_examples=False, collect_dags=False, known_pools=known_pools) dagbag.bag_dag(dag) assert dagbag.dag_warnings == expected + + def test_sigsegv_handling(self, tmp_path, caplog): + """ + Test that a SIGSEGV in a DAG file is handled gracefully and does not crash the process. + """ + # Create a DAG file that will raise a SIGSEGV + dag_file = tmp_path / "bad_dag.py" + dag_file.write_text( + textwrap.dedent( + """\ + import signal + from airflow import DAG + import os + from airflow.decorators import task + + os.kill(os.getpid(), signal.SIGSEGV) + + with DAG('testbug'): + @task + def mytask(): + print(1) + mytask() + """ + ) + ) + + dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + assert "Received SIGSEGV signal while processing" in caplog.text + assert dag_file.as_posix() in dagbag.import_errors + + def test_failed_signal_registration_does_not_crash_the_process(self, tmp_path, caplog): + """Test that a ValueError raised by a signal setting on child process does not crash the main process. + This was raised in test_dag_report.py module in api_fastapi/core_api/routes/public tests + """ + dag_file = tmp_path / "test_dag.py" + dag_file.write_text( + textwrap.dedent( + """\ + from airflow import DAG + from airflow.decorators import task + + with DAG('testbug'): + @task + def mytask(): + print(1) + mytask() + """ + ) + ) + with mock.patch("airflow.models.dagbag.signal.signal") as mock_signal: + mock_signal.side_effect = ValueError("Invalid signal setting") + DagBag(dag_folder=os.fspath(tmp_path), include_examples=False) + assert "SIGSEGV signal handler registration failed. Not in the main thread" in caplog.text