diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 176f7c82e79..19a03aa5043 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -1489,10 +1489,23 @@ pub fn escape_field_path_for_project(name: &str) -> String { #[cfg(test)] mod tests { use arrow_schema::{DataType as ArrowDataType, Fields as ArrowFields}; - use std::sync::Arc; + use std::{collections::HashMap, sync::Arc}; use super::*; + #[test] + fn projection_from_schema_defaults_to_v1() { + let field = Field::try_from(&ArrowField::new("a", ArrowDataType::Int32, true)).unwrap(); + let schema = Schema { + fields: vec![field], + metadata: HashMap::new(), + }; + + let projection = Projection::empty(Arc::new(schema)); + + assert_eq!(projection.blob_version, BlobVersion::V1); + } + #[test] fn test_resolve_with_quoted_fields() { // Create a schema with fields containing dots diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index c7aa82daeff..86ca0b0707e 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -14,7 +14,7 @@ use std::{ }; use lance_core::{ - datatypes::{OnMissing, Projectable, Projection, Schema}, + datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema}, Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET, WILDCARD, }; @@ -37,11 +37,16 @@ struct ProjectionBuilder { needs_row_created_at: bool, must_add_row_offset: bool, has_wildcard: bool, + blob_version: BlobVersion, } impl ProjectionBuilder { - fn new(base: Arc) -> Self { - let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema()); + fn new(base: Arc, blob_version: BlobVersion) -> Self { + let full_schema = Arc::new( + Projection::full(base.clone()) + .with_blob_version(blob_version) + .to_arrow_schema(), + ); let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema)); let planner = Planner::new(full_schema); @@ -58,6 +63,7 @@ impl ProjectionBuilder { needs_row_last_updated_at: false, must_add_row_offset: false, has_wildcard: false, + blob_version, } } @@ -146,6 +152,8 @@ impl ProjectionBuilder { .union_columns(&self.physical_cols, OnMissing::Ignore)? }; + physical_projection = physical_projection.with_blob_version(self.blob_version); + physical_projection.with_row_id = self.needs_row_id; physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset; physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at; @@ -202,8 +210,9 @@ impl ProjectionPlan { pub fn from_expressions( base: Arc, columns: &[(impl AsRef, impl AsRef)], + blob_version: BlobVersion, ) -> Result { - let mut builder = ProjectionBuilder::new(base); + let mut builder = ProjectionBuilder::new(base, blob_version); builder.add_columns(columns)?; builder.build() } @@ -242,7 +251,11 @@ impl ProjectionPlan { /// ``` /// /// This is something that cannot be done easily using expressions. - pub fn from_schema(base: Arc, projection: &Schema) -> Result { + pub fn from_schema( + base: Arc, + projection: &Schema, + blob_version: BlobVersion, + ) -> Result { // Separate data columns from system columns // System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection, // not as fields in the Schema @@ -282,7 +295,9 @@ impl ProjectionPlan { }; // Calculate the physical projection from data columns only - let mut physical_projection = Projection::empty(base).union_schema(&data_schema); + let mut physical_projection = Projection::empty(base) + .union_schema(&data_schema) + .with_blob_version(blob_version); physical_projection.with_row_id = with_row_id; physical_projection.with_row_addr = with_row_addr; @@ -303,7 +318,7 @@ impl ProjectionPlan { }) } - pub fn full(base: Arc) -> Result { + pub fn full(base: Arc, blob_version: BlobVersion) -> Result { let physical_cols: Vec<&str> = base .schema() .fields @@ -311,8 +326,9 @@ impl ProjectionPlan { .map(|f| f.name.as_ref()) .collect::>(); - let physical_projection = - Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?; + let physical_projection = Projection::empty(base.clone()) + .union_columns(&physical_cols, OnMissing::Ignore)? + .with_blob_version(blob_version); let requested_output_expr = physical_cols .into_iter() diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ebeeba4ad0e..525961d0696 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -395,6 +395,7 @@ impl ProjectionRequest { } pub fn into_projection_plan(self, dataset: Arc) -> Result { + let blob_version = dataset.blob_version(); match self { Self::Schema(schema) => { // The schema might contain system columns (_rowid, _rowaddr) which are not @@ -407,7 +408,7 @@ impl ProjectionRequest { if system_columns_present { // If system columns are present, we can't use project_by_schema directly // Just pass the schema to ProjectionPlan::from_schema which handles it - ProjectionPlan::from_schema(dataset, schema.as_ref()) + ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version) } else { // No system columns, use normal path with validation let projection = dataset.schema().project_by_schema( @@ -415,10 +416,10 @@ impl ProjectionRequest { OnMissing::Error, OnTypeMismatch::Error, )?; - ProjectionPlan::from_schema(dataset, &projection) + ProjectionPlan::from_schema(dataset, &projection, blob_version) } } - Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns), + Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version), } } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index ca88c7c1f05..beb25ac9300 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -599,7 +599,8 @@ impl TakeOperation { impl Scanner { pub fn new(dataset: Arc) -> Self { - let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap(); + let projection_plan = + ProjectionPlan::full(dataset.clone(), dataset.blob_version()).unwrap(); let file_reader_options = dataset.file_reader_options.clone(); let mut scanner = Self { dataset, @@ -723,7 +724,11 @@ impl Scanner { columns: &[(impl AsRef, impl AsRef)], ) -> Result<&mut Self> { self.explicit_projection = true; - self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?; + self.projection_plan = ProjectionPlan::from_expressions( + self.dataset.clone(), + columns, + self.dataset.blob_version(), + )?; if self.legacy_with_row_id { self.projection_plan.include_row_id(); }