Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ def scanner(
with_row_id: bool = False,
with_row_address: bool = False,
batch_readahead: int = 16,
blob_handling: Optional[
Literal["all_binary", "blobs_descriptions", "all_descriptions"]
] = None,
order_by: Optional[List[ColumnOrdering]] = None,
) -> "LanceScanner":
"""See Dataset::scanner for details"""
Expand All @@ -468,6 +471,7 @@ def scanner(
with_row_id=with_row_id,
with_row_address=with_row_address,
batch_readahead=batch_readahead,
blob_handling=blob_handling,
order_by=order_by,
**columns_arg,
)
Expand Down Expand Up @@ -515,6 +519,9 @@ def to_batches(
with_row_id: bool = False,
with_row_address: bool = False,
batch_readahead: int = 16,
blob_handling: Optional[
Literal["all_binary", "blobs_descriptions", "all_descriptions"]
] = None,
order_by: Optional[List[ColumnOrdering]] = None,
) -> Iterator[pa.RecordBatch]:
return self.scanner(
Expand All @@ -526,6 +533,7 @@ def to_batches(
with_row_id=with_row_id,
with_row_address=with_row_address,
batch_readahead=batch_readahead,
blob_handling=blob_handling,
order_by=order_by,
).to_batches()

Expand All @@ -537,6 +545,9 @@ def to_table(
offset: Optional[int] = None,
with_row_id: bool = False,
with_row_address: bool = False,
blob_handling: Optional[
Literal["all_binary", "blobs_descriptions", "all_descriptions"]
] = None,
order_by: Optional[List[ColumnOrdering]] = None,
) -> pa.Table:
return self.scanner(
Expand All @@ -546,6 +557,7 @@ def to_table(
offset=offset,
with_row_id=with_row_id,
with_row_address=with_row_address,
blob_handling=blob_handling,
order_by=order_by,
).to_table()

Expand Down
20 changes: 11 additions & 9 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -443,15 +443,17 @@ class _Fragment:
) -> pa.RecordBatch: ...
def scanner(
self,
columns: Optional[List[str]],
columns_with_transform: Optional[List[Tuple[str, str]]],
batch_size: Optional[int],
filter: Optional[str],
limit: Optional[int],
offset: Optional[int],
with_row_id: Optional[bool],
batch_readahead: Optional[int],
**kwargs,
columns: Optional[List[str]] = None,
columns_with_transform: Optional[List[Tuple[str, str]]] = None,
batch_size: Optional[int] = None,
filter: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
with_row_id: Optional[bool] = None,
with_row_address: Optional[bool] = None,
batch_readahead: Optional[int] = None,
blob_handling: Optional[str] = None,
order_by: Optional[List[Any]] = None,
) -> _Scanner: ...
def add_columns_from_reader(
self,
Expand Down
24 changes: 24 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ def test_scan_blob_as_binary(tmp_path):
assert tbl.column("blobs").to_pylist() == values


def test_fragment_scan_blob_as_binary(tmp_path):
values = [b"foo", b"bar", b"baz"]
arr = pa.array(values, pa.large_binary())
table = pa.table(
[arr],
schema=pa.schema(
[
pa.field(
"blobs", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
)
]
),
)
ds = lance.write_dataset(table, tmp_path / "test_ds")

fragment = ds.get_fragments()[0]

tbl = fragment.scanner(columns=["blobs"], blob_handling="all_binary").to_table()
assert tbl.column("blobs").to_pylist() == values

tbl = fragment.to_table(columns=["blobs"], blob_handling="all_binary")
assert tbl.column("blobs").to_pylist() == values


@pytest.fixture
def dataset_with_blobs(tmp_path):
values = pa.array([b"foo", b"bar", b"baz"], pa.large_binary())
Expand Down
22 changes: 21 additions & 1 deletion python/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use lance::dataset::scanner::ColumnOrdering;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{InsertBuilder, NewColumnTransform};
use lance::Error;
use lance_core::datatypes::BlobHandling;
use lance_io::utils::CachedFileSize;
use lance_table::format::{
DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta,
Expand Down Expand Up @@ -196,7 +197,7 @@ impl FileFragment {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature=(columns=None, columns_with_transform=None, batch_size=None, filter=None, limit=None, offset=None, with_row_id=None, with_row_address=None, batch_readahead=None, order_by=None))]
#[pyo3(signature=(columns=None, columns_with_transform=None, batch_size=None, filter=None, limit=None, offset=None, with_row_id=None, with_row_address=None, batch_readahead=None, blob_handling=None, order_by=None))]
fn scanner(
self_: PyRef<'_, Self>,
columns: Option<Vec<String>>,
Expand All @@ -208,6 +209,7 @@ impl FileFragment {
with_row_id: Option<bool>,
with_row_address: Option<bool>,
batch_readahead: Option<usize>,
blob_handling: Option<Bound<PyAny>>,
order_by: Option<Vec<PyLance<ColumnOrdering>>>,
) -> PyResult<Scanner> {
let mut scanner = self_.fragment.scan();
Expand Down Expand Up @@ -253,6 +255,24 @@ impl FileFragment {
if let Some(batch_readahead) = batch_readahead {
scanner.batch_readahead(batch_readahead);
}
if let Some(blob_handling) = blob_handling {
let handling = if let Ok(handling) = blob_handling.extract::<String>() {
match handling.as_str() {
"all_binary" => BlobHandling::AllBinary,
"blobs_descriptions" => BlobHandling::BlobsDescriptions,
"all_descriptions" => BlobHandling::AllDescriptions,
other => {
return Err(PyValueError::new_err(format!(
"Invalid blob_handling: {other}. Expected one of: all_binary, blobs_descriptions, all_descriptions"
)))
}
}
} else {
return Err(PyTypeError::new_err("blob_handling must be a str"));
};

scanner.blob_handling(handling);
}
if let Some(orderings) = order_by {
let col_orderings = Some(orderings.into_iter().map(|co| co.0).collect());
scanner
Expand Down
Loading