diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 0a681e01c36..69e8a0b75a5 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -302,7 +302,11 @@ def test_scan_blob(tmp_path, dataset_with_blobs): def test_blob_extension_write_inline(tmp_path): table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])}) - ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2") + ds = lance.write_dataset( + table, + tmp_path / "test_ds_v2", + data_storage_version="2.2", + ) desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) assert pa.types.is_struct(desc.type) @@ -319,7 +323,9 @@ def test_blob_extension_write_external(tmp_path): table = pa.table({"blob": lance.blob_array([uri])}) ds = lance.write_dataset( - table, tmp_path / "test_ds_v2_external", data_storage_version="2.2" + table, + tmp_path / "test_ds_v2_external", + data_storage_version="2.2", ) blob = ds.take_blobs("blob", indices=[0])[0] diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a091632c524..a2f21585b34 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -99,25 +99,6 @@ pub enum BlobVersion { /// Blob v2 struct format. V2, } - -impl BlobVersion { - /// Convert a persisted string value (e.g. table config) into a blob version - pub fn from_config_value(value: &str) -> Option { - match value { - "1" => Some(Self::V1), - "2" => Some(Self::V2), - _ => None, - } - } - - /// Persistable string representation for table config. - pub fn config_value(self) -> &'static str { - match self { - Self::V1 => "1", - Self::V2 => "2", - } - } -} /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] pub enum Encoding { @@ -302,11 +283,7 @@ impl Field { } else { let mut new_field = self.clone(); new_field.children = children; - Some( - projection - .blob_handling - .unload_if_needed(new_field, projection.blob_version), - ) + Some(projection.blob_handling.unload_if_needed(new_field)) } } @@ -562,28 +539,6 @@ impl Field { } } - /// If the field is a blob, return a new field with the same name and id - /// but with the data type set to a struct of the blob description fields. - /// - /// If the field is not a blob, return the field itself. - pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self { - if self.is_blob() { - match version { - BlobVersion::V2 => { - self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); - self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); - self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); - } - BlobVersion::V1 => { - self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); - self.children = BLOB_DESC_LANCE_FIELD.children.clone(); - self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); - } - } - } - self - } - pub fn project(&self, path_components: &[&str]) -> Result { let mut f = Self { name: self.name.clone(), @@ -1806,14 +1761,34 @@ mod tests { } #[test] - fn blob_into_unloaded_selects_v2_layout() { + fn blob_unloaded_mut_selects_layout_from_metadata() { let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); - let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) + let mut field: Field = ArrowField::new("blob", DataType::LargeBinary, true) .with_metadata(metadata) .try_into() .unwrap(); - let unloaded = field.into_unloaded_with_version(BlobVersion::V2); - assert_eq!(unloaded.children.len(), 5); - assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); + field.unloaded_mut(); + assert_eq!(field.children.len(), 2); + assert_eq!(field.logical_type, BLOB_DESC_LANCE_FIELD.logical_type); + + let metadata = + HashMap::from([(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]); + let mut field: Field = ArrowField::new( + "blob", + DataType::Struct( + vec![ + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ] + .into(), + ), + true, + ) + .with_metadata(metadata) + .try_into() + .unwrap(); + field.unloaded_mut(); + assert_eq!(field.children.len(), 5); + assert_eq!(field.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } } diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index caa31fdf9da..528437341ca 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf; use lance_arrow::*; use snafu::location; -use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions}; +use super::field::{Field, OnTypeMismatch, SchemaCompareOptions}; use crate::{ Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION, ROW_CREATED_AT_VERSION_FIELD, ROW_ID, ROW_ID_FIELD, ROW_LAST_UPDATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION_FIELD, @@ -1061,12 +1061,11 @@ impl BlobHandling { } } - pub fn unload_if_needed(&self, field: Field, version: BlobVersion) -> Field { + pub fn unload_if_needed(&self, mut field: Field) -> Field { if self.should_unload(&field) { - field.into_unloaded_with_version(version) - } else { - field + field.unloaded_mut(); } + field } } @@ -1083,7 +1082,6 @@ pub struct Projection { pub with_row_last_updated_at_version: bool, pub with_row_created_at_version: bool, pub blob_handling: BlobHandling, - pub blob_version: BlobVersion, } impl Debug for Projection { @@ -1101,7 +1099,6 @@ impl Debug for Projection { &self.with_row_created_at_version, ) .field("blob_handling", &self.blob_handling) - .field("blob_version", &self.blob_version) .finish() } } @@ -1117,7 +1114,6 @@ impl Projection { with_row_last_updated_at_version: false, with_row_created_at_version: false, blob_handling: BlobHandling::default(), - blob_version: BlobVersion::V1, } } @@ -1151,11 +1147,6 @@ impl Projection { self } - pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self { - self.blob_version = blob_version; - self - } - fn add_field_children(field_ids: &mut HashSet, field: &Field) { for child in &field.children { field_ids.insert(child.id); @@ -1620,19 +1611,6 @@ mod tests { use super::*; - #[test] - fn projection_from_schema_defaults_to_v1() { - let field = Field::try_from(&ArrowField::new("a", ArrowDataType::Int32, true)).unwrap(); - let schema = Schema { - fields: vec![field], - metadata: HashMap::new(), - }; - - let projection = Projection::empty(Arc::new(schema)); - - assert_eq!(projection.blob_version, BlobVersion::V1); - } - #[test] fn test_resolve_with_quoted_fields() { // Create a schema with fields containing dots diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index aacb63d4118..f2f74e0f61f 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -15,7 +15,7 @@ use std::{ use tracing::instrument; use lance_core::{ - datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema}, + datatypes::{OnMissing, Projectable, Projection, Schema}, Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET, WILDCARD, }; @@ -38,16 +38,11 @@ struct ProjectionBuilder { needs_row_created_at: bool, must_add_row_offset: bool, has_wildcard: bool, - blob_version: BlobVersion, } impl ProjectionBuilder { - fn new(base: Arc, blob_version: BlobVersion) -> Self { - let full_schema = Arc::new( - Projection::full(base.clone()) - .with_blob_version(blob_version) - .to_arrow_schema(), - ); + fn new(base: Arc) -> Self { + let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema()); let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema)); let planner = Planner::new(full_schema); @@ -64,7 +59,6 @@ impl ProjectionBuilder { needs_row_last_updated_at: false, must_add_row_offset: false, has_wildcard: false, - blob_version, } } @@ -153,8 +147,6 @@ impl ProjectionBuilder { .union_columns(&self.physical_cols, OnMissing::Ignore)? }; - physical_projection = physical_projection.with_blob_version(self.blob_version); - physical_projection.with_row_id = self.needs_row_id; physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset; physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at; @@ -211,9 +203,8 @@ impl ProjectionPlan { pub fn from_expressions( base: Arc, columns: &[(impl AsRef, impl AsRef)], - blob_version: BlobVersion, ) -> Result { - let mut builder = ProjectionBuilder::new(base, blob_version); + let mut builder = ProjectionBuilder::new(base); builder.add_columns(columns)?; builder.build() } @@ -252,11 +243,7 @@ impl ProjectionPlan { /// ``` /// /// This is something that cannot be done easily using expressions. - pub fn from_schema( - base: Arc, - projection: &Schema, - blob_version: BlobVersion, - ) -> Result { + pub fn from_schema(base: Arc, projection: &Schema) -> Result { // Separate data columns from system columns // System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection, // not as fields in the Schema @@ -302,9 +289,7 @@ impl ProjectionPlan { }; // Calculate the physical projection from data columns only - let mut physical_projection = Projection::empty(base) - .union_schema(&data_schema) - .with_blob_version(blob_version); + let mut physical_projection = Projection::empty(base).union_schema(&data_schema); physical_projection.with_row_id = with_row_id; physical_projection.with_row_addr = with_row_addr; physical_projection.with_row_last_updated_at_version = with_row_last_updated_at_version; @@ -327,7 +312,7 @@ impl ProjectionPlan { }) } - pub fn full(base: Arc, blob_version: BlobVersion) -> Result { + pub fn full(base: Arc) -> Result { let physical_cols: Vec<&str> = base .schema() .fields @@ -335,9 +320,8 @@ impl ProjectionPlan { .map(|f| f.name.as_ref()) .collect::>(); - let physical_projection = Projection::empty(base.clone()) - .union_columns(&physical_cols, OnMissing::Ignore)? - .with_blob_version(blob_version); + let physical_projection = + Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?; let requested_output_expr = physical_cols .into_iter() @@ -486,7 +470,7 @@ mod tests { let base_schema = Schema::try_from(&arrow_schema).unwrap(); let base = Arc::new(base_schema.clone()); - let plan = ProjectionPlan::from_schema(base, &base_schema, BlobVersion::default()).unwrap(); + let plan = ProjectionPlan::from_schema(base, &base_schema).unwrap(); let physical = plan.physical_projection.to_arrow_schema(); assert!(is_json_field(physical.field_with_name("meta").unwrap())); diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 349d3d4b700..bc3bc80db27 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -281,27 +281,45 @@ impl FieldEncoder for BlobV2StructuralEncoder { let kind_col = struct_arr .column_by_name("kind") - .expect("kind column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `kind` field".into(), + location: location!(), + })? .as_primitive::(); let data_col = struct_arr .column_by_name("data") - .expect("data column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `data` field".into(), + location: location!(), + })? .as_binary::(); let uri_col = struct_arr .column_by_name("uri") - .expect("uri column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `uri` field".into(), + location: location!(), + })? .as_string::(); let blob_id_col = struct_arr .column_by_name("blob_id") - .expect("blob_id column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `blob_id` field".into(), + location: location!(), + })? .as_primitive::(); let blob_size_col = struct_arr .column_by_name("blob_size") - .expect("blob_size column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `blob_size` field".into(), + location: location!(), + })? .as_primitive::(); let packed_position_col = struct_arr .column_by_name("position") - .expect("position column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `position` field".into(), + location: location!(), + })? .as_primitive::(); let row_count = struct_arr.len(); @@ -498,13 +516,20 @@ mod tests { ])); // Use the standard test harness - check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await; + check_round_trip_encoding_of_data( + vec![array], + &TestCases::default().with_max_file_version(LanceFileVersion::V2_1), + blob_metadata, + ) + .await; } #[tokio::test] async fn test_blob_v2_external_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); @@ -579,8 +604,10 @@ mod tests { #[tokio::test] async fn test_blob_v2_dedicated_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); @@ -642,8 +669,10 @@ mod tests { #[tokio::test] async fn test_blob_v2_packed_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 793e8347360..3eb9a6bd250 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -389,7 +389,12 @@ impl StructuralDecodeArrayTask for RepDefStructDecodeTask { repdef.unravel_validity(length) }; - let array = StructArray::new(self.child_fields, children, validity); + let array = StructArray::try_new(self.child_fields, children, validity).map_err(|e| { + Error::InvalidInput { + source: e.to_string().into(), + location: location!(), + } + })?; Ok(DecodedArray { array: Arc::new(array), repdef, diff --git a/rust/lance-encoding/src/previous/encodings/logical/blob.rs b/rust/lance-encoding/src/previous/encodings/logical/blob.rs index e9719553124..3d79df2b03d 100644 --- a/rust/lance-encoding/src/previous/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/previous/encodings/logical/blob.rs @@ -400,7 +400,7 @@ pub mod tests { use crate::{ format::pb::column_encoding, - testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases}, + testing::{check_round_trip_encoding_of_data, check_specific_random, TestCases}, version::LanceFileVersion, }; @@ -414,7 +414,11 @@ pub mod tests { #[test_log::test(tokio::test)] async fn test_basic_blob() { let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone()); - check_basic_random(field).await; + check_specific_random( + field, + TestCases::basic().with_max_file_version(LanceFileVersion::V2_1), + ) + .await; } #[test_log::test(tokio::test)] @@ -423,6 +427,7 @@ pub mod tests { let val2: &[u8] = &[7, 8, 9]; let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)])); let test_cases = TestCases::default() + .with_max_file_version(LanceFileVersion::V2_1) .with_expected_encoding("packed_struct") .with_verify_encoding(Arc::new(|cols, version| { if version < &LanceFileVersion::V2_1 { diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index e0a3c04a626..37df889035f 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -979,8 +979,7 @@ async fn check_round_trip_encoding_inner( let decode_field = if is_structural_encoding { let mut lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); if lance_field.is_blob() && matches!(lance_field.data_type(), DataType::Struct(_)) { - lance_field = - lance_field.into_unloaded_with_version(lance_core::datatypes::BlobVersion::V2); + lance_field.unloaded_mut(); let mut arrow_field = ArrowField::from(&lance_field); let mut metadata = arrow_field.metadata().clone(); metadata.insert("lance-encoding:packed".to_string(), "true".to_string()); diff --git a/rust/lance-file/benches/reader.rs b/rust/lance-file/benches/reader.rs index 1c6f0fdb425..a00af5015fa 100644 --- a/rust/lance-file/benches/reader.rs +++ b/rust/lance-file/benches/reader.rs @@ -120,8 +120,7 @@ fn bench_reader(c: &mut Criterion) { } #[cfg(not(target_os = "linux"))] -pub fn drop_file_from_cache(path: impl AsRef) -> std::io::Result<()> { - println!("drop_file_from_cache: not implemented on this platform"); +pub fn drop_file_from_cache(_path: impl AsRef) -> std::io::Result<()> { Ok(()) } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3076aec8f9d..20c5138af7e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -13,13 +13,12 @@ use futures::future::BoxFuture; use futures::stream::{self, BoxStream, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; -use crate::dataset::blob::blob_version_from_config; use crate::dataset::metadata::UpdateFieldMetadataBuilder; use crate::dataset::transaction::translate_schema_metadata_updates; use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey}; use crate::session::index_caches::DSIndexCache; use itertools::Itertools; -use lance_core::datatypes::{BlobVersion, OnMissing, OnTypeMismatch, Projectable, Projection}; +use lance_core::datatypes::{OnMissing, OnTypeMismatch, Projectable, Projection}; use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ @@ -363,7 +362,6 @@ impl ProjectionRequest { } pub fn into_projection_plan(self, dataset: Arc) -> Result { - let blob_version = dataset.blob_version(); match self { Self::Schema(schema) => { // The schema might contain system columns (_rowid, _rowaddr) which are not @@ -376,7 +374,7 @@ impl ProjectionRequest { if system_columns_present { // If system columns are present, we can't use project_by_schema directly // Just pass the schema to ProjectionPlan::from_schema which handles it - ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version) + ProjectionPlan::from_schema(dataset, schema.as_ref()) } else { // No system columns, use normal path with validation let projection = dataset.schema().project_by_schema( @@ -384,10 +382,10 @@ impl ProjectionRequest { OnMissing::Error, OnTypeMismatch::Error, )?; - ProjectionPlan::from_schema(dataset, &projection, blob_version) + ProjectionPlan::from_schema(dataset, &projection) } } - Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version), + Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns), } } } @@ -1820,12 +1818,12 @@ impl Dataset { /// Similar to [Self::schema], but only returns fields that are not marked as blob columns /// Creates a new empty projection into the dataset schema pub fn empty_projection(self: &Arc) -> Projection { - Projection::empty(self.clone()).with_blob_version(self.blob_version()) + Projection::empty(self.clone()) } /// Creates a projection that includes all columns in the dataset pub fn full_projection(self: &Arc) -> Projection { - Projection::full(self.clone()).with_blob_version(self.blob_version()) + Projection::full(self.clone()) } /// Get fragments. @@ -2785,10 +2783,6 @@ impl Dataset { &self.manifest.config } - pub(crate) fn blob_version(&self) -> BlobVersion { - blob_version_from_config(&self.manifest.config) - } - /// Delete keys from the config. #[deprecated( note = "Use the new update_config(values, replace) method - pass None values to delete keys" diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 4dbf1bb1480..720edd33534 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -24,15 +24,6 @@ use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; -pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version"; - -pub fn blob_version_from_config(config: &HashMap) -> BlobVersion { - config - .get(BLOB_VERSION_CONFIG_KEY) - .and_then(|value| BlobVersion::from_config_value(value)) - .unwrap_or(BlobVersion::V1) -} - const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar @@ -130,15 +121,34 @@ pub struct BlobPreprocessor { data_file_key: String, local_counter: u32, pack_writer: PackWriter, + blob_v2_cols: Vec, + dedicated_thresholds: Vec, + writer_metadata: Vec>, } impl BlobPreprocessor { - pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { + pub(crate) fn new( + object_store: ObjectStore, + data_dir: Path, + data_file_key: String, + schema: &lance_core::datatypes::Schema, + ) -> Self { let pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); + let arrow_schema = arrow_schema::Schema::from(schema); + let fields = arrow_schema.fields(); + let blob_v2_cols = fields.iter().map(|field| field.is_blob_v2()).collect(); + let dedicated_thresholds = fields + .iter() + .map(|field| dedicated_threshold_from_metadata(field.as_ref())) + .collect(); + let writer_metadata = fields + .iter() + .map(|field| field.metadata().clone()) + .collect(); Self { object_store, data_dir, @@ -146,6 +156,9 @@ impl BlobPreprocessor { // Start at 1 to avoid a potential all-zero blob_id value. local_counter: 1, pack_writer, + blob_v2_cols, + dedicated_thresholds, + writer_metadata, } } @@ -177,23 +190,33 @@ impl BlobPreprocessor { .await } pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { + let expected_columns = self.blob_v2_cols.len(); + if batch.num_columns() != expected_columns { + return Err(Error::invalid_input( + format!( + "Unexpected number of columns: expected {}, got {}", + expected_columns, + batch.num_columns() + ), + location!(), + )); + } + + let batch_schema = batch.schema(); + let batch_fields = batch_schema.fields(); + let mut new_columns = Vec::with_capacity(batch.num_columns()); let mut new_fields = Vec::with_capacity(batch.num_columns()); - for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - if !field.is_blob_v2() { + for idx in 0..batch.num_columns() { + let array = batch.column(idx); + let field = &batch_fields[idx]; + if !self.blob_v2_cols[idx] { new_columns.push(array.clone()); new_fields.push(field.clone()); continue; } - let dedicated_threshold = field - .metadata() - .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY) - .and_then(|value| value.parse::().ok()) - .filter(|&value| value > DEDICATED_THRESHOLD) - .unwrap_or(DEDICATED_THRESHOLD); - let struct_arr = array .as_any() .downcast_ref::() @@ -241,6 +264,7 @@ impl BlobPreprocessor { let has_uri = !uri_col.is_null(i); let data_len = if has_data { data_col.value(i).len() } else { 0 }; + let dedicated_threshold = self.dedicated_thresholds[idx]; if has_data && data_len > dedicated_threshold { let blob_id = self.next_blob_id(); self.write_dedicated(blob_id, data_col.value(i)).await?; @@ -324,7 +348,7 @@ impl BlobPreprocessor { ArrowDataType::Struct(child_fields.into()), field.is_nullable(), ) - .with_metadata(field.metadata().clone()), + .with_metadata(self.writer_metadata[idx].clone()), )); } @@ -333,7 +357,7 @@ impl BlobPreprocessor { .iter() .map(|f| f.as_ref().clone()) .collect::>(), - batch.schema().metadata().clone(), + batch_schema.metadata().clone(), )); RecordBatch::try_new(new_schema, new_columns) @@ -345,8 +369,14 @@ impl BlobPreprocessor { } } -pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool { - schema.fields.iter().any(|f| f.is_blob_v2()) +fn dedicated_threshold_from_metadata(field: &arrow_schema::Field) -> usize { + field + .metadata() + .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY) + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .and_then(|value| usize::try_from(value).ok()) + .unwrap_or(DEDICATED_THRESHOLD) } pub async fn preprocess_blob_batches( @@ -605,7 +635,7 @@ pub(super) async fn take_blobs( let row_addrs = description_and_addr.column(1).as_primitive::(); let blob_field_id = blob_field_id as u32; - match dataset.blob_version() { + match blob_version_from_descriptions(descriptions)? { BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), BlobVersion::V2 => { collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await @@ -652,7 +682,7 @@ pub async fn take_blobs_by_addresses( let row_addrs_result = description_and_addr.column(1).as_primitive::(); let blob_field_id = blob_field_id as u32; - match dataset.blob_version() { + match blob_version_from_descriptions(descriptions)? { BlobVersion::V1 => { collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result) } @@ -662,6 +692,30 @@ pub async fn take_blobs_by_addresses( } } +fn blob_version_from_descriptions(descriptions: &StructArray) -> Result { + let fields = descriptions.fields(); + if fields.len() == 2 && fields[0].name() == "position" && fields[1].name() == "size" { + return Ok(BlobVersion::V1); + } + if fields.len() == 5 + && fields[0].name() == "kind" + && fields[1].name() == "position" + && fields[2].name() == "size" + && fields[3].name() == "blob_id" + && fields[4].name() == "blob_uri" + { + return Ok(BlobVersion::V2); + } + Err(Error::InvalidInput { + source: format!( + "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri) but got {:?}", + fields.iter().map(|f| f.name().as_str()).collect::>(), + ) + .into(), + location: location!(), + }) +} + fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32, @@ -796,9 +850,9 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY}; + use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_io::stream::RecordBatchStream; - use lance_core::datatypes::BlobKind; use lance_core::{utils::tempfile::TempStrDir, Error, Result}; use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; @@ -1089,7 +1143,10 @@ mod tests { let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); + let params = WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }; let dataset = Arc::new( Dataset::write(reader, &test_dir, Some(params)) .await @@ -1108,108 +1165,74 @@ mod tests { assert_eq!(second.as_ref(), b"world"); } - fn build_schema_with_meta(threshold_opt: Option) -> Arc { - let mut blob_field_with_meta = blob_field("blob", true); - if let Some(threshold) = threshold_opt { - let mut metadata = blob_field_with_meta.metadata().clone(); - metadata.insert( - BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), - threshold.to_string(), - ); - blob_field_with_meta = blob_field_with_meta.with_metadata(metadata); - } + async fn preprocess_kind_with_schema_metadata(metadata_value: &str, data_len: usize) -> u8 { + let (object_store, base_path) = ObjectStore::from_uri_and_params( + Arc::new(ObjectStoreRegistry::default()), + "memory://blob_preprocessor", + &ObjectStoreParams::default(), + ) + .await + .unwrap(); + let object_store = object_store.as_ref().clone(); + let data_dir = base_path.child("data"); + + let mut field = blob_field("blob", true); + let mut metadata = field.metadata().clone(); + metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + metadata_value.to_string(), + ); + field = field.with_metadata(metadata); - Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - blob_field_with_meta, - ])) - } + let writer_arrow_schema = Schema::new(vec![field.clone()]); + let writer_schema = lance_core::datatypes::Schema::try_from(&writer_arrow_schema).unwrap(); - async fn write_then_get_blob_kinds( - blob_sizes: Vec, - threshold_opt: Option, - ) -> Vec { - let test_dir = TempStrDir::default(); + let mut preprocessor = super::BlobPreprocessor::new( + object_store.clone(), + data_dir, + "data_file_key".to_string(), + &writer_schema, + ); - let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len()); - for size in &blob_sizes { - blob_builder.push_bytes(vec![0u8; *size]).unwrap(); - } + let mut blob_builder = BlobArrayBuilder::new(1); + blob_builder.push_bytes(vec![0u8; data_len]).unwrap(); let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); - let id_values: Vec = (0..blob_sizes.len() as u32).collect(); - let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values)); - - let schema = build_schema_with_meta(threshold_opt); - - let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); - let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - - let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); - let dataset = Arc::new( - Dataset::write(reader, &test_dir, Some(params)) - .await - .unwrap(), - ); + let field_without_metadata = + Field::new("blob", field.data_type().clone(), field.is_nullable()); + let batch_schema = Arc::new(Schema::new(vec![field_without_metadata])); + let batch = RecordBatch::try_new(batch_schema, vec![blob_array]).unwrap(); - let indices: Vec = (0..blob_sizes.len() as u64).collect(); - let blobs = dataset - .take_blobs_by_indices(&indices, "blob") - .await + let out = preprocessor.preprocess_batch(&batch).await.unwrap(); + let struct_arr = out + .column(0) + .as_any() + .downcast_ref::() .unwrap(); - - assert_eq!(blobs.len(), blob_sizes.len()); - - blobs.into_iter().map(|b| b.kind()).collect() + struct_arr + .column_by_name("kind") + .unwrap() + .as_primitive::() + .value(0) } #[tokio::test] async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() { - let small_blob_len = super::DEDICATED_THRESHOLD / 2; - let large_blob_len = super::DEDICATED_THRESHOLD + 1; - - // Sanity check assumptions for this test - assert!(small_blob_len > super::INLINE_MAX); - - let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")]; - - for (threshold_opt, label) in cases { - let kinds = - write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt) - .await; - - assert_eq!(kinds.len(), 2, "case: {label}"); - assert_eq!(kinds[0], BlobKind::Packed, "case: {label}"); - assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}"); - } + let kind = preprocess_kind_with_schema_metadata("0", 256 * 1024).await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Packed as u8); } #[tokio::test] async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() { - let blob_len = super::DEDICATED_THRESHOLD / 2; - let overridden_threshold = super::DEDICATED_THRESHOLD / 4; - - assert!(blob_len > super::INLINE_MAX); - assert!(overridden_threshold > 0); - assert!(blob_len > overridden_threshold); - - let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; - - assert_eq!(kinds.len(), 1); - assert_eq!(kinds[0], BlobKind::Packed); + let kind = preprocess_kind_with_schema_metadata("131072", 256 * 1024).await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Dedicated as u8); } #[tokio::test] async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() { - let blob_len = super::DEDICATED_THRESHOLD + 1; - let overridden_threshold = super::DEDICATED_THRESHOLD * 2; - - assert!(blob_len > super::INLINE_MAX); - assert!(blob_len < overridden_threshold); - - let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; - - assert_eq!(kinds.len(), 1); - assert_eq!(kinds[0], BlobKind::Packed); + let kind = + preprocess_kind_with_schema_metadata("8388608", super::DEDICATED_THRESHOLD + 1024) + .await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Packed as u8); } } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index fc1f54cc1b3..68e760cc3e3 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -738,9 +738,17 @@ fn tagged_old_versions_cleanup_error( mod tests { use std::{collections::HashMap, sync::Arc}; + use super::*; + use crate::blob::{blob_field, BlobArrayBuilder}; + use crate::{ + dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, + index::vector::VectorIndexParams, + }; + use all_asserts::{assert_gt, assert_lt}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::assert_contains; + use lance_core::utils::tempfile::TempStrDir; use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy}; use lance_index::{DatasetIndexExt, IndexType}; use lance_io::object_store::{ @@ -752,15 +760,6 @@ mod tests { use mock_instant::thread_local::MockClock; use snafu::location; - use super::*; - use crate::blob::{blob_field, BlobArrayBuilder}; - use crate::{ - dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, - index::vector::VectorIndexParams, - }; - use all_asserts::{assert_gt, assert_lt}; - use lance_core::utils::tempfile::TempStrDir; - #[derive(Debug)] struct MockObjectStore { policy: Arc>, diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index a9c87e44a05..bc8f78871b4 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -18,7 +18,6 @@ use snafu::location; use std::borrow::Cow; use uuid::Uuid; -use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; use crate::dataset::builder::DatasetBuilder; use crate::dataset::write::do_write_fragments; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; @@ -139,7 +138,6 @@ impl<'a> FragmentCreateBuilder<'a> { let filename = format!("{}.lance", data_file_key); let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); - let has_blob_v2 = schema_has_blob_v2(&schema); let obj_writer = object_store.create(&full_path).await?; let mut writer = lance_file::writer::FileWriter::try_new( obj_writer, @@ -150,16 +148,6 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; - let mut preprocessor = if has_blob_v2 { - Some(BlobPreprocessor::new( - object_store.as_ref().clone(), - base_path.child(DATA_DIR), - data_file_key.clone(), - )) - } else { - None - }; - let (major, minor) = writer.version().to_numbers(); let data_file = DataFile::new_unstarted(filename, major, minor); @@ -173,10 +161,7 @@ impl<'a> FragmentCreateBuilder<'a> { .map_ok(|batch| vec![batch]) .boxed(); while let Some(batched_chunk) = broken_stream.next().await { - let mut batch_chunk = batched_chunk?; - if let Some(pre) = preprocessor.as_mut() { - batch_chunk = preprocess_blob_batches(&batch_chunk, pre).await?; - } + let batch_chunk = batched_chunk?; writer.write_batches(batch_chunk.iter()).await?; } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a0812d6caf4..e4a456d11d0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -760,8 +760,7 @@ impl TakeOperation { impl Scanner { pub fn new(dataset: Arc) -> Self { - let projection_plan = - ProjectionPlan::full(dataset.clone(), dataset.blob_version()).unwrap(); + let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap(); let file_reader_options = dataset.file_reader_options.clone(); let mut scanner = Self { dataset, @@ -885,11 +884,7 @@ impl Scanner { columns: &[(impl AsRef, impl AsRef)], ) -> Result<&mut Self> { self.explicit_projection = true; - self.projection_plan = ProjectionPlan::from_expressions( - self.dataset.clone(), - columns, - self.dataset.blob_version(), - )?; + self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?; if self.legacy_with_row_id { self.projection_plan.include_row_id(); } diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 00a59f15f5a..9d182acc207 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -592,12 +592,13 @@ fn take_struct_array(array: &StructArray, indices: &UInt64Array) -> Result= LanceFileVersion::V2_2 { - manifest.config_mut().insert( - BLOB_VERSION_CONFIG_KEY.to_string(), - BlobVersion::V2.config_value().to_string(), - ); - } - manifest + ) }; manifest.tag.clone_from(&self.tag); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index d953ea4dd44..c1b36702408 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -6,8 +6,9 @@ use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; +use lance_arrow::BLOB_META_KEY; use lance_core::datatypes::{ - BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, + NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; use lance_core::error::LanceOptionExt; use lance_core::utils::tempfile::TempDir; @@ -33,7 +34,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tracing::{info, instrument}; -use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; +use crate::dataset::blob::{preprocess_blob_batches, BlobPreprocessor}; use crate::session::Session; use crate::Dataset; @@ -43,14 +44,6 @@ use super::transaction::Transaction; use super::utils::SchemaAdapter; use super::DATA_DIR; -pub(super) fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion { - if storage_version >= LanceFileVersion::V2_2 { - BlobVersion::V2 - } else { - BlobVersion::V1 - } -} - mod commit; pub mod delete; mod insert; @@ -376,6 +369,7 @@ pub async fn write_fragments( .await } +#[allow(clippy::too_many_arguments)] pub async fn do_write_fragments( object_store: Arc, base_dir: &Path, @@ -571,9 +565,10 @@ pub async fn write_fragments_internal( base_dir: &Path, schema: Schema, data: SendableRecordBatchStream, - mut params: WriteParams, + params: WriteParams, target_bases_info: Option>, ) -> Result<(Vec, Schema)> { + let mut params = params; let adapter = SchemaAdapter::new(data.schema()); let (data, converted_schema) = if adapter.requires_physical_conversion() { @@ -637,19 +632,30 @@ pub async fn write_fragments_internal( (converted_schema, params.storage_version_or_default()) }; - let target_blob_version = blob_version_for(storage_version); - if let Some(dataset) = dataset { - let existing_version = dataset.blob_version(); - if existing_version != target_blob_version { - return Err(Error::InvalidInput { - source: format!( - "Blob column version mismatch. Existing dataset uses {:?} but requested write requires {:?}. Changing blob version is not allowed", - existing_version, target_blob_version - ) - .into(), - location: location!(), - }); - } + if storage_version < LanceFileVersion::V2_2 && schema.fields.iter().any(|f| f.is_blob_v2()) { + return Err(Error::InvalidInput { + source: format!( + "Blob v2 requires file version >= 2.2 (got {:?})", + storage_version + ) + .into(), + location: location!(), + }); + } + + if storage_version >= LanceFileVersion::V2_2 + && schema + .fields + .iter() + .any(|f| f.metadata.contains_key(BLOB_META_KEY)) + { + return Err(Error::InvalidInput { + source: format!( + "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)." + ) + .into(), + location: location!(), + }); } let fragments = do_write_fragments( @@ -811,6 +817,7 @@ pub async fn open_writer_with_options( }) } else { let writer = object_store.create(&full_path).await?; + let enable_blob_v2 = storage_version >= LanceFileVersion::V2_2; let file_writer = current_writer::FileWriter::try_new( writer, schema.clone(), @@ -819,11 +826,12 @@ pub async fn open_writer_with_options( ..Default::default() }, )?; - let preprocessor = if schema_has_blob_v2(schema) { + let preprocessor = if enable_blob_v2 { Some(BlobPreprocessor::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), + schema, )) } else { None diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 7763c5e8f7c..f2fb5aa0dbc 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -193,7 +193,7 @@ impl<'a> InsertBuilder<'a> { let target_base_info = validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; - let (written_fragments, _) = write_fragments_internal( + let (written_fragments, written_schema) = write_fragments_internal( context.dest.dataset(), context.object_store.clone(), &context.base_path, @@ -204,7 +204,7 @@ impl<'a> InsertBuilder<'a> { ) .await?; - let transaction = Self::build_transaction(schema, written_fragments, &context)?; + let transaction = Self::build_transaction(written_schema, written_fragments, &context)?; Ok((transaction, context)) } @@ -216,28 +216,29 @@ impl<'a> InsertBuilder<'a> { ) -> Result { let operation = match context.params.mode { WriteMode::Create => { - let config_upsert_values = - if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { - let mut upsert_values = HashMap::new(); - upsert_values.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); - - let duration = auto_cleanup_params.older_than.to_std().map_err(|e| { - Error::InvalidInput { - source: e.into(), - location: location!(), - } - })?; - upsert_values.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(duration).to_string(), - ); - Some(upsert_values) - } else { - None - }; + let mut upsert_values = HashMap::new(); + if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { + upsert_values.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); + + let duration = auto_cleanup_params.older_than.to_std().map_err(|e| { + Error::InvalidInput { + source: e.into(), + location: location!(), + } + })?; + upsert_values.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(duration).to_string(), + ); + } + let config_upsert_values = if upsert_values.is_empty() { + None + } else { + Some(upsert_values) + }; Operation::Overwrite { // Use the full schema, not the written schema schema, @@ -434,8 +435,11 @@ struct WriteContext<'a> { #[cfg(test)] mod test { - use arrow_array::{Int32Array, RecordBatchReader, StructArray}; + use std::collections::HashMap; + + use arrow_array::{BinaryArray, Int32Array, RecordBatchReader, StructArray}; use arrow_schema::{ArrowError, DataType, Field, Schema}; + use lance_arrow::BLOB_META_KEY; use crate::session::Session; @@ -488,7 +492,7 @@ mod test { } #[tokio::test] - async fn prevent_blob_version_upgrade_on_overwrite() { + async fn allow_overwrite_to_v2_2_without_blob_upgrade() { let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) .unwrap(); @@ -513,7 +517,44 @@ mod test { .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) .await; - assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn create_v2_2_dataset_rejects_legacy_blob_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "blob", + DataType::Binary, + false, + ) + .with_metadata(HashMap::from([( + BLOB_META_KEY.to_string(), + "true".to_string(), + )]))])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(BinaryArray::from(vec![Some(b"abc".as_slice())]))], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://forced-blob-v2") + .with_params(&WriteParams { + mode: WriteMode::Create, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }) + .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) + .await; + + let err = dataset.unwrap_err(); + match err { + Error::InvalidInput { source, .. } => { + let message = source.to_string(); + assert!(message.contains("Legacy blob columns")); + assert!(message.contains("lance.blob.v2")); + } + other => panic!("unexpected error: {other:?}"), + } } mod external_error {