Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ use crate::{
pub const LANCE_UNENFORCED_PRIMARY_KEY: &str = "lance-schema:unenforced-primary-key";

fn has_blob_v2_extension(field: &ArrowField) -> bool {
field.data_type() == &DataType::LargeBinary
&& field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == BLOB_V2_EXT_NAME)
.unwrap_or(false)
field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}

#[derive(Debug, Default)]
Expand Down
18 changes: 16 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::buffer::LanceBuffer;
use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
use crate::compression_config::CompressionParams;
use crate::decoder::PageEncoding;
use crate::encodings::logical::blob::BlobStructuralEncoder;
use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
use crate::encodings::logical::list::ListStructuralEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
Expand Down Expand Up @@ -385,10 +385,24 @@ impl StructuralEncodingStrategy {
self.compression_strategy.clone(),
)?));
}
DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
return Ok(Box::new(BlobV2StructuralEncoder::new(
field,
column_index.next_column_index(field.id as u32),
options,
self.compression_strategy.clone(),
)?));
}
DataType::Struct(_) => {
return Err(Error::InvalidInput {
source: "Blob v2 struct input requires file version >= 2.2".into(),
location: location!(),
});
}
_ => {
return Err(Error::InvalidInput {
source: format!(
"Blob encoding only supports Binary/LargeBinary, got {}",
"Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
data_type
)
.into(),
Expand Down
179 changes: 177 additions & 2 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@

use std::{collections::HashMap, sync::Arc};

use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array};
use arrow_array::{
builder::{PrimitiveBuilder, StringBuilder},
cast::AsArray,
types::{UInt32Type, UInt64Type, UInt8Type},
Array, ArrayRef, StructArray, UInt64Array,
};
use arrow_buffer::Buffer;
use arrow_schema::{DataType, Field as ArrowField, Fields};
use futures::{future::BoxFuture, FutureExt};
use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
use lance_core::{
datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result,
};
use snafu::location;

use crate::{
Expand Down Expand Up @@ -221,6 +228,174 @@ impl FieldEncoder for BlobStructuralEncoder {
}
}

/// Blob v2 structural encoder
pub struct BlobV2StructuralEncoder {
descriptor_encoder: Box<dyn FieldEncoder>,
}

impl BlobV2StructuralEncoder {
pub fn new(
field: &Field,
column_index: u32,
options: &crate::encoder::EncodingOptions,
compression_strategy: Arc<dyn crate::compression::CompressionStrategy>,
) -> Result<Self> {
let mut descriptor_metadata = HashMap::with_capacity(1);
descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());

let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone());

let descriptor_field = Field::try_from(
ArrowField::new(&field.name, descriptor_data_type, field.nullable)
.with_metadata(descriptor_metadata),
)?;

let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new(
options,
compression_strategy,
column_index,
descriptor_field,
Arc::new(HashMap::new()),
)?);

Ok(Self { descriptor_encoder })
}
}

impl FieldEncoder for BlobV2StructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
_repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
// Supported input: Struct<data:LargeBinary?, uri:Utf8?>
let DataType::Struct(fields) = array.data_type() else {
return Err(Error::InvalidInput {
source: "Blob v2 requires struct<data, uri> input".into(),
location: location!(),
});
};

let struct_arr = array.as_struct();
let mut data_idx = None;
let mut uri_idx = None;
for (idx, field) in fields.iter().enumerate() {
match field.name().as_str() {
"data" => data_idx = Some(idx),
"uri" => uri_idx = Some(idx),
_ => {}
}
}
let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput {
source: "Blob v2 struct must contain 'data' and 'uri' fields".into(),
location: location!(),
})?;

let data_col = struct_arr.column(data_idx).as_binary::<i64>();
let uri_col = struct_arr.column(uri_idx).as_string::<i32>();
Comment thread
Xuanwo marked this conversation as resolved.

// Validate XOR(data, uri)
for i in 0..struct_arr.len() {
if struct_arr.is_null(i) {
continue;
}
let data_is_set = !data_col.is_null(i);
let uri_is_set = !uri_col.is_null(i);
if data_is_set == uri_is_set {
return Err(Error::InvalidInput {
source: "Each blob row must set exactly one of data or uri".into(),
location: location!(),
});
}
if uri_is_set {
return Err(Error::NotSupported {
source: "External blob (uri) is not supported yet".into(),
location: location!(),
});
}
}

let binary_array = data_col;

let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(binary_array.len());
let mut position_builder =
PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len());
let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0);

for i in 0..binary_array.len() {
let is_null_row = match array.data_type() {
DataType::Struct(_) => array.is_null(i),
_ => binary_array.is_null(i),
};
if is_null_row {
kind_builder.append_null();
position_builder.append_null();
size_builder.append_null();
blob_id_builder.append_null();
uri_builder.append_null();
continue;
}

let value = binary_array.value(i);
kind_builder.append_value(0);

if value.is_empty() {
position_builder.append_value(0);
size_builder.append_value(0);
} else {
let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
position_builder.append_value(position);
size_builder.append_value(value.len() as u64);
}

blob_id_builder.append_null();
uri_builder.append_null();
}

let children: Vec<ArrayRef> = vec![
Arc::new(kind_builder.finish()),
Arc::new(position_builder.finish()),
Arc::new(size_builder.finish()),
Arc::new(blob_id_builder.finish()),
Arc::new(uri_builder.finish()),
];

let descriptor_array = Arc::new(StructArray::try_new(
BLOB_V2_DESC_FIELDS.clone(),
children,
None,
)?) as ArrayRef;

self.descriptor_encoder.maybe_encode(
descriptor_array,
external_buffers,
RepDefBuilder::default(),
row_number,
num_rows,
)
}

fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
self.descriptor_encoder.flush(external_buffers)
}

fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
self.descriptor_encoder.finish(external_buffers)
}

fn num_columns(&self) -> u32 {
self.descriptor_encoder.num_columns()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
43 changes: 24 additions & 19 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc};

use arrow::array::AsArray;
use arrow::datatypes::{UInt64Type, UInt8Type};
use arrow_schema::DataType;
use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type};
use object_store::path::Path;
use snafu::location;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -189,7 +188,7 @@ pub(super) async fn take_blobs(
let projection = dataset.schema().project(&[column])?;
let blob_field = &projection.fields[0];
let blob_field_id = blob_field.id;
if blob_field.data_type() != DataType::LargeBinary || !projection.fields[0].is_blob() {
if !projection.fields[0].is_blob() {
return Err(Error::InvalidInput {
location: location!(),
source: format!("the column '{}' is not a blob column", column).into(),
Expand Down Expand Up @@ -246,32 +245,38 @@ fn collect_blob_files_v2(
let kinds = descriptions.column(0).as_primitive::<UInt8Type>();
let positions = descriptions.column(1).as_primitive::<UInt64Type>();
let sizes = descriptions.column(2).as_primitive::<UInt64Type>();
let _blob_ids = descriptions.column(3).as_primitive::<UInt32Type>();
let _uris = descriptions.column(4).as_string::<i32>();

let mut files = Vec::with_capacity(row_addrs.len());
for (idx, row_addr) in row_addrs.values().iter().enumerate() {
if positions.is_null(idx) || sizes.is_null(idx) {
if kinds.is_null(idx) {
// Null row
continue;
}

if !kinds.is_null(idx) {
let kind = kinds.value(idx);
if kind != INLINE_BLOB_KIND {
let kind = kinds.value(idx);
match kind {
INLINE_BLOB_KIND => {
if positions.is_null(idx) || sizes.is_null(idx) {
continue;
}
let position = positions.value(idx);
let size = sizes.value(idx);
files.push(BlobFile::new(
dataset.clone(),
blob_field_id,
*row_addr,
position,
size,
));
}
other => {
return Err(Error::NotSupported {
source: format!("Blob kind {} is not supported", kind).into(),
source: format!("Blob kind {} is not supported", other).into(),
location: location!(),
});
}
}

let position = positions.value(idx);
let size = sizes.value(idx);
files.push(BlobFile::new(
dataset.clone(),
blob_field_id,
*row_addr,
position,
size,
));
}

Ok(files)
Expand Down
Loading