Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
# SPDX-FileCopyrightText: Copyright The Lance Authors

import io
import subprocess
import sys
import tarfile
import textwrap

import lance
import pyarrow as pa
Expand Down Expand Up @@ -143,6 +146,43 @@ def test_blob_files(dataset_with_blobs):
assert f.read() == expected


def test_blob_files_close_no_shutdown_panic(tmp_path):
script = textwrap.dedent(
f"""
import pyarrow as pa
import lance

table = pa.table(
[pa.array([b"foo", b"bar"], pa.large_binary())],
schema=pa.schema(
[
pa.field(
"blob",
pa.large_binary(),
metadata={{"lance-encoding:blob": "true"}},
)
]
),
)
ds = lance.write_dataset(table, {str(tmp_path / "ds")!r})
row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist()
blobs = ds.take_blobs("blob", ids=row_ids)
for blob in blobs:
blob.close()
print("done")
"""
)
result = subprocess.run(
[sys.executable, "-c", script],
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, result.stderr
assert "interpreter_lifecycle.rs" not in result.stderr
assert "The Python interpreter is not initialized" not in result.stderr


def test_blob_files_by_address(dataset_with_blobs):
addresses = (
dataset_with_blobs.to_table(columns=[], with_row_address=True)
Expand Down
49 changes: 41 additions & 8 deletions python/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ impl BackgroundExecutor {
if let Some(py) = py {
py.detach(|| self.spawn_impl(task))
} else {
// Python::with_gil is a no-op if the GIL is already held by the thread.
Python::attach(|py| py.detach(|| self.spawn_impl(task)))
let mut task = Some(task);
if let Some(result) = Python::try_attach(|py| {
let task = task.take().expect("task should not be taken");
py.detach(|| self.spawn_impl(task))
}) {
result
} else {
self.spawn_impl(task.expect("task should still be available"))
}
}
}

Expand All @@ -83,7 +90,13 @@ impl BackgroundExecutor {

loop {
// Check for keyboard interrupts
match Python::attach(|py| py.check_signals()) {
let signal_check = match Python::try_attach(|py| py.check_signals()) {
Some(result) => result,
// Python may be finalizing or unavailable. In this state we can't
// observe KeyboardInterrupt reliably, but we should not panic.
None => Ok(()),
};
match signal_check {
Ok(_) => {}
Err(err) => {
handle.abort();
Expand Down Expand Up @@ -113,12 +126,18 @@ impl BackgroundExecutor {
self.runtime.spawn(task);
})
} else {
// Python::with_gil is a no-op if the GIL is already held by the thread.
Python::attach(|py| {
let mut task = Some(task);
if Python::try_attach(|py| {
let task = task.take().expect("task should not be taken");
py.detach(|| {
self.runtime.spawn(task);
})
})
.is_none()
{
self.runtime
.spawn(task.expect("task should still be available"));
}
}
}

Expand All @@ -141,8 +160,16 @@ impl BackgroundExecutor {
if let Some(py) = py {
py.detach(move || self.runtime.block_on(future))
} else {
// Python::with_gil is a no-op if the GIL is already held by the thread.
Python::attach(|py| py.detach(|| self.runtime.block_on(future)))
let mut future = Some(future);
if let Some(result) = Python::try_attach(|py| {
let future = future.take().expect("future should not be taken");
py.detach(|| self.runtime.block_on(future))
}) {
result
} else {
self.runtime
.block_on(future.expect("future should still be available"))
}
}
}

Expand All @@ -154,7 +181,13 @@ impl BackgroundExecutor {
let interrupt_future = async {
loop {
// Check for keyboard interrupts
match Python::attach(|py| py.check_signals()) {
let signal_check = match Python::try_attach(|py| py.check_signals()) {
Some(result) => result,
// Python may be finalizing or unavailable. In this state we can't
// observe KeyboardInterrupt reliably, but we should not panic.
None => Ok(()),
};
match signal_check {
Ok(_) => {
// Wait for 100ms before checking signals again
tokio::time::sleep(SIGNAL_CHECK_INTERVAL).await;
Expand Down