From 4701754e89113823973638f1e7d4a9280bd008a6 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 21:50:36 +0800 Subject: [PATCH 1/9] feat(cdf): support set start/end timestamp in cdf --- rust/lance/src/dataset/delta.rs | 135 ++++++++++++++++++++++++++++---- 1 file changed, 119 insertions(+), 16 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index f27fe358e5f..2100acd7477 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::rc::Rc; +use chrono::{DateTime, Utc}; use super::transaction::Transaction; use crate::dataset::scanner::DatasetRecordBatchStream; use crate::Dataset; @@ -13,6 +15,7 @@ use lance_core::ROW_ID; use lance_core::ROW_LAST_UPDATED_AT_VERSION; use lance_core::WILDCARD; use snafu::location; +use crate::dataset::Version; /// Builder for creating a [`DatasetDelta`] to explore changes between dataset versions. /// @@ -40,6 +43,7 @@ pub struct DatasetDeltaBuilder { compared_against_version: Option, begin_version: Option, end_version: Option, + cached_versions: Rc>, } impl DatasetDeltaBuilder { @@ -50,6 +54,7 @@ impl DatasetDeltaBuilder { compared_against_version: None, begin_version: None, end_version: None, + cached_versions: Rc::new(Vec::new()), } } @@ -80,6 +85,51 @@ impl DatasetDeltaBuilder { self } + /// Set the beginning date for the delta (exclusive). + /// + /// Must be used together with `with_end_date`. + pub async fn with_begin_date(mut self, date_str: &str) -> Self { + if let Ok(date_time) = date_str.parse::>() { + let versions = self.list_versions().await; + let begin_version = versions + .iter() + .find(|v| v.timestamp >= date_time) + .map(|v| v.version) + .ok_or_else(|| { + Error::invalid_input( + format!("Can not find version with timestamp >= {}", date_str), + location!(), + ) + }); + self.begin_version = Some(begin_version.unwrap()); + } + self + } + + /// Set the ending date for the delta (inclusive). + /// + /// Must be used together with `with_begin_date`. + pub async fn with_end_date(mut self, date_str: &str) -> Self { + if let Ok(date_time) = date_str.parse::>() { + let versions = self.list_versions().await; + let end_version = versions + .iter() + .rev() + .find(|v| v.timestamp < date_time) + .map(|v| v.version) + .unwrap_or(self.dataset.version().version); + self.end_version = Some(end_version); + } + self + } + + async fn list_versions(&mut self) -> Rc> { + if self.cached_versions.is_empty() { + self.cached_versions = Rc::new(self.dataset.versions().await.unwrap()); + } + Rc::clone(&self.cached_versions) + } + /// Build the [`DatasetDelta`]. /// /// # Errors @@ -105,19 +155,19 @@ impl DatasetDeltaBuilder { (None, Some(begin), Some(end)) => (begin, end), (Some(_), Some(_), _) | (Some(_), _, Some(_)) => { return Err(Error::invalid_input( - "Cannot specify both compared_against_version and explicit begin/end versions", + "Cannot specify both compared_against_version and explicit begin/end versions(dates)", location!(), )); } (None, Some(_), None) | (None, None, Some(_)) => { return Err(Error::invalid_input( - "Must specify both with_begin_version and with_end_version", + "Must specify both (with_begin_version and with_end_version) or (with_begin_date and with_end_date)", location!(), )); } (None, None, None) => { return Err(Error::invalid_input( - "Must specify either compared_against_version or both with_begin_version and with_end_version", + "Must specify either (compared_against_version) or (both with_begin_version and with_end_version) or (with_begin_date and with_end_date)", location!(), )); } @@ -554,7 +604,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -606,7 +656,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -649,7 +699,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -785,7 +835,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version = 1"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); let created_at = result[ROW_CREATED_AT_VERSION] @@ -804,7 +854,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version = 2"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); let created_at = result[ROW_CREATED_AT_VERSION] @@ -823,7 +873,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version >= 2"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); for i in 0..result.num_rows() { @@ -994,7 +1044,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 1"), ) - .await; + .await; // Should have 40 rows (keys 0-19 and 30-49) assert_eq!(result.num_rows(), 40); @@ -1020,7 +1070,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 3"), ) - .await; + .await; // Should have 10 rows (keys 20-29) assert_eq!(result.num_rows(), 10); @@ -1045,7 +1095,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = _row_last_updated_at_version"), ) - .await; + .await; // Should have 90 rows (40 from v1 that weren't updated + 50 from v2) assert_eq!(result.num_rows(), 90); @@ -1067,7 +1117,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version != _row_last_updated_at_version"), ) - .await; + .await; // Should have 10 rows (keys 20-29 that were updated) assert_eq!(result.num_rows(), 10); @@ -1102,7 +1152,7 @@ mod tests { &["key", "value", ROW_LAST_UPDATED_AT_VERSION], Some("key < 50 AND _row_last_updated_at_version = 2"), ) - .await; + .await; // Should have 20 rows (keys 30-49 that were updated in v2) assert_eq!(result.num_rows(), 20); @@ -1248,7 +1298,7 @@ mod tests { for i in 0..result.num_rows() { assert_eq!(created_at[i], 1); // Created at version 1 assert_eq!(updated_at[i], 2); // Updated at version 2 - // Keys should be in range [0, 30) but excluding [10, 20) + // Keys should be in range [0, 30) but excluding [10, 20) assert!(keys[i] < 30); assert!(keys[i] < 10 || keys[i] >= 20); } @@ -1298,4 +1348,57 @@ mod tests { assert_eq!(created_at[i], 1); // All created at version 1 } } -} + + #[tokio::test] + async fn test_delta_build_with_date_range_transactions() { + // 使用 MockClock 控制各版本的时间戳 + let temp_dir = lance_core::utils::tempfile::TempStrDir::default(); + + // 版本1,时间 t=10s + mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(10)); + let mut ds = write_dataset_temp(&temp_dir, 0, 50, 1, "v1", true, false).await; + let t1 = chrono::DateTime::from_timestamp(10, 0).unwrap().to_rfc3339(); + assert_eq!(ds.version().version, 1); + + // Version 2, time t=20s (append) + mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(20)); + ds = write_dataset_temp(&temp_dir, 50, 10, 1, "v2_append", true, true).await; + let t2 = chrono::DateTime::from_timestamp(20, 0).unwrap().to_rfc3339(); + assert_eq!(ds.version().version, 2); + + // Version 3, time t=30s (update) + mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(30)); + ds = update_where(ds, "key >= 0 AND key < 10", "updated_v3").await; + let t3 = chrono::DateTime::from_timestamp(30, 0).unwrap().to_rfc3339(); + assert_eq!(ds.version().version, 3); + + // Note: with_end_date currently uses "< date" semantics. To include version t3, we set end to t3 + 1s + let t3_plus = (chrono::DateTime::from_timestamp(30, 0).unwrap() + + chrono::Duration::seconds(1)) + .to_rfc3339(); + + // Build delta, begin=t1(exclusive v1), end=t3_plus(inclusive v3) -> expect to include both v2 and v3 transactions + let delta = ds + .delta() + .with_begin_date(&t1) + .await + .with_end_date(&t3_plus) + .await + .build() + .unwrap(); + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 2); + + // Build delta, begin=t2(exclusive v2), end=t3_plus(inclusive v3) -> expect to include only v3 transaction + let delta = ds + .delta() + .with_begin_date(&t2) + .await + .with_end_date(&t3_plus) + .await + .build() + .unwrap(); + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 1); + } +} \ No newline at end of file From 55b79d9a3a1fcb862e40ad94c683b58d8d6ed9dd Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 21:54:21 +0800 Subject: [PATCH 2/9] feat(cdf): support set start/end timestamp in cdf --- rust/lance/src/dataset/delta.rs | 46 +++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 2100acd7477..a105915705b 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1,12 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::rc::Rc; -use chrono::{DateTime, Utc}; use super::transaction::Transaction; use crate::dataset::scanner::DatasetRecordBatchStream; +use crate::dataset::Version; use crate::Dataset; use crate::Result; +use chrono::{DateTime, Utc}; use futures::stream::{self, StreamExt, TryStreamExt}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::Error; @@ -15,7 +15,7 @@ use lance_core::ROW_ID; use lance_core::ROW_LAST_UPDATED_AT_VERSION; use lance_core::WILDCARD; use snafu::location; -use crate::dataset::Version; +use std::rc::Rc; /// Builder for creating a [`DatasetDelta`] to explore changes between dataset versions. /// @@ -604,7 +604,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -656,7 +656,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -699,7 +699,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], None, ) - .await; + .await; let created_at = result[ROW_CREATED_AT_VERSION] .as_primitive::() @@ -835,7 +835,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version = 1"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); let created_at = result[ROW_CREATED_AT_VERSION] @@ -854,7 +854,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version = 2"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); let created_at = result[ROW_CREATED_AT_VERSION] @@ -873,7 +873,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION], Some("_row_created_at_version >= 2"), ) - .await; + .await; assert_eq!(result.num_rows(), 50); for i in 0..result.num_rows() { @@ -1044,7 +1044,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 1"), ) - .await; + .await; // Should have 40 rows (keys 0-19 and 30-49) assert_eq!(result.num_rows(), 40); @@ -1070,7 +1070,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 3"), ) - .await; + .await; // Should have 10 rows (keys 20-29) assert_eq!(result.num_rows(), 10); @@ -1095,7 +1095,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version = _row_last_updated_at_version"), ) - .await; + .await; // Should have 90 rows (40 from v1 that weren't updated + 50 from v2) assert_eq!(result.num_rows(), 90); @@ -1117,7 +1117,7 @@ mod tests { &["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION], Some("_row_created_at_version != _row_last_updated_at_version"), ) - .await; + .await; // Should have 10 rows (keys 20-29 that were updated) assert_eq!(result.num_rows(), 10); @@ -1152,7 +1152,7 @@ mod tests { &["key", "value", ROW_LAST_UPDATED_AT_VERSION], Some("key < 50 AND _row_last_updated_at_version = 2"), ) - .await; + .await; // Should have 20 rows (keys 30-49 that were updated in v2) assert_eq!(result.num_rows(), 20); @@ -1298,7 +1298,7 @@ mod tests { for i in 0..result.num_rows() { assert_eq!(created_at[i], 1); // Created at version 1 assert_eq!(updated_at[i], 2); // Updated at version 2 - // Keys should be in range [0, 30) but excluding [10, 20) + // Keys should be in range [0, 30) but excluding [10, 20) assert!(keys[i] < 30); assert!(keys[i] < 10 || keys[i] >= 20); } @@ -1357,25 +1357,31 @@ mod tests { // 版本1,时间 t=10s mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(10)); let mut ds = write_dataset_temp(&temp_dir, 0, 50, 1, "v1", true, false).await; - let t1 = chrono::DateTime::from_timestamp(10, 0).unwrap().to_rfc3339(); + let t1 = chrono::DateTime::from_timestamp(10, 0) + .unwrap() + .to_rfc3339(); assert_eq!(ds.version().version, 1); // Version 2, time t=20s (append) mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(20)); ds = write_dataset_temp(&temp_dir, 50, 10, 1, "v2_append", true, true).await; - let t2 = chrono::DateTime::from_timestamp(20, 0).unwrap().to_rfc3339(); + let t2 = chrono::DateTime::from_timestamp(20, 0) + .unwrap() + .to_rfc3339(); assert_eq!(ds.version().version, 2); // Version 3, time t=30s (update) mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(30)); ds = update_where(ds, "key >= 0 AND key < 10", "updated_v3").await; - let t3 = chrono::DateTime::from_timestamp(30, 0).unwrap().to_rfc3339(); + let t3 = chrono::DateTime::from_timestamp(30, 0) + .unwrap() + .to_rfc3339(); assert_eq!(ds.version().version, 3); // Note: with_end_date currently uses "< date" semantics. To include version t3, we set end to t3 + 1s let t3_plus = (chrono::DateTime::from_timestamp(30, 0).unwrap() + chrono::Duration::seconds(1)) - .to_rfc3339(); + .to_rfc3339(); // Build delta, begin=t1(exclusive v1), end=t3_plus(inclusive v3) -> expect to include both v2 and v3 transactions let delta = ds @@ -1401,4 +1407,4 @@ mod tests { let txs = delta.list_transactions().await.unwrap(); assert_eq!(txs.len(), 1); } -} \ No newline at end of file +} From c83f112110ba1e28acb04d606c1c5e0d35218e5e Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 21:57:02 +0800 Subject: [PATCH 3/9] feat(cdf): support set start/end timestamp in cdf --- rust/lance/src/dataset/delta.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index a105915705b..baa285794b7 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1351,10 +1351,9 @@ mod tests { #[tokio::test] async fn test_delta_build_with_date_range_transactions() { - // 使用 MockClock 控制各版本的时间戳 let temp_dir = lance_core::utils::tempfile::TempStrDir::default(); - // 版本1,时间 t=10s + // Version 2, time t=10s (append) mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(10)); let mut ds = write_dataset_temp(&temp_dir, 0, 50, 1, "v1", true, false).await; let t1 = chrono::DateTime::from_timestamp(10, 0) From 902193d671fca07e19e7a6f8077bb2c45b2a3909 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 21:57:14 +0800 Subject: [PATCH 4/9] feat(cdf): support set start/end timestamp in cdf --- rust/lance/src/dataset/delta.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index baa285794b7..aeb5c394da6 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1353,7 +1353,7 @@ mod tests { async fn test_delta_build_with_date_range_transactions() { let temp_dir = lance_core::utils::tempfile::TempStrDir::default(); - // Version 2, time t=10s (append) + // Version 1, time t=10s (append) mock_instant::thread_local::MockClock::set_system_time(std::time::Duration::from_secs(10)); let mut ds = write_dataset_temp(&temp_dir, 0, 50, 1, "v1", true, false).await; let t1 = chrono::DateTime::from_timestamp(10, 0) From dcc91401dae265bc5eb8524041daca8265aea8e0 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Mon, 1 Dec 2025 22:01:04 +0800 Subject: [PATCH 5/9] feat(cdf): support set start/end timestamp in cdf --- rust/lance/src/dataset.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 992bbd48be6..996a12eb9ab 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -184,7 +184,7 @@ impl std::fmt::Debug for Dataset { } /// Dataset Version -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] pub struct Version { /// version number pub version: u64, From caad0ea67f23703979d96a1f5975995d69d479ec Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 9 Dec 2025 19:14:39 +0800 Subject: [PATCH 6/9] reviewed --- rust/lance/src/dataset/delta.rs | 167 +++++++++++++++++++++++++++++--- 1 file changed, 153 insertions(+), 14 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index e0a4ee0a1ee..39c36287cad 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -5,6 +5,7 @@ use super::transaction::Transaction; use crate::dataset::scanner::DatasetRecordBatchStream; use crate::Dataset; use crate::Result; +use chrono::{DateTime, Utc}; use futures::stream::{self, StreamExt, TryStreamExt}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::Error; @@ -32,6 +33,12 @@ use snafu::location; /// .with_begin_version(3) /// .with_end_version(7) /// .build()?; +/// +/// // Or specify explicit time range +/// let delta = DatasetDeltaBuilder::new(dataset.clone()) +/// .with_begin_date(chrono::Utc::now()) +/// .with_end_date(chrono::Utc::now()) +/// .build()?; /// # Ok(()) /// # } /// ``` @@ -41,6 +48,8 @@ pub struct DatasetDeltaBuilder { compared_against_version: Option, begin_version: Option, end_version: Option, + begin_timestamp: Option>, + end_timestamp: Option>, } impl DatasetDeltaBuilder { @@ -51,6 +60,8 @@ impl DatasetDeltaBuilder { compared_against_version: None, begin_version: None, end_version: None, + begin_timestamp: None, + end_timestamp: None, } } @@ -81,6 +92,24 @@ impl DatasetDeltaBuilder { self } + /// Set the beginning timestamp for the delta (exclusive). + /// + /// Must be used together with `with_end_date`. + /// Cannot be used together with `compared_against_version` or explicit version range. + pub fn with_begin_date(mut self, timestamp: DateTime) -> Self { + self.begin_timestamp = Some(timestamp); + self + } + + /// Set the ending timestamp for the delta (inclusive). + /// + /// Must be used together with `with_begin_date`. + /// Cannot be used together with `compared_against_version` or explicit version range. + pub fn with_end_date(mut self, timestamp: DateTime) -> Self { + self.end_timestamp = Some(timestamp); + self + } + /// Build the [`DatasetDelta`]. /// /// # Errors @@ -90,35 +119,54 @@ impl DatasetDeltaBuilder { /// - Neither `compared_against_version` nor explicit version range are specified /// - Only one of `with_begin_version` or `with_end_version` is specified pub fn build(self) -> Result { - let (begin_version, end_version) = match ( + // Validate incompatible combinations + if self.compared_against_version.is_some() + && (self.begin_version.is_some() + || self.end_version.is_some() + || self.begin_timestamp.is_some() + || self.end_timestamp.is_some()) + { + return Err(Error::invalid_input( + "Cannot combine compared_against_version with explicit begin/end versions or dates", + location!(), + )); + } + + // Resolve parameters and construct DatasetDelta. For date ranges, defer mapping to versions. + let (begin_version, end_version, begin_ts, end_ts) = match ( self.compared_against_version, self.begin_version, self.end_version, + self.begin_timestamp, + self.end_timestamp, ) { - (Some(compared), None, None) => { + (Some(compared), None, None, None, None) => { let current_version = self.dataset.version().version; if current_version > compared { - (compared, current_version) + (compared, current_version, None, None) } else { - (current_version, compared) + (current_version, compared, None, None) } } - (None, Some(begin), Some(end)) => (begin, end), - (Some(_), Some(_), _) | (Some(_), _, Some(_)) => { + (None, Some(begin), Some(end), None, None) => (begin, end, None, None), + (None, None, None, Some(begin_ts), Some(end_ts)) => { + (0, 0, Some(begin_ts), Some(end_ts)) + } + (None, Some(_), None, None, None) | (None, None, Some(_), None, None) => { return Err(Error::invalid_input( - "Cannot specify both compared_against_version and explicit begin/end versions", + "Must specify both with_begin_version and with_end_version", location!(), )); } - (None, Some(_), None) | (None, None, Some(_)) => { + (None, None, None, Some(_), None) | (None, None, None, None, Some(_)) => { return Err(Error::invalid_input( - "Must specify both with_begin_version and with_end_version", + "Must specify both with_begin_date and with_end_date", location!(), )); } - (None, None, None) => { + _ => { return Err(Error::invalid_input( - "Must specify either compared_against_version or both with_begin_version and with_end_version", + "Invalid combination of parameters for DatasetDeltaBuilder", location!(), )); } @@ -128,6 +176,8 @@ impl DatasetDeltaBuilder { begin_version, end_version, base_dataset: self.dataset, + begin_timestamp: begin_ts, + end_timestamp: end_ts, }) } } @@ -140,12 +190,47 @@ pub struct DatasetDelta { pub(crate) end_version: u64, /// The Lance dataset to compute delta pub(crate) base_dataset: Dataset, + pub(crate) begin_timestamp: Option>, + pub(crate) end_timestamp: Option>, } impl DatasetDelta { + /// Resolve the effective version range for this delta. + /// + /// If a date window is set (`begin_timestamp` and `end_timestamp` provided), this lazily + /// maps timestamps to version ids by scanning dataset versions: + /// - Begin is exclusive: pick the greatest version with `timestamp < begin_timestamp`. + /// - End is inclusive: pick the greatest version with `timestamp <= end_timestamp`. + /// + /// If no date window is set, returns the explicit `begin_version`/`end_version` stored on + /// the struct. + async fn resolve_range(&self) -> Result<(u64, u64)> { + if let (Some(begin_ts), Some(end_ts)) = (self.begin_timestamp, self.end_timestamp) { + // Load all dataset versions and fold them to a version interval matching the date window + let versions = self.base_dataset.versions().await?; + let mut begin_version: u64 = 0; + let mut end_version: u64 = 0; + for v in &versions { + // Exclusive begin: track the largest version strictly before begin_ts + if v.timestamp < begin_ts && v.version > begin_version { + begin_version = v.version; + } + // Inclusive end: track the largest version at or before end_ts + if v.timestamp <= end_ts && v.version > end_version { + end_version = v.version; + } + } + Ok((begin_version, end_version)) + } else { + // No date window: use the pre-resolved version interval + Ok((self.begin_version, self.end_version)) + } + } + /// Listing the transactions between two versions. pub async fn list_transactions(&self) -> Result> { - stream::iter((self.begin_version + 1)..=self.end_version) + let (begin_version, end_version) = self.resolve_range().await?; + stream::iter((begin_version + 1)..=end_version) .map(|version| { let base_dataset = self.base_dataset.clone(); async move { @@ -216,9 +301,10 @@ impl DatasetDelta { ])?; // Filter for rows created in the version range + let (begin_version, end_version) = self.resolve_range().await?; let filter = format!( "_row_created_at_version > {} AND _row_created_at_version <= {}", - self.begin_version, self.end_version + begin_version, end_version ); scanner.filter(&filter)?; @@ -269,9 +355,10 @@ impl DatasetDelta { ])?; // Filter for rows that were updated (not inserted) in the version range + let (begin_version, end_version) = self.resolve_range().await?; let filter = format!( "_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", - self.begin_version, self.begin_version, self.end_version + begin_version, begin_version, end_version ); scanner.filter(&filter)?; @@ -1299,4 +1386,56 @@ mod tests { assert_eq!(created_at[i], 1); // All created at version 1 } } + + #[tokio::test] + async fn test_build_with_date_window_basic() { + MockClock::set_system_time(std::time::Duration::from_secs(10)); + let ds = create_test_dataset(50, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(20)); + let ds = update_where(ds, "key < 10", "v2").await; + assert_eq!(ds.version().version, 2); + + MockClock::set_system_time(std::time::Duration::from_secs(30)); + let ds = update_where(ds, "key >= 10 AND key < 20", "v3").await; + assert_eq!(ds.version().version, 3); + + let begin_ts = chrono::DateTime::::from_timestamp(15, 0).unwrap(); + let end_ts = chrono::DateTime::::from_timestamp(25, 0).unwrap(); + + let delta = ds + .delta() + .with_begin_date(begin_ts) + .with_end_date(end_ts) + .build() + .unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 1); + } + + #[tokio::test] + async fn test_build_with_date_window_edges() { + MockClock::set_system_time(std::time::Duration::from_secs(100)); + let ds = create_test_dataset(10, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(200)); + let ds = update_where(ds, "key < 5", "v2").await; + assert_eq!(ds.version().version, 2); + + let begin_ts = chrono::DateTime::::from_timestamp(50, 0).unwrap(); + let end_ts = chrono::DateTime::::from_timestamp(250, 0).unwrap(); + + let delta = ds + .delta() + .with_begin_date(begin_ts) + .with_end_date(end_ts) + .build() + .unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 2); + } } From 747e4ac1e54c410677ff13e8f5a63751c22667be Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 9 Dec 2025 19:20:59 +0800 Subject: [PATCH 7/9] set latest version as default --- rust/lance/src/dataset/delta.rs | 43 +++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 39c36287cad..01c563f26ac 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -158,9 +158,10 @@ impl DatasetDeltaBuilder { location!(), )); } - (None, None, None, Some(_), None) | (None, None, None, None, Some(_)) => { + (None, None, None, Some(begin_ts), None) => (0, 0, Some(begin_ts), None), + (None, None, None, None, Some(_)) => { return Err(Error::invalid_input( - "Must specify both with_begin_date and with_end_date", + "Must specify with_begin_date when with_end_date is provided", location!(), )); } @@ -221,6 +222,17 @@ impl DatasetDelta { } } Ok((begin_version, end_version)) + } else if let (Some(begin_ts), None) = (self.begin_timestamp, self.end_timestamp) { + // Open-ended range: use latest version as end + let versions = self.base_dataset.versions().await?; + let mut begin_version: u64 = 0; + for v in &versions { + if v.timestamp < begin_ts && v.version > begin_version { + begin_version = v.version; + } + } + let end_version = self.base_dataset.latest_version_id().await?; + Ok((begin_version, end_version)) } else { // No date window: use the pre-resolved version interval Ok((self.begin_version, self.end_version)) @@ -1438,4 +1450,31 @@ mod tests { let txs = delta.list_transactions().await.unwrap(); assert_eq!(txs.len(), 2); } + + #[tokio::test] + async fn test_build_with_date_open_end_uses_latest() { + MockClock::set_system_time(std::time::Duration::from_secs(10)); + let ds = create_test_dataset(20, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(20)); + let ds = update_where(ds, "key < 5", "v2").await; + assert_eq!(ds.version().version, 2); + + MockClock::set_system_time(std::time::Duration::from_secs(30)); + let ds = update_where(ds, "key >= 5 AND key < 10", "v3").await; + assert_eq!(ds.version().version, 3); + + let begin_ts = chrono::DateTime::::from_timestamp(15, 0).unwrap(); + + let delta = ds + .delta() + .with_begin_date(begin_ts) + .build() + .unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + // Should include transactions at v2 and v3 + assert_eq!(txs.len(), 2); + } } From 02e31bece8c051d794211c6e3077cfb6d719c5ae Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 9 Dec 2025 19:22:31 +0800 Subject: [PATCH 8/9] set latest version as default --- rust/lance/src/dataset/delta.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 01c563f26ac..9d57c4c49d6 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -1467,11 +1467,7 @@ mod tests { let begin_ts = chrono::DateTime::::from_timestamp(15, 0).unwrap(); - let delta = ds - .delta() - .with_begin_date(begin_ts) - .build() - .unwrap(); + let delta = ds.delta().with_begin_date(begin_ts).build().unwrap(); let txs = delta.list_transactions().await.unwrap(); // Should include transactions at v2 and v3 From db0848c594a919b817afbe6536bda8aac5e7b091 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 9 Dec 2025 19:59:13 +0800 Subject: [PATCH 9/9] fix ut --- rust/lance/src/dataset/delta.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index 9d57c4c49d6..d14e6d55869 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -165,6 +165,12 @@ impl DatasetDeltaBuilder { location!(), )); } + (None, None, None, None, None) => { + return Err(Error::invalid_input( + "Must specify either compared_against_version or both with_begin_version and with_end_version", + location!(), + )); + } _ => { return Err(Error::invalid_input( "Invalid combination of parameters for DatasetDeltaBuilder",