From dc08b8eed4d856f5344e0cf06651645db35ed962 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 5 Nov 2025 00:18:10 +0800 Subject: [PATCH 1/5] Remove not used storage class Signed-off-by: Xuanwo --- java/lance-jni/src/schema.rs | 25 +--- java/lance-jni/src/transaction.rs | 12 +- .../java/com/lancedb/lance/Transaction.java | 18 +-- .../com/lancedb/lance/schema/LanceField.java | 8 -- .../com/lancedb/lance/schema/StorageType.java | 19 --- protos/file.proto | 49 ++++--- protos/table.proto | 9 +- protos/transaction.proto | 8 +- python/python/lance/dataset.py | 3 - python/python/lance/lance/__init__.pyi | 1 - python/src/dataset.rs | 10 +- python/src/fragment.rs | 5 - python/src/transaction.rs | 13 +- rust/lance-core/src/datatypes.rs | 5 +- rust/lance-core/src/datatypes/field.rs | 103 +++++---------- rust/lance-core/src/datatypes/schema.rs | 37 ++++-- rust/lance-file/src/datatypes.rs | 50 ++++++- rust/lance-table/src/feature_flags.rs | 2 - rust/lance-table/src/format/manifest.rs | 33 +---- rust/lance-table/src/io/manifest.rs | 2 - rust/lance/src/dataset.rs | 122 ++++++++---------- rust/lance/src/dataset/blob.rs | 69 +--------- rust/lance/src/dataset/fragment.rs | 7 - rust/lance/src/dataset/index/frag_reuse.rs | 1 - rust/lance/src/dataset/metadata.rs | 7 +- rust/lance/src/dataset/optimize.rs | 9 +- rust/lance/src/dataset/optimize/remapping.rs | 1 - rust/lance/src/dataset/scanner.rs | 6 - rust/lance/src/dataset/schema_evolution.rs | 35 +---- rust/lance/src/dataset/transaction.rs | 120 +---------------- rust/lance/src/dataset/write.rs | 116 +++++------------ rust/lance/src/dataset/write/commit.rs | 19 --- rust/lance/src/dataset/write/delete.rs | 7 +- rust/lance/src/dataset/write/insert.rs | 70 +++------- rust/lance/src/dataset/write/merge_insert.rs | 16 +-- .../dataset/write/merge_insert/exec/write.rs | 11 +- rust/lance/src/dataset/write/update.rs | 30 +---- rust/lance/src/index.rs | 3 - rust/lance/src/index/create.rs | 1 - rust/lance/src/index/mem_wal.rs | 5 - rust/lance/src/index/vector.rs | 1 - rust/lance/src/index/vector/ivf.rs | 2 - rust/lance/src/io/commit.rs | 79 +----------- rust/lance/src/io/commit/conflict_resolver.rs | 4 +- rust/lance/src/io/exec.rs | 1 - 45 files changed, 266 insertions(+), 888 deletions(-) delete mode 100644 java/src/main/java/com/lancedb/lance/schema/StorageType.java diff --git a/java/lance-jni/src/schema.rs b/java/lance-jni/src/schema.rs index 4a7c679d40d..41d6f78f043 100644 --- a/java/lance-jni/src/schema.rs +++ b/java/lance-jni/src/schema.rs @@ -9,7 +9,7 @@ use arrow_schema::{TimeUnit, UnionFields}; use jni::objects::{JObject, JValue}; use jni::sys::{jboolean, jint}; use jni::JNIEnv; -use lance_core::datatypes::{Field, Schema, StorageClass}; +use lance_core::datatypes::{Field, Schema}; impl IntoJava for Schema { fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { @@ -40,11 +40,8 @@ pub fn convert_to_java_field<'local>( let children = convert_children_fields(env, lance_field)?; let metadata = to_java_map(env, &lance_field.metadata)?; let arrow_type = convert_arrow_type(env, &lance_field.data_type())?; - let storage_type = convert_storage_type(env, &lance_field.storage_class)?; - let ctor_sig = "(IILjava/lang/String;".to_owned() + "ZLorg/apache/arrow/vector/types/pojo/ArrowType;" - + "Lcom/lancedb/lance/schema/StorageType;" + "Lorg/apache/arrow/vector/types/pojo/DictionaryEncoding;" + "Ljava/util/Map;" + "Ljava/util/List;Z)V"; @@ -57,7 +54,6 @@ pub fn convert_to_java_field<'local>( JValue::Object(&JObject::from(name)), JValue::Bool(lance_field.nullable as jboolean), JValue::Object(&arrow_type), - JValue::Object(&storage_type), JValue::Object(&JObject::null()), JValue::Object(&metadata), JValue::Object(&children), @@ -68,25 +64,6 @@ pub fn convert_to_java_field<'local>( Ok(field_obj) } -fn convert_storage_type<'local>( - env: &mut JNIEnv<'local>, - storage_class: &StorageClass, -) -> Result> { - let jname = match storage_class { - StorageClass::Blob => env.new_string("BLOB")?, - _ => env.new_string("DEFAULT")?, - }; - - Ok(env - .call_static_method( - "com/lancedb/lance/schema/StorageType", - "valueOf", - "(Ljava/lang/String;)Lcom/lancedb/lance/schema/StorageType;", - &[JValue::Object(&JObject::from(jname))], - )? - .l()?) -} - fn convert_children_fields<'local>( env: &mut JNIEnv<'local>, lance_field: &Field, diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index f9ce719f369..13088e600d0 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -397,18 +397,16 @@ fn convert_to_java_transaction<'local>( Some(properties) => to_java_map(env, &properties)?, _ => JObject::null(), }; - let operation = convert_to_java_operation_inner(env, transaction.operation)?; - let blobs_op = convert_to_java_operation(env, transaction.blobs_op)?; + let operation = convert_to_java_operation(env, Some(transaction.operation))?; let java_transaction = env.new_object( "com/lancedb/lance/Transaction", - "(Lcom/lancedb/lance/Dataset;JLjava/lang/String;Lcom/lancedb/lance/operation/Operation;Lcom/lancedb/lance/operation/Operation;Ljava/util/Map;Ljava/util/Map;)V", + "(Lcom/lancedb/lance/Dataset;JLjava/lang/String;Lcom/lancedb/lance/operation/Operation;Ljava/util/Map;Ljava/util/Map;)V", &[ JValue::Object(java_dataset), JValue::Long(transaction.read_version as i64), JValue::Object(&uuid), JValue::Object(&operation), - JValue::Object(&blobs_op), JValue::Object(&JObject::null()), JValue::Object(&transaction_properties), ], @@ -707,11 +705,6 @@ fn convert_to_rust_transaction( .l()?; let op = convert_to_rust_operation(env, &op, java_dataset)?; - let blobs_op = - env.get_optional_from_method(&java_transaction, "blobsOperation", |env, blobs_op| { - convert_to_rust_operation(env, &blobs_op, java_dataset) - })?; - let transaction_properties = env.get_optional_from_method( &java_transaction, "transactionProperties", @@ -722,7 +715,6 @@ fn convert_to_rust_transaction( )?; Ok(TransactionBuilder::new(read_ver, op) .uuid(uuid) - .blobs_op(blobs_op) .transaction_properties(transaction_properties.map(Arc::new)) .build()) } diff --git a/java/src/main/java/com/lancedb/lance/Transaction.java b/java/src/main/java/com/lancedb/lance/Transaction.java index eea66afaedf..057ca5466cf 100644 --- a/java/src/main/java/com/lancedb/lance/Transaction.java +++ b/java/src/main/java/com/lancedb/lance/Transaction.java @@ -37,21 +37,18 @@ public class Transaction { // Mainly for JNI usage private final Dataset dataset; private final Operation operation; - private final Optional blobOp; private Transaction( Dataset dataset, long readVersion, String uuid, Operation operation, - Operation blobOp, Map writeParams, Map transactionProperties) { this.dataset = dataset; this.readVersion = readVersion; this.uuid = uuid; this.operation = operation; - this.blobOp = Optional.ofNullable(blobOp); this.writeParams = writeParams != null ? writeParams : new HashMap<>(); this.transactionProperties = Optional.ofNullable(transactionProperties); } @@ -68,10 +65,6 @@ public Operation operation() { return operation; } - public Optional blobsOperation() { - return blobOp; - } - public Map writeParams() { return writeParams; } @@ -89,7 +82,6 @@ public Dataset commit() { public void release() { operation.release(); - blobOp.ifPresent(Operation::release); } @Override @@ -99,7 +91,6 @@ public String toString() { .add("uuid", uuid) .add("operation", operation) .add("writeParams", writeParams) - .add("blobOp", blobOp) .add("transactionProperties", transactionProperties) .toString(); } @@ -116,7 +107,6 @@ public boolean equals(Object o) { return readVersion == that.readVersion && uuid.equals(that.uuid) && Objects.equals(operation, that.operation) - && Objects.equals(blobOp, that.blobOp) && Objects.equals(writeParams, that.writeParams) && Objects.equals(transactionProperties, that.transactionProperties); } @@ -126,7 +116,6 @@ public static class Builder { private final Dataset dataset; private long readVersion; private Operation operation; - private Operation blobOp; private Map writeParams; private Map transactionProperties; @@ -156,11 +145,6 @@ public Builder operation(Operation operation) { return this; } - public Builder blobsOperation(Operation blobOp) { - this.blobOp = blobOp; - return this; - } - private void validateState() { if (operation != null) { throw new IllegalStateException( @@ -171,7 +155,7 @@ private void validateState() { public Transaction build() { Preconditions.checkState(operation != null, "TransactionBuilder has no operations"); return new Transaction( - dataset, readVersion, uuid, operation, blobOp, writeParams, transactionProperties); + dataset, readVersion, uuid, operation, writeParams, transactionProperties); } } } diff --git a/java/src/main/java/com/lancedb/lance/schema/LanceField.java b/java/src/main/java/com/lancedb/lance/schema/LanceField.java index 658f63e6ee8..9f10e58f1aa 100644 --- a/java/src/main/java/com/lancedb/lance/schema/LanceField.java +++ b/java/src/main/java/com/lancedb/lance/schema/LanceField.java @@ -30,7 +30,6 @@ public class LanceField { private final String name; private final boolean nullable; private final ArrowType type; - private final StorageType storageType; private final DictionaryEncoding dictionaryEncoding; private final Map metadata; private final List children; @@ -42,7 +41,6 @@ public class LanceField { String name, boolean nullable, ArrowType type, - StorageType storageType, DictionaryEncoding dictionaryEncoding, Map metadata, List children, @@ -52,7 +50,6 @@ public class LanceField { this.name = name; this.nullable = nullable; this.type = type; - this.storageType = storageType; this.dictionaryEncoding = dictionaryEncoding; this.metadata = metadata; this.children = children; @@ -79,10 +76,6 @@ public ArrowType getType() { return type; } - public StorageType getStorageType() { - return storageType; - } - public Optional getDictionaryEncoding() { return Optional.ofNullable(dictionaryEncoding); } @@ -114,7 +107,6 @@ public String toString() { .add("name", name) .add("nullable", nullable) .add("type", type) - .add("storageType", storageType) .add("dictionaryEncoding", dictionaryEncoding) .add("children", children) .add("isUnenforcedPrimaryKey", isUnenforcedPrimaryKey) diff --git a/java/src/main/java/com/lancedb/lance/schema/StorageType.java b/java/src/main/java/com/lancedb/lance/schema/StorageType.java deleted file mode 100644 index 829189ae0ca..00000000000 --- a/java/src/main/java/com/lancedb/lance/schema/StorageType.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.lancedb.lance.schema; - -public enum StorageType { - DEFAULT, - BLOB -} diff --git a/protos/file.proto b/protos/file.proto index 0ed681c05ae..e381f3a5198 100644 --- a/protos/file.proto +++ b/protos/file.proto @@ -15,16 +15,16 @@ message FileDescriptor { // A schema which describes the data type of each of the columns message Schema { - // All fields in this file, including the nested fields. - repeated lance.file.Field fields = 1; - // Schema metadata. - map metadata = 5; + // All fields in this file, including the nested fields. + repeated lance.file.Field fields = 1; + // Schema metadata. + map metadata = 5; } // Metadata of one Lance file. message Metadata { - // 4 was used for StatisticsMetadata in the past, but has been moved to prevent - // a bug in older readers. + // 4 was used for StatisticsMetadata in the past, but has been moved to + // prevent a bug in older readers. reserved 4; // Position of the manifest in the file. If it is zero, the manifest is stored @@ -44,7 +44,7 @@ message Metadata { // contiguously stored. // // Every field that is a part of the file will have a run in the page table. - // This includes struct columns, which will have a run of length 0 since + // This includes struct columns, which will have a run of length 0 since // they don't store any actual data. // // For example, for the column 5 and batch 4, we have: @@ -57,7 +57,7 @@ message Metadata { message StatisticsMetadata { // The schema of the statistics. // - // This might be empty, meaning there are no statistics. It also might not + // This might be empty, meaning there are no statistics. It also might not // contain statistics for every field. repeated Field schema = 1; @@ -70,20 +70,20 @@ message Metadata { // The file position of the statistics page table // - // The page table is a matrix of N x 2, where N = length of stats_fields. This is - // the same layout as the main page table, except there is always only one - // batch. + // The page table is a matrix of N x 2, where N = length of stats_fields. + // This is the same layout as the main page table, except there is always + // only one batch. // // For example, to get the stats column 5, we have: // ```text // position = stats_page_table[5][0]; // length = stats_page_table[5][1]; // ``` - uint64 page_table_position = 3; + uint64 page_table_position = 3; } StatisticsMetadata statistics = 5; -} // Metadata +} // Metadata // Supported encodings. enum Encoding { @@ -154,7 +154,8 @@ message Field { // * "date32:day" // * "date64:ms" // * "decimal:128:{precision}:{scale}" / "decimal:256:{precision}:{scale}" - // * "time:{unit}" / "timestamp:{unit}" / "duration:{unit}", where unit is "s", "ms", "us", "ns" + // * "time:{unit}" / "timestamp:{unit}" / "duration:{unit}", where unit is + // "s", "ms", "us", "ns" // * "dict:{value_type}:{index_type}:false" string logical_type = 5; // If this field is nullable. @@ -168,24 +169,18 @@ message Field { /// The logic type presents the value type of the column, i.e., string value. Dictionary dictionary = 8; - // Deprecated: optional extension type name, use metadata field ARROW:extension:name + // Deprecated: optional extension type name, use metadata field + // ARROW:extension:name string extension_name = 9; // optional field metadata (e.g. extension type name/parameters) map metadata = 10; - /// The storage class of the field - /// - /// This determines the rate at which the field is compacted. - /// - /// Currently, there are only two storage classes: - /// - /// "" - The default storage class. - /// "blob" - The field is compacted into fewer rows per fragment. - /// - /// Fields that have non-default storage classes are stored in different - /// datasets (e.g. blob fields are stored in the nested "_blobs" dataset) - string storage_class = 11; + // Field number 11 was previously `string storage_class`. + // Keep it reserved so older manifests remain compatible while new writers + // avoid reusing the slot. + reserved 11; + reserved "storage_class"; bool unenforced_primary_key = 12; } diff --git a/protos/table.proto b/protos/table.proto index 580abeaf954..f20b68cfcda 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -183,12 +183,9 @@ message Manifest { // data itself and is attached to the output schema of scans. map table_metadata = 19; - // The version of the blob dataset associated with this table. Changes to - // blob fields will modify the blob dataset and update this version in the parent - // table. - // - // If this value is 0 then there are no blob fields. - uint64 blob_dataset_version = 17; + // Field number 17 (`blob_dataset_version`) was used for a secondary blob dataset. + reserved 17; + reserved "blob_dataset_version"; // The base paths of data files. // diff --git a/protos/transaction.proto b/protos/transaction.proto index 186847d52b5..bcc49a16188 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -298,9 +298,7 @@ message Transaction { UpdateBases update_bases = 114; } - // An operation to apply to the blob dataset - oneof blob_operation { - Append blob_append = 200; - Overwrite blob_overwrite = 202; - } + // Fields 200/202 (`blob_append` / `blob_overwrite`) previously represented blob dataset ops. + reserved 200, 202; + reserved "blob_append", "blob_overwrite"; } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index a9e8bca873b..43d1c423903 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3059,7 +3059,6 @@ def _commit( def commit( base_uri: Union[str, Path, LanceDataset], operation: Union[LanceOperation.BaseOperation, Transaction], - blobs_op: Optional[LanceOperation.BaseOperation] = None, read_version: Optional[int] = None, commit_lock: Optional[CommitLock] = None, storage_options: Optional[Dict[str, str]] = None, @@ -3200,7 +3199,6 @@ def commit( new_ds = _Dataset.commit( base_uri, operation, - blobs_op, read_version, commit_lock, storage_options=storage_options, @@ -3649,7 +3647,6 @@ class Transaction: read_version: int operation: LanceOperation.BaseOperation uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) - blobs_op: Optional[LanceOperation.BaseOperation] = None transaction_properties: Optional[Dict[str, str]] = dataclasses.field( default_factory=dict ) diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 6f20364046e..670b6357f73 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -342,7 +342,6 @@ class _Dataset: def commit( dest: str | _Dataset, operation: LanceOperation.BaseOperation, - blobs_op: Optional[LanceOperation.BaseOperation] = None, read_version: Optional[int] = None, commit_lock: Optional[CommitLock] = None, storage_options: Optional[Dict[str, str]] = None, diff --git a/python/src/dataset.rs b/python/src/dataset.rs index a0bd8eb0767..7f154576bf9 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2073,11 +2073,10 @@ impl Dataset { #[allow(clippy::too_many_arguments)] #[staticmethod] - #[pyo3(signature = (dest, operation, blobs_op=None, read_version = None, commit_lock = None, storage_options = None, enable_v2_manifest_paths = None, detached = None, max_retries = None, commit_message = None))] + #[pyo3(signature = (dest, operation, read_version = None, commit_lock = None, storage_options = None, enable_v2_manifest_paths = None, detached = None, max_retries = None, commit_message = None))] fn commit( dest: PyWriteDest, operation: PyLance, - blobs_op: Option>, read_version: Option, commit_lock: Option<&Bound<'_, PyAny>>, storage_options: Option>, @@ -2086,12 +2085,7 @@ impl Dataset { max_retries: Option, commit_message: Option, ) -> PyResult { - let mut transaction = Transaction::new( - read_version.unwrap_or_default(), - operation.0, - blobs_op.map(|op| op.0), - None, - ); + let mut transaction = Transaction::new(read_version.unwrap_or_default(), operation.0, None); if let Some(commit_message) = commit_message { transaction.transaction_properties = Some(Arc::new(HashMap::from([( diff --git a/python/src/fragment.rs b/python/src/fragment.rs index 1bc864b9027..9dbd8f1a738 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -425,11 +425,6 @@ pub fn write_fragments( ) -> PyResult> { let written = do_write_fragments(dest, reader, kwargs)?; - assert!( - written.blobs_op.is_none(), - "Blob writing is not yet supported by the python _write_fragments API" - ); - let get_fragments = |operation| match operation { Operation::Overwrite { fragments, .. } => Ok(fragments), Operation::Append { fragments, .. } => Ok(fragments), diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 0aa19669986..131f7957782 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -527,10 +527,6 @@ impl FromPyObject<'_> for PyLance { let read_version = ob.getattr("read_version")?.extract()?; let uuid = ob.getattr("uuid")?.extract()?; let operation = ob.getattr("operation")?.extract::>()?.0; - let blobs_op = ob - .getattr("blobs_op")? - .extract::>>()? - .map(|op| op.0); let transaction_properties = ob .getattr("transaction_properties")? .extract::>>()? @@ -540,7 +536,6 @@ impl FromPyObject<'_> for PyLance { read_version, uuid, operation, - blobs_op, tag: None, transaction_properties, })) @@ -560,18 +555,12 @@ impl<'py> IntoPyObject<'py> for PyLance<&Transaction> { let read_version = self.0.read_version; let uuid = &self.0.uuid; let operation = PyLance(&self.0.operation).into_pyobject(py)?; - let blobs_op = self - .0 - .blobs_op - .as_ref() - .map(|op| PyLance(op).into_pyobject(py)) - .transpose()?; let cls = namespace .getattr("Transaction") .expect("Failed to get Transaction class"); - let py_transaction = cls.call1((read_version, operation, uuid, blobs_op))?; + let py_transaction = cls.call1((read_version, operation, uuid))?; if let Some(transaction_properties_arc) = &self.0.transaction_properties { let py_dict = transaction_properties_arc.as_ref().into_pyobject(py)?; diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index f193f626920..96861ca54d7 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -18,10 +18,7 @@ mod field; mod schema; use crate::{Error, Result}; -pub use field::{ - Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions, StorageClass, - LANCE_STORAGE_CLASS_SCHEMA_META_KEY, -}; +pub use field::{Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions}; pub use schema::{ escape_field_path_for_project, format_field_path, parse_field_path, FieldRef, OnMissing, Projectable, Projection, Schema, diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 3d1463a02f1..e996326f096 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -6,8 +6,7 @@ use std::{ cmp::{max, Ordering}, collections::{HashMap, VecDeque}, - fmt::{self, Display}, - str::FromStr, + fmt, sync::Arc, }; @@ -32,7 +31,7 @@ use super::{ }; use crate::{datatypes::BLOB_DESC_LANCE_FIELD, Error, Result}; -pub const LANCE_STORAGE_CLASS_SCHEMA_META_KEY: &str = "lance-schema:storage-class"; +const LEGACY_STORAGE_CLASS_METADATA_KEY: &str = "lance-schema:storage-class"; /// Use this config key in Arrow field metadata to indicate a column is a part of the primary key. /// The value can be any true values like `true`, `1`, `yes` (case-insensitive). @@ -85,40 +84,6 @@ pub enum Encoding { RLE, } -/// Describes the rate at which a column should be compacted -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, DeepSizeOf)] -pub enum StorageClass { - /// Default storage class (stored in primary dataset) - #[default] - Default, - /// Blob storage class (stored in blob dataset) - Blob, -} - -impl Display for StorageClass { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Default => write!(f, "default"), - Self::Blob => write!(f, "blob"), - } - } -} - -impl FromStr for StorageClass { - type Err = Error; - - fn from_str(s: &str) -> std::result::Result { - match s { - "default" | "" => Ok(Self::Default), - "blob" => Ok(Self::Blob), - _ => Err(Error::Schema { - message: format!("Unknown storage class: {}", s), - location: location!(), - }), - } - } -} - /// What to do on a merge operation if the types of the fields don't match #[derive(Debug, Clone, Copy, PartialEq, Eq, DeepSizeOf)] pub enum OnTypeMismatch { @@ -143,7 +108,6 @@ pub struct Field { /// Dictionary value array if this field is dictionary. pub dictionary: Option, - pub storage_class: StorageClass, pub unenforced_primary_key: bool, } @@ -173,14 +137,6 @@ impl Field { || self.children.iter().any(Self::has_dictionary_types) } - pub fn is_default_storage(&self) -> bool { - self.storage_class == StorageClass::Default - } - - pub fn storage_class(&self) -> StorageClass { - self.storage_class - } - /// Merge a field with another field using a reference field to ensure /// the correct order of fields /// @@ -526,7 +482,6 @@ impl Field { nullable: self.nullable, children: vec![], dictionary: self.dictionary.clone(), - storage_class: self.storage_class, unenforced_primary_key: self.unenforced_primary_key, }; if path_components.is_empty() { @@ -744,7 +699,6 @@ impl Field { nullable: self.nullable, children, dictionary: self.dictionary.clone(), - storage_class: self.storage_class, unenforced_primary_key: self.unenforced_primary_key, }; return Ok(f); @@ -808,7 +762,6 @@ impl Field { nullable: self.nullable, children, dictionary: self.dictionary.clone(), - storage_class: self.storage_class, unenforced_primary_key: self.unenforced_primary_key, }) } @@ -978,18 +931,24 @@ impl TryFrom<&ArrowField> for Field { DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?], _ => vec![], }; - let storage_class = field - .metadata() - .get(LANCE_STORAGE_CLASS_SCHEMA_META_KEY) - .map(|s| StorageClass::from_str(s)) - .unwrap_or(Ok(StorageClass::Default))?; - - let unenforced_primary_key = field - .metadata() + let mut metadata = field.metadata().clone(); + let unenforced_primary_key = metadata .get(LANCE_UNENFORCED_PRIMARY_KEY) .map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes")) .unwrap_or(false); + if let Some(value) = metadata + .get(LEGACY_STORAGE_CLASS_METADATA_KEY) + .map(|v| v.to_ascii_lowercase()) + { + if value == "blob" { + metadata + .entry(lance_arrow::BLOB_META_KEY.to_string()) + .or_insert_with(|| "true".to_string()); + } + metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); + } + // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") @@ -1010,11 +969,10 @@ impl TryFrom<&ArrowField> for Field { DataType::List(_) | DataType::LargeList(_) => Some(Encoding::Plain), _ => None, }, - metadata: field.metadata().clone(), + metadata, nullable: field.is_nullable(), children, dictionary: None, - storage_class, unenforced_primary_key, }) } @@ -1032,6 +990,7 @@ impl From<&Field> for ArrowField { fn from(field: &Field) -> Self { let out = Self::new(&field.name, field.data_type(), field.nullable); let mut metadata = field.metadata.clone(); + metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); // Add JSON extension metadata if this is a JSON field if field.logical_type.0 == "json" { @@ -1041,15 +1000,6 @@ impl From<&Field> for ArrowField { ); } - match field.storage_class { - StorageClass::Default => {} - StorageClass::Blob => { - metadata.insert( - LANCE_STORAGE_CLASS_SCHEMA_META_KEY.to_string(), - "blob".to_string(), - ); - } - } out.with_metadata(metadata) } } @@ -1060,6 +1010,7 @@ mod tests { use arrow_array::{DictionaryArray, StringArray, UInt32Array}; use arrow_schema::{Fields, TimeUnit}; + use std::collections::HashMap; #[test] fn arrow_field_to_field() { @@ -1171,6 +1122,22 @@ mod tests { assert_eq!(ArrowField::from(&field), arrow_field); } + #[test] + fn legacy_storage_class_metadata_sets_blob() { + let metadata = HashMap::from([( + super::LEGACY_STORAGE_CLASS_METADATA_KEY.to_string(), + "blob".to_string(), + )]); + let arrow_field = + ArrowField::new("blob", DataType::LargeBinary, false).with_metadata(metadata); + let field = Field::try_from(&arrow_field).unwrap(); + assert!(field.is_blob()); + assert_eq!( + field.metadata.get(lance_arrow::BLOB_META_KEY), + Some(&"true".to_string()) + ); + } + #[test] fn test_project_by_field_null_type() { let f1: Field = ArrowField::new("a", DataType::Null, true) diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 31d2d729ae9..2eacce0deba 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf; use lance_arrow::*; use snafu::location; -use super::field::{Field, OnTypeMismatch, SchemaCompareOptions, StorageClass}; +use super::field::{Field, OnTypeMismatch, SchemaCompareOptions}; use crate::{Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, WILDCARD}; /// Lance Schema. @@ -146,11 +146,19 @@ impl Schema { } } - pub fn retain_storage_class(&self, storage_class: StorageClass) -> Self { + pub fn retain_blob_fields(&self) -> Self { + self.retain_by(|f| f.is_blob()) + } + + pub fn retain_non_blob_fields(&self) -> Self { + self.retain_by(|f| !f.is_blob()) + } + + fn retain_by bool>(&self, predicate: F) -> Self { let fields = self .fields .iter() - .filter(|f| f.storage_class() == storage_class) + .filter(|f| predicate(f)) .cloned() .collect(); Self { @@ -159,28 +167,29 @@ impl Schema { } } - /// Splits the schema into two schemas, one with default storage class fields and the other with blob storage class fields. - /// If there are no blob storage class fields, the second schema will be `None`. + /// Splits the schema into two schemas, one with non-blob fields and the other with blob fields. + /// If there are no blob fields, the second schema will be `None`. /// The order of fields is preserved. - pub fn partition_by_storage_class(&self) -> (Self, Option) { - let mut local_fields = Vec::with_capacity(self.fields.len()); - let mut sibling_fields = Vec::with_capacity(self.fields.len()); + pub fn partition_by_blob_columns(&self) -> (Self, Option) { + let mut non_blob_fields = Vec::with_capacity(self.fields.len()); + let mut blob_fields = Vec::with_capacity(self.fields.len()); for field in self.fields.iter() { - match field.storage_class() { - StorageClass::Default => local_fields.push(field.clone()), - StorageClass::Blob => sibling_fields.push(field.clone()), + if field.is_blob() { + blob_fields.push(field.clone()); + } else { + non_blob_fields.push(field.clone()); } } ( Self { - fields: local_fields, + fields: non_blob_fields, metadata: self.metadata.clone(), }, - if sibling_fields.is_empty() { + if blob_fields.is_empty() { None } else { Some(Self { - fields: sibling_fields, + fields: blob_fields, metadata: self.metadata.clone(), }) }, diff --git a/rust/lance-file/src/datatypes.rs b/rust/lance-file/src/datatypes.rs index 83f72dce6ac..a047c2e6df8 100644 --- a/rust/lance-file/src/datatypes.rs +++ b/rust/lance-file/src/datatypes.rs @@ -15,6 +15,8 @@ use snafu::location; use crate::format::pb; +const LEGACY_STORAGE_CLASS_METADATA_KEY: &str = "lance-schema:storage-class"; + #[allow(clippy::fallible_impl_from)] impl From<&pb::Field> for Field { fn from(field: &pb::Field) -> Self { @@ -29,6 +31,19 @@ impl From<&pb::Field> for Field { if !field.extension_name.is_empty() { lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone()); } + let legacy_blob_marker = lance_metadata + .get(LEGACY_STORAGE_CLASS_METADATA_KEY) + .map(|value| value.eq_ignore_ascii_case("blob")) + .unwrap_or(false); + if legacy_blob_marker { + lance_metadata + .entry(lance_arrow::BLOB_META_KEY.to_string()) + .or_insert_with(|| "true".to_string()); + } + // Drop the legacy storage class metadata key after translating it to blob semantics. + if legacy_blob_marker { + lance_metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); + } Self { name: field.name.clone(), id: field.id, @@ -45,7 +60,6 @@ impl From<&pb::Field> for Field { nullable: field.nullable, children: vec![], dictionary: field.dictionary.as_ref().map(Dictionary::from), - storage_class: field.storage_class.parse().unwrap(), unenforced_primary_key: field.unenforced_primary_key, } } @@ -55,9 +69,9 @@ impl From<&Field> for pb::Field { fn from(field: &Field) -> Self { let pb_metadata = field .metadata - .clone() - .into_iter() - .map(|(key, value)| (key, value.into_bytes())) + .iter() + .filter(|(key, _)| key.as_str() != LEGACY_STORAGE_CLASS_METADATA_KEY) + .map(|(key, value)| (key.clone(), value.clone().into_bytes())) .collect(); Self { id: field.id, @@ -79,7 +93,6 @@ impl From<&Field> for pb::Field { .map(|name| name.to_owned()) .unwrap_or_default(), r#type: 0, - storage_class: field.storage_class.to_string(), unenforced_primary_key: field.unenforced_primary_key, } } @@ -269,8 +282,8 @@ mod tests { use arrow_schema::Schema as ArrowSchema; use lance_core::datatypes::Schema; - use crate::datatypes::Fields; - use crate::datatypes::FieldsWithMeta; + use super::{Field, Fields, FieldsWithMeta, LEGACY_STORAGE_CLASS_METADATA_KEY}; + use crate::format::pb; #[test] fn test_schema_set_ids() { @@ -313,4 +326,27 @@ mod tests { let schema = Schema::from(fields_with_meta); assert_eq!(expected_schema, schema); } + + #[test] + fn legacy_proto_storage_class_sets_blob_metadata() { + let proto = pb::Field { + name: "blob".to_string(), + logical_type: "large_binary".to_string(), + metadata: HashMap::from([( + LEGACY_STORAGE_CLASS_METADATA_KEY.to_string(), + b"blob".to_vec(), + )]), + nullable: true, + ..Default::default() + }; + let field = Field::from(&proto); + assert!(field.is_blob()); + assert_eq!( + field.metadata.get(lance_arrow::BLOB_META_KEY), + Some(&"true".to_string()) + ); + assert!(!field + .metadata + .contains_key(LEGACY_STORAGE_CLASS_METADATA_KEY)); + } } diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index f06e50799a2..672d649a668 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -143,7 +143,6 @@ mod tests { schema.clone(), Arc::new(vec![]), DataStorageFormat::default(), - None, HashMap::new(), // Empty base_paths ); apply_feature_flags(&mut normal_manifest, false).unwrap(); @@ -164,7 +163,6 @@ mod tests { schema, Arc::new(vec![]), DataStorageFormat::default(), - None, base_paths, ); apply_feature_flags(&mut multi_base_manifest, false).unwrap(); diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 722a9205a71..20687086471 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -19,7 +19,7 @@ use super::Fragment; use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS}; use crate::format::pb; use lance_core::cache::LanceCache; -use lance_core::datatypes::{Schema, StorageClass}; +use lance_core::datatypes::Schema; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreRegistry}; use lance_io::utils::read_struct; @@ -36,7 +36,7 @@ pub struct Manifest { /// Dataset schema. pub schema: Schema, - /// Local schema, only containing fields with the default storage class (not blobs) + /// Local schema, only containing fields that are not marked as blobs pub local_schema: Schema, /// Dataset version @@ -99,9 +99,6 @@ pub struct Manifest { /// is used to tell libraries how to read, write, or manage the table. pub table_metadata: HashMap, - /// Blob dataset version - pub blob_dataset_version: Option, - /* external base paths */ pub base_paths: HashMap, } @@ -174,11 +171,10 @@ impl Manifest { schema: Schema, fragments: Arc>, data_storage_format: DataStorageFormat, - blob_dataset_version: Option, base_paths: HashMap, ) -> Self { let fragment_offsets = compute_fragment_offsets(&fragments); - let local_schema = schema.retain_storage_class(StorageClass::Default); + let local_schema = schema.retain_non_blob_fields(); Self { schema, @@ -200,7 +196,6 @@ impl Manifest { data_storage_format, config: HashMap::new(), table_metadata: HashMap::new(), - blob_dataset_version, base_paths, } } @@ -209,12 +204,9 @@ impl Manifest { previous: &Self, schema: Schema, fragments: Arc>, - new_blob_version: Option, ) -> Self { let fragment_offsets = compute_fragment_offsets(&fragments); - let local_schema = schema.retain_storage_class(StorageClass::Default); - - let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version); + let local_schema = schema.retain_non_blob_fields(); Self { schema, @@ -236,7 +228,6 @@ impl Manifest { data_storage_format: previous.data_storage_format.clone(), config: previous.config.clone(), table_metadata: previous.table_metadata.clone(), - blob_dataset_version, base_paths: previous.base_paths.clone(), } } @@ -293,7 +284,6 @@ impl Manifest { next_row_id: self.next_row_id, data_storage_format: self.data_storage_format.clone(), config: self.config.clone(), - blob_dataset_version: self.blob_dataset_version, base_paths: { let mut base_paths = self.base_paths.clone(); let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true); @@ -921,7 +911,7 @@ impl TryFrom for Manifest { }; let schema = Schema::from(fields_with_meta); - let local_schema = schema.retain_storage_class(StorageClass::Default); + let local_schema = schema.retain_non_blob_fields(); Ok(Self { schema, @@ -947,11 +937,6 @@ impl TryFrom for Manifest { data_storage_format, config: p.config, table_metadata: p.table_metadata, - blob_dataset_version: if p.blob_dataset_version == 0 { - None - } else { - Some(p.blob_dataset_version) - }, base_paths: p .base_paths .iter() @@ -1009,7 +994,6 @@ impl From<&Manifest> for pb::Manifest { version: m.data_storage_format.version.clone(), }), config: m.config.clone(), - blob_dataset_version: m.blob_dataset_version.unwrap_or_default(), base_paths: m .base_paths .values() @@ -1294,7 +1278,6 @@ mod tests { schema, Arc::new(fragments), DataStorageFormat::default(), - /*blob_dataset_version= */ None, HashMap::new(), ); @@ -1370,7 +1353,6 @@ mod tests { schema, Arc::new(fragments), DataStorageFormat::default(), - /*blob_dataset_version= */ None, HashMap::new(), ); @@ -1394,7 +1376,6 @@ mod tests { schema, Arc::new(fragments), DataStorageFormat::default(), - /*blob_dataset_version= */ None, HashMap::new(), ); @@ -1423,7 +1404,6 @@ mod tests { schema.clone(), Arc::new(vec![]), DataStorageFormat::default(), - None, HashMap::new(), ); @@ -1446,7 +1426,6 @@ mod tests { schema.clone(), Arc::new(empty_fragments), DataStorageFormat::default(), - None, HashMap::new(), ); @@ -1470,7 +1449,6 @@ mod tests { schema.clone(), Arc::new(real_fragments), DataStorageFormat::default(), - None, HashMap::new(), ); @@ -1506,7 +1484,6 @@ mod tests { schema, Arc::new(vec![fragment_with_deletion]), DataStorageFormat::default(), - None, HashMap::new(), ); diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 6ef313a4230..c4ee08e9937 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -226,7 +226,6 @@ impl ManifestProvider for ManifestDescribing { schema.clone(), Arc::new(vec![]), DataStorageFormat::new(LanceFileVersion::Legacy), - /*blob_dataset_version= */ None, HashMap::new(), ); let pos = do_write_manifest(object_writer, &mut manifest, None).await?; @@ -278,7 +277,6 @@ mod test { schema, Arc::new(vec![]), DataStorageFormat::default(), - /*blob_dataset_version= */ None, HashMap::new(), ); let pos = write_manifest(&mut writer, &mut manifest, None) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf890eebebc..37f55d9afa5 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -127,7 +127,6 @@ pub use write::{ const INDICES_DIR: &str = "_indices"; pub const DATA_DIR: &str = "data"; -pub const BLOB_DIR: &str = "_blobs"; // We default to 6GB for the index cache, since indices are often large but // worth caching. pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024; @@ -514,7 +513,7 @@ impl Dataset { ref_path: String::from(self.uri()), branch_name: Some(branch.to_string()), }; - let transaction = Transaction::new(version_number, clone_op, None, None); + let transaction = Transaction::new(version_number, clone_op, None); let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str())) .with_store_params(store_params.unwrap_or_default()) @@ -846,36 +845,6 @@ impl Dataset { } // TODO: Cache this - pub async fn blobs_dataset(&self) -> Result>> { - if let Some(blobs_version) = self.manifest.blob_dataset_version { - let blobs_path = self.base.child(BLOB_DIR); - let blob_manifest_location = self - .commit_handler - .resolve_version_location(&blobs_path, blobs_version, &self.object_store.inner) - .await?; - let manifest = read_manifest( - &self.object_store, - &blob_manifest_location.path, - blob_manifest_location.size, - ) - .await?; - let blobs_dataset = Self::checkout_manifest( - self.object_store.clone(), - blobs_path, - format!("{}/{}", self.uri, BLOB_DIR), - Arc::new(manifest), - blob_manifest_location, - self.session.clone(), - self.commit_handler.clone(), - self.file_reader_options.clone(), - self.store_params.as_deref().cloned(), - )?; - Ok(Some(Arc::new(blobs_dataset))) - } else { - Ok(None) - } - } - pub(crate) fn is_legacy_storage(&self) -> bool { self.manifest .data_storage_format @@ -992,7 +961,6 @@ impl Dataset { Operation::Restore { version: self.manifest.version, }, - /*blobs_op=*/ None, None, ); @@ -1069,7 +1037,6 @@ impl Dataset { async fn do_commit( base_uri: WriteDestination<'_>, operation: Operation, - blobs_op: Option, read_version: Option, store_params: Option, commit_handler: Option>, @@ -1088,7 +1055,7 @@ impl Dataset { Ok, )?; - let transaction = Transaction::new(read_version, operation, blobs_op, None); + let transaction = Transaction::new(read_version, operation, None); let mut builder = CommitBuilder::new(base_uri) .enable_v2_manifest_paths(enable_v2_manifest_paths) @@ -1152,9 +1119,6 @@ impl Dataset { Self::do_commit( dest.into(), operation, - // TODO: Allow blob operations to be specified? (breaking change?) - /*blobs_op=*/ - None, read_version, store_params, commit_handler, @@ -1185,9 +1149,6 @@ impl Dataset { Self::do_commit( dest.into(), operation, - // TODO: Allow blob operations to be specified? (breaking change?) - /*blobs_op=*/ - None, read_version, store_params, commit_handler, @@ -1592,7 +1553,7 @@ impl Dataset { &self.manifest.schema } - /// Similar to [Self::schema], but only returns fields with the default storage class + /// Similar to [Self::schema], but only returns fields that are not marked as blob columns pub fn local_schema(&self) -> &Schema { &self.manifest.local_schema } @@ -1954,7 +1915,7 @@ impl Dataset { ref_path: self.uri.clone(), branch_name: None, }; - let transaction = Transaction::new(version_number, clone_op, None, None); + let transaction = Transaction::new(version_number, clone_op, None); let builder = CommitBuilder::new(WriteDestination::Uri(target_path)) .with_store_params(store_params.unwrap_or_default()) @@ -2273,9 +2234,6 @@ impl Dataset { fragments: updated_fragments, schema: new_schema, }, - // It is not possible to add blob columns using merge - /*blobs_op=*/ - None, None, ); @@ -2625,15 +2583,14 @@ mod tests { }; use arrow_array::{ Array, FixedSizeListArray, GenericStringArray, Int16Array, Int16DictionaryArray, - StructArray, UInt64Array, + LargeBinaryArray, StructArray, UInt64Array, }; use arrow_ord::sort::sort_to_indices; use arrow_schema::{ DataType, Field as ArrowField, Field, Fields as ArrowFields, Schema as ArrowSchema, }; use lance_arrow::bfloat16::{self, BFLOAT16_EXT_NAME}; - use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY}; - use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY; + use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY}; use lance_core::utils::tempfile::{TempDir, TempStdDir, TempStrDir}; use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount}; use lance_file::v2::writer::FileWriter; @@ -4308,7 +4265,7 @@ mod tests { initial_bases: None, }; let test_uri = TempStrDir::default(); - let read_version_0_transaction = Transaction::new(0, operation, None, None); + let read_version_0_transaction = Transaction::new(0, operation, None); let strict_builder = CommitBuilder::new(&test_uri).with_max_retries(0); let unstrict_builder = CommitBuilder::new(&test_uri).with_max_retries(1); strict_builder @@ -6846,6 +6803,7 @@ mod tests { let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); dataset.append(reader, None).await.unwrap(); dataset.validate().await.unwrap(); + assert_eq!(dataset.count_rows(None).await.unwrap(), 1); // Looking at the fragments, there is no data file with the missing field let fragments = dataset.get_fragments(); @@ -6877,6 +6835,7 @@ mod tests { let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()); dataset.append(reader, None).await.unwrap(); dataset.validate().await.unwrap(); + assert_eq!(dataset.count_rows(None).await.unwrap(), 2); // When reading back, only missing data is null, otherwise is filled in let data = dataset.scan().try_into_batch().await.unwrap(); @@ -7054,20 +7013,15 @@ mod tests { #[tokio::test] async fn test_insert_balanced_subschemas() { - // TODO: support this. let test_uri = TempStrDir::default(); let field_a = ArrowField::new("a", DataType::Int32, true); - let field_b = ArrowField::new("b", DataType::Int64, true); + let field_b = ArrowField::new("b", DataType::LargeBinary, true); let schema = Arc::new(ArrowSchema::new(vec![ field_a.clone(), - field_b.clone().with_metadata( - [( - LANCE_STORAGE_CLASS_SCHEMA_META_KEY.to_string(), - "blob".to_string(), - )] - .into(), - ), + field_b + .clone() + .with_metadata([(BLOB_META_KEY.to_string(), "true".to_string())].into()), ])); let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); let options = WriteParams { @@ -7085,18 +7039,52 @@ mod tests { let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); - let result = dataset.append(reader, None).await; - assert!(result.is_err()); - assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); + dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0]); // Insert right side let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()])); - let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int64Array::from(vec![2]))]) - .unwrap(); + let batch = RecordBatch::try_new( + just_b.clone(), + vec![Arc::new(LargeBinaryArray::from_iter(vec![Some(vec![2u8])]))], + ) + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); - let result = dataset.append(reader, None).await; - assert!(result.is_err()); - assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); + dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + assert_eq!(fragments[1].metadata.files.len(), 1); + assert_eq!(&fragments[1].metadata.files[0].fields, &[1]); + + let data = dataset + .take( + &[0, 1], + ProjectionRequest::from_columns(["a"], dataset.schema()), + ) + .await + .unwrap(); + assert_eq!(data.num_rows(), 2); + let a_column = data.column(0).as_primitive::(); + assert_eq!(a_column.value(0), 1); + assert!(a_column.is_null(1)); + + let blob_batch = dataset + .take( + &[0, 1], + ProjectionRequest::from_columns(["b"], dataset.schema()), + ) + .await + .unwrap(); + let blob_descriptions = blob_batch.column(0).as_struct(); + assert!(blob_descriptions.is_null(0)); + assert!(blob_descriptions.is_valid(1)); } #[tokio::test] diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index cdc6ab83d4b..ba19d1a5d89 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -6,23 +6,12 @@ use std::{future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::UInt64Type; use arrow_schema::DataType; -use datafusion::execution::SendableRecordBatchStream; -use futures::StreamExt; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; use super::Dataset; -use crate::io::exec::{ShareableRecordBatchStream, ShareableRecordBatchStreamAdapter}; -use lance_core::{ - datatypes::{Schema, StorageClass}, - error::CloneableResult, - utils::{ - address::RowAddress, - futures::{Capacity, SharedStreamExt}, - }, - Error, Result, -}; +use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; /// Current state of the reader. Held in a mutex for easy sharing @@ -227,62 +216,6 @@ pub(super) async fn take_blobs( .collect()) } -pub trait BlobStreamExt: Sized { - /// Splits a stream into a regular portion (the first stream) - /// and a blob portion (the second stream) - /// - /// The first stream contains all fields with the default storage class and - /// may be identical to self. - /// - /// The second stream may be None (if there are no fields with the blob storage class) - /// or it contains all fields with the blob storage class. - fn extract_blob_stream(self, schema: &Schema) -> (Self, Option); -} - -impl BlobStreamExt for SendableRecordBatchStream { - fn extract_blob_stream(self, schema: &Schema) -> (Self, Option) { - let mut indices_with_blob = Vec::with_capacity(schema.fields.len()); - let mut indices_without_blob = Vec::with_capacity(schema.fields.len()); - for (idx, field) in schema.fields.iter().enumerate() { - if field.storage_class() == StorageClass::Blob { - indices_with_blob.push(idx); - } else { - indices_without_blob.push(idx); - } - } - if indices_with_blob.is_empty() { - (self, None) - } else { - let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); - let right_schema = Arc::new(self.schema().project(&indices_with_blob).unwrap()); - - let (left, right) = ShareableRecordBatchStream(self) - .boxed() - // If we are working with blobs then we are probably working with rather large batches - // We don't want to read too far ahead. - .share(Capacity::Bounded(1)); - - let left = left.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_without_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let right = right.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_with_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let left = ShareableRecordBatchStreamAdapter::new(left_schema, left); - let right = ShareableRecordBatchStreamAdapter::new(right_schema, right); - (Box::pin(left), Some(Box::pin(right))) - } - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1544bc3583a..a0dd79fa6c3 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1476,13 +1476,6 @@ impl FileFragment { schema = schema.project(&projection)?; } - if schema.fields.iter().any(|f| !f.is_default_storage()) { - return Err(Error::NotSupported { - source: "adding columns whose value depends on scanning non-default storage".into(), - location: location!(), - }); - } - // If there is no projection, we at least need to read the row addresses with_row_addr |= !with_row_id && schema.fields.is_empty(); diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index a2896808857..80f1281a297 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -89,7 +89,6 @@ pub async fn cleanup_frag_reuse_index(dataset: &mut Dataset) -> lance_core::Resu removed_indices: vec![frag_reuse_index_meta.clone()], }, None, - None, ); dataset diff --git a/rust/lance/src/dataset/metadata.rs b/rust/lance/src/dataset/metadata.rs index 3bf1605091f..d800ccce61f 100644 --- a/rust/lance/src/dataset/metadata.rs +++ b/rust/lance/src/dataset/metadata.rs @@ -14,12 +14,7 @@ use lance_core::datatypes::Schema; /// Execute a metadata update operation on a dataset. /// This is moved from Dataset::update_op to keep metadata logic in this module. pub async fn execute_metadata_update(dataset: &mut Dataset, operation: Operation) -> Result<()> { - let transaction = Transaction::new( - dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = Transaction::new(dataset.manifest.version, operation, None); dataset .apply_commit(transaction, &Default::default(), &Default::default()) .await?; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 0d64ebf6189..bfa788bb178 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -609,7 +609,6 @@ async fn reserve_fragment_ids( Operation::ReserveFragments { num_fragments: fragments.len() as u32, }, - /*blob_op=*/ None, None, ); @@ -727,7 +726,7 @@ async fn rewrite_files( params.enable_stable_row_ids = true; } - let new_fragments = write_fragments_internal( + let (mut new_fragments, _) = write_fragments_internal( Some(dataset.as_ref()), dataset.object_store.clone(), &dataset.base, @@ -738,10 +737,6 @@ async fn rewrite_files( ) .await?; - // We should not be rewriting any blob data - assert!(new_fragments.blob.is_none()); - let mut new_fragments = new_fragments.default.0; - log::info!("Compaction task {}: file written", task_id); let (row_id_map, changed_row_addrs) = if let Some(row_ids_rx) = row_ids_rx { @@ -1065,8 +1060,6 @@ pub async fn commit_compaction( rewritten_indices, frag_reuse_index, }, - // TODO: Add a blob compaction pass - /*blob_op= */ None, None, ); diff --git a/rust/lance/src/dataset/optimize/remapping.rs b/rust/lance/src/dataset/optimize/remapping.rs index 5eb332380d5..ee3e8301319 100644 --- a/rust/lance/src/dataset/optimize/remapping.rs +++ b/rust/lance/src/dataset/optimize/remapping.rs @@ -307,7 +307,6 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { removed_indices: vec![curr_index_meta.clone()], }, None, - None, ); dataset diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 5b030f63bec..f2a6ec508f9 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1606,12 +1606,6 @@ impl Scanner { .empty_projection() .union_columns(filter_columns, OnMissing::Error)? .into_schema(); - if filter_schema.fields.iter().any(|f| !f.is_default_storage()) { - return Err(Error::NotSupported { - source: "non-default storage columns cannot be used as filters".into(), - location: location!(), - }); - } // Start with the desired fields Ok(desired_projection diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 0ba6056d555..fda48b102a3 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -308,13 +308,7 @@ pub(super) async fn add_columns( .await?; let operation = Operation::Merge { fragments, schema }; - let transaction = Transaction::new( - dataset.manifest.version, - operation, - // TODO: Make it possible to add new blob columns - /*blob_op= */ None, - None, - ); + let transaction = Transaction::new(dataset.manifest.version, operation, None); dataset .apply_commit(transaction, &Default::default(), &Default::default()) .await?; @@ -478,17 +472,6 @@ pub(super) async fn alter_columns( ) })?; - if !field_src.is_default_storage() { - return Err(Error::NotSupported { - source: format!( - "Column \"{}\" is not a default storage column and cannot yet be altered", - alteration.path - ) - .into(), - location: location!(), - }); - } - if let Some(nullable) = alteration.nullable { // TODO: in the future, we could check the values of the column to see if // they are all non-null and thus the column could be made non-nullable. @@ -547,7 +530,6 @@ pub(super) async fn alter_columns( Operation::Project { schema: new_schema }, // TODO: Make it possible to alter blob columns /*blob_op= */ None, - None, ) } else { // Otherwise, we need to re-write the relevant fields. @@ -621,7 +603,6 @@ pub(super) async fn alter_columns( fragments, }, /*blob_op= */ None, - None, ) }; @@ -643,18 +624,7 @@ pub(super) async fn alter_columns( pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Result<()> { // Check if columns are present in the dataset and construct the new schema. for col in columns { - if let Some(field) = dataset.schema().field(col) { - if !field.is_default_storage() { - return Err(Error::NotSupported { - source: format!( - "Column \"{}\" is not a default storage column and cannot yet be dropped", - col - ) - .into(), - location: location!(), - }); - } - } else { + if dataset.schema().field(col).is_none() { return Err(Error::invalid_input( format!("Column {} does not exist in the dataset", col), location!(), @@ -676,7 +646,6 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res dataset.manifest.version, Operation::Project { schema: new_schema }, /*blob_op= */ None, - None, ); dataset diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f22c7426b19..51cd153ffb3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -88,23 +88,10 @@ pub struct Transaction { pub read_version: u64, pub uuid: String, pub operation: Operation, - /// If the transaction modified the blobs dataset, this is the operation - /// to apply to the blobs dataset. - /// - /// If this is `None`, then the blobs dataset was not modified - pub blobs_op: Option, pub tag: Option, pub transaction_properties: Option>>, } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum BlobsOperation { - /// The operation did not modify the blobs dataset - Unchanged, - /// The operation modified the blobs dataset, contains the new version of the blobs dataset - Updated(u64), -} - #[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub struct DataReplacementGroup(pub u64, pub DataFile); @@ -1430,7 +1417,6 @@ pub struct TransactionBuilder { // uuid is optional for builder since it can autogenerate uuid: Option, operation: Operation, - blobs_op: Option, tag: Option, transaction_properties: Option>>, } @@ -1441,7 +1427,6 @@ impl TransactionBuilder { read_version, uuid: None, operation, - blobs_op: None, tag: None, transaction_properties: None, } @@ -1452,11 +1437,6 @@ impl TransactionBuilder { self } - pub fn blobs_op(mut self, blobs_op: Option) -> Self { - self.blobs_op = blobs_op; - self - } - pub fn tag(mut self, tag: Option) -> Self { self.tag = tag; self @@ -1478,7 +1458,6 @@ impl TransactionBuilder { read_version: self.read_version, uuid, operation: self.operation, - blobs_op: self.blobs_op, tag: self.tag, transaction_properties: self.transaction_properties, } @@ -1490,18 +1469,8 @@ impl Transaction { TransactionBuilder::new(read_version, operation).build() } - pub fn with_blobs_op(self, blobs_op: Option) -> Self { - Self { blobs_op, ..self } - } - - pub fn new( - read_version: u64, - operation: Operation, - blobs_op: Option, - tag: Option, - ) -> Self { + pub fn new(read_version: u64, operation: Operation, tag: Option) -> Self { TransactionBuilder::new(read_version, operation) - .blobs_op(blobs_op) .tag(tag) .build() } @@ -1572,7 +1541,6 @@ impl Transaction { current_indices: Vec, transaction_file_path: &str, config: &ManifestWriteConfig, - new_blob_version: Option, ) -> Result<(Manifest, Vec)> { if config.use_stable_row_ids && current_manifest @@ -2191,12 +2159,8 @@ impl Transaction { let mut manifest = if let Some(current_manifest) = current_manifest { // OVERWRITE with initial_bases on existing dataset is not allowed (caught by validation) // So we always use new_from_previous which preserves base_paths - let mut prev_manifest = Manifest::new_from_previous( - current_manifest, - schema, - Arc::new(final_fragments), - new_blob_version, - ); + let mut prev_manifest = + Manifest::new_from_previous(current_manifest, schema, Arc::new(final_fragments)); if let (Some(user_requested_version), Operation::Overwrite { .. }) = (user_requested_version, &self.operation) @@ -2215,7 +2179,6 @@ impl Transaction { schema, Arc::new(final_fragments), data_storage_format, - new_blob_version, reference_paths, ) }; @@ -3046,51 +3009,10 @@ impl TryFrom for Transaction { }); } }; - let blobs_op = message - .blob_operation - .map(|blob_op| match blob_op { - pb::transaction::BlobOperation::BlobAppend(pb::transaction::Append { - fragments, - }) => Result::Ok(Operation::Append { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - }), - pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { - fragments, - schema, - schema_metadata: _schema_metadata, // TODO: handle metadata - config_upsert_values, - initial_bases, - }) => { - let config_upsert_option = if config_upsert_values.is_empty() { - Some(config_upsert_values) - } else { - None - }; - - Ok(Operation::Overwrite { - fragments: fragments - .into_iter() - .map(Fragment::try_from) - .collect::>>()?, - schema: Schema::from(&Fields(schema)), - config_upsert_values: config_upsert_option, - initial_bases: if initial_bases.is_empty() { - None - } else { - Some(initial_bases.into_iter().map(BasePath::from).collect()) - }, - }) - } - }) - .transpose()?; Ok(Self { read_version: message.read_version, uuid: message.uuid.clone(), operation, - blobs_op, tag: if message.tag.is_empty() { None } else { @@ -3359,40 +3281,6 @@ impl From<&Transaction> for pb::Transaction { } }; - let blob_operation = value.blobs_op.as_ref().map(|op| match op { - Operation::Append { fragments } => { - pb::transaction::BlobOperation::BlobAppend(pb::transaction::Append { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - }) - } - Operation::Overwrite { - fragments, - schema, - config_upsert_values, - initial_bases, - } => { - pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { - fragments: fragments.iter().map(pb::DataFragment::from).collect(), - schema: Fields::from(schema).0, - schema_metadata: Default::default(), // TODO: handle metadata - config_upsert_values: config_upsert_values - .clone() - .unwrap_or(Default::default()), - initial_bases: initial_bases - .as_ref() - .map(|paths| { - paths - .iter() - .cloned() - .map(|bp: BasePath| -> pb::BasePath { bp.into() }) - .collect::>() - }) - .unwrap_or_default(), - }) - } - _ => panic!("Invalid blob operation: {:?}", value), - }); - let transaction_properties = value .transaction_properties .as_ref() @@ -3402,7 +3290,6 @@ impl From<&Transaction> for pb::Transaction { read_version: value.read_version, uuid: value.uuid.clone(), operation: Some(operation), - blob_operation, tag: value.tag.clone().unwrap_or("".to_string()), transaction_properties, } @@ -3686,7 +3573,6 @@ mod tests { LanceSchema::try_from(&schema).unwrap(), Arc::new(original_fragments), DataStorageFormat::new(LanceFileVersion::V2_0), - None, HashMap::new(), ); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index defb647e012..4ffe37f39e3 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -6,9 +6,7 @@ use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; -use lance_core::datatypes::{ - NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, StorageClass, -}; +use lance_core::datatypes::{NullabilityComparison, SchemaCompareOptions}; use lance_core::error::LanceOptionExt; use lance_core::utils::tempfile::TempDir; use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT}; @@ -35,7 +33,6 @@ use tracing::{info, instrument}; use crate::session::Session; use crate::Dataset; -use super::blob::BlobStreamExt; use super::fragment::write::generate_random_filename; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; use super::transaction::Transaction; @@ -441,13 +438,6 @@ pub async fn do_write_fragments( Ok(fragments) } -pub struct WrittenFragments { - /// The fragments written to the dataset (and the schema) - pub default: (Vec, Schema), - /// The fragments written to the blob dataset, if any - pub blob: Option<(Vec, Schema)>, -} - pub async fn validate_and_resolve_target_bases( params: &mut WriteParams, existing_base_paths: Option<&HashMap>, @@ -571,7 +561,7 @@ pub async fn write_fragments_internal( data: SendableRecordBatchStream, mut params: WriteParams, target_bases_info: Option>, -) -> Result { +) -> Result<(Vec, Schema)> { let adapter = SchemaAdapter::new(data.schema()); let (data, converted_schema) = if adapter.requires_physical_conversion() { @@ -604,12 +594,26 @@ pub async fn write_fragments_internal( ..Default::default() }, )?; - // Project from the dataset schema, because it has the correct field ids. - let write_schema = dataset.schema().project_by_schema( - &converted_schema, - OnMissing::Error, - OnTypeMismatch::Error, - )?; + let dataset_schema = dataset.schema(); + let mut write_fields = Vec::with_capacity(converted_schema.fields.len()); + for field in converted_schema.fields.iter() { + let dataset_field = + dataset_schema + .field(&field.name) + .ok_or_else(|| Error::SchemaMismatch { + difference: format!( + "Column '{}' not found in target schema", + field.name + ), + location: location!(), + })?; + write_fields.push(dataset_field.clone()); + } + + let write_schema = Schema { + fields: write_fields, + metadata: dataset_schema.metadata.clone(), + }; // Use the storage version from the dataset, ignoring any version from the user. let data_storage_version = dataset .manifest() @@ -636,67 +640,18 @@ pub async fn write_fragments_internal( (converted_schema, params.storage_version_or_default()) }; - let data_schema = schema.project_by_schema( - data.schema().as_ref(), - OnMissing::Error, - OnTypeMismatch::Error, - )?; - - let (data, blob_data) = data.extract_blob_stream(&data_schema); - - // Some params we borrow from the normal write, some we override - let blob_write_params = WriteParams { - store_params: params.store_params.clone(), - commit_handler: params.commit_handler.clone(), - data_storage_version: params.data_storage_version, - enable_stable_row_ids: true, - // This shouldn't really matter since all commits are detached - enable_v2_manifest_paths: true, - max_bytes_per_file: params.max_bytes_per_file, - max_rows_per_file: params.max_rows_per_file, - ..Default::default() - }; - - if blob_data.is_some() && !params.enable_stable_row_ids { - return Err(Error::invalid_input( - "The blob storage class requires stable row ids", - location!(), - )); - } - - let frag_schema = schema.retain_storage_class(StorageClass::Default); - let fragments_fut = do_write_fragments( - object_store.clone(), + let fragments = do_write_fragments( + object_store, base_dir, - &frag_schema, + &schema, data, params, storage_version, target_bases_info, - ); + ) + .await?; - let (default, blob) = if let Some(blob_data) = blob_data { - let blob_schema = schema.retain_storage_class(StorageClass::Blob); - let blobs_path = base_dir.child("_blobs"); - let blob_fut = do_write_fragments( - object_store, - &blobs_path, - &blob_schema, - blob_data, - blob_write_params, - storage_version, - None, // Blobs don't use target_bases - ); - let (fragments_res, blobs_res) = futures::join!(fragments_fut, blob_fut); - let fragments = fragments_res?; - let blobs = blobs_res?; - ((fragments, frag_schema), Some((blobs, blob_schema))) - } else { - let fragments = fragments_fut.await?; - ((fragments, frag_schema), None) - }; - - Ok(WrittenFragments { default, blob }) + Ok((fragments, schema)) } #[async_trait::async_trait] @@ -1210,10 +1165,7 @@ mod tests { .into_reader_rows(RowCount::from(10 * 1024), BatchCount::from(2)), ); - let written = reader_to_frags(data_reader).await.unwrap(); - - assert!(written.blob.is_none()); - let fragments = written.default.0; + let (fragments, _) = reader_to_frags(data_reader).await.unwrap(); assert_eq!(fragments.len(), 2); } @@ -1257,7 +1209,7 @@ mod tests { let schema = Schema::try_from(schema.as_ref()).unwrap(); let object_store = Arc::new(ObjectStore::memory()); - let written = write_fragments_internal( + let (fragments, _) = write_fragments_internal( None, object_store, &Path::from("test"), @@ -1269,9 +1221,6 @@ mod tests { .await .unwrap(); - assert!(written.blob.is_none()); - let fragments = written.default.0; - assert_eq!(fragments.len(), 1); let fragment = &fragments[0]; assert_eq!(fragment.files.len(), 1); @@ -1335,7 +1284,7 @@ mod tests { let object_store = Arc::new(ObjectStore::memory()); let base_path = Path::from("test"); - let written = write_fragments_internal( + let (fragments, _) = write_fragments_internal( None, object_store.clone(), &base_path, @@ -1347,9 +1296,6 @@ mod tests { .await .unwrap(); - assert!(written.blob.is_none()); - let fragments = written.default.0; - assert_eq!(fragments.len(), 1); let fragment = &fragments[0]; assert_eq!(fragment.files.len(), 1); diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 15c1e0e8e31..7e49cf5ee98 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -436,21 +436,6 @@ impl<'a> CommitBuilder<'a> { } let read_version = transactions.iter().map(|t| t.read_version).min().unwrap(); - let blob_new_frags = transactions - .iter() - .flat_map(|t| &t.blobs_op) - .flat_map(|b| match b { - Operation::Append { fragments } => fragments.clone(), - _ => unreachable!(), - }) - .collect::>(); - let blobs_op = if blob_new_frags.is_empty() { - None - } else { - Some(Operation::Append { - fragments: blob_new_frags, - }) - }; let merged = Transaction { uuid: uuid::Uuid::new_v4().hyphenated().to_string(), @@ -464,7 +449,6 @@ impl<'a> CommitBuilder<'a> { .collect(), }, read_version, - blobs_op, tag: None, //TODO: handle batch transaction merges in the future transaction_properties: None, @@ -529,7 +513,6 @@ mod tests { fragments: vec![sample_fragment()], }, read_version, - blobs_op: None, tag: None, transaction_properties: None, } @@ -794,7 +777,6 @@ mod tests { update_mode: None, }, read_version: 1, - blobs_op: None, tag: None, transaction_properties: None, }; @@ -822,6 +804,5 @@ mod tests { matches!(transaction.operation, Operation::Append { fragments } if fragments == expected_fragments) ); assert_eq!(transaction.read_version, 1); - assert!(transaction.blobs_op.is_none()); } } diff --git a/rust/lance/src/dataset/write/delete.rs b/rust/lance/src/dataset/write/delete.rs index 9636011f734..588f5248b72 100644 --- a/rust/lance/src/dataset/write/delete.rs +++ b/rust/lance/src/dataset/write/delete.rs @@ -234,12 +234,7 @@ impl RetryExecutor for DeleteJob { deleted_fragment_ids: data.deleted_fragment_ids, predicate: self.predicate.clone(), }; - let transaction = Transaction::new( - dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = Transaction::new(dataset.manifest.version, operation, None); let mut builder = CommitBuilder::new(dataset); diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index b8c9c225724..b9c4ffc7334 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -4,21 +4,17 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow_array::RecordBatch; -use arrow_array::RecordBatchIterator; +use arrow_array::{RecordBatch, RecordBatchIterator}; use datafusion::execution::SendableRecordBatchStream; use humantime::format_duration; -use lance_core::datatypes::NullabilityComparison; -use lance_core::datatypes::Schema; -use lance_core::datatypes::SchemaCompareOptions; +use lance_core::datatypes::{NullabilityComparison, Schema, SchemaCompareOptions}; use lance_core::utils::tracing::{DATASET_WRITING_EVENT, TRACE_DATASET_EVENTS}; -use lance_core::ROW_ADDR; -use lance_core::ROW_ID; -use lance_core::ROW_OFFSET; +use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; use lance_datafusion::utils::StreamingWriteSource; use lance_file::version::LanceFileVersion; use lance_io::object_store::ObjectStore; use lance_table::feature_flags::can_write_dataset; +use lance_table::format::Fragment; use lance_table::io::commit::CommitHandler; use object_store::path::Path; use snafu::location; @@ -36,8 +32,6 @@ use super::resolve_commit_handler; use super::WriteDestination; use super::WriteMode; use super::WriteParams; -use super::WrittenFragments; - /// Insert or create a new dataset. /// /// There are different variants of `execute()` methods. Those with the `_stream` @@ -199,7 +193,7 @@ impl<'a> InsertBuilder<'a> { let target_base_info = validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; - let written_frags = write_fragments_internal( + let (written_fragments, _) = write_fragments_internal( context.dest.dataset(), context.object_store.clone(), &context.base_path, @@ -210,14 +204,14 @@ impl<'a> InsertBuilder<'a> { ) .await?; - let transaction = Self::build_transaction(schema, written_frags, &context)?; + let transaction = Self::build_transaction(schema, written_fragments, &context)?; Ok((transaction, context)) } fn build_transaction( schema: Schema, - written_frags: WrittenFragments, + fragments: Vec, context: &WriteContext<'_>, ) -> Result { let operation = match context.params.mode { @@ -255,7 +249,7 @@ impl<'a> InsertBuilder<'a> { Operation::Overwrite { // Use the full schema, not the written schema schema, - fragments: written_frags.default.0, + fragments, config_upsert_values, initial_bases: context.params.initial_bases.clone(), } @@ -264,26 +258,14 @@ impl<'a> InsertBuilder<'a> { Operation::Overwrite { // Use the full schema, not the written schema schema, - fragments: written_frags.default.0, + fragments, config_upsert_values: None, initial_bases: context.params.initial_bases.clone(), } } - WriteMode::Append => Operation::Append { - fragments: written_frags.default.0, - }, + WriteMode::Append => Operation::Append { fragments }, }; - let blobs_op = written_frags.blob.map(|blob| match context.params.mode { - WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { - schema: blob.1, - fragments: blob.0, - config_upsert_values: None, - initial_bases: context.params.initial_bases.clone(), - }, - WriteMode::Append => Operation::Append { fragments: blob.0 }, - }); - let transaction = TransactionBuilder::new( context .dest @@ -292,7 +274,6 @@ impl<'a> InsertBuilder<'a> { .unwrap_or(0), operation, ) - .blobs_op(blobs_op) .transaction_properties(context.params.transaction_properties.clone()) .build(); @@ -328,26 +309,16 @@ impl<'a> InsertBuilder<'a> { ); context.params.enable_stable_row_ids = dataset.manifest.uses_stable_row_ids(); } - let m = dataset.manifest.as_ref(); - let mut schema_cmp_opts = SchemaCompareOptions { - // In the legacy format we stored the dictionary in the manifest and - // all files must have identical dictionaries. - // - // In 2.0+ the dictionary is stored in the files and dictionaries may - // fluctuate between files. - compare_dictionary: m.should_use_legacy_format(), - // array nullability is checked later, using actual data instead - // of the schema + + let schema_cmp_opts = SchemaCompareOptions { + compare_dictionary: dataset.manifest.should_use_legacy_format(), compare_nullability: NullabilityComparison::Ignore, + allow_missing_if_nullable: true, + ignore_field_order: true, ..Default::default() }; - if m.blob_dataset_version.is_none() { - // Balanced datasets don't yet support schema evolution - schema_cmp_opts.ignore_field_order = true; - schema_cmp_opts.allow_missing_if_nullable = true; - } - data_schema.check_compatible(&m.schema, &schema_cmp_opts)?; + data_schema.check_compatible(dataset.schema(), &schema_cmp_opts)?; } } @@ -365,15 +336,6 @@ impl<'a> InsertBuilder<'a> { } } - // If we are writing a dataset with non-default storage, we need to enable stable row ids - if context.dest.dataset().is_none() - && !context.params.enable_stable_row_ids - && data_schema.fields.iter().any(|f| !f.is_default_storage()) - { - log::info!("Enabling stable row ids because non-default storage is used"); - context.params.enable_stable_row_ids = true; - } - // Feature flags if let WriteDestination::Dataset(dataset) = &context.dest { if !can_write_dataset(dataset.manifest.writer_feature_flags) { diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 340e60cdff9..4015acc5649 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1097,7 +1097,7 @@ impl MergeInsertJob { OnTypeMismatch::Error, )?; - let fragments = write_fragments_internal( + let (fragments, _) = write_fragments_internal( Some(dataset.as_ref()), dataset.object_store.clone(), &dataset.base, @@ -1108,7 +1108,7 @@ impl MergeInsertJob { ) .await?; - new_fragments.lock().unwrap().extend(fragments.default.0); + new_fragments.lock().unwrap().extend(fragments); Ok(reservation_size) } // We shouldn't need much more memory beyond what is already in the batches. @@ -1487,7 +1487,7 @@ impl MergeInsertJob { // we can't use affected rows here. (operation, None) } else { - let written = write_fragments_internal( + let (mut new_fragments, _) = write_fragments_internal( Some(&self.dataset), self.dataset.object_store.clone(), &self.dataset.base, @@ -1498,9 +1498,6 @@ impl MergeInsertJob { ) .await?; - assert!(written.blob.is_none()); - let mut new_fragments = written.default.0; - if let Some(row_id_sequence) = updating_row_ids.lock().unwrap().row_id_sequence() { let fragment_sizes = new_fragments .iter() @@ -1570,12 +1567,7 @@ impl MergeInsertJob { .into_inner() .unwrap(); - let transaction = Transaction::new( - self.dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = Transaction::new(self.dataset.manifest.version, operation, None); Ok(UncommittedMergeInsert { transaction, diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 0df589ce71a..6644fefaf68 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -826,7 +826,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let result_stream = stream::once(async move { // Step 2: Write new fragments using the filtered data (inserts + updates) - let write_result = write_fragments_internal( + let (mut new_fragments, _) = write_fragments_internal( Some(&dataset), dataset.object_store.clone(), &dataset.base, @@ -837,8 +837,6 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { ) .await?; - let mut new_fragments = write_result.default.0; - if let Some(row_id_sequence) = updating_row_ids.lock().unwrap().row_id_sequence() { let fragment_sizes = new_fragments .iter() @@ -894,12 +892,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { }; // Step 5: Create and store the transaction - let transaction = Transaction::new( - dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = Transaction::new(dataset.manifest.version, operation, None); // Step 6: Store transaction, merge stats, and affected rows for later retrieval { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 5d40570ac91..ea27b2d7cc1 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -203,19 +203,6 @@ impl UpdateBuilder { // pub fn with_write_params(mut self, params: WriteParams) -> Self { ... } pub fn build(self) -> Result { - if self - .dataset - .schema() - .fields - .iter() - .any(|f| !f.is_default_storage()) - { - return Err(Error::NotSupported { - source: "Updating datasets containing non-default storage columns".into(), - location: location!(), - }); - } - let mut updates = HashMap::new(); let planner = Planner::new(Arc::new(self.dataset.schema().into())); @@ -322,7 +309,7 @@ impl UpdateJob { .manifest() .data_storage_format .lance_file_version()?; - let written = write_fragments_internal( + let (mut new_fragments, _) = write_fragments_internal( Some(&self.dataset), self.dataset.object_store.clone(), &self.dataset.base, @@ -333,14 +320,6 @@ impl UpdateJob { ) .await?; - if written.blob.is_some() { - return Err(Error::NotSupported { - source: "Updating blob columns".into(), - location: location!(), - }); - } - let mut new_fragments = written.default.0; - let removed_row_ids = row_id_rx.try_recv().map_err(|err| Error::Internal { message: format!("Failed to receive row ids: {}", err), location: location!(), @@ -414,12 +393,7 @@ impl UpdateJob { update_mode: Some(RewriteRows), }; - let transaction = Transaction::new( - dataset.manifest.version, - operation, - /*blobs_op=*/ None, - None, - ); + let transaction = Transaction::new(dataset.manifest.version, operation, None); let new_dataset = CommitBuilder::new(dataset) .with_affected_rows(update_data.affected_rows) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7c124a0f4cc..9b16b4cb91d 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -450,7 +450,6 @@ impl DatasetIndexExt for Dataset { new_indices: vec![], removed_indices: indices.clone(), }, - /*blobs_op= */ None, None, ); @@ -558,7 +557,6 @@ impl DatasetIndexExt for Dataset { new_indices: vec![new_idx], removed_indices: vec![], }, - /*blobs_op= */ None, None, ); @@ -686,7 +684,6 @@ impl DatasetIndexExt for Dataset { new_indices, removed_indices, }, - /*blobs_op= */ None, None, ); diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 76f1fba3d34..af9ab02ad05 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -355,7 +355,6 @@ impl<'a> CreateIndexBuilder<'a> { new_indices: vec![new_idx], removed_indices: vec![], }, - /*blobs_op= */ None, None, ); diff --git a/rust/lance/src/index/mem_wal.rs b/rust/lance/src/index/mem_wal.rs index c310d2aa0dc..bb1d93b3834 100644 --- a/rust/lance/src/index/mem_wal.rs +++ b/rust/lance/src/index/mem_wal.rs @@ -90,7 +90,6 @@ pub async fn create_mem_wal_generation( removed: vec![], }, None, - None, ); dataset @@ -206,7 +205,6 @@ pub async fn advance_mem_wal_generation( removed: removed_mem_wal.into_iter().collect(), }, None, - None, ) } else { // this is the first time the MemWAL index is created @@ -232,7 +230,6 @@ pub async fn advance_mem_wal_generation( removed: vec![], }, None, - None, ) }; @@ -478,7 +475,6 @@ pub async fn trim_mem_wal_index(dataset: &mut Dataset) -> Result<()> { removed, }, None, - None, ); dataset @@ -514,7 +510,6 @@ where removed: vec![mem_wal.clone()], }, None, - None, ); dataset diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index bde9d1e7d25..8692b7c4484 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -1190,7 +1190,6 @@ pub async fn initialize_vector_index( removed_indices: vec![], }, None, - None, ); target_dataset diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 1e954b51c44..f9652eb5048 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2377,7 +2377,6 @@ mod tests { removed_indices: vec![], }, None, - None, ); // Apply the transaction to register the index @@ -2480,7 +2479,6 @@ mod tests { removed_indices: vec![], }, None, - None, ); // Apply the transaction to register the new index diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 37a0fead258..444f9386c9f 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -45,7 +45,7 @@ use crate::dataset::cleanup::auto_cleanup_hook; use crate::dataset::fragment::FileFragment; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::{ - load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, BLOB_DIR, + load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, }; use crate::index::DatasetIndexInternalExt; use crate::io::deletion::read_dataset_deletion_file; @@ -107,7 +107,6 @@ async fn do_commit_new_dataset( transaction: &Transaction, write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, - blob_version: Option, metadata_cache: &DSMetadataCache, store_registry: Arc, ) -> Result<(Manifest, ManifestLocation)> { @@ -166,18 +165,11 @@ async fn do_commit_new_dataset( }; (new_manifest, updated_indices) } else { - let (manifest, indices) = transaction.build_manifest( - None, - vec![], - &transaction_file, - write_config, - blob_version, - )?; + let (manifest, indices) = + transaction.build_manifest(None, vec![], &transaction_file, write_config)?; (manifest, indices) }; - manifest.blob_dataset_version = blob_version; - let result = write_manifest_file( object_store, commit_handler, @@ -232,26 +224,6 @@ pub(crate) async fn commit_new_dataset( metadata_cache: &crate::session::caches::DSMetadataCache, store_registry: Arc, ) -> Result<(Manifest, ManifestLocation)> { - let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { - let blob_path = base_path.child(BLOB_DIR); - let blob_tx = Transaction::new(0, blob_op.clone(), None, None); - let (blob_manifest, _) = do_commit_new_dataset( - object_store, - commit_handler, - &blob_path, - &blob_tx, - write_config, - manifest_naming_scheme, - None, - metadata_cache, - store_registry.clone(), - ) - .await?; - Some(blob_manifest.version) - } else { - None - }; - do_commit_new_dataset( object_store, commit_handler, @@ -259,7 +231,6 @@ pub(crate) async fn commit_new_dataset( transaction, write_config, manifest_naming_scheme, - blob_version, metadata_cache, store_registry, ) @@ -637,7 +608,6 @@ pub(crate) async fn do_commit_detached_transaction( transaction: &Transaction, write_config: &ManifestWriteConfig, commit_config: &CommitConfig, - new_blob_version: Option, ) -> Result<(Manifest, ManifestLocation)> { // We don't strictly need a transaction file but we go ahead and create one for // record-keeping if nothing else. @@ -666,7 +636,6 @@ pub(crate) async fn do_commit_detached_transaction( dataset.load_indices().await?.as_ref().clone(), &transaction_file, write_config, - new_blob_version, )?, }; @@ -733,25 +702,6 @@ pub(crate) async fn commit_detached_transaction( write_config: &ManifestWriteConfig, commit_config: &CommitConfig, ) -> Result<(Manifest, ManifestLocation)> { - let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { - let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); - let blobs_tx = - Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let (blobs_manifest, _) = do_commit_detached_transaction( - blobs_dataset.as_ref(), - object_store, - commit_handler, - &blobs_tx, - write_config, - commit_config, - None, - ) - .await?; - Some(blobs_manifest.version) - } else { - None - }; - do_commit_detached_transaction( dataset, object_store, @@ -759,7 +709,6 @@ pub(crate) async fn commit_detached_transaction( transaction, write_config, commit_config, - new_blob_version, ) .await } @@ -790,25 +739,6 @@ pub(crate) async fn commit_transaction( manifest_naming_scheme: ManifestNamingScheme, affected_rows: Option<&RowIdTreeMap>, ) -> Result<(Manifest, ManifestLocation)> { - let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { - let blobs_dataset = dataset.blobs_dataset().await?.unwrap(); - let blobs_tx = - Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None); - let (blobs_manifest, _) = do_commit_detached_transaction( - blobs_dataset.as_ref(), - object_store, - commit_handler, - &blobs_tx, - write_config, - commit_config, - None, - ) - .await?; - Some(blobs_manifest.version) - } else { - None - }; - // Note: object_store has been configured with WriteParams, but dataset.object_store() // has not necessarily. So for anything involving writing, use `object_store`. let read_version = transaction.read_version; @@ -891,7 +821,6 @@ pub(crate) async fn commit_transaction( dataset.load_indices().await?.as_ref().clone(), &transaction_file, write_config, - new_blob_version, )?, }; @@ -1165,7 +1094,6 @@ mod tests { let transaction = Transaction::new( 42, Operation::Append { fragments: vec![] }, - /*blobs_op= */ None, Some("hello world".to_string()), ); @@ -1567,7 +1495,6 @@ mod tests { schema, Arc::new(fragments), DataStorageFormat::default(), - /*blob_dataset_version=*/ None, HashMap::new(), ); diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 84110c69249..5e98b634f4e 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -2211,7 +2211,7 @@ mod tests { ]; let other_transactions = other_operations .iter() - .map(|op| Transaction::new(0, op.clone(), None, None)) + .map(|op| Transaction::new(0, op.clone(), None)) .collect::>(); // Transactions and whether they are expected to conflict with each @@ -2569,7 +2569,7 @@ mod tests { ]; for (operation, expected_conflicts) in &cases { - let transaction = Transaction::new(0, operation.clone(), None, None); + let transaction = Transaction::new(0, operation.clone(), None); let mut rebase = TransactionRebase { transaction, initial_fragments: HashMap::new(), diff --git a/rust/lance/src/io/exec.rs b/rust/lance/src/io/exec.rs index 08cf3b8edad..ca6af3f5e9f 100644 --- a/rust/lance/src/io/exec.rs +++ b/rust/lance/src/io/exec.rs @@ -31,4 +31,3 @@ pub use rowids::{AddRowAddrExec, AddRowOffsetExec}; pub use scan::{LanceScanConfig, LanceScanExec}; pub use take::TakeExec; pub use utils::PreFilterSource; -pub(crate) use utils::{ShareableRecordBatchStream, ShareableRecordBatchStreamAdapter}; From ec77a09663cf30851182c0fd56aeb286937df11c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 5 Nov 2025 00:48:01 +0800 Subject: [PATCH 2/5] Remove more logic Signed-off-by: Xuanwo --- .../java/com/lancedb/lance/TestUtils.java | 7 ++-- rust/lance-core/src/datatypes/field.rs | 33 +----------------- rust/lance-file/src/datatypes.rs | 34 +++---------------- 3 files changed, 7 insertions(+), 67 deletions(-) diff --git a/java/src/test/java/com/lancedb/lance/TestUtils.java b/java/src/test/java/com/lancedb/lance/TestUtils.java index e474572689b..540cef24b01 100644 --- a/java/src/test/java/com/lancedb/lance/TestUtils.java +++ b/java/src/test/java/com/lancedb/lance/TestUtils.java @@ -628,11 +628,8 @@ public static final class BlobTestDataset { /** * Build the Arrow schema with a filter column and a blob column marked as blob storage. * - *

Columns: - filterer: Int64 (not nullable) - blobs: Binary (nullable) with metadata - * {"lance-schema:storage-class":"blob"} - * - *

Note: ArrowType.LargeBinary may not be available in our Arrow Java version; Binary is - * sufficient for tests and aligns with Lance blob storage when annotated via metadata. + *

Columns: - filterer: Int64 (not nullable) - blobs: LargeBinary (nullable) annotated with + * metadata {"lance-encoding:blob":"true"} */ public Schema getSchema() { Map blobMeta = Maps.newHashMap(); diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index e996326f096..717a7e497c5 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -31,8 +31,6 @@ use super::{ }; use crate::{datatypes::BLOB_DESC_LANCE_FIELD, Error, Result}; -const LEGACY_STORAGE_CLASS_METADATA_KEY: &str = "lance-schema:storage-class"; - /// Use this config key in Arrow field metadata to indicate a column is a part of the primary key. /// The value can be any true values like `true`, `1`, `yes` (case-insensitive). /// A primary key column must satisfy: @@ -931,24 +929,12 @@ impl TryFrom<&ArrowField> for Field { DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?], _ => vec![], }; - let mut metadata = field.metadata().clone(); + let metadata = field.metadata().clone(); let unenforced_primary_key = metadata .get(LANCE_UNENFORCED_PRIMARY_KEY) .map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes")) .unwrap_or(false); - if let Some(value) = metadata - .get(LEGACY_STORAGE_CLASS_METADATA_KEY) - .map(|v| v.to_ascii_lowercase()) - { - if value == "blob" { - metadata - .entry(lance_arrow::BLOB_META_KEY.to_string()) - .or_insert_with(|| "true".to_string()); - } - metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); - } - // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") @@ -990,7 +976,6 @@ impl From<&Field> for ArrowField { fn from(field: &Field) -> Self { let out = Self::new(&field.name, field.data_type(), field.nullable); let mut metadata = field.metadata.clone(); - metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); // Add JSON extension metadata if this is a JSON field if field.logical_type.0 == "json" { @@ -1122,22 +1107,6 @@ mod tests { assert_eq!(ArrowField::from(&field), arrow_field); } - #[test] - fn legacy_storage_class_metadata_sets_blob() { - let metadata = HashMap::from([( - super::LEGACY_STORAGE_CLASS_METADATA_KEY.to_string(), - "blob".to_string(), - )]); - let arrow_field = - ArrowField::new("blob", DataType::LargeBinary, false).with_metadata(metadata); - let field = Field::try_from(&arrow_field).unwrap(); - assert!(field.is_blob()); - assert_eq!( - field.metadata.get(lance_arrow::BLOB_META_KEY), - Some(&"true".to_string()) - ); - } - #[test] fn test_project_by_field_null_type() { let f1: Field = ArrowField::new("a", DataType::Null, true) diff --git a/rust/lance-file/src/datatypes.rs b/rust/lance-file/src/datatypes.rs index a047c2e6df8..be8fe557ff1 100644 --- a/rust/lance-file/src/datatypes.rs +++ b/rust/lance-file/src/datatypes.rs @@ -15,12 +15,10 @@ use snafu::location; use crate::format::pb; -const LEGACY_STORAGE_CLASS_METADATA_KEY: &str = "lance-schema:storage-class"; - #[allow(clippy::fallible_impl_from)] impl From<&pb::Field> for Field { fn from(field: &pb::Field) -> Self { - let mut lance_metadata: HashMap = field + let lance_metadata: HashMap = field .metadata .iter() .map(|(key, value)| { @@ -28,22 +26,10 @@ impl From<&pb::Field> for Field { (key.clone(), string_value) }) .collect(); + let mut lance_metadata = lance_metadata; if !field.extension_name.is_empty() { lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone()); } - let legacy_blob_marker = lance_metadata - .get(LEGACY_STORAGE_CLASS_METADATA_KEY) - .map(|value| value.eq_ignore_ascii_case("blob")) - .unwrap_or(false); - if legacy_blob_marker { - lance_metadata - .entry(lance_arrow::BLOB_META_KEY.to_string()) - .or_insert_with(|| "true".to_string()); - } - // Drop the legacy storage class metadata key after translating it to blob semantics. - if legacy_blob_marker { - lance_metadata.remove(LEGACY_STORAGE_CLASS_METADATA_KEY); - } Self { name: field.name.clone(), id: field.id, @@ -70,7 +56,6 @@ impl From<&Field> for pb::Field { let pb_metadata = field .metadata .iter() - .filter(|(key, _)| key.as_str() != LEGACY_STORAGE_CLASS_METADATA_KEY) .map(|(key, value)| (key.clone(), value.clone().into_bytes())) .collect(); Self { @@ -282,7 +267,7 @@ mod tests { use arrow_schema::Schema as ArrowSchema; use lance_core::datatypes::Schema; - use super::{Field, Fields, FieldsWithMeta, LEGACY_STORAGE_CLASS_METADATA_KEY}; + use super::{Field, Fields, FieldsWithMeta}; use crate::format::pb; #[test] @@ -328,25 +313,14 @@ mod tests { } #[test] - fn legacy_proto_storage_class_sets_blob_metadata() { + fn large_binary_fields_marked_blob() { let proto = pb::Field { name: "blob".to_string(), logical_type: "large_binary".to_string(), - metadata: HashMap::from([( - LEGACY_STORAGE_CLASS_METADATA_KEY.to_string(), - b"blob".to_vec(), - )]), nullable: true, ..Default::default() }; let field = Field::from(&proto); assert!(field.is_blob()); - assert_eq!( - field.metadata.get(lance_arrow::BLOB_META_KEY), - Some(&"true".to_string()) - ); - assert!(!field - .metadata - .contains_key(LEGACY_STORAGE_CLASS_METADATA_KEY)); } } From 0ef0590fdb43c0f784a775d6b7f752ec40046ae6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 5 Nov 2025 00:58:21 +0800 Subject: [PATCH 3/5] Refactor a fnction Signed-off-by: Xuanwo --- rust/lance/src/dataset/write.rs | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 4ffe37f39e3..74d028a93dc 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -6,7 +6,9 @@ use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; -use lance_core::datatypes::{NullabilityComparison, SchemaCompareOptions}; +use lance_core::datatypes::{ + NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, +}; use lance_core::error::LanceOptionExt; use lance_core::utils::tempfile::TempDir; use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT}; @@ -594,26 +596,11 @@ pub async fn write_fragments_internal( ..Default::default() }, )?; - let dataset_schema = dataset.schema(); - let mut write_fields = Vec::with_capacity(converted_schema.fields.len()); - for field in converted_schema.fields.iter() { - let dataset_field = - dataset_schema - .field(&field.name) - .ok_or_else(|| Error::SchemaMismatch { - difference: format!( - "Column '{}' not found in target schema", - field.name - ), - location: location!(), - })?; - write_fields.push(dataset_field.clone()); - } - - let write_schema = Schema { - fields: write_fields, - metadata: dataset_schema.metadata.clone(), - }; + let write_schema = dataset.schema().project_by_schema( + &converted_schema, + OnMissing::Error, + OnTypeMismatch::Error, + )?; // Use the storage version from the dataset, ignoring any version from the user. let data_storage_version = dataset .manifest() From 349f34a4162579ea8aebb68f074e1c2f0c7c7836 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 5 Nov 2025 01:05:19 +0800 Subject: [PATCH 4/5] remove wrong tests Signed-off-by: Xuanwo --- rust/lance-file/src/datatypes.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/rust/lance-file/src/datatypes.rs b/rust/lance-file/src/datatypes.rs index be8fe557ff1..3cb37a5e659 100644 --- a/rust/lance-file/src/datatypes.rs +++ b/rust/lance-file/src/datatypes.rs @@ -259,8 +259,6 @@ pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader #[cfg(test)] mod tests { - use std::collections::HashMap; - use arrow_schema::DataType; use arrow_schema::Field as ArrowField; use arrow_schema::Fields as ArrowFields; @@ -311,16 +309,4 @@ mod tests { let schema = Schema::from(fields_with_meta); assert_eq!(expected_schema, schema); } - - #[test] - fn large_binary_fields_marked_blob() { - let proto = pb::Field { - name: "blob".to_string(), - logical_type: "large_binary".to_string(), - nullable: true, - ..Default::default() - }; - let field = Field::from(&proto); - assert!(field.is_blob()); - } } From 9817357839a50169a4e35c1a9b20a8cf331bd6bd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 5 Nov 2025 01:21:00 +0800 Subject: [PATCH 5/5] Code cleanup Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 2 - rust/lance-core/src/datatypes/schema.rs | 50 -------------------- rust/lance-file/src/datatypes.rs | 7 ++- rust/lance-table/src/format/manifest.rs | 10 ---- rust/lance/src/dataset.rs | 4 -- rust/lance/src/dataset/write/merge_insert.rs | 6 +-- rust/lance/src/io/commit.rs | 4 -- 7 files changed, 6 insertions(+), 77 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 717a7e497c5..3817011d1cd 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -995,8 +995,6 @@ mod tests { use arrow_array::{DictionaryArray, StringArray, UInt32Array}; use arrow_schema::{Fields, TimeUnit}; - use std::collections::HashMap; - #[test] fn arrow_field_to_field() { for (name, data_type) in [ diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 2eacce0deba..d3bd7f4b158 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -146,56 +146,6 @@ impl Schema { } } - pub fn retain_blob_fields(&self) -> Self { - self.retain_by(|f| f.is_blob()) - } - - pub fn retain_non_blob_fields(&self) -> Self { - self.retain_by(|f| !f.is_blob()) - } - - fn retain_by bool>(&self, predicate: F) -> Self { - let fields = self - .fields - .iter() - .filter(|f| predicate(f)) - .cloned() - .collect(); - Self { - fields, - metadata: self.metadata.clone(), - } - } - - /// Splits the schema into two schemas, one with non-blob fields and the other with blob fields. - /// If there are no blob fields, the second schema will be `None`. - /// The order of fields is preserved. - pub fn partition_by_blob_columns(&self) -> (Self, Option) { - let mut non_blob_fields = Vec::with_capacity(self.fields.len()); - let mut blob_fields = Vec::with_capacity(self.fields.len()); - for field in self.fields.iter() { - if field.is_blob() { - blob_fields.push(field.clone()); - } else { - non_blob_fields.push(field.clone()); - } - } - ( - Self { - fields: non_blob_fields, - metadata: self.metadata.clone(), - }, - if blob_fields.is_empty() { - None - } else { - Some(Self { - fields: blob_fields, - metadata: self.metadata.clone(), - }) - }, - ) - } - pub fn has_dictionary_types(&self) -> bool { self.fields.iter().any(|f| f.has_dictionary_types()) } diff --git a/rust/lance-file/src/datatypes.rs b/rust/lance-file/src/datatypes.rs index 3cb37a5e659..09c5076f86d 100644 --- a/rust/lance-file/src/datatypes.rs +++ b/rust/lance-file/src/datatypes.rs @@ -1,8 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::collections::HashMap; - use arrow_schema::DataType; use async_recursion::async_recursion; use lance_arrow::DataTypeExt; @@ -12,6 +10,7 @@ use lance_core::{Error, Result}; use lance_io::traits::Reader; use lance_io::utils::{read_binary_array, read_fixed_stride_array}; use snafu::location; +use std::collections::HashMap; use crate::format::pb; @@ -264,9 +263,9 @@ mod tests { use arrow_schema::Fields as ArrowFields; use arrow_schema::Schema as ArrowSchema; use lance_core::datatypes::Schema; + use std::collections::HashMap; - use super::{Field, Fields, FieldsWithMeta}; - use crate::format::pb; + use super::{Fields, FieldsWithMeta}; #[test] fn test_schema_set_ids() { diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 20687086471..b00d475c6c3 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -36,9 +36,6 @@ pub struct Manifest { /// Dataset schema. pub schema: Schema, - /// Local schema, only containing fields that are not marked as blobs - pub local_schema: Schema, - /// Dataset version pub version: u64, @@ -174,11 +171,9 @@ impl Manifest { base_paths: HashMap, ) -> Self { let fragment_offsets = compute_fragment_offsets(&fragments); - let local_schema = schema.retain_non_blob_fields(); Self { schema, - local_schema, version: 1, branch: None, writer_version: Some(WriterVersion::default()), @@ -206,11 +201,9 @@ impl Manifest { fragments: Arc>, ) -> Self { let fragment_offsets = compute_fragment_offsets(&fragments); - let local_schema = schema.retain_non_blob_fields(); Self { schema, - local_schema, version: previous.version + 1, branch: previous.branch.clone(), writer_version: Some(WriterVersion::default()), @@ -267,7 +260,6 @@ impl Manifest { Self { schema: self.schema.clone(), - local_schema: self.local_schema.clone(), version: self.version, branch: branch_name, writer_version: self.writer_version.clone(), @@ -911,11 +903,9 @@ impl TryFrom for Manifest { }; let schema = Schema::from(fields_with_meta); - let local_schema = schema.retain_non_blob_fields(); Ok(Self { schema, - local_schema, version: p.version, branch: p.branch, writer_version, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 37f55d9afa5..2662672968e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1554,10 +1554,6 @@ impl Dataset { } /// Similar to [Self::schema], but only returns fields that are not marked as blob columns - pub fn local_schema(&self) -> &Schema { - &self.manifest.local_schema - } - /// Creates a new empty projection into the dataset schema pub fn empty_projection(self: &Arc) -> Projection { Projection::empty(self.clone()) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 4015acc5649..041a115edf2 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -877,7 +877,7 @@ impl MergeInsertJob { .as_ref() .without_column(ROW_ADDR) .without_column(ROW_ID); - let write_schema = dataset.local_schema().project_by_schema( + let write_schema = dataset.schema().project_by_schema( &write_schema, OnMissing::Error, OnTypeMismatch::Error, @@ -1392,7 +1392,7 @@ impl MergeInsertJob { async fn can_use_create_plan(&self, source_schema: &Schema) -> Result { // Convert to lance schema for comparison let lance_schema = lance_core::datatypes::Schema::try_from(source_schema)?; - let full_schema = self.dataset.local_schema(); + let full_schema = self.dataset.schema(); let is_full_schema = full_schema.compare_with_options( &lance_schema, &SchemaCompareOptions { @@ -1432,7 +1432,7 @@ impl MergeInsertJob { let source_schema = source.schema(); let lance_schema = lance_core::datatypes::Schema::try_from(source_schema.as_ref())?; - let full_schema = self.dataset.local_schema(); + let full_schema = self.dataset.schema(); let is_full_schema = full_schema.compare_with_options( &lance_schema, &SchemaCompareOptions { diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 444f9386c9f..1b11eb8dbe5 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -386,10 +386,6 @@ fn fix_schema(manifest: &mut Manifest) -> Result<()> { for (old_field_id, new_field_id) in &old_field_id_mapping { let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap(); field.id = *new_field_id; - - if let Some(local_field) = manifest.local_schema.mut_field_by_id(*old_field_id) { - local_field.id = *new_field_id; - } } // Drop data files that are no longer in use.