diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 5da8bae9e0a..3330010722f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2021,7 +2021,7 @@ def delete( *, conflict_retries: int = 10, retry_timeout: timedelta = timedelta(seconds=30), - ): + ) -> DeleteResult: """ Delete rows from the dataset. @@ -2042,6 +2042,12 @@ def delete( regardless of how long it takes to complete. Subsequent attempts will be cancelled once this timeout is reached. Default is 30 seconds. + Returns + ------- + dict + A dictionary containing the number of rows deleted, with the key + ``num_deleted_rows``. + Examples -------- >>> import lance @@ -2049,17 +2055,11 @@ def delete( >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") - >>> dataset.to_table() - pyarrow.Table - a: int64 - b: string - ---- - a: [[3]] - b: [["c"]] + {'num_deleted_rows': 2} """ if isinstance(predicate, pa.compute.Expression): predicate = str(predicate) - self._ds.delete(predicate, conflict_retries, retry_timeout) + return self._ds.delete(predicate, conflict_retries, retry_timeout) def truncate_table(self) -> None: """ @@ -4172,6 +4172,10 @@ class UpdateResult(TypedDict): num_rows_updated: int +class DeleteResult(TypedDict): + num_deleted_rows: int + + class AlterColumn(TypedDict): path: str name: Optional[str] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index acb207f3f15..be1f70eb8b2 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1392,10 +1392,11 @@ impl Dataset { #[pyo3(signature=(predicate, conflict_retries=None, retry_timeout=None))] fn delete( &mut self, + py: Python<'_>, predicate: String, conflict_retries: Option, retry_timeout: Option, - ) -> PyResult<()> { + ) -> PyResult> { let mut builder = DeleteBuilder::new(self.ds.clone(), predicate); if let Some(retries) = conflict_retries { @@ -1406,11 +1407,13 @@ impl Dataset { builder = builder.retry_timeout(timeout); } - let new_dataset = rt() + let result = rt() .block_on(None, builder.execute())? .map_err(|err| PyIOError::new_err(err.to_string()))?; - self.ds = new_dataset; - Ok(()) + self.ds = result.new_dataset; + let dict = PyDict::new(py); + dict.set_item("num_deleted_rows", result.num_deleted_rows)?; + Ok(dict.into()) } #[pyo3(signature=(updates, predicate=None, conflict_retries=None, retry_timeout=None))] diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ec8ee0792a4..91c4150d485 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -131,7 +131,7 @@ use crate::dataset::index::LanceIndexStoreExt; pub use write::update::{UpdateBuilder, UpdateJob}; #[allow(deprecated)] pub use write::{ - write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, InsertBuilder, + write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, DeleteResult, InsertBuilder, WriteDestination, WriteMode, WriteParams, }; @@ -1550,14 +1550,14 @@ impl Dataset { } /// Delete rows based on a predicate. - pub async fn delete(&mut self, predicate: &str) -> Result<()> { + pub async fn delete(&mut self, predicate: &str) -> Result { info!(target: TRACE_DATASET_EVENTS, event=DATASET_DELETING_EVENT, uri = &self.uri, predicate=predicate); write::delete::delete(self, predicate).await } /// Truncate the dataset by deleting all rows. pub async fn truncate_table(&mut self) -> Result<()> { - self.delete("true").await + self.delete("true").await.map(|_| ()) } /// Add new base paths to the dataset. diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 2262ee20abf..942848375f0 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -52,7 +52,7 @@ mod retry; pub mod update; pub use commit::CommitBuilder; -pub use delete::DeleteBuilder; +pub use delete::{DeleteBuilder, DeleteResult}; pub use insert::InsertBuilder; /// The destination to write data to. diff --git a/rust/lance/src/dataset/write/delete.rs b/rust/lance/src/dataset/write/delete.rs index 66e1da10d4b..7e5733db855 100644 --- a/rust/lance/src/dataset/write/delete.rs +++ b/rust/lance/src/dataset/write/delete.rs @@ -22,6 +22,15 @@ use std::time::Duration; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; use super::CommitBuilder; +/// Result of a delete operation. +#[derive(Debug, Clone)] +pub struct DeleteResult { + /// The new dataset after the delete operation. + pub new_dataset: Arc, + /// The number of rows that were deleted. + pub num_deleted_rows: u64, +} + /// Apply deletions to fragments based on a RoaringTreemap of row IDs. /// /// Returns the set of modified fragments and removed fragments, if any. @@ -84,10 +93,11 @@ async fn apply_deletions( /// # use lance::dataset::DeleteBuilder; /// # use std::sync::Arc; /// # async fn example(dataset: Arc) -> Result<()> { -/// let new_dataset = DeleteBuilder::new(dataset, "age > 65") +/// let result = DeleteBuilder::new(dataset, "age > 65") /// .conflict_retries(5) /// .execute() /// .await?; +/// println!("Deleted {} rows", result.num_deleted_rows); /// # Ok(()) /// # } /// ``` @@ -124,7 +134,7 @@ impl DeleteBuilder { } /// Execute the delete operation - pub async fn execute(self) -> Result> { + pub async fn execute(self) -> Result { let job = DeleteJob { dataset: self.dataset.clone(), predicate: self.predicate, @@ -151,11 +161,12 @@ struct DeleteData { updated_fragments: Vec, deleted_fragment_ids: Vec, affected_rows: Option, + num_deleted_rows: u64, } impl RetryExecutor for DeleteJob { type Data = DeleteData; - type Result = Arc; + type Result = DeleteResult; async fn execute_impl(&self) -> Result { // Create a single scanner for the entire dataset @@ -166,69 +177,78 @@ impl RetryExecutor for DeleteJob { .filter(&self.predicate)?; // Check if the filter optimized to true (delete everything) or false (delete nothing) - let (updated_fragments, deleted_fragment_ids, affected_rows) = if let Some(filter_expr) = - scanner.get_expr_filter()? - { - if matches!( - filter_expr, - Expr::Literal(ScalarValue::Boolean(Some(false)), _) - ) { - // Predicate evaluated to false - no deletions - (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new())) - } else if matches!( - filter_expr, - Expr::Literal(ScalarValue::Boolean(Some(true)), _) - ) { - // Predicate evaluated to true - delete all fragments - let deleted_fragment_ids = self - .dataset - .get_fragments() - .iter() - .map(|f| f.id() as u64) - .collect(); - - // When deleting everything, we don't have specific row addresses, - // so better not to emit affected rows. - (Vec::new(), deleted_fragment_ids, None) - } else { - // Regular predicate - scan and collect row addresses to delete - let stream = scanner.try_into_stream().await?.into(); - let (stream, row_id_rx) = - make_rowid_capture_stream(stream, self.dataset.manifest.uses_stable_row_ids())?; - - // Process the stream to capture row addresses - // We need to consume the stream to trigger the capture - futures::pin_mut!(stream); - while let Some(_batch) = stream.try_next().await? { - // The row addresses are captured automatically by make_rowid_capture_stream - } + let (updated_fragments, deleted_fragment_ids, affected_rows, num_deleted_rows) = + if let Some(filter_expr) = scanner.get_expr_filter()? { + if matches!( + filter_expr, + Expr::Literal(ScalarValue::Boolean(Some(false)), _) + ) { + // Predicate evaluated to false - no deletions + (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new()), 0) + } else if matches!( + filter_expr, + Expr::Literal(ScalarValue::Boolean(Some(true)), _) + ) { + // Predicate evaluated to true - delete all fragments + let fragments = self.dataset.get_fragments(); + let num_deleted_rows: u64 = fragments + .iter() + .map(|f| f.metadata.num_rows().unwrap_or(0) as u64) + .sum(); + let deleted_fragment_ids = fragments.iter().map(|f| f.id() as u64).collect(); + + // When deleting everything, we don't have specific row addresses, + // so better not to emit affected rows. + (Vec::new(), deleted_fragment_ids, None, num_deleted_rows) + } else { + // Regular predicate - scan and collect row addresses to delete + let stream = scanner.try_into_stream().await?.into(); + let (stream, row_id_rx) = make_rowid_capture_stream( + stream, + self.dataset.manifest.uses_stable_row_ids(), + )?; + + // Process the stream to capture row addresses + // We need to consume the stream to trigger the capture + futures::pin_mut!(stream); + while let Some(_batch) = stream.try_next().await? { + // The row addresses are captured automatically by make_rowid_capture_stream + } - // Extract the row addresses from the receiver - let removed_row_ids = row_id_rx.try_recv().map_err(|err| Error::Internal { - message: format!("Failed to receive row ids: {}", err), - location: location!(), - })?; - let row_id_index = get_row_id_index(&self.dataset).await?; - let removed_row_addrs = removed_row_ids.row_addrs(row_id_index.as_deref()); - - let (fragments, deleted_ids) = - apply_deletions(&self.dataset, &removed_row_addrs).await?; - let affected_rows = RowAddrTreeMap::from(removed_row_addrs.as_ref().clone()); - (fragments, deleted_ids, Some(affected_rows)) - } - } else { - // No filter was applied - this shouldn't happen but treat as delete nothing - (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new())) - }; + // Extract the row addresses from the receiver + let removed_row_ids = row_id_rx.try_recv().map_err(|err| Error::Internal { + message: format!("Failed to receive row ids: {}", err), + location: location!(), + })?; + let row_id_index = get_row_id_index(&self.dataset).await?; + let removed_row_addrs = removed_row_ids.row_addrs(row_id_index.as_deref()); + + let (fragments, deleted_ids) = + apply_deletions(&self.dataset, &removed_row_addrs).await?; + let num_deleted_rows = removed_row_addrs.len(); + let affected_rows = RowAddrTreeMap::from(removed_row_addrs.as_ref().clone()); + ( + fragments, + deleted_ids, + Some(affected_rows), + num_deleted_rows, + ) + } + } else { + // No filter was applied - this shouldn't happen but treat as delete nothing + (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new()), 0) + }; Ok(DeleteData { updated_fragments, deleted_fragment_ids, affected_rows, + num_deleted_rows, }) } async fn commit(&self, dataset: Arc, data: Self::Data) -> Result { + let num_deleted_rows = data.num_deleted_rows; let operation = Operation::Delete { updated_fragments: data.updated_fragments, deleted_fragment_ids: data.deleted_fragment_ids, @@ -242,7 +262,11 @@ impl RetryExecutor for DeleteJob { builder = builder.with_affected_rows(affected_rows); } - builder.execute(transaction).await.map(Arc::new) + let new_dataset = builder.execute(transaction).await.map(Arc::new)?; + Ok(DeleteResult { + new_dataset, + num_deleted_rows, + }) } fn update_dataset(&mut self, dataset: Arc) { @@ -251,14 +275,14 @@ impl RetryExecutor for DeleteJob { } /// Legacy delete function - uses DeleteBuilder with no retries for backwards compatibility -pub async fn delete(ds: &mut Dataset, predicate: &str) -> Result<()> { +pub async fn delete(ds: &mut Dataset, predicate: &str) -> Result { // Use DeleteBuilder with 0 retries to maintain backwards compatibility let dataset = Arc::new(ds.clone()); - let new_dataset = DeleteBuilder::new(dataset, predicate).execute().await?; + let result = DeleteBuilder::new(dataset, predicate).execute().await?; // Update the dataset in place - *ds = Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()); - Ok(()) + *ds = Arc::try_unwrap(result.new_dataset.clone()).unwrap_or_else(|arc| (*arc).clone()); + Ok(result) } #[cfg(test)] @@ -325,7 +349,8 @@ mod tests { } // Delete nothing - dataset.delete("i < 0").await.unwrap(); + let result = dataset.delete("i < 0").await.unwrap(); + assert_eq!(result.num_deleted_rows, 0); dataset.validate().await.unwrap(); // We should not have any deletion file still @@ -338,7 +363,8 @@ mod tests { assert!(fragments[1].metadata.deletion_file.is_none()); // Delete rows - dataset.delete("i < 10 OR i >= 90").await.unwrap(); + let result = dataset.delete("i < 10 OR i >= 90").await.unwrap(); + assert_eq!(result.num_deleted_rows, 20); dataset.validate().await.unwrap(); // Verify result: @@ -386,8 +412,9 @@ mod tests { ); let second_deletion_file = fragments[1].metadata.deletion_file.clone().unwrap(); - // Delete more rows - dataset.delete("i < 20").await.unwrap(); + // Delete more rows (only 10 new rows since 0..10 already deleted) + let result = dataset.delete("i < 20").await.unwrap(); + assert_eq!(result.num_deleted_rows, 10); dataset.validate().await.unwrap(); // Verify result @@ -407,8 +434,9 @@ mod tests { &second_deletion_file ); - // Delete full fragment - dataset.delete("i >= 50").await.unwrap(); + // Delete full fragment (50 rows remaining in fragment 1, 10 already deleted) + let result = dataset.delete("i >= 50").await.unwrap(); + assert_eq!(result.num_deleted_rows, 40); dataset.validate().await.unwrap(); // Verify second fragment is fully gone @@ -618,7 +646,8 @@ mod tests { } // Get the final dataset from any successful result - let final_dataset = results.into_iter().find_map(|r| r.ok()).unwrap(); + let final_result = results.into_iter().find_map(|r| r.ok()).unwrap(); + let final_dataset = final_result.new_dataset; // Rows 0-49 should be deleted, rows 50-99 should remain assert_eq!(final_dataset.count_rows(None).await.unwrap(), 50); @@ -829,12 +858,12 @@ mod tests { ); // Also verify with the retry mechanism that it works correctly - let final_dataset = DeleteBuilder::new(dataset_arc, "true") + let final_result = DeleteBuilder::new(dataset_arc, "true") .conflict_retries(5) .execute() .await .unwrap(); // All rows should be deleted, including the updated ones - assert_eq!(final_dataset.count_rows(None).await.unwrap(), 0); + assert_eq!(final_result.new_dataset.count_rows(None).await.unwrap(), 0); } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 828374e4d11..ecdffeb4a1b 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1035,7 +1035,7 @@ async fn migrate_and_recompute_index_statistics(ds: &Dataset, index_name: &str) "Detecting out-dated fragment metadata, migrating dataset. \ To disable migration, set LANCE_AUTO_MIGRATION=false" ); - ds.delete("false").await.map_err(|err| Error::Execution { + ds.delete("false").await.map(|_| ()).map_err(|err| Error::Execution { message: format!( "Failed to migrate dataset while calculating index statistics. \ To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}",