Skip to content
Merged
89 changes: 39 additions & 50 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Field as ArrowField};
use deepsize::DeepSizeOf;
use lance_arrow::{
json::{is_arrow_json_field, is_json_field},
DataTypeExt, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME,
DataTypeExt, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME,
};
use snafu::location;

Expand Down Expand Up @@ -516,6 +516,22 @@ impl Field {
.unwrap_or(false)
}

/// If the field is a blob, update this field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn unloaded_mut(&mut self) {
if self.is_blob_v2() {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone();
} else if self.is_blob() {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone();
}
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
Expand Down Expand Up @@ -737,13 +753,31 @@ impl Field {
location: location!(),
});
}

if self.is_blob() != other.is_blob() {
return Err(Error::Arrow {
message: format!(
"Attempt to intersect blob and non-blob field: {}",
self.name
),
location: location!(),
});
}

let self_type = self.data_type();
let other_type = other.data_type();

if matches!(
(&self_type, &other_type),
(DataType::Struct(_), DataType::Struct(_)) | (DataType::List(_), DataType::List(_))
) {
// Blob v2 uses a struct logical type for descriptors, which differs from the logical
// input struct (data/uri). When intersecting schemas for projection we want to keep
// the projected blob layout instead of intersecting by child names.
if self.is_blob() {
return Ok(self.clone());
}

let children = self
.children
.iter()
Expand Down Expand Up @@ -1007,15 +1041,15 @@ impl TryFrom<&ArrowField> for Field {

if is_blob_v2 {
metadata
.entry(BLOB_META_KEY.to_string())
.or_insert_with(|| "true".to_string());
.entry(ARROW_EXT_NAME_KEY.to_string())
.or_insert_with(|| BLOB_V2_EXT_NAME.to_string());
}

// Check for JSON extension types (both Arrow and Lance)
let logical_type = if is_arrow_json_field(field) || is_json_field(field) {
LogicalType::from("json")
} else if is_blob_v2 {
LogicalType::from(super::BLOB_LOGICAL_TYPE)
LogicalType::from("struct")
} else {
LogicalType::try_from(field.data_type())?
};
Expand Down Expand Up @@ -1056,11 +1090,6 @@ impl From<&Field> for ArrowField {
let mut metadata = field.metadata.clone();

if field.logical_type.is_blob() {
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
lance_arrow::BLOB_V2_EXT_NAME.to_string(),
);
metadata.entry(ARROW_EXT_META_KEY.to_string()).or_default();
metadata
.entry(BLOB_META_KEY.to_string())
.or_insert_with(|| "true".to_string());
Expand All @@ -1084,7 +1113,7 @@ mod tests {

use arrow_array::{DictionaryArray, StringArray, UInt32Array};
use arrow_schema::{Fields, TimeUnit};
use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME};
use lance_arrow::BLOB_META_KEY;
use std::collections::HashMap;
#[test]
fn arrow_field_to_field() {
Expand Down Expand Up @@ -1569,44 +1598,4 @@ mod tests {
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}

#[test]
fn blob_v2_detection_by_extension() {
let metadata = HashMap::from([
(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()),
(BLOB_META_KEY.to_string(), "true".to_string()),
]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
assert!(field.is_blob_v2());
}

#[test]
fn blob_extension_roundtrip() {
let metadata = HashMap::from([
(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()),
(ARROW_EXT_META_KEY.to_string(), "".to_string()),
]);
let arrow_field =
ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();
assert_eq!(
field.logical_type,
LogicalType::from(crate::datatypes::BLOB_LOGICAL_TYPE)
);
assert!(field.is_blob());
assert_eq!(field.data_type(), DataType::LargeBinary);

let roundtrip: ArrowField = ArrowField::from(&field);
assert_eq!(
roundtrip.metadata().get(ARROW_EXT_NAME_KEY),
Some(&BLOB_V2_EXT_NAME.to_string())
);
assert_eq!(
roundtrip.metadata().get(BLOB_META_KEY),
Some(&"true".to_string())
);
}
}
4 changes: 3 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,9 @@ pub enum BlobHandling {

impl BlobHandling {
fn should_unload(&self, field: &Field) -> bool {
if !field.data_type().is_binary_like() {
// Blob v2 columns are Structs, so we need to treat any blob-marked field as unloadable
// even if the physical data type is not binary-like.
if !(field.data_type().is_binary_like() || field.is_blob()) {
return false;
}
match self {
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,11 @@ impl ReaderProjection {
field_id_to_column_index,
&mut column_indices,
)?;
Ok(Self {
let projection = Self {
schema: Arc::new(schema.clone()),
column_indices,
})
};
Ok(projection)
}

/// Creates a projection that reads the entire file
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ impl FileWriter {
async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
schema.metadata = std::mem::take(&mut self.schema_metadata);
// Use descriptor layout for blob v2 in the footer to avoid exposing logical child fields.
//
// TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut?
schema.fields.iter_mut().for_each(|f| {
if f.is_blob_v2() {
f.unloaded_mut();
}
Comment thread
Xuanwo marked this conversation as resolved.
});

let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
let file_descriptor_bytes = file_descriptor.encode_to_vec();
let file_descriptor_len = file_descriptor_bytes.len() as u64;
Expand Down
206 changes: 206 additions & 0 deletions rust/lance/src/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Convenience builders for Lance blob v2 input columns.
//!
//! Blob v2 expects a column shaped as `Struct<data: LargeBinary?, uri: Utf8?>` and
//! tagged with `ARROW:extension:name = "lance.blob.v2"`. This module offers a
//! type-safe builder to construct that struct without manually wiring metadata

use std::sync::Arc;

use arrow_array::{builder::LargeBinaryBuilder, builder::StringBuilder, ArrayRef, StructArray};
use arrow_buffer::NullBufferBuilder;
use arrow_schema::{DataType, Field};
use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME};

use crate::{Error, Result};

/// Construct the Arrow field for a blob v2 column.
///
/// Blob v2 expects a column shaped as `Struct<data: LargeBinary?, uri: Utf8?>` and
/// tagged with `ARROW:extension:name = "lance.blob.v2"`.
pub fn blob_field(name: &str, nullable: bool) -> Field {
let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]
.into_iter()
.collect();
Field::new(
name,
DataType::Struct(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
]
.into(),
),
nullable,
)
.with_metadata(metadata)
}

/// Builder for blob v2 input struct columns.
///
/// The builder enforces that each row contains exactly one of `data` or `uri` (or is null).
pub struct BlobArrayBuilder {
data_builder: LargeBinaryBuilder,
uri_builder: StringBuilder,
validity: NullBufferBuilder,
expected_len: usize,
len: usize,
}

impl BlobArrayBuilder {
/// Create a new builder with the given row capacity.
pub fn new(capacity: usize) -> Self {
Self {
data_builder: LargeBinaryBuilder::with_capacity(capacity, 0),
uri_builder: StringBuilder::with_capacity(capacity, 0),
validity: NullBufferBuilder::new(capacity),
expected_len: capacity,
len: 0,
}
}

/// Append a blob backed by raw bytes.
pub fn push_bytes(&mut self, bytes: impl AsRef<[u8]>) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_non_null();
self.data_builder.append_value(bytes);
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Append a blob referenced by URI.
pub fn push_uri(&mut self, uri: impl Into<String>) -> Result<()> {
self.ensure_capacity()?;
let uri = uri.into();
if uri.is_empty() {
return Err(Error::invalid_input(
"URI cannot be empty",
snafu::location!(),
));
}
self.validity.append_non_null();
self.data_builder.append_null();
self.uri_builder.append_value(uri);
self.len += 1;
Ok(())
}

/// Append an empty blob (inline, zero-length payload).
pub fn push_empty(&mut self) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_non_null();
self.data_builder.append_value([]);
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Append a null row.
pub fn push_null(&mut self) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_null();
self.data_builder.append_null();
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Finish building and return an Arrow struct array.
pub fn finish(mut self) -> Result<ArrayRef> {
if self.len != self.expected_len {
return Err(Error::invalid_input(
format!(
"Expected {} rows but received {}",
self.expected_len, self.len
),
snafu::location!(),
));
}

let data = Arc::new(self.data_builder.finish());
let uri = Arc::new(self.uri_builder.finish());
let validity = self.validity.finish();

let struct_array = StructArray::try_new(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
]
.into(),
vec![data as ArrayRef, uri as ArrayRef],
validity,
)?;

Ok(Arc::new(struct_array))
}

fn ensure_capacity(&self) -> Result<()> {
if self.len >= self.expected_len {
Err(Error::invalid_input(
"BlobArrayBuilder capacity exceeded",
snafu::location!(),
))
} else {
Ok(())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::cast::AsArray;
use arrow_array::Array;

#[test]
fn test_field_metadata() {
let field = blob_field("blob", true);
assert!(field.metadata().get(ARROW_EXT_NAME_KEY).is_some());
assert_eq!(
field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(),
BLOB_V2_EXT_NAME
);
}

#[test]
fn test_builder_basic() {
let mut b = BlobArrayBuilder::new(4);
b.push_bytes(b"hi").unwrap();
b.push_uri("s3://bucket/key").unwrap();
b.push_empty().unwrap();
b.push_null().unwrap();

let arr = b.finish().unwrap();
assert_eq!(arr.len(), 4);
assert_eq!(arr.null_count(), 1);

let struct_arr = arr.as_struct();
let data = struct_arr.column(0).as_binary::<i64>();
let uri = struct_arr.column(1).as_string::<i32>();

assert_eq!(data.value(0), b"hi");
assert!(uri.is_null(0));
assert!(data.is_null(1));
assert_eq!(uri.value(1), "s3://bucket/key");
assert_eq!(data.value(2).len(), 0);
assert!(uri.is_null(2));
}

#[test]
fn test_capacity_error() {
let mut b = BlobArrayBuilder::new(1);
b.push_bytes(b"a").unwrap();
let err = b.push_bytes(b"b").unwrap_err();
assert!(err.to_string().contains("capacity exceeded"));
}

#[test]
fn test_empty_uri_rejected() {
let mut b = BlobArrayBuilder::new(1);
let err = b.push_uri("").unwrap_err();
assert!(err.to_string().contains("URI cannot be empty"));
}
}
Loading