Skip to content
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl std::fmt::Debug for Dataset {
}

/// Dataset Version
#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]
pub struct Version {
/// version number
pub version: u64,
Expand Down
206 changes: 193 additions & 13 deletions rust/lance/src/dataset/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::transaction::Transaction;
use crate::dataset::scanner::DatasetRecordBatchStream;
use crate::Dataset;
use crate::Result;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt, TryStreamExt};
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::Error;
Expand Down Expand Up @@ -32,6 +33,12 @@ use snafu::location;
/// .with_begin_version(3)
/// .with_end_version(7)
/// .build()?;
///
/// // Or specify explicit time range
/// let delta = DatasetDeltaBuilder::new(dataset.clone())
/// .with_begin_date(chrono::Utc::now())
/// .with_end_date(chrono::Utc::now())
/// .build()?;
/// # Ok(())
/// # }
/// ```
Expand All @@ -41,6 +48,8 @@ pub struct DatasetDeltaBuilder {
compared_against_version: Option<u64>,
begin_version: Option<u64>,
end_version: Option<u64>,
begin_timestamp: Option<DateTime<Utc>>,
end_timestamp: Option<DateTime<Utc>>,
}

impl DatasetDeltaBuilder {
Expand All @@ -51,6 +60,8 @@ impl DatasetDeltaBuilder {
compared_against_version: None,
begin_version: None,
end_version: None,
begin_timestamp: None,
end_timestamp: None,
}
}

Expand Down Expand Up @@ -81,6 +92,24 @@ impl DatasetDeltaBuilder {
self
}

/// Set the beginning timestamp for the delta (exclusive).
///
/// Must be used together with `with_end_date`.
/// Cannot be used together with `compared_against_version` or explicit version range.
pub fn with_begin_date(mut self, timestamp: DateTime<Utc>) -> Self {
self.begin_timestamp = Some(timestamp);
self
}

/// Set the ending timestamp for the delta (inclusive).
///
/// Must be used together with `with_begin_date`.
/// Cannot be used together with `compared_against_version` or explicit version range.
pub fn with_end_date(mut self, timestamp: DateTime<Utc>) -> Self {
self.end_timestamp = Some(timestamp);
self
}

/// Build the [`DatasetDelta`].
///
/// # Errors
Expand All @@ -90,44 +119,72 @@ impl DatasetDeltaBuilder {
/// - Neither `compared_against_version` nor explicit version range are specified
/// - Only one of `with_begin_version` or `with_end_version` is specified
pub fn build(self) -> Result<DatasetDelta> {
let (begin_version, end_version) = match (
// Validate incompatible combinations
if self.compared_against_version.is_some()
&& (self.begin_version.is_some()
|| self.end_version.is_some()
|| self.begin_timestamp.is_some()
|| self.end_timestamp.is_some())
{
return Err(Error::invalid_input(
"Cannot combine compared_against_version with explicit begin/end versions or dates",
location!(),
));
}

// Resolve parameters and construct DatasetDelta. For date ranges, defer mapping to versions.
let (begin_version, end_version, begin_ts, end_ts) = match (
self.compared_against_version,
self.begin_version,
self.end_version,
self.begin_timestamp,
self.end_timestamp,
) {
(Some(compared), None, None) => {
(Some(compared), None, None, None, None) => {
let current_version = self.dataset.version().version;
if current_version > compared {
(compared, current_version)
(compared, current_version, None, None)
} else {
(current_version, compared)
(current_version, compared, None, None)
}
}
(None, Some(begin), Some(end)) => (begin, end),
(Some(_), Some(_), _) | (Some(_), _, Some(_)) => {
(None, Some(begin), Some(end), None, None) => (begin, end, None, None),
(None, None, None, Some(begin_ts), Some(end_ts)) => {
(0, 0, Some(begin_ts), Some(end_ts))
}
(None, Some(_), None, None, None) | (None, None, Some(_), None, None) => {
return Err(Error::invalid_input(
"Cannot specify both compared_against_version and explicit begin/end versions",
"Must specify both with_begin_version and with_end_version",
location!(),
));
}
(None, Some(_), None) | (None, None, Some(_)) => {
(None, None, None, Some(begin_ts), None) => (0, 0, Some(begin_ts), None),
(None, None, None, None, Some(_)) => {
return Err(Error::invalid_input(
"Must specify both with_begin_version and with_end_version",
"Must specify with_begin_date when with_end_date is provided",
location!(),
));
}
(None, None, None) => {
(None, None, None, None, None) => {
return Err(Error::invalid_input(
"Must specify either compared_against_version or both with_begin_version and with_end_version",
location!(),
));
}
_ => {
return Err(Error::invalid_input(
"Invalid combination of parameters for DatasetDeltaBuilder",
location!(),
));
}
};

Ok(DatasetDelta {
begin_version,
end_version,
base_dataset: self.dataset,
begin_timestamp: begin_ts,
end_timestamp: end_ts,
})
}
}
Expand All @@ -140,12 +197,58 @@ pub struct DatasetDelta {
pub(crate) end_version: u64,
/// The Lance dataset to compute delta
pub(crate) base_dataset: Dataset,
pub(crate) begin_timestamp: Option<DateTime<Utc>>,
pub(crate) end_timestamp: Option<DateTime<Utc>>,
}

impl DatasetDelta {
/// Resolve the effective version range for this delta.
///
/// If a date window is set (`begin_timestamp` and `end_timestamp` provided), this lazily
/// maps timestamps to version ids by scanning dataset versions:
/// - Begin is exclusive: pick the greatest version with `timestamp < begin_timestamp`.
/// - End is inclusive: pick the greatest version with `timestamp <= end_timestamp`.
///
/// If no date window is set, returns the explicit `begin_version`/`end_version` stored on
/// the struct.
async fn resolve_range(&self) -> Result<(u64, u64)> {
if let (Some(begin_ts), Some(end_ts)) = (self.begin_timestamp, self.end_timestamp) {
// Load all dataset versions and fold them to a version interval matching the date window
let versions = self.base_dataset.versions().await?;
let mut begin_version: u64 = 0;
let mut end_version: u64 = 0;
for v in &versions {
// Exclusive begin: track the largest version strictly before begin_ts
if v.timestamp < begin_ts && v.version > begin_version {
begin_version = v.version;
}
// Inclusive end: track the largest version at or before end_ts
if v.timestamp <= end_ts && v.version > end_version {
end_version = v.version;
}
}
Ok((begin_version, end_version))
} else if let (Some(begin_ts), None) = (self.begin_timestamp, self.end_timestamp) {
// Open-ended range: use latest version as end
let versions = self.base_dataset.versions().await?;
let mut begin_version: u64 = 0;
for v in &versions {
if v.timestamp < begin_ts && v.version > begin_version {
begin_version = v.version;
}
}
let end_version = self.base_dataset.latest_version_id().await?;
Ok((begin_version, end_version))
} else {
// No date window: use the pre-resolved version interval
Ok((self.begin_version, self.end_version))
}
}

/// Listing the transactions between two versions.
pub async fn list_transactions(&self) -> Result<Vec<Transaction>> {
stream::iter((self.begin_version + 1)..=self.end_version)
let (begin_version, end_version) = self.resolve_range().await?;
stream::iter((begin_version + 1)..=end_version)
.map(|version| {
let base_dataset = self.base_dataset.clone();
async move {
Expand Down Expand Up @@ -216,9 +319,10 @@ impl DatasetDelta {
])?;

// Filter for rows created in the version range
let (begin_version, end_version) = self.resolve_range().await?;
let filter = format!(
"_row_created_at_version > {} AND _row_created_at_version <= {}",
self.begin_version, self.end_version
begin_version, end_version
);
scanner.filter(&filter)?;

Expand Down Expand Up @@ -269,9 +373,10 @@ impl DatasetDelta {
])?;

// Filter for rows that were updated (not inserted) in the version range
let (begin_version, end_version) = self.resolve_range().await?;
let filter = format!(
"_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
self.begin_version, self.begin_version, self.end_version
begin_version, begin_version, end_version
);
scanner.filter(&filter)?;

Expand Down Expand Up @@ -1299,4 +1404,79 @@ mod tests {
assert_eq!(created_at[i], 1); // All created at version 1
}
}

#[tokio::test]
async fn test_build_with_date_window_basic() {
MockClock::set_system_time(std::time::Duration::from_secs(10));
let ds = create_test_dataset(50, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);

MockClock::set_system_time(std::time::Duration::from_secs(20));
let ds = update_where(ds, "key < 10", "v2").await;
assert_eq!(ds.version().version, 2);

MockClock::set_system_time(std::time::Duration::from_secs(30));
let ds = update_where(ds, "key >= 10 AND key < 20", "v3").await;
assert_eq!(ds.version().version, 3);

let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(15, 0).unwrap();
let end_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(25, 0).unwrap();

let delta = ds
.delta()
.with_begin_date(begin_ts)
.with_end_date(end_ts)
.build()
.unwrap();

let txs = delta.list_transactions().await.unwrap();
assert_eq!(txs.len(), 1);
}

#[tokio::test]
async fn test_build_with_date_window_edges() {
MockClock::set_system_time(std::time::Duration::from_secs(100));
let ds = create_test_dataset(10, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);

MockClock::set_system_time(std::time::Duration::from_secs(200));
let ds = update_where(ds, "key < 5", "v2").await;
assert_eq!(ds.version().version, 2);

let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(50, 0).unwrap();
let end_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(250, 0).unwrap();

let delta = ds
.delta()
.with_begin_date(begin_ts)
.with_end_date(end_ts)
.build()
.unwrap();

let txs = delta.list_transactions().await.unwrap();
assert_eq!(txs.len(), 2);
}

#[tokio::test]
async fn test_build_with_date_open_end_uses_latest() {
MockClock::set_system_time(std::time::Duration::from_secs(10));
let ds = create_test_dataset(20, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);

MockClock::set_system_time(std::time::Duration::from_secs(20));
let ds = update_where(ds, "key < 5", "v2").await;
assert_eq!(ds.version().version, 2);

MockClock::set_system_time(std::time::Duration::from_secs(30));
let ds = update_where(ds, "key >= 5 AND key < 10", "v3").await;
assert_eq!(ds.version().version, 3);

let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(15, 0).unwrap();

let delta = ds.delta().with_begin_date(begin_ts).build().unwrap();

let txs = delta.list_transactions().await.unwrap();
// Should include transactions at v2 and v3
assert_eq!(txs.len(), 2);
}
}