diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 722ba7c97e1..0dfd263e775 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -184,7 +184,7 @@ impl std::fmt::Debug for Dataset { } /// Dataset Version -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] pub struct Version { /// version number pub version: u64, diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index e0a4ee0a1ee..d14e6d55869 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -5,6 +5,7 @@ use super::transaction::Transaction; use crate::dataset::scanner::DatasetRecordBatchStream; use crate::Dataset; use crate::Result; +use chrono::{DateTime, Utc}; use futures::stream::{self, StreamExt, TryStreamExt}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::Error; @@ -32,6 +33,12 @@ use snafu::location; /// .with_begin_version(3) /// .with_end_version(7) /// .build()?; +/// +/// // Or specify explicit time range +/// let delta = DatasetDeltaBuilder::new(dataset.clone()) +/// .with_begin_date(chrono::Utc::now()) +/// .with_end_date(chrono::Utc::now()) +/// .build()?; /// # Ok(()) /// # } /// ``` @@ -41,6 +48,8 @@ pub struct DatasetDeltaBuilder { compared_against_version: Option, begin_version: Option, end_version: Option, + begin_timestamp: Option>, + end_timestamp: Option>, } impl DatasetDeltaBuilder { @@ -51,6 +60,8 @@ impl DatasetDeltaBuilder { compared_against_version: None, begin_version: None, end_version: None, + begin_timestamp: None, + end_timestamp: None, } } @@ -81,6 +92,24 @@ impl DatasetDeltaBuilder { self } + /// Set the beginning timestamp for the delta (exclusive). + /// + /// Must be used together with `with_end_date`. + /// Cannot be used together with `compared_against_version` or explicit version range. + pub fn with_begin_date(mut self, timestamp: DateTime) -> Self { + self.begin_timestamp = Some(timestamp); + self + } + + /// Set the ending timestamp for the delta (inclusive). + /// + /// Must be used together with `with_begin_date`. + /// Cannot be used together with `compared_against_version` or explicit version range. + pub fn with_end_date(mut self, timestamp: DateTime) -> Self { + self.end_timestamp = Some(timestamp); + self + } + /// Build the [`DatasetDelta`]. /// /// # Errors @@ -90,44 +119,72 @@ impl DatasetDeltaBuilder { /// - Neither `compared_against_version` nor explicit version range are specified /// - Only one of `with_begin_version` or `with_end_version` is specified pub fn build(self) -> Result { - let (begin_version, end_version) = match ( + // Validate incompatible combinations + if self.compared_against_version.is_some() + && (self.begin_version.is_some() + || self.end_version.is_some() + || self.begin_timestamp.is_some() + || self.end_timestamp.is_some()) + { + return Err(Error::invalid_input( + "Cannot combine compared_against_version with explicit begin/end versions or dates", + location!(), + )); + } + + // Resolve parameters and construct DatasetDelta. For date ranges, defer mapping to versions. + let (begin_version, end_version, begin_ts, end_ts) = match ( self.compared_against_version, self.begin_version, self.end_version, + self.begin_timestamp, + self.end_timestamp, ) { - (Some(compared), None, None) => { + (Some(compared), None, None, None, None) => { let current_version = self.dataset.version().version; if current_version > compared { - (compared, current_version) + (compared, current_version, None, None) } else { - (current_version, compared) + (current_version, compared, None, None) } } - (None, Some(begin), Some(end)) => (begin, end), - (Some(_), Some(_), _) | (Some(_), _, Some(_)) => { + (None, Some(begin), Some(end), None, None) => (begin, end, None, None), + (None, None, None, Some(begin_ts), Some(end_ts)) => { + (0, 0, Some(begin_ts), Some(end_ts)) + } + (None, Some(_), None, None, None) | (None, None, Some(_), None, None) => { return Err(Error::invalid_input( - "Cannot specify both compared_against_version and explicit begin/end versions", + "Must specify both with_begin_version and with_end_version", location!(), )); } - (None, Some(_), None) | (None, None, Some(_)) => { + (None, None, None, Some(begin_ts), None) => (0, 0, Some(begin_ts), None), + (None, None, None, None, Some(_)) => { return Err(Error::invalid_input( - "Must specify both with_begin_version and with_end_version", + "Must specify with_begin_date when with_end_date is provided", location!(), )); } - (None, None, None) => { + (None, None, None, None, None) => { return Err(Error::invalid_input( "Must specify either compared_against_version or both with_begin_version and with_end_version", location!(), )); } + _ => { + return Err(Error::invalid_input( + "Invalid combination of parameters for DatasetDeltaBuilder", + location!(), + )); + } }; Ok(DatasetDelta { begin_version, end_version, base_dataset: self.dataset, + begin_timestamp: begin_ts, + end_timestamp: end_ts, }) } } @@ -140,12 +197,58 @@ pub struct DatasetDelta { pub(crate) end_version: u64, /// The Lance dataset to compute delta pub(crate) base_dataset: Dataset, + pub(crate) begin_timestamp: Option>, + pub(crate) end_timestamp: Option>, } impl DatasetDelta { + /// Resolve the effective version range for this delta. + /// + /// If a date window is set (`begin_timestamp` and `end_timestamp` provided), this lazily + /// maps timestamps to version ids by scanning dataset versions: + /// - Begin is exclusive: pick the greatest version with `timestamp < begin_timestamp`. + /// - End is inclusive: pick the greatest version with `timestamp <= end_timestamp`. + /// + /// If no date window is set, returns the explicit `begin_version`/`end_version` stored on + /// the struct. + async fn resolve_range(&self) -> Result<(u64, u64)> { + if let (Some(begin_ts), Some(end_ts)) = (self.begin_timestamp, self.end_timestamp) { + // Load all dataset versions and fold them to a version interval matching the date window + let versions = self.base_dataset.versions().await?; + let mut begin_version: u64 = 0; + let mut end_version: u64 = 0; + for v in &versions { + // Exclusive begin: track the largest version strictly before begin_ts + if v.timestamp < begin_ts && v.version > begin_version { + begin_version = v.version; + } + // Inclusive end: track the largest version at or before end_ts + if v.timestamp <= end_ts && v.version > end_version { + end_version = v.version; + } + } + Ok((begin_version, end_version)) + } else if let (Some(begin_ts), None) = (self.begin_timestamp, self.end_timestamp) { + // Open-ended range: use latest version as end + let versions = self.base_dataset.versions().await?; + let mut begin_version: u64 = 0; + for v in &versions { + if v.timestamp < begin_ts && v.version > begin_version { + begin_version = v.version; + } + } + let end_version = self.base_dataset.latest_version_id().await?; + Ok((begin_version, end_version)) + } else { + // No date window: use the pre-resolved version interval + Ok((self.begin_version, self.end_version)) + } + } + /// Listing the transactions between two versions. pub async fn list_transactions(&self) -> Result> { - stream::iter((self.begin_version + 1)..=self.end_version) + let (begin_version, end_version) = self.resolve_range().await?; + stream::iter((begin_version + 1)..=end_version) .map(|version| { let base_dataset = self.base_dataset.clone(); async move { @@ -216,9 +319,10 @@ impl DatasetDelta { ])?; // Filter for rows created in the version range + let (begin_version, end_version) = self.resolve_range().await?; let filter = format!( "_row_created_at_version > {} AND _row_created_at_version <= {}", - self.begin_version, self.end_version + begin_version, end_version ); scanner.filter(&filter)?; @@ -269,9 +373,10 @@ impl DatasetDelta { ])?; // Filter for rows that were updated (not inserted) in the version range + let (begin_version, end_version) = self.resolve_range().await?; let filter = format!( "_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", - self.begin_version, self.begin_version, self.end_version + begin_version, begin_version, end_version ); scanner.filter(&filter)?; @@ -1299,4 +1404,79 @@ mod tests { assert_eq!(created_at[i], 1); // All created at version 1 } } + + #[tokio::test] + async fn test_build_with_date_window_basic() { + MockClock::set_system_time(std::time::Duration::from_secs(10)); + let ds = create_test_dataset(50, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(20)); + let ds = update_where(ds, "key < 10", "v2").await; + assert_eq!(ds.version().version, 2); + + MockClock::set_system_time(std::time::Duration::from_secs(30)); + let ds = update_where(ds, "key >= 10 AND key < 20", "v3").await; + assert_eq!(ds.version().version, 3); + + let begin_ts = chrono::DateTime::::from_timestamp(15, 0).unwrap(); + let end_ts = chrono::DateTime::::from_timestamp(25, 0).unwrap(); + + let delta = ds + .delta() + .with_begin_date(begin_ts) + .with_end_date(end_ts) + .build() + .unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 1); + } + + #[tokio::test] + async fn test_build_with_date_window_edges() { + MockClock::set_system_time(std::time::Duration::from_secs(100)); + let ds = create_test_dataset(10, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(200)); + let ds = update_where(ds, "key < 5", "v2").await; + assert_eq!(ds.version().version, 2); + + let begin_ts = chrono::DateTime::::from_timestamp(50, 0).unwrap(); + let end_ts = chrono::DateTime::::from_timestamp(250, 0).unwrap(); + + let delta = ds + .delta() + .with_begin_date(begin_ts) + .with_end_date(end_ts) + .build() + .unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + assert_eq!(txs.len(), 2); + } + + #[tokio::test] + async fn test_build_with_date_open_end_uses_latest() { + MockClock::set_system_time(std::time::Duration::from_secs(10)); + let ds = create_test_dataset(20, 1, "v1", true).await; + assert_eq!(ds.version().version, 1); + + MockClock::set_system_time(std::time::Duration::from_secs(20)); + let ds = update_where(ds, "key < 5", "v2").await; + assert_eq!(ds.version().version, 2); + + MockClock::set_system_time(std::time::Duration::from_secs(30)); + let ds = update_where(ds, "key >= 5 AND key < 10", "v3").await; + assert_eq!(ds.version().version, 3); + + let begin_ts = chrono::DateTime::::from_timestamp(15, 0).unwrap(); + + let delta = ds.delta().with_begin_date(begin_ts).build().unwrap(); + + let txs = delta.list_transactions().await.unwrap(); + // Should include transactions at v2 and v3 + assert_eq!(txs.len(), 2); + } }