From ace48dacf374e8eb789b934679e8f4ed2216f2d8 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 28 Nov 2025 19:07:21 +0800 Subject: [PATCH 1/5] feat(cdf): support cdf upsert view --- rust/lance/src/dataset/delta.rs | 141 ++++++++++++++++++++++++++++++-- 1 file changed, 135 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index f27fe358e5f..1a15ab4e40d 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -215,15 +215,19 @@ impl DatasetDelta { ])?; // Filter for rows created in the version range - let filter = format!( - "_row_created_at_version > {} AND _row_created_at_version <= {}", - self.begin_version, self.end_version - ); + let filter = self.build_inserted_rows_filter(); scanner.filter(&filter)?; scanner.try_into_stream().await } + fn build_inserted_rows_filter(&self) -> String { + format!( + "_row_created_at_version > {} AND _row_created_at_version <= {}", + self.begin_version, self.end_version + ) + } + /// Get updated rows between the two versions. /// /// This returns rows where `_row_last_updated_at_version` is greater than `begin_version` @@ -268,10 +272,78 @@ impl DatasetDelta { ])?; // Filter for rows that were updated (not inserted) in the version range - let filter = format!( + let filter = self.build_updated_rows_batch_filter(); + scanner.filter(&filter)?; + + scanner.try_into_stream().await + } + + fn build_updated_rows_batch_filter(&self) -> String { + format!( "_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", self.begin_version, self.begin_version, self.end_version - ); + ) + } + + fn build_upserted_rows_filter(&self) -> String { + let inserted_row_filter = self.build_inserted_rows_filter(); + let updated_rows_filter = self.build_updated_rows_batch_filter(); + format!( + "({}) OR ({})", + inserted_row_filter, updated_rows_filter + ) + } + + /// Get upserted rows between the two versions. + /// + /// This returns rows meet following conditions: + /// Condition 1: + /// `_row_last_updated_at_version` is greater than `begin_version` + /// and less than or equal to `end_version`, but `_row_created_at_version` is less than + /// or equal to `begin_version` (to exclude newly inserted rows). + /// Condition 2: + /// This returns rows where `_row_created_at_version` is greater than `begin_version` + /// and less than or equal to `end_version`. + /// + /// The result always includes: + /// - `_row_created_at_version`: Version when the row was created + /// - `_row_last_updated_at_version`: Version when the row was last updated + /// - `_rowid`: Row ID + /// - All other columns from the dataset + /// + /// # Returns + /// + /// A stream of record batches containing the updated and inserted rows. + /// + /// # Example + /// + /// ``` + /// # use lance::{Dataset, Result}; + /// # use futures::TryStreamExt; + /// # async fn example(dataset: &Dataset, previous_version: u64) -> Result<()> { + /// let delta = dataset.delta() + /// .compared_against_version(previous_version) + /// .build()?; + /// let mut updated = delta.get_upserted_rows().await?; + /// while let Some(batch) = updated.try_next().await? { + /// // Process batch... + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn get_upserted_rows(&self) -> Result { + let mut scanner = self.base_dataset.scan(); + + // Enable version columns + scanner.project(&[ + WILDCARD, + ROW_ID, + ROW_CREATED_AT_VERSION, + ROW_LAST_UPDATED_AT_VERSION, + ])?; + + // Filter for rows that were updated or inserted in the version range + let filter = self.build_upserted_rows_filter(); scanner.filter(&filter)?; scanner.try_into_stream().await @@ -1298,4 +1370,61 @@ mod tests { assert_eq!(created_at[i], 1); // All created at version 1 } } + + #[tokio::test] + async fn test_get_upsert_rows() { + // Create initial dataset (version 1) + let temp_dir = lance_core::utils::tempfile::TempStrDir::default(); + let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await; + + assert_eq!(ds.version().version, 1); + + // Append inserted rows (version 2) + let ds = write_dataset_temp(&temp_dir, 50, 20, 1, "appended_v2", true, true).await; + assert_eq!(ds.version().version, 2); + + // Update some existing rows (version 3) + let ds = update_where(ds, "key < 10", "updated_v3").await; + assert_eq!(ds.version().version, 3); + + // Get upserted rows between version 1 and 3 + let delta = ds + .delta() + .with_begin_version(1) + .with_end_version(3) + .build() + .unwrap(); + + let stream = delta.get_upserted_rows().await.unwrap(); + let result = collect_stream(stream).await; + + // Should include 20 inserted rows (keys 50-69) and 10 updated rows (keys 0-9) + assert_eq!(result.num_rows(), 30); + assert!(result.column_by_name(ROW_ID).is_some()); + assert!(result.column_by_name(ROW_CREATED_AT_VERSION).is_some()); + assert!(result.column_by_name(ROW_LAST_UPDATED_AT_VERSION).is_some()); + + let created_at = result[ROW_CREATED_AT_VERSION] + .as_primitive::() + .values(); + let updated_at = result[ROW_LAST_UPDATED_AT_VERSION] + .as_primitive::() + .values(); + let keys = result["key"].as_primitive::().values(); + let a = 1; + + for i in 0..result.num_rows() { + let key = keys[i]; + if key < 10 { + // Updated rows from version 3 + assert_eq!(created_at[i], 1); + assert_eq!(updated_at[i], 3); + } else { + // Inserted rows from version 2 + assert!((50..70).contains(&key)); + assert_eq!(created_at[i], 2); + assert_eq!(updated_at[i], 2); + } + } + } } From 0f740ff514ea4ff262360b3cfbe91fba145f8918 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 28 Nov 2025 19:09:33 +0800 Subject: [PATCH 2/5] feat(cdf): support cdf upsert view --- rust/lance/src/dataset/delta.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 1a15ab4e40d..a32c5442a16 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -288,10 +288,7 @@ impl DatasetDelta { fn build_upserted_rows_filter(&self) -> String { let inserted_row_filter = self.build_inserted_rows_filter(); let updated_rows_filter = self.build_updated_rows_batch_filter(); - format!( - "({}) OR ({})", - inserted_row_filter, updated_rows_filter - ) + format!("({}) OR ({})", inserted_row_filter, updated_rows_filter) } /// Get upserted rows between the two versions. From 07b6febea9dcdacf8ad9fb4af6854d9ec4848305 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 14:17:23 +0800 Subject: [PATCH 3/5] feat(cdf): support cdf upsert view --- rust/lance/src/dataset/delta.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 68ea6ac8f18..9638c6c7863 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1409,7 +1409,6 @@ mod tests { .as_primitive::() .values(); let keys = result["key"].as_primitive::().values(); - let a = 1; for i in 0..result.num_rows() { let key = keys[i]; From 82ccb5e73ee2bc045dc9687b9ee65a871e6bd23c Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 21 Jan 2026 19:26:56 +0800 Subject: [PATCH 4/5] code review --- rust/lance/src/dataset/delta.rs | 34 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 44b31306287..1455d94c7c2 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -319,21 +319,18 @@ impl DatasetDelta { ])?; // Filter for rows created in the version range - let filter = self.build_inserted_rows_filter().await; + let filter = self.build_inserted_rows_filter().await?; scanner.filter(&filter)?; scanner.try_into_stream().await } - async fn build_inserted_rows_filter(&self) -> String { - let (begin_version, end_version) = self - .resolve_range() - .await - .unwrap_or((self.begin_version, self.end_version)); - format!( + async fn build_inserted_rows_filter(&self) -> Result { + let (begin_version, end_version) = self.resolve_range().await?; + Ok(format!( "_row_created_at_version > {} AND _row_created_at_version <= {}", begin_version, end_version - ) + )) } /// Get updated rows between the two versions. @@ -380,21 +377,20 @@ impl DatasetDelta { ])?; // Filter for rows that were updated (not inserted) in the version range - let filter = self.build_updated_rows_batch_filter().await; + let filter = self.build_updated_rows_batch_filter().await?; scanner.filter(&filter)?; scanner.try_into_stream().await } - async fn build_updated_rows_batch_filter(&self) -> String { + async fn build_updated_rows_batch_filter(&self) -> Result { let (begin_version, end_version) = self .resolve_range() - .await - .unwrap_or((self.begin_version, self.end_version)); - format!( + .await?; + Ok(format!( "_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", begin_version, begin_version, end_version - ) + )) } /// Get upserted rows between the two versions. @@ -446,16 +442,16 @@ impl DatasetDelta { ])?; // Filter for rows that were updated or inserted in the version range - let filter = self.build_upserted_rows_filter().await; + let filter = self.build_upserted_rows_filter().await?; scanner.filter(&filter)?; scanner.try_into_stream().await } - async fn build_upserted_rows_filter(&self) -> String { - let inserted_row_filter = self.build_inserted_rows_filter().await; - let updated_rows_filter = self.build_updated_rows_batch_filter().await; - format!("({}) OR ({})", inserted_row_filter, updated_rows_filter) + async fn build_upserted_rows_filter(&self) -> Result { + let inserted_row_filter = self.build_inserted_rows_filter().await?; + let updated_rows_filter = self.build_updated_rows_batch_filter().await?; + Ok( format!("({}) OR ({})", inserted_row_filter, updated_rows_filter)) } } From 6a049440fd9bcbad1a6e3f8ccae3644e20998c6d Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 21 Jan 2026 19:30:37 +0800 Subject: [PATCH 5/5] code review --- rust/lance/src/dataset/delta.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 1455d94c7c2..1ee94e2d4e3 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -384,9 +384,7 @@ impl DatasetDelta { } async fn build_updated_rows_batch_filter(&self) -> Result { - let (begin_version, end_version) = self - .resolve_range() - .await?; + let (begin_version, end_version) = self.resolve_range().await?; Ok(format!( "_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", begin_version, begin_version, end_version @@ -451,7 +449,10 @@ impl DatasetDelta { async fn build_upserted_rows_filter(&self) -> Result { let inserted_row_filter = self.build_inserted_rows_filter().await?; let updated_rows_filter = self.build_updated_rows_batch_filter().await?; - Ok( format!("({}) OR ({})", inserted_row_filter, updated_rows_filter)) + Ok(format!( + "({}) OR ({})", + inserted_row_filter, updated_rows_filter + )) } }