From 12a4fb882d024dc054079b972ea37d57082cce92 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 26 Feb 2026 16:07:01 +0800 Subject: [PATCH] fix(python): avoid shutdown panic when checking signals --- python/python/tests/test_blob.py | 40 ++++++++++++++++++++++++++ python/src/executor.rs | 49 ++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 6b31b665ea3..b8704d4795a 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -2,7 +2,10 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import io +import subprocess +import sys import tarfile +import textwrap import lance import pyarrow as pa @@ -143,6 +146,43 @@ def test_blob_files(dataset_with_blobs): assert f.read() == expected +def test_blob_files_close_no_shutdown_panic(tmp_path): + script = textwrap.dedent( + f""" + import pyarrow as pa + import lance + + table = pa.table( + [pa.array([b"foo", b"bar"], pa.large_binary())], + schema=pa.schema( + [ + pa.field( + "blob", + pa.large_binary(), + metadata={{"lance-encoding:blob": "true"}}, + ) + ] + ), + ) + ds = lance.write_dataset(table, {str(tmp_path / "ds")!r}) + row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist() + blobs = ds.take_blobs("blob", ids=row_ids) + for blob in blobs: + blob.close() + print("done") + """ + ) + result = subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + text=True, + check=False, + ) + assert result.returncode == 0, result.stderr + assert "interpreter_lifecycle.rs" not in result.stderr + assert "The Python interpreter is not initialized" not in result.stderr + + def test_blob_files_by_address(dataset_with_blobs): addresses = ( dataset_with_blobs.to_table(columns=[], with_row_address=True) diff --git a/python/src/executor.rs b/python/src/executor.rs index 23b08d07f80..7e7646ea854 100644 --- a/python/src/executor.rs +++ b/python/src/executor.rs @@ -58,8 +58,15 @@ impl BackgroundExecutor { if let Some(py) = py { py.detach(|| self.spawn_impl(task)) } else { - // Python::with_gil is a no-op if the GIL is already held by the thread. - Python::attach(|py| py.detach(|| self.spawn_impl(task))) + let mut task = Some(task); + if let Some(result) = Python::try_attach(|py| { + let task = task.take().expect("task should not be taken"); + py.detach(|| self.spawn_impl(task)) + }) { + result + } else { + self.spawn_impl(task.expect("task should still be available")) + } } } @@ -83,7 +90,13 @@ impl BackgroundExecutor { loop { // Check for keyboard interrupts - match Python::attach(|py| py.check_signals()) { + let signal_check = match Python::try_attach(|py| py.check_signals()) { + Some(result) => result, + // Python may be finalizing or unavailable. In this state we can't + // observe KeyboardInterrupt reliably, but we should not panic. + None => Ok(()), + }; + match signal_check { Ok(_) => {} Err(err) => { handle.abort(); @@ -113,12 +126,18 @@ impl BackgroundExecutor { self.runtime.spawn(task); }) } else { - // Python::with_gil is a no-op if the GIL is already held by the thread. - Python::attach(|py| { + let mut task = Some(task); + if Python::try_attach(|py| { + let task = task.take().expect("task should not be taken"); py.detach(|| { self.runtime.spawn(task); }) }) + .is_none() + { + self.runtime + .spawn(task.expect("task should still be available")); + } } } @@ -141,8 +160,16 @@ impl BackgroundExecutor { if let Some(py) = py { py.detach(move || self.runtime.block_on(future)) } else { - // Python::with_gil is a no-op if the GIL is already held by the thread. - Python::attach(|py| py.detach(|| self.runtime.block_on(future))) + let mut future = Some(future); + if let Some(result) = Python::try_attach(|py| { + let future = future.take().expect("future should not be taken"); + py.detach(|| self.runtime.block_on(future)) + }) { + result + } else { + self.runtime + .block_on(future.expect("future should still be available")) + } } } @@ -154,7 +181,13 @@ impl BackgroundExecutor { let interrupt_future = async { loop { // Check for keyboard interrupts - match Python::attach(|py| py.check_signals()) { + let signal_check = match Python::try_attach(|py| py.check_signals()) { + Some(result) => result, + // Python may be finalizing or unavailable. In this state we can't + // observe KeyboardInterrupt reliably, but we should not panic. + None => Ok(()), + }; + match signal_check { Ok(_) => { // Wait for 100ms before checking signals again tokio::time::sleep(SIGNAL_CHECK_INTERVAL).await;