Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ markers = [
filterwarnings = [
'error::FutureWarning',
'error::DeprecationWarning',
# TensorFlow import can emit NumPy deprecation FutureWarnings in some environments.
# We keep FutureWarnings as errors generally, but ignore this known-noisy import-time warning.
'ignore:.*`np\\.object` will be defined as the corresponding NumPy scalar\\..*:FutureWarning',
# Boto3
'ignore:.*datetime\.datetime\.utcnow\(\) is deprecated.*:DeprecationWarning',
# Pandas 2.2 on Python 2.12
Expand Down
6 changes: 5 additions & 1 deletion python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from . import io, log
from .blob import BlobColumn, BlobFile
from .blob import Blob, BlobArray, BlobColumn, BlobFile, blob_array, blob_field
from .dataset import (
DataStatistics,
FieldStatistics,
Expand Down Expand Up @@ -51,8 +51,12 @@


__all__ = [
"Blob",
"BlobArray",
"BlobColumn",
"BlobFile",
"blob_array",
"blob_field",
"DatasetBasePath",
"DataStatistics",
"FieldStatistics",
Expand Down
153 changes: 152 additions & 1 deletion python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,164 @@
# SPDX-FileCopyrightText: Copyright The Lance Authors

import io
from typing import IO, Iterator, Optional, Union
from dataclasses import dataclass
from typing import IO, Any, Iterator, Optional, Union

import pyarrow as pa

from .lance import LanceBlobFile


@dataclass(frozen=True)
class Blob:
"""
A logical blob value for writing Lance blob columns.

A blob can be represented either by inlined bytes or by an external URI.
"""

data: Optional[bytes] = None
uri: Optional[str] = None

def __post_init__(self) -> None:
if self.data is not None and self.uri is not None:
raise ValueError("Blob cannot have both data and uri")
if self.uri == "":
raise ValueError("Blob uri cannot be empty")

@staticmethod
def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob":
return Blob(data=bytes(data))

@staticmethod
def from_uri(uri: str) -> "Blob":
if uri == "":
raise ValueError("Blob uri cannot be empty")
return Blob(uri=uri)

@staticmethod
def empty() -> "Blob":
return Blob(data=b"")


class BlobType(pa.ExtensionType):
"""
A PyArrow extension type for Lance blob columns.

This is the "logical" type users write. Lance will store it in a compact
descriptor format, and reads will return descriptors by default.
"""

def __init__(self) -> None:
storage_type = pa.struct(
[
pa.field("data", pa.large_binary(), nullable=True),
pa.field("uri", pa.utf8(), nullable=True),
]
)
pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2")

def __arrow_ext_serialize__(self) -> bytes:
return b""

@classmethod
def __arrow_ext_deserialize__(
cls, storage_type: pa.DataType, serialized: bytes
) -> "BlobType":
return BlobType()

def __arrow_ext_class__(self):
return BlobArray

def __reduce__(self):
# Workaround to ensure pickle works in earlier versions of PyArrow
# https://github.com/apache/arrow/issues/35599
return type(self).__arrow_ext_deserialize__, (
self.storage_type,
self.__arrow_ext_serialize__(),
)


try:
pa.register_extension_type(BlobType())
except pa.ArrowKeyError:
# Already registered in this interpreter.
pass


class BlobArray(pa.ExtensionArray):
"""
A PyArrow extension array for Lance blob columns.

Construct with :meth:`from_pylist` or use :func:`blob_array`.
"""

@classmethod
def from_pylist(cls, values: list[Any]) -> "BlobArray":
data_values: list[Optional[bytes]] = []
uri_values: list[Optional[str]] = []
null_mask: list[bool] = []

for v in values:
if v is None:
data_values.append(None)
uri_values.append(None)
null_mask.append(True)
continue

if isinstance(v, Blob):
data_values.append(v.data)
uri_values.append(v.uri)
null_mask.append(False)
continue

if isinstance(v, str):
if v == "":
raise ValueError("Blob uri cannot be empty")
data_values.append(None)
uri_values.append(v)
null_mask.append(False)
continue

if isinstance(v, (bytes, bytearray, memoryview)):
data_values.append(bytes(v))
uri_values.append(None)
null_mask.append(False)
continue

raise TypeError(
"BlobArray values must be bytes-like, str (URI), Blob, or None; "
f"got {type(v)}"
)

data_arr = pa.array(data_values, type=pa.large_binary())
uri_arr = pa.array(uri_values, type=pa.utf8())
mask_arr = pa.array(null_mask, type=pa.bool_())
storage = pa.StructArray.from_arrays(
[data_arr, uri_arr], names=["data", "uri"], mask=mask_arr
)
return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value]


def blob_array(values: list[Any]) -> BlobArray:
"""
Construct a blob array from Python values.

Each value must be one of:
- bytes-like: inline bytes
- str: an external URI
- Blob: explicit inline/uri/empty
- None: null
"""

return BlobArray.from_pylist(values)


def blob_field(name: str, *, nullable: bool = True) -> pa.Field:
"""Construct an Arrow field for a Lance blob column."""
return pa.field(name, BlobType(), nullable=nullable)


class BlobIterator:
def __init__(self, binary_iter: Iterator[pa.BinaryScalar]):
self.binary_iter = binary_iter
Expand Down
28 changes: 28 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,31 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs):
def test_scan_blob(tmp_path, dataset_with_blobs):
ds = dataset_with_blobs.scanner(filter="idx = 2").to_table()
assert ds.num_rows == 1


def test_blob_extension_write_inline(tmp_path):
table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])})
ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2")

desc = ds.to_table(columns=["blob"]).column("blob").chunk(0)
assert pa.types.is_struct(desc.type)

blobs = ds.take_blobs("blob", indices=[0, 1])
with blobs[0] as f:
assert f.read() == b"foo"


def test_blob_extension_write_external(tmp_path):
blob_path = tmp_path / "external_blob.bin"
blob_path.write_bytes(b"hello")
uri = blob_path.as_uri()

table = pa.table({"blob": lance.blob_array([uri])})
ds = lance.write_dataset(
table, tmp_path / "test_ds_v2_external", data_storage_version="2.2"
)

blob = ds.take_blobs("blob", indices=[0])[0]
assert blob.size() == 5
with blob as f:
assert f.read() == b"hello"
6 changes: 6 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,12 @@ impl Field {
Ok(self.clone())
}
(DataType::Struct(_), DataType::Struct(_)) => {
// Blob v2 columns are special: they can have different struct layouts
// (logical input vs. descriptor struct). We treat blob v2 structs like primitive
// fields (e.g. a binary column) during schema set operations (union/subtract).
if self.is_blob() {
return Ok(self.clone());
}
let mut fields = vec![];
for other_field in other.children.iter() {
let Some(child) = self.child(&other_field.name) else {
Expand Down
Loading