From 284ea320be8a669a78e4474ac5bf997b98b403d4 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 15 Jan 2026 09:42:57 -0600 Subject: [PATCH 01/15] adding project_preserve_system_columns function to schema Signed-off-by: Daniel Rammer --- python/src/dataset.rs | 2 +- rust/lance-core/src/datatypes/schema.rs | 48 +++++++++++++++++++++++++ rust/lance/src/dataset.rs | 47 +++--------------------- 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 5420251dfe4..fd1986fbbd3 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1271,7 +1271,7 @@ impl Dataset { Arc::new( self.ds .schema() - .project(&columns) + .project_preserve_system_columns(&columns) .map_err(|err| PyValueError::new_err(err.to_string()))?, ) } else { diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 3e8340bd19b..ccedeaf6394 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -263,6 +263,54 @@ impl Schema { self.do_project(columns, false) } + /// TODO @hamersaw - docs / tests + pub fn project_preserve_system_columns>(&self, columns: &[T]) -> Result { + // Separate data columns from system columns + // System columns need to be added to the schema manually since Schema::project + // doesn't include them (they're virtual columns) + let mut data_columns = Vec::new(); + let mut system_fields = Vec::new(); + + for col in columns { + let col_str = col.as_ref(); + if crate::is_system_column(col_str) { + // For now we only support _rowid and _rowaddr in projections + if col_str == ROW_ID { + system_fields.push(Field::try_from(ROW_ID_FIELD.clone()).unwrap()); + } else if col_str == ROW_ADDR { + system_fields.push(Field::try_from(ROW_ADDR_FIELD.clone()).unwrap()); + } + // Note: Other system columns like _rowoffset are handled differently + } else { + data_columns.push(col_str); + } + } + + // Project only the data columns + let mut schema = self.do_project(&data_columns, true)?; + + // Add system fields in the order they appeared in the original columns list + // We need to reconstruct the proper order + let mut final_fields = Vec::new(); + for col in columns { + let col_str = col.as_ref(); + if crate::is_system_column(col_str) { + // Find and add the system field + if let Some(field) = system_fields.iter().find(|f| &f.name == col_str) { + final_fields.push(field.clone()); + } + } else { + // Find and add the data field + if let Some(field) = schema.fields.iter().find(|f| &f.name == col_str) { + final_fields.push(field.clone()); + } + } + } + + schema.fields = final_fields; + Ok(schema) + } + /// Check that the top level fields don't contain `.` in their names /// to distinguish from nested fields. // TODO: pub(crate) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c6c9533ea44..4689a953ab4 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -334,47 +334,10 @@ impl ProjectionRequest { .map(|s| s.as_ref().to_string()) .collect::>(); - // Separate data columns from system columns - // System columns need to be added to the schema manually since Schema::project - // doesn't include them (they're virtual columns) - let mut data_columns = Vec::new(); - let mut system_fields = Vec::new(); - - for col in &columns { - if lance_core::is_system_column(col) { - // For now we only support _rowid and _rowaddr in projections - if col == ROW_ID { - system_fields.push(Field::try_from(ROW_ID_FIELD.clone()).unwrap()); - } else if col == ROW_ADDR { - system_fields.push(Field::try_from(ROW_ADDR_FIELD.clone()).unwrap()); - } - // Note: Other system columns like _rowoffset are handled differently - } else { - data_columns.push(col.as_str()); - } - } - - // Project only the data columns - let mut schema = dataset_schema.project(&data_columns).unwrap(); - - // Add system fields in the order they appeared in the original columns list - // We need to reconstruct the proper order - let mut final_fields = Vec::new(); - for col in &columns { - if lance_core::is_system_column(col) { - // Find and add the system field - if let Some(field) = system_fields.iter().find(|f| &f.name == col) { - final_fields.push(field.clone()); - } - } else { - // Find and add the data field - if let Some(field) = schema.fields.iter().find(|f| &f.name == col) { - final_fields.push(field.clone()); - } - } - } - - schema.fields = final_fields; + // TODO @hamersaw - capture error and return Result + let schema = dataset_schema + .project_preserve_system_columns(&columns) + .unwrap(); Self::Schema(Arc::new(schema)) } @@ -1446,7 +1409,7 @@ impl Dataset { row_indices: &[u64], projection: impl Into, ) -> Result { - take::take(self, row_indices, projection.into()).await + take::take(self, row_indices, projection.info()).await } /// Take Rows by the internal ROW ids. From 6ecc5b43896f29b428387d94b6d79df436537549 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 15 Jan 2026 10:13:37 -0600 Subject: [PATCH 02/15] adding flag to existing do_project to furthur simplify Signed-off-by: Daniel Rammer --- rust/lance-core/src/datatypes/schema.rs | 105 +++++++++++++----------- rust/lance/src/dataset.rs | 1 - 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index ccedeaf6394..50631864939 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -221,7 +221,12 @@ impl Schema { } } - fn do_project>(&self, columns: &[T], err_on_missing: bool) -> Result { + fn do_project>( + &self, + columns: &[T], + err_on_missing: bool, + preserve_system_columns: bool, + ) -> Result { let mut candidates: Vec = vec![]; for col in columns { let split = parse_field_path(col.as_ref())?; @@ -234,7 +239,17 @@ impl Schema { } else { candidates.push(projected_field) } - } else if err_on_missing && first != ROW_ID && first != ROW_ADDR { + } else if first == ROW_ID || first == ROW_ADDR { + // Note: Other system columns like _rowoffset are handled differently + if preserve_system_columns { + // For now we only support _rowid and _rowaddr in projections + if first == ROW_ID { + candidates.push(Field::try_from(ROW_ID_FIELD.clone())?); + } else if first == ROW_ADDR { + candidates.push(Field::try_from(ROW_ADDR_FIELD.clone())?); + } + } + } else if err_on_missing { return Err(Error::Schema { message: format!("Column {} does not exist", col.as_ref()), location: location!(), @@ -255,60 +270,17 @@ impl Schema { /// let projected = schema.project(&["col1", "col2.sub_col3.field4"])?; /// ``` pub fn project>(&self, columns: &[T]) -> Result { - self.do_project(columns, true) + self.do_project(columns, true, false) } /// Project the columns over the schema, dropping unrecognized columns pub fn project_or_drop>(&self, columns: &[T]) -> Result { - self.do_project(columns, false) + self.do_project(columns, false, false) } - /// TODO @hamersaw - docs / tests + /// Project the columns over the schema, preserving system columns. pub fn project_preserve_system_columns>(&self, columns: &[T]) -> Result { - // Separate data columns from system columns - // System columns need to be added to the schema manually since Schema::project - // doesn't include them (they're virtual columns) - let mut data_columns = Vec::new(); - let mut system_fields = Vec::new(); - - for col in columns { - let col_str = col.as_ref(); - if crate::is_system_column(col_str) { - // For now we only support _rowid and _rowaddr in projections - if col_str == ROW_ID { - system_fields.push(Field::try_from(ROW_ID_FIELD.clone()).unwrap()); - } else if col_str == ROW_ADDR { - system_fields.push(Field::try_from(ROW_ADDR_FIELD.clone()).unwrap()); - } - // Note: Other system columns like _rowoffset are handled differently - } else { - data_columns.push(col_str); - } - } - - // Project only the data columns - let mut schema = self.do_project(&data_columns, true)?; - - // Add system fields in the order they appeared in the original columns list - // We need to reconstruct the proper order - let mut final_fields = Vec::new(); - for col in columns { - let col_str = col.as_ref(); - if crate::is_system_column(col_str) { - // Find and add the system field - if let Some(field) = system_fields.iter().find(|f| &f.name == col_str) { - final_fields.push(field.clone()); - } - } else { - // Find and add the data field - if let Some(field) = schema.fields.iter().find(|f| &f.name == col_str) { - final_fields.push(field.clone()); - } - } - } - - schema.fields = final_fields; - Ok(schema) + self.do_project(columns, true, true) } /// Check that the top level fields don't contain `.` in their names @@ -1880,6 +1852,41 @@ mod tests { assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); } + #[test] + fn test_schema_projection_preserving_system_columns() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ArrowField::new("c", DataType::Float64, false), + ]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let projected = schema + .project_preserve_system_columns(&["b.f1", "b.f3", "_rowid", "c"]) + .unwrap(); + + let expected_arrow_schema = ArrowSchema::new(vec![ + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ArrowField::new("_rowid", DataType::UInt64, true), + ArrowField::new("c", DataType::Float64, false), + ]); + assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); + } + #[test] fn test_schema_project_by_ids() { let arrow_schema = ArrowSchema::new(vec![ diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 4689a953ab4..cde7bc0bdfb 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -334,7 +334,6 @@ impl ProjectionRequest { .map(|s| s.as_ref().to_string()) .collect::>(); - // TODO @hamersaw - capture error and return Result let schema = dataset_schema .project_preserve_system_columns(&columns) .unwrap(); From ae4b25ba49addd675fa2398a2c850e5a3f4a071d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 15 Jan 2026 10:23:27 -0600 Subject: [PATCH 03/15] typo Signed-off-by: Daniel Rammer --- rust/lance/src/dataset.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index cde7bc0bdfb..bca1edc8979 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -19,16 +19,14 @@ use crate::dataset::transaction::translate_schema_metadata_updates; use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey}; use crate::session::index_caches::DSIndexCache; use itertools::Itertools; -use lance_core::datatypes::{ - BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection, -}; +use lance_core::datatypes::{BlobVersion, OnMissing, OnTypeMismatch, Projectable, Projection}; use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT, TRACE_DATASET_EVENTS, }; -use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID_FIELD}; +use lance_core::ROW_ADDR; use lance_datafusion::projection::ProjectionPlan; use lance_file::datatypes::populate_schema_dictionary; use lance_file::reader::FileReaderOptions; @@ -1408,7 +1406,7 @@ impl Dataset { row_indices: &[u64], projection: impl Into, ) -> Result { - take::take(self, row_indices, projection.info()).await + take::take(self, row_indices, projection.into()).await } /// Take Rows by the internal ROW ids. From 84a995bf8680c7ca4bd73c334134fbffacd7c000 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 19 Jan 2026 09:33:36 -0600 Subject: [PATCH 04/15] correct projection for all take_* functions on all system columns Signed-off-by: Daniel Rammer --- rust/lance-core/src/datatypes/schema.rs | 25 +++++++++++-- rust/lance-datafusion/src/projection.rs | 49 +++++++++++++++++++++++-- rust/lance/src/dataset/fragment.rs | 9 ++++- rust/lance/src/dataset/take.rs | 18 ++++++++- 4 files changed, 90 insertions(+), 11 deletions(-) diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 50631864939..caa31fdf9da 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -16,7 +16,11 @@ use lance_arrow::*; use snafu::location; use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions}; -use crate::{Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, WILDCARD}; +use crate::{ + Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION, ROW_CREATED_AT_VERSION_FIELD, + ROW_ID, ROW_ID_FIELD, ROW_LAST_UPDATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION_FIELD, + ROW_OFFSET, ROW_OFFSET_FIELD, WILDCARD, +}; /// Lance Schema. #[derive(Default, Debug, Clone, DeepSizeOf)] @@ -239,14 +243,27 @@ impl Schema { } else { candidates.push(projected_field) } - } else if first == ROW_ID || first == ROW_ADDR { - // Note: Other system columns like _rowoffset are handled differently + } else if crate::is_system_column(first) { if preserve_system_columns { - // For now we only support _rowid and _rowaddr in projections if first == ROW_ID { candidates.push(Field::try_from(ROW_ID_FIELD.clone())?); } else if first == ROW_ADDR { candidates.push(Field::try_from(ROW_ADDR_FIELD.clone())?); + } else if first == ROW_OFFSET { + candidates.push(Field::try_from(ROW_OFFSET_FIELD.clone())?); + } else if first == ROW_CREATED_AT_VERSION { + candidates.push(Field::try_from(ROW_CREATED_AT_VERSION_FIELD.clone())?); + } else if first == ROW_LAST_UPDATED_AT_VERSION { + candidates + .push(Field::try_from(ROW_LAST_UPDATED_AT_VERSION_FIELD.clone())?); + } else { + return Err(Error::Schema { + message: format!( + "System column {} is currently not supported in projection", + first + ), + location: location!(), + }); } } } else if err_on_missing { diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index f586ac4bb20..fd23e180795 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -264,6 +264,8 @@ impl ProjectionPlan { let mut with_row_id = false; let mut with_row_addr = false; let mut must_add_row_offset = false; + let mut with_row_last_updated_at_version = false; + let mut with_row_created_at_version = false; for field in projection.fields.iter() { if lance_core::is_system_column(&field.name) { @@ -273,10 +275,14 @@ impl ProjectionPlan { must_add_row_offset = true; } else if field.name == ROW_ADDR { with_row_addr = true; + } else if field.name == ROW_OFFSET { + with_row_addr = true; must_add_row_offset = true; + } else if field.name == ROW_LAST_UPDATED_AT_VERSION { + with_row_last_updated_at_version = true; + } else if field.name == ROW_CREATED_AT_VERSION { + with_row_created_at_version = true; } - // Note: Other system columns like _rowoffset are computed differently - // and shouldn't appear in the schema at this point } else { // Regular data column - validate it exists in base schema if base.schema().field(&field.name).is_none() { @@ -301,6 +307,8 @@ impl ProjectionPlan { .with_blob_version(blob_version); physical_projection.with_row_id = with_row_id; physical_projection.with_row_addr = with_row_addr; + physical_projection.with_row_last_updated_at_version = with_row_last_updated_at_version; + physical_projection.with_row_created_at_version = with_row_created_at_version; // Build output expressions preserving the original order (including system columns) let exprs = projection @@ -431,8 +439,43 @@ impl ProjectionPlan { #[instrument(skip_all, level = "debug")] pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); - let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; + + // TODO @hamersaw - need to add "extra columns" here to get filterable schema + // rust/lance/src/dataset/scanner.rs 1526-1553 + let mut extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)]; + /*if self.nearest.as_ref().is_some() { + extra_columns.push(ArrowField::new(DIST_COL, DataType::Float32, true)); + }; + + if self.full_text_query.is_some() { + extra_columns.push(ArrowField::new(SCORE_COL, DataType::Float32, true)); + }*/ + + let mut filterable_schema = self.physical_projection.to_schema(); + filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?; // TODO @hamersaw - result? + + let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?; + + //let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); + + /* + * do we need to inject the RowOffset field after we have the batch? + * then we update the projection for the "current_schema" to have `_rowoffset` + * from rust/lance/src/dataset/scanner.rs:2114-2117: + * + * or do we manually compute and inject row_offset column in batch like in + * rust/lance/src/io/exec/rowids.rs:526-553 + * + // Stage 6: Add system columns, if requested + if self.projection_plan.must_add_row_offset { + plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?); + } + */ + /*if self.must_add_row_offset { + projection = Arc::new(AddRowOffsetExec::try_new(projection, self.dataset.clone()).await?); + }*/ + // Run dummy plan to execute projection, do not log the plan run let stream = execute_plan( projection, diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 8428cf619b4..483c5dd10be 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1376,7 +1376,8 @@ impl FileFragment { }; // Then call take rows - self.take_rows(&row_ids, projection, false, false).await + self.take_rows(&row_ids, projection, false, false, false, false) + .await } /// Get the deletion vector for this fragment, using the cache if available. @@ -1424,13 +1425,17 @@ impl FileFragment { projection: &Schema, with_row_id: bool, with_row_address: bool, + with_row_created_at_version: bool, + with_row_last_updated_at_version: bool, ) -> Result { let reader = self .open( projection, FragReadConfig::default() .with_row_id(with_row_id) - .with_row_address(with_row_address), + .with_row_address(with_row_address) + .with_row_created_at_version(with_row_created_at_version) + .with_row_last_updated_at_version(with_row_last_updated_at_version), ) .await?; diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index f39531f9853..d7ada41287b 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -131,6 +131,11 @@ async fn do_take_rows( ) -> Result { let with_row_id_in_projection = projection.physical_projection.with_row_id; let with_row_addr_in_projection = projection.physical_projection.with_row_addr; + let with_row_created_at_version_in_projection = + projection.physical_projection.with_row_created_at_version; + let with_row_last_updated_at_version_in_projection = projection + .physical_projection + .with_row_last_updated_at_version; let row_addrs = builder.get_row_addrs().await?.clone(); @@ -160,6 +165,8 @@ async fn do_take_rows( projection: Arc, with_row_id: bool, with_row_addresses: bool, + with_row_created_at_version: bool, + with_row_last_updated_at_version: bool, ) -> impl Future> + Send { async move { fragment @@ -168,13 +175,14 @@ async fn do_take_rows( projection.as_ref(), with_row_id, with_row_addresses, + with_row_created_at_version, + with_row_last_updated_at_version, ) .await } } let physical_schema = Arc::new(projection.physical_projection.to_bare_schema()); - let batch = if row_addr_stats.contiguous { // Fastest path: Can use `read_range` directly let start = row_addrs.first().expect("empty range passed to take_rows"); @@ -195,7 +203,9 @@ async fn do_take_rows( let read_config = FragReadConfig::default() .with_row_id(with_row_id_in_projection) - .with_row_address(with_row_addr_in_projection); + .with_row_address(with_row_addr_in_projection) + .with_row_created_at_version(with_row_created_at_version_in_projection) + .with_row_last_updated_at_version(with_row_last_updated_at_version_in_projection); let reader = fragment.open(&physical_schema, read_config).await?; reader.legacy_read_range_as_batch(range).await } else if row_addr_stats.sorted { @@ -243,6 +253,8 @@ async fn do_take_rows( physical_schema.clone(), with_row_id_in_projection, with_row_addr_in_projection, + with_row_created_at_version_in_projection, + with_row_last_updated_at_version_in_projection, ); batches.push(batch_fut); } @@ -283,6 +295,8 @@ async fn do_take_rows( physical_schema.clone(), with_row_id_in_projection, true, + with_row_created_at_version_in_projection, + with_row_last_updated_at_version_in_projection, ) }) .buffered(builder.dataset.object_store.io_parallelism()) From 950659d907652286f11fec61c344c40a94b2457b Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 19 Jan 2026 19:18:08 -0600 Subject: [PATCH 05/15] added AddRow to execution plan - still not working Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/exec.rs | 215 +++++++++++++++++++++++- rust/lance-datafusion/src/projection.rs | 23 ++- rust/lance/src/dataset/take.rs | 24 ++- 3 files changed, 245 insertions(+), 17 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index c61ff26419a..e7de770a4c9 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -12,8 +12,8 @@ use std::{ use chrono::{DateTime, Utc}; -use arrow_array::RecordBatch; -use arrow_schema::Schema as ArrowSchema; +use arrow_array::{cast::AsArray, types::UInt64Type, ArrayRef, RecordBatch, UInt64Array}; +use arrow_schema::{Schema, Schema as ArrowSchema}; use datafusion::physical_plan::metrics::MetricType; use datafusion::{ catalog::streaming::StreamingTable, @@ -35,17 +35,19 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, }, }; -use datafusion_common::{DataFusionError, Statistics}; +use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::{stream, StreamExt}; use lance_arrow::SchemaExt; use lance_core::{ utils::{ + address::RowAddress, + deletion::DeletionVector, futures::FinallyStreamExt, tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION}, }, - Error, Result, + Error as LanceError, Result as LanceResult, ROW_ADDR, ROW_OFFSET, ROW_OFFSET_FIELD, }; use log::{debug, info, warn}; use snafu::location; @@ -537,7 +539,7 @@ fn display_plan_one_liner_impl(plan: &dyn ExecutionPlan, output: &mut String) { pub fn execute_plan( plan: Arc, options: LanceExecutionOptions, -) -> Result { +) -> LanceResult { if !options.skip_logging { debug!( "Executing plan:\n{}", @@ -564,7 +566,7 @@ pub fn execute_plan( pub async fn analyze_plan( plan: Arc, options: LanceExecutionOptions, -) -> Result { +) -> LanceResult { // This is needed as AnalyzeExec launches a thread task per // partition, and we want these to be connected to the parent span let plan = Arc::new(TracedExec::new(plan, Span::current())); @@ -584,7 +586,7 @@ pub async fn analyze_plan( let mut stream = analyze .execute(0, get_task_context(&session_ctx, &options)) .map_err(|err| { - Error::io( + LanceError::io( format!("Failed to execute analyze plan: {}", err), location!(), ) @@ -889,3 +891,202 @@ impl ExecutionPlan for StrictBatchSizeExec { true } } + +#[derive(Debug)] +pub struct FragInfo { + /// The dataset offset of the first row in the fragment + pub row_offset: u64, + /// The deletion vector for this fragment, if any + pub deletion_vector: Option>, +} + +/// Add a `_rowoffset` column to a stream of record batches that have a `_rowaddr` column. +/// +/// The row offset is the number of rows between the current row and the first row in the dataset. +#[derive(Clone, Debug)] +pub struct AddRowOffsetExec { + input: Arc, + row_addr_pos: usize, + frag_id_to_offset: Arc>, + properties: PlanProperties, +} + +impl AddRowOffsetExec { + pub fn try_new( + input: Arc, + frag_id_to_offset: Arc>, + ) -> LanceResult { + let input_schema = input.schema(); + let row_addr_pos = input_schema + .index_of(ROW_ADDR) + .map_err(|_| LanceError::Internal { + message: format!("Input plan does not have a {} column", ROW_ADDR), + location: location!(), + })?; + + if input_schema.field_with_name(ROW_OFFSET).is_ok() { + return Err(LanceError::Internal { + message: format!("Input plan already has a {} column", ROW_OFFSET), + location: location!(), + }); + } + + let mut fields = input.schema().fields().iter().cloned().collect::>(); + fields.push(Arc::new(ROW_OFFSET_FIELD.clone())); + let schema = Arc::new(Schema::new_with_metadata( + fields, + input.schema().metadata().clone(), + )); + + let new_eq_props = + EquivalenceProperties::new(schema).extend(input.properties().eq_properties.clone())?; + let properties = input.properties().clone().with_eq_properties(new_eq_props); + + Ok(Self { + input, + row_addr_pos, + frag_id_to_offset, + properties, + }) + } + + fn compute_row_offsets( + row_addr: &ArrayRef, + frag_id_to_offset: &HashMap, + ) -> Result { + let row_addr_values = row_addr.as_primitive::().values(); + let mut row_offsets = Vec::with_capacity(row_addr_values.len()); + + let mut last_frag_id = u32::MAX; + let mut last_frag_offset = 0; + let mut last_frag_delete_count = 0; + let mut dv_iter = None; + + for addr in row_addr_values { + let addr = RowAddress::new_from_u64(*addr); + let frag_id = addr.fragment_id(); + if frag_id != last_frag_id { + last_frag_id = frag_id; + let Some(frag_info) = frag_id_to_offset.get(&frag_id) else { + return Err(DataFusionError::External(Box::new(LanceError::Internal { message: format!("A row address referred to a fragment {} that wasn't in the frag_id_to_offset map", frag_id), location: location!() }))); + }; + last_frag_offset = frag_info.row_offset; + last_frag_delete_count = 0; + dv_iter = frag_info + .deletion_vector + .as_ref() + .map(|dv| dv.to_sorted_iter().peekable()); + } + + let row_offset = addr.row_offset(); + if let Some(dv_iter) = &mut dv_iter { + while dv_iter.peek().is_some() { + if *dv_iter.peek().unwrap() < row_offset { + dv_iter.next(); + last_frag_delete_count += 1; + } else { + break; + } + } + row_offsets + .push(last_frag_offset + row_offset as u64 - last_frag_delete_count as u64); + } else { + row_offsets.push(last_frag_offset + row_offset as u64); + } + } + + Ok(Arc::new(UInt64Array::from(row_offsets))) + } +} + +impl DisplayAs for AddRowOffsetExec { + fn fmt_as( + &self, + _format_type: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "AddRowOffsetExec") + } +} + +impl ExecutionPlan for AddRowOffsetExec { + fn name(&self) -> &str { + "AddRowOffsetExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) + } + + fn supports_limit_pushdown(&self) -> bool { + true + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::Equal + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 1 { + Err(DataFusionError::Internal( + "AddRowOffsetExec: invalid number of children".into(), + )) + } else { + Ok(Arc::new(Self::try_new( + children.into_iter().next().unwrap(), + self.frag_id_to_offset.clone(), + )?)) + } + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input_stream = self.input.execute(partition, context)?; + let schema = self.schema(); + let row_addr_pos = self.row_addr_pos; + let frag_id_to_offset = self.frag_id_to_offset.clone(); + let stream = input_stream.then(move |batch| { + let schema = schema.clone(); + let row_addr_pos = row_addr_pos; + let frag_id_to_offset = frag_id_to_offset.clone(); + async move { + let batch = batch?; + let row_addr = batch.column(row_addr_pos); + let row_offsets = Self::compute_row_offsets(row_addr, frag_id_to_offset.as_ref())?; + let mut columns = batch.columns().to_vec(); + columns.push(Arc::new(row_offsets)); + Ok(RecordBatch::try_new(schema.clone(), columns)?) + } + }); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + stream, + ))) + } +} diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index fd23e180795..a90c1a802d8 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -3,7 +3,10 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; -use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec}; +use datafusion::{ + logical_expr::Expr, + physical_plan::{projection::ProjectionExec, ExecutionPlan}, +}; use datafusion_common::{Column, DFSchema}; use datafusion_physical_expr::PhysicalExpr; use futures::TryStreamExt; @@ -21,7 +24,7 @@ use lance_core::{ }; use crate::{ - exec::{execute_plan, LanceExecutionOptions, OneShotExec}, + exec::{execute_plan, AddRowOffsetExec, FragInfo, LanceExecutionOptions, OneShotExec}, planner::Planner, }; @@ -437,7 +440,11 @@ impl ProjectionPlan { } #[instrument(skip_all, level = "debug")] - pub async fn project_batch(&self, batch: RecordBatch) -> Result { + pub async fn project_batch( + &self, + batch: RecordBatch, + frag_id_to_offset: Arc>, + ) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); // TODO @hamersaw - need to add "extra columns" here to get filterable schema @@ -457,7 +464,8 @@ impl ProjectionPlan { let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?; //let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; - let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); + let mut projection: Arc = + Arc::new(ProjectionExec::try_new(physical_exprs, src)?); /* * do we need to inject the RowOffset field after we have the batch? @@ -472,9 +480,10 @@ impl ProjectionPlan { plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?); } */ - /*if self.must_add_row_offset { - projection = Arc::new(AddRowOffsetExec::try_new(projection, self.dataset.clone()).await?); - }*/ + if self.must_add_row_offset { + println!("Adding AddRowOffsetExec to projection"); + projection = Arc::new(AddRowOffsetExec::try_new(projection, frag_id_to_offset)?); + } // Run dummy plan to execute projection, do not log the plan run let stream = execute_plan( diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index d7ada41287b..95b458de7d3 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::BTreeMap, ops::Range, pin::Pin, sync::Arc}; +use std::{collections::BTreeMap, collections::HashMap, ops::Range, pin::Pin, sync::Arc}; use crate::dataset::fragment::FragReadConfig; use crate::dataset::rowids::get_row_id_index; @@ -19,7 +19,7 @@ use lance_core::datatypes::Schema; use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::OffsetMapper; use lance_core::ROW_ADDR; -use lance_datafusion::projection::ProjectionPlan; +use lance_datafusion::{exec::FragInfo, projection::ProjectionPlan}; use snafu::location; use super::ProjectionRequest; @@ -343,7 +343,25 @@ async fn do_take_rows( Ok(reordered.into()) }?; - let batch = projection.project_batch(batch).await?; + // compile `frag_id_to_offset` map - TODO @hamersaw - this needs to be a util function + let mut frag_id_to_offset = HashMap::new(); + let mut row_offset = 0; + for frag in builder.dataset.get_fragments() { + let deletion_vector = frag.get_deletion_vector().await?; + frag_id_to_offset.insert( + frag.id() as u32, + FragInfo { + row_offset, + deletion_vector, + }, + ); + // Should be sync unless the dataset was written by an extremely old lance version + row_offset += frag.count_rows(None).await? as u64; + } + + let batch = projection + .project_batch(batch, Arc::new(frag_id_to_offset)) + .await?; if builder.with_row_address { if batch.num_rows() != row_addrs.len() { return Err(Error::NotSupported { From 8dd654776dcd415fab01885457373a91aa1c645e Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 19 Jan 2026 21:29:43 -0600 Subject: [PATCH 06/15] manually computing row_offset following row_addr Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/exec.rs | 4 +- rust/lance-datafusion/src/projection.rs | 27 +++------- rust/lance/src/dataset/take.rs | 67 +++++++++++++++++++------ rust/lance/src/io/exec/rowids.rs | 17 ++++++- 4 files changed, 79 insertions(+), 36 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index e7de770a4c9..9c268e3ac5e 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -892,7 +892,7 @@ impl ExecutionPlan for StrictBatchSizeExec { } } -#[derive(Debug)] +/*#[derive(Debug)] pub struct FragInfo { /// The dataset offset of the first row in the fragment pub row_offset: u64, @@ -1089,4 +1089,4 @@ impl ExecutionPlan for AddRowOffsetExec { stream, ))) } -} +}*/ diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index a90c1a802d8..c9e88a22855 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -24,7 +24,8 @@ use lance_core::{ }; use crate::{ - exec::{execute_plan, AddRowOffsetExec, FragInfo, LanceExecutionOptions, OneShotExec}, + //exec::{execute_plan, AddRowOffsetExec, FragInfo, LanceExecutionOptions, OneShotExec}, + exec::{execute_plan, LanceExecutionOptions, OneShotExec}, planner::Planner, }; @@ -440,30 +441,18 @@ impl ProjectionPlan { } #[instrument(skip_all, level = "debug")] - pub async fn project_batch( - &self, - batch: RecordBatch, - frag_id_to_offset: Arc>, - ) -> Result { + pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); - // TODO @hamersaw - need to add "extra columns" here to get filterable schema + /*// TODO @hamersaw - need to add "extra columns" here to get filterable schema // rust/lance/src/dataset/scanner.rs 1526-1553 let mut extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)]; - /*if self.nearest.as_ref().is_some() { - extra_columns.push(ArrowField::new(DIST_COL, DataType::Float32, true)); - }; - - if self.full_text_query.is_some() { - extra_columns.push(ArrowField::new(SCORE_COL, DataType::Float32, true)); - }*/ - let mut filterable_schema = self.physical_projection.to_schema(); filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?; // TODO @hamersaw - result? - let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?; + let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?;*/ - //let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; + let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; let mut projection: Arc = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); @@ -480,10 +469,10 @@ impl ProjectionPlan { plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?); } */ - if self.must_add_row_offset { + /*if self.must_add_row_offset { println!("Adding AddRowOffsetExec to projection"); projection = Arc::new(AddRowOffsetExec::try_new(projection, frag_id_to_offset)?); - } + }*/ // Run dummy plan to execute projection, do not log the plan run let stream = execute_plan( diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 95b458de7d3..013d92a8afa 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -5,10 +5,11 @@ use std::{collections::BTreeMap, collections::HashMap, ops::Range, pin::Pin, syn use crate::dataset::fragment::FragReadConfig; use crate::dataset::rowids::get_row_id_index; +use crate::io::exec::AddRowOffsetExec; use crate::{Error, Result}; use arrow::{compute::concat_batches, datatypes::UInt64Type}; use arrow_array::cast::AsArray; -use arrow_array::{Array, RecordBatch, StructArray, UInt64Array}; +use arrow_array::{Array, ArrayRef, RecordBatch, StructArray, UInt64Array}; use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, NullBuffer}; use arrow_schema::Field as ArrowField; use datafusion::error::DataFusionError; @@ -18,8 +19,9 @@ use lance_arrow::RecordBatchExt; use lance_core::datatypes::Schema; use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::OffsetMapper; -use lance_core::ROW_ADDR; -use lance_datafusion::{exec::FragInfo, projection::ProjectionPlan}; +use lance_core::{ROW_ADDR, ROW_OFFSET}; +//use lance_datafusion::exec::{AddRowOffsetExec, FragInfo}; +use lance_datafusion::projection::ProjectionPlan; use snafu::location; use super::ProjectionRequest; @@ -344,7 +346,7 @@ async fn do_take_rows( }?; // compile `frag_id_to_offset` map - TODO @hamersaw - this needs to be a util function - let mut frag_id_to_offset = HashMap::new(); + /*let mut frag_id_to_offset = HashMap::new(); let mut row_offset = 0; for frag in builder.dataset.get_fragments() { let deletion_vector = frag.get_deletion_vector().await?; @@ -357,12 +359,24 @@ async fn do_take_rows( ); // Should be sync unless the dataset was written by an extremely old lance version row_offset += frag.count_rows(None).await? as u64; - } + }*/ + + // strip ROW_OFFSET from projection (if necessary) + let mut stripped_projection = projection.as_ref().clone(); + stripped_projection.requested_output_expr = stripped_projection + .requested_output_expr + .clone() + .into_iter() + .filter(|e| e.name != ROW_OFFSET) + .collect(); + + let p = Arc::new(stripped_projection); - let batch = projection - .project_batch(batch, Arc::new(frag_id_to_offset)) - .await?; - if builder.with_row_address { + let mut batch = p.project_batch(batch).await?; + //let mut batch = projection.project_batch(batch, Arc::new(frag_id_to_offset)).await?; + + if builder.with_row_address || projection.must_add_row_offset { + // build row_addr column if batch.num_rows() != row_addrs.len() { return Err(Error::NotSupported { source: format!( @@ -374,12 +388,37 @@ async fn do_take_rows( }); } - let row_addr_col = Arc::new(UInt64Array::from(row_addrs)); - let row_addr_field = ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); - Ok(batch.try_with_column(row_addr_field, row_addr_col)?) - } else { - Ok(batch) + let row_addr_col: ArrayRef = Arc::new(UInt64Array::from(row_addrs)); + + if projection.must_add_row_offset { + let row_offset_col = + AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; + let row_offset_field = + ArrowField::new(ROW_OFFSET, arrow::datatypes::DataType::UInt64, false); + batch = batch.try_with_column(row_offset_field, row_offset_col)?; + } + + if builder.with_row_address { + let row_addr_field = + ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); + batch = batch.try_with_column(row_addr_field, row_addr_col)?; + } } + + /*if projection.must_add_row_offset { + let row_addr_col = batch + .column_by_name(ROW_ADDR) + .ok_or_else(|| Error::Internal { + message: format!("Expected {} column to compute row offsets", ROW_ADDR), + location: location!(), + })?; + + let row_offset_col = AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; + let row_offset_field = ArrowField::new(ROW_OFFSET, arrow::datatypes::DataType::UInt64, false); + batch = batch.try_with_column(row_offset_field, row_offset_col)?; + }*/ + + Ok(batch) } async fn take_rows(builder: TakeBuilder) -> Result { diff --git a/rust/lance/src/io/exec/rowids.rs b/rust/lance/src/io/exec/rowids.rs index 0078c1256b7..abe8f229157 100644 --- a/rust/lance/src/io/exec/rowids.rs +++ b/rust/lance/src/io/exec/rowids.rs @@ -393,6 +393,13 @@ impl AddRowOffsetExec { input: Arc, dataset: Arc, ) -> LanceResult { + let frag_id_to_offset = Self::compute_frag_id_to_offset(dataset).await?; + Self::internal_new(input, frag_id_to_offset) + } + + async fn compute_frag_id_to_offset( + dataset: Arc, + ) -> LanceResult>> { let mut frag_id_to_offset = HashMap::new(); let mut row_offset = 0; for frag in dataset.get_fragments() { @@ -408,7 +415,15 @@ impl AddRowOffsetExec { row_offset += frag.count_rows(None).await? as u64; } - Self::internal_new(input, Arc::new(frag_id_to_offset)) + Ok(Arc::new(frag_id_to_offset)) + } + + pub async fn compute_row_offset_array( + row_addr: &ArrayRef, + dataset: Arc, + ) -> Result { + let frag_id_to_offset = Self::compute_frag_id_to_offset(dataset).await?; + Self::compute_row_offsets(row_addr, frag_id_to_offset.as_ref()) } fn compute_row_offsets( From 3267635f5b7b3b7dbac359952501ea7ca107c38f Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 19 Jan 2026 21:44:06 -0600 Subject: [PATCH 07/15] cleaned up Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/exec.rs | 215 +----------------------------- rust/lance/src/dataset/take.rs | 38 +----- 2 files changed, 11 insertions(+), 242 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 9c268e3ac5e..c61ff26419a 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -12,8 +12,8 @@ use std::{ use chrono::{DateTime, Utc}; -use arrow_array::{cast::AsArray, types::UInt64Type, ArrayRef, RecordBatch, UInt64Array}; -use arrow_schema::{Schema, Schema as ArrowSchema}; +use arrow_array::RecordBatch; +use arrow_schema::Schema as ArrowSchema; use datafusion::physical_plan::metrics::MetricType; use datafusion::{ catalog::streaming::StreamingTable, @@ -35,19 +35,17 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, }, }; -use datafusion_common::{DataFusionError, Result, Statistics}; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::{stream, StreamExt}; use lance_arrow::SchemaExt; use lance_core::{ utils::{ - address::RowAddress, - deletion::DeletionVector, futures::FinallyStreamExt, tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION}, }, - Error as LanceError, Result as LanceResult, ROW_ADDR, ROW_OFFSET, ROW_OFFSET_FIELD, + Error, Result, }; use log::{debug, info, warn}; use snafu::location; @@ -539,7 +537,7 @@ fn display_plan_one_liner_impl(plan: &dyn ExecutionPlan, output: &mut String) { pub fn execute_plan( plan: Arc, options: LanceExecutionOptions, -) -> LanceResult { +) -> Result { if !options.skip_logging { debug!( "Executing plan:\n{}", @@ -566,7 +564,7 @@ pub fn execute_plan( pub async fn analyze_plan( plan: Arc, options: LanceExecutionOptions, -) -> LanceResult { +) -> Result { // This is needed as AnalyzeExec launches a thread task per // partition, and we want these to be connected to the parent span let plan = Arc::new(TracedExec::new(plan, Span::current())); @@ -586,7 +584,7 @@ pub async fn analyze_plan( let mut stream = analyze .execute(0, get_task_context(&session_ctx, &options)) .map_err(|err| { - LanceError::io( + Error::io( format!("Failed to execute analyze plan: {}", err), location!(), ) @@ -891,202 +889,3 @@ impl ExecutionPlan for StrictBatchSizeExec { true } } - -/*#[derive(Debug)] -pub struct FragInfo { - /// The dataset offset of the first row in the fragment - pub row_offset: u64, - /// The deletion vector for this fragment, if any - pub deletion_vector: Option>, -} - -/// Add a `_rowoffset` column to a stream of record batches that have a `_rowaddr` column. -/// -/// The row offset is the number of rows between the current row and the first row in the dataset. -#[derive(Clone, Debug)] -pub struct AddRowOffsetExec { - input: Arc, - row_addr_pos: usize, - frag_id_to_offset: Arc>, - properties: PlanProperties, -} - -impl AddRowOffsetExec { - pub fn try_new( - input: Arc, - frag_id_to_offset: Arc>, - ) -> LanceResult { - let input_schema = input.schema(); - let row_addr_pos = input_schema - .index_of(ROW_ADDR) - .map_err(|_| LanceError::Internal { - message: format!("Input plan does not have a {} column", ROW_ADDR), - location: location!(), - })?; - - if input_schema.field_with_name(ROW_OFFSET).is_ok() { - return Err(LanceError::Internal { - message: format!("Input plan already has a {} column", ROW_OFFSET), - location: location!(), - }); - } - - let mut fields = input.schema().fields().iter().cloned().collect::>(); - fields.push(Arc::new(ROW_OFFSET_FIELD.clone())); - let schema = Arc::new(Schema::new_with_metadata( - fields, - input.schema().metadata().clone(), - )); - - let new_eq_props = - EquivalenceProperties::new(schema).extend(input.properties().eq_properties.clone())?; - let properties = input.properties().clone().with_eq_properties(new_eq_props); - - Ok(Self { - input, - row_addr_pos, - frag_id_to_offset, - properties, - }) - } - - fn compute_row_offsets( - row_addr: &ArrayRef, - frag_id_to_offset: &HashMap, - ) -> Result { - let row_addr_values = row_addr.as_primitive::().values(); - let mut row_offsets = Vec::with_capacity(row_addr_values.len()); - - let mut last_frag_id = u32::MAX; - let mut last_frag_offset = 0; - let mut last_frag_delete_count = 0; - let mut dv_iter = None; - - for addr in row_addr_values { - let addr = RowAddress::new_from_u64(*addr); - let frag_id = addr.fragment_id(); - if frag_id != last_frag_id { - last_frag_id = frag_id; - let Some(frag_info) = frag_id_to_offset.get(&frag_id) else { - return Err(DataFusionError::External(Box::new(LanceError::Internal { message: format!("A row address referred to a fragment {} that wasn't in the frag_id_to_offset map", frag_id), location: location!() }))); - }; - last_frag_offset = frag_info.row_offset; - last_frag_delete_count = 0; - dv_iter = frag_info - .deletion_vector - .as_ref() - .map(|dv| dv.to_sorted_iter().peekable()); - } - - let row_offset = addr.row_offset(); - if let Some(dv_iter) = &mut dv_iter { - while dv_iter.peek().is_some() { - if *dv_iter.peek().unwrap() < row_offset { - dv_iter.next(); - last_frag_delete_count += 1; - } else { - break; - } - } - row_offsets - .push(last_frag_offset + row_offset as u64 - last_frag_delete_count as u64); - } else { - row_offsets.push(last_frag_offset + row_offset as u64); - } - } - - Ok(Arc::new(UInt64Array::from(row_offsets))) - } -} - -impl DisplayAs for AddRowOffsetExec { - fn fmt_as( - &self, - _format_type: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "AddRowOffsetExec") - } -} - -impl ExecutionPlan for AddRowOffsetExec { - fn name(&self) -> &str { - "AddRowOffsetExec" - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.properties - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn maintains_input_order(&self) -> Vec { - vec![true] - } - - fn benefits_from_input_partitioning(&self) -> Vec { - vec![false] - } - - fn partition_statistics(&self, partition: Option) -> Result { - self.input.partition_statistics(partition) - } - - fn supports_limit_pushdown(&self) -> bool { - true - } - - fn cardinality_effect(&self) -> CardinalityEffect { - CardinalityEffect::Equal - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - if children.len() != 1 { - Err(DataFusionError::Internal( - "AddRowOffsetExec: invalid number of children".into(), - )) - } else { - Ok(Arc::new(Self::try_new( - children.into_iter().next().unwrap(), - self.frag_id_to_offset.clone(), - )?)) - } - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - let input_stream = self.input.execute(partition, context)?; - let schema = self.schema(); - let row_addr_pos = self.row_addr_pos; - let frag_id_to_offset = self.frag_id_to_offset.clone(); - let stream = input_stream.then(move |batch| { - let schema = schema.clone(); - let row_addr_pos = row_addr_pos; - let frag_id_to_offset = frag_id_to_offset.clone(); - async move { - let batch = batch?; - let row_addr = batch.column(row_addr_pos); - let row_offsets = Self::compute_row_offsets(row_addr, frag_id_to_offset.as_ref())?; - let mut columns = batch.columns().to_vec(); - columns.push(Arc::new(row_offsets)); - Ok(RecordBatch::try_new(schema.clone(), columns)?) - } - }); - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - stream, - ))) - } -}*/ diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 013d92a8afa..5abd80cd36c 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::BTreeMap, collections::HashMap, ops::Range, pin::Pin, sync::Arc}; +use std::{collections::BTreeMap, ops::Range, pin::Pin, sync::Arc}; use crate::dataset::fragment::FragReadConfig; use crate::dataset::rowids::get_row_id_index; @@ -20,7 +20,6 @@ use lance_core::datatypes::Schema; use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::OffsetMapper; use lance_core::{ROW_ADDR, ROW_OFFSET}; -//use lance_datafusion::exec::{AddRowOffsetExec, FragInfo}; use lance_datafusion::projection::ProjectionPlan; use snafu::location; @@ -345,23 +344,7 @@ async fn do_take_rows( Ok(reordered.into()) }?; - // compile `frag_id_to_offset` map - TODO @hamersaw - this needs to be a util function - /*let mut frag_id_to_offset = HashMap::new(); - let mut row_offset = 0; - for frag in builder.dataset.get_fragments() { - let deletion_vector = frag.get_deletion_vector().await?; - frag_id_to_offset.insert( - frag.id() as u32, - FragInfo { - row_offset, - deletion_vector, - }, - ); - // Should be sync unless the dataset was written by an extremely old lance version - row_offset += frag.count_rows(None).await? as u64; - }*/ - - // strip ROW_OFFSET from projection (if necessary) + // strip `ROW_OFFSET` from projection (if necessary) let mut stripped_projection = projection.as_ref().clone(); stripped_projection.requested_output_expr = stripped_projection .requested_output_expr @@ -369,11 +352,9 @@ async fn do_take_rows( .into_iter() .filter(|e| e.name != ROW_OFFSET) .collect(); - let p = Arc::new(stripped_projection); let mut batch = p.project_batch(batch).await?; - //let mut batch = projection.project_batch(batch, Arc::new(frag_id_to_offset)).await?; if builder.with_row_address || projection.must_add_row_offset { // build row_addr column @@ -390,6 +371,7 @@ async fn do_take_rows( let row_addr_col: ArrayRef = Arc::new(UInt64Array::from(row_addrs)); + // build and inject row_offset column (if necessary) if projection.must_add_row_offset { let row_offset_col = AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; @@ -398,6 +380,7 @@ async fn do_take_rows( batch = batch.try_with_column(row_offset_field, row_offset_col)?; } + // inject row_addr column (if necessary) if builder.with_row_address { let row_addr_field = ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); @@ -405,19 +388,6 @@ async fn do_take_rows( } } - /*if projection.must_add_row_offset { - let row_addr_col = batch - .column_by_name(ROW_ADDR) - .ok_or_else(|| Error::Internal { - message: format!("Expected {} column to compute row offsets", ROW_ADDR), - location: location!(), - })?; - - let row_offset_col = AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; - let row_offset_field = ArrowField::new(ROW_OFFSET, arrow::datatypes::DataType::UInt64, false); - batch = batch.try_with_column(row_offset_field, row_offset_col)?; - }*/ - Ok(batch) } From e486adae7a9f5b789a0fb5e5c80effc88e578bff Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 19 Jan 2026 21:45:24 -0600 Subject: [PATCH 08/15] and more cleanup Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/projection.rs | 33 ------------------------- 1 file changed, 33 deletions(-) diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index c9e88a22855..29667e6d451 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -3,10 +3,6 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; -use datafusion::{ - logical_expr::Expr, - physical_plan::{projection::ProjectionExec, ExecutionPlan}, -}; use datafusion_common::{Column, DFSchema}; use datafusion_physical_expr::PhysicalExpr; use futures::TryStreamExt; @@ -24,7 +20,6 @@ use lance_core::{ }; use crate::{ - //exec::{execute_plan, AddRowOffsetExec, FragInfo, LanceExecutionOptions, OneShotExec}, exec::{execute_plan, LanceExecutionOptions, OneShotExec}, planner::Planner, }; @@ -443,37 +438,9 @@ impl ProjectionPlan { #[instrument(skip_all, level = "debug")] pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); - - /*// TODO @hamersaw - need to add "extra columns" here to get filterable schema - // rust/lance/src/dataset/scanner.rs 1526-1553 - let mut extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)]; - let mut filterable_schema = self.physical_projection.to_schema(); - filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?; // TODO @hamersaw - result? - - let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?;*/ - let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; let mut projection: Arc = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); - - /* - * do we need to inject the RowOffset field after we have the batch? - * then we update the projection for the "current_schema" to have `_rowoffset` - * from rust/lance/src/dataset/scanner.rs:2114-2117: - * - * or do we manually compute and inject row_offset column in batch like in - * rust/lance/src/io/exec/rowids.rs:526-553 - * - // Stage 6: Add system columns, if requested - if self.projection_plan.must_add_row_offset { - plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?); - } - */ - /*if self.must_add_row_offset { - println!("Adding AddRowOffsetExec to projection"); - projection = Arc::new(AddRowOffsetExec::try_new(projection, frag_id_to_offset)?); - }*/ - // Run dummy plan to execute projection, do not log the plan run let stream = execute_plan( projection, From bcbd005aa4bfdc4fe4c10e220f9e58e45822e6aa Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 20 Jan 2026 07:13:32 -0600 Subject: [PATCH 09/15] last cleanup Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/projection.rs | 5 +++-- rust/lance/src/dataset/fragment.rs | 29 +++++++++++++++++++++---- rust/lance/src/dataset/take.rs | 11 +++++----- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index 29667e6d451..d8e289e505a 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -3,6 +3,7 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; +use datafusion::{logical_expr::Expr, physical_plan::projection::ProjectionExec}; use datafusion_common::{Column, DFSchema}; use datafusion_physical_expr::PhysicalExpr; use futures::TryStreamExt; @@ -439,8 +440,8 @@ impl ProjectionPlan { pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; - let mut projection: Arc = - Arc::new(ProjectionExec::try_new(physical_exprs, src)?); + let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); + // Run dummy plan to execute projection, do not log the plan run let stream = execute_plan( projection, diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 483c5dd10be..985040227b8 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -3319,7 +3319,14 @@ mod tests { // Repeated indices are repeated in result. let batch = fragment - .take_rows(&[1, 2, 4, 5, 5, 8], dataset.schema(), false, false) + .take_rows( + &[1, 2, 4, 5, 5, 8], + dataset.schema(), + false, + false, + false, + false, + ) .await .unwrap(); assert_eq!( @@ -3338,7 +3345,14 @@ mod tests { .unwrap(); assert!(fragment.metadata().deletion_file.is_some()); let batch = fragment - .take_rows(&[1, 2, 4, 5, 8], dataset.schema(), false, false) + .take_rows( + &[1, 2, 4, 5, 8], + dataset.schema(), + false, + false, + false, + false, + ) .await .unwrap(); assert_eq!( @@ -3348,7 +3362,7 @@ mod tests { // Empty indices gives empty result let batch = fragment - .take_rows(&[], dataset.schema(), false, false) + .take_rows(&[], dataset.schema(), false, false, false, false) .await .unwrap(); assert_eq!( @@ -3358,7 +3372,14 @@ mod tests { // Can get row ids let batch = fragment - .take_rows(&[1, 2, 4, 5, 8], dataset.schema(), false, true) + .take_rows( + &[1, 2, 4, 5, 8], + dataset.schema(), + false, + true, + false, + false, + ) .await .unwrap(); assert_eq!( diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 5abd80cd36c..44c5ceaf1fe 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -344,7 +344,7 @@ async fn do_take_rows( Ok(reordered.into()) }?; - // strip `ROW_OFFSET` from projection (if necessary) + // strip `ROW_OFFSET` field from projection (if exists) let mut stripped_projection = projection.as_ref().clone(); stripped_projection.requested_output_expr = stripped_projection .requested_output_expr @@ -352,12 +352,11 @@ async fn do_take_rows( .into_iter() .filter(|e| e.name != ROW_OFFSET) .collect(); - let p = Arc::new(stripped_projection); - let mut batch = p.project_batch(batch).await?; + let mut batch = (Arc::new(stripped_projection)).project_batch(batch).await?; if builder.with_row_address || projection.must_add_row_offset { - // build row_addr column + // compile `ROW_ADDR` column if batch.num_rows() != row_addrs.len() { return Err(Error::NotSupported { source: format!( @@ -371,8 +370,8 @@ async fn do_take_rows( let row_addr_col: ArrayRef = Arc::new(UInt64Array::from(row_addrs)); - // build and inject row_offset column (if necessary) if projection.must_add_row_offset { + // compile and inject `ROW_OFFSET` column let row_offset_col = AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; let row_offset_field = @@ -380,8 +379,8 @@ async fn do_take_rows( batch = batch.try_with_column(row_offset_field, row_offset_col)?; } - // inject row_addr column (if necessary) if builder.with_row_address { + // inject `ROW_ADDR` column let row_addr_field = ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); batch = batch.try_with_column(row_addr_field, row_addr_col)?; From 6c7a205ef2a13bd134e09955b91280c02a088314 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 21 Jan 2026 15:26:54 -0600 Subject: [PATCH 10/15] adding columns before projection to ensure ordering correctness Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/projection.rs | 8 +++++++- rust/lance/src/dataset/take.rs | 15 ++------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index d8e289e505a..b8ecc04b7c6 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -439,7 +439,13 @@ impl ProjectionPlan { #[instrument(skip_all, level = "debug")] pub async fn project_batch(&self, batch: RecordBatch) -> Result { let src = Arc::new(OneShotExec::from_batch(batch)); - let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; + + // Need to add ROW_OFFSET to get filterable schema + let extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)]; + let mut filterable_schema = self.physical_projection.to_schema(); + filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?; + + let physical_exprs = self.to_physical_exprs(&(&filterable_schema).into())?; let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); // Run dummy plan to execute projection, do not log the plan run diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 44c5ceaf1fe..da5fc076b4e 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -184,7 +184,7 @@ async fn do_take_rows( } let physical_schema = Arc::new(projection.physical_projection.to_bare_schema()); - let batch = if row_addr_stats.contiguous { + let mut batch = if row_addr_stats.contiguous { // Fastest path: Can use `read_range` directly let start = row_addrs.first().expect("empty range passed to take_rows"); let fragment_id = (start >> 32) as usize; @@ -344,17 +344,6 @@ async fn do_take_rows( Ok(reordered.into()) }?; - // strip `ROW_OFFSET` field from projection (if exists) - let mut stripped_projection = projection.as_ref().clone(); - stripped_projection.requested_output_expr = stripped_projection - .requested_output_expr - .clone() - .into_iter() - .filter(|e| e.name != ROW_OFFSET) - .collect(); - - let mut batch = (Arc::new(stripped_projection)).project_batch(batch).await?; - if builder.with_row_address || projection.must_add_row_offset { // compile `ROW_ADDR` column if batch.num_rows() != row_addrs.len() { @@ -387,7 +376,7 @@ async fn do_take_rows( } } - Ok(batch) + Ok(projection.project_batch(batch).await?) } async fn take_rows(builder: TakeBuilder) -> Result { From 99a7403e28c3de1d38b1da68397443e51da1432e Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Sat, 24 Jan 2026 20:59:04 -0600 Subject: [PATCH 11/15] added python tests Signed-off-by: Daniel Rammer --- python/python/tests/test_dataset.py | 137 ++++++++++++++++++++++++++++ rust/lance/src/dataset/take.rs | 2 +- 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9cf7c824a60..3751c4717a8 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -685,6 +685,143 @@ def test_take_rowid_rowaddr(tmp_path: Path): assert sample_dataset.num_columns == 2 +@pytest.mark.parametrize( + "column_name", + [ + "_rowid", + "_rowaddr", + "_rowoffset", + "_row_created_at_version", + "_row_last_updated_at_version", + ], +) +def test_take_system_columns_values(tmp_path: Path, column_name: str): + """Test that system columns return correct values in take.""" + table = pa.table({"a": range(100), "b": range(100, 200)}) + base_dir = tmp_path / "test_take_system_columns_values" + # Use max_rows_per_file to create multiple fragments + lance.write_dataset(table, base_dir, max_rows_per_file=25) + dataset = lance.dataset(base_dir) + + indices = [0, 5, 10, 50, 99] + result = dataset.take(indices, columns=[column_name, "a"]) + assert result.num_rows == len(indices) + assert result.schema.names == [column_name, "a"] + + col_values = result.column(column_name).to_pylist() + a_values = result.column("a").to_pylist() + + # Verify column type is UInt64 + assert result.column(column_name).type == pa.uint64() + + # Verify data column values + assert a_values == indices + + # Verify system column values based on column type + if column_name == "_rowid": + # Without stable row IDs, _rowid should match the index + assert col_values == indices + elif column_name in ("_row_created_at_version", "_row_last_updated_at_version"): + # All rows created/updated at version 1 + assert col_values == [1] * len(indices) + # _rowaddr and _rowoffset values depend on fragment layout + + +def test_take_system_columns_column_ordering(tmp_path: Path): + """Test that column ordering is preserved when using system columns.""" + table = pa.table({"a": range(50), "b": range(50, 100)}) + base_dir = tmp_path / "test_take_column_ordering" + lance.write_dataset(table, base_dir) + dataset = lance.dataset(base_dir) + + indices = [0, 1, 2] + + # Test different orderings with all system columns + result = dataset.take(indices, columns=["_rowid", "a", "_rowaddr"]) + assert result.schema.names == ["_rowid", "a", "_rowaddr"] + + result = dataset.take(indices, columns=["a", "_rowaddr", "_rowid"]) + assert result.schema.names == ["a", "_rowaddr", "_rowid"] + + result = dataset.take(indices, columns=["_rowaddr", "_rowid", "b", "a"]) + assert result.schema.names == ["_rowaddr", "_rowid", "b", "a"] + + # Test with version columns + result = dataset.take( + indices, + columns=[ + "_row_created_at_version", + "a", + "_row_last_updated_at_version", + "_rowid", + ], + ) + assert result.schema.names == [ + "_row_created_at_version", + "a", + "_row_last_updated_at_version", + "_rowid", + ] + + # Test with all system columns in mixed order + result = dataset.take( + indices, + columns=[ + "_rowoffset", + "_row_last_updated_at_version", + "b", + "_rowaddr", + "_row_created_at_version", + "a", + "_rowid", + ], + ) + assert result.schema.names == [ + "_rowoffset", + "_row_last_updated_at_version", + "b", + "_rowaddr", + "_row_created_at_version", + "a", + "_rowid", + ] + + +def test_take_version_system_columns(tmp_path: Path): + """Test _row_created_at_version and _row_last_updated_at_version columns.""" + table = pa.table({"a": range(50)}) + base_dir = tmp_path / "test_take_version_columns" + lance.write_dataset(table, base_dir) + dataset = lance.dataset(base_dir) + + # Initial version is 1 + initial_version = dataset.version + + indices = [0, 10, 25] + result = dataset.take( + indices, + columns=["a", "_row_created_at_version", "_row_last_updated_at_version"], + ) + + assert result.num_rows == 3 + created_at = result.column("_row_created_at_version").to_pylist() + updated_at = result.column("_row_last_updated_at_version").to_pylist() + + # All rows were created and last updated at the initial version + assert created_at == [initial_version] * 3 + assert updated_at == [initial_version] * 3 + + # Now update some rows by overwriting + table2 = pa.table({"a": range(50, 100)}) + lance.write_dataset(table2, base_dir, mode="append") + dataset = lance.dataset(base_dir) + + # New rows should have version 2 + result = dataset.take([50, 60], columns=["_row_created_at_version"]) + created_at = result.column("_row_created_at_version").to_pylist() + assert created_at == [dataset.version] * 2 + + @pytest.mark.parametrize("indices", [[], [1, 1], [1, 1, 20, 20, 21], [21, 0, 21, 1, 0]]) def test_take_duplicate_index(tmp_path: Path, indices: List[int]): table = pa.table({"x": range(24)}) diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index da5fc076b4e..63f9b0d2337 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -376,7 +376,7 @@ async fn do_take_rows( } } - Ok(projection.project_batch(batch).await?) + projection.project_batch(batch).await? } async fn take_rows(builder: TakeBuilder) -> Result { From 5ceb80ab10518a9c67630c1cb268924df9b31b98 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Sat, 24 Jan 2026 21:09:43 -0600 Subject: [PATCH 12/15] fix clippy Signed-off-by: Daniel Rammer --- rust/lance/src/dataset/take.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 63f9b0d2337..c50a93573ac 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -126,6 +126,7 @@ pub async fn take( } /// Take rows by the internal ROW ids. +#[allow(clippy::needless_question_mark)] async fn do_take_rows( mut builder: TakeBuilder, projection: Arc, @@ -376,7 +377,7 @@ async fn do_take_rows( } } - projection.project_batch(batch).await? + Ok(projection.project_batch(batch).await?) } async fn take_rows(builder: TakeBuilder) -> Result { From a034d306d9ee484b09c167b4fe1f933012ca9e2f Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Sat, 24 Jan 2026 21:42:31 -0600 Subject: [PATCH 13/15] fixed existing take_blob apis Signed-off-by: Daniel Rammer --- rust/lance-datafusion/src/projection.rs | 5 ++++- rust/lance/src/dataset/take.rs | 23 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index b8ecc04b7c6..aacb63d4118 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -441,7 +441,10 @@ impl ProjectionPlan { let src = Arc::new(OneShotExec::from_batch(batch)); // Need to add ROW_OFFSET to get filterable schema - let extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)]; + let extra_columns = vec![ + ArrowField::new(ROW_ADDR, DataType::UInt64, true), + ArrowField::new(ROW_OFFSET, DataType::UInt64, true), + ]; let mut filterable_schema = self.physical_projection.to_schema(); filterable_schema = filterable_schema.merge(&ArrowSchema::new(extra_columns))?; diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index c50a93573ac..00a59f15f5a 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -12,15 +12,17 @@ use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, RecordBatch, StructArray, UInt64Array}; use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, NullBuffer}; use arrow_schema::Field as ArrowField; +use datafusion::common::Column; use datafusion::error::DataFusionError; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion_expr::Expr; use futures::{Future, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; use lance_core::datatypes::Schema; use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::OffsetMapper; use lance_core::{ROW_ADDR, ROW_OFFSET}; -use lance_datafusion::projection::ProjectionPlan; +use lance_datafusion::projection::{OutputColumn, ProjectionPlan}; use snafu::location; use super::ProjectionRequest; @@ -131,6 +133,25 @@ async fn do_take_rows( mut builder: TakeBuilder, projection: Arc, ) -> Result { + // If we need row addresses in output, add to projection's output expressions + let projection = if builder.with_row_address { + let mut proj = (*projection).clone(); + // Add _rowaddr to output if not already present + if !proj + .requested_output_expr + .iter() + .any(|c| c.name == ROW_ADDR) + { + proj.requested_output_expr.push(OutputColumn { + expr: Expr::Column(Column::from_name(ROW_ADDR)), + name: ROW_ADDR.to_string(), + }); + } + Arc::new(proj) + } else { + projection + }; + let with_row_id_in_projection = projection.physical_projection.with_row_id; let with_row_addr_in_projection = projection.physical_projection.with_row_addr; let with_row_created_at_version_in_projection = From 4543ee9e594f272571d388df50aae98bd11d186d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 27 Jan 2026 12:03:15 -0600 Subject: [PATCH 14/15] updated python _row_id tests to use the fragment computed offset Signed-off-by: Daniel Rammer --- python/python/tests/test_dataset.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 3751c4717a8..fa24cb24f7d 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -719,8 +719,17 @@ def test_take_system_columns_values(tmp_path: Path, column_name: str): # Verify system column values based on column type if column_name == "_rowid": - # Without stable row IDs, _rowid should match the index - assert col_values == indices + # Without stable row IDs, _rowid equals _rowaddr (not the index). + # Row address = (fragment_id << 32) | row_offset_within_fragment + # With max_rows_per_file=25: frag0=0-24, frag1=25-49, frag2=50-74, frag3=75-99 + expected_rowids = [ + (0 << 32) | 0, # index 0: fragment 0, offset 0 + (0 << 32) | 5, # index 5: fragment 0, offset 5 + (0 << 32) | 10, # index 10: fragment 0, offset 10 + (2 << 32) | 0, # index 50: fragment 2, offset 0 + (3 << 32) | 24, # index 99: fragment 3, offset 24 + ] + assert col_values == expected_rowids elif column_name in ("_row_created_at_version", "_row_last_updated_at_version"): # All rows created/updated at version 1 assert col_values == [1] * len(indices) From 2f4b31e46d06b561f55c84a55dfe9c1d6ad7a9e8 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 27 Jan 2026 14:25:49 -0600 Subject: [PATCH 15/15] updated version tests to use stable row ids Signed-off-by: Daniel Rammer --- python/python/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index fa24cb24f7d..8bebfe2c4cb 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -800,7 +800,7 @@ def test_take_version_system_columns(tmp_path: Path): """Test _row_created_at_version and _row_last_updated_at_version columns.""" table = pa.table({"a": range(50)}) base_dir = tmp_path / "test_take_version_columns" - lance.write_dataset(table, base_dir) + lance.write_dataset(table, base_dir, enable_stable_row_ids=True) dataset = lance.dataset(base_dir) # Initial version is 1