From 012f111be48f052caa1c8c63ad850569e09f503d Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 3 Mar 2025 00:18:02 -0500 Subject: [PATCH 1/2] feat: Make duplicate check optional for adding parquet files --- crates/iceberg/src/transaction.rs | 100 ++++++++++-------- .../shared_tests/append_data_file_test.rs | 4 +- .../append_partition_data_file_test.rs | 2 +- .../shared_tests/conflict_commit_test.rs | 4 +- .../tests/shared_tests/scan_all_type.rs | 2 +- 5 files changed, 59 insertions(+), 53 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 79a6ce8c18..4e0b58d8bd 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -210,7 +210,11 @@ impl<'a> FastAppendAction<'a> { /// Adds existing parquet files #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result> { + async fn add_parquet_files( + mut self, + file_path: Vec, + check_duplicate: bool, + ) -> Result> { if !self .snapshot_produce_action .tx @@ -236,57 +240,59 @@ impl<'a> FastAppendAction<'a> { self.add_data_files(data_files)?; - self.apply().await + self.apply(check_duplicate).await } /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { + pub async fn apply(self, check_duplicate: bool) -> Result> { // Checks duplicate files - let new_files: HashSet<&str> = self - .snapshot_produce_action - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut manifest_stream = self - .snapshot_produce_action - .tx - .table - .inspect() - .manifests() - .scan() - .await?; - let mut referenced_files = Vec::new(); - - while let Some(batch) = manifest_stream.try_next().await? { - let file_path_array = batch - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Failed to downcast file_path column to StringArray", - ) - })?; - - for i in 0..batch.num_rows() { - let file_path = file_path_array.value(i); - if new_files.contains(file_path) { - referenced_files.push(file_path.to_string()); + if check_duplicate { + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut manifest_stream = self + .snapshot_produce_action + .tx + .table + .inspect() + .manifests() + .scan() + .await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } } } - } - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } } self.snapshot_produce_action @@ -884,7 +890,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + let tx = action.apply(true).await.unwrap(); // check updates and requirements assert!( @@ -971,7 +977,7 @@ mod tests { // Attempt to add the existing Parquet files with fast append. let new_tx = fast_append_action - .add_parquet_files(file_paths.clone()) + .add_parquet_files(file_paths.clone(), true) .await .expect("Adding existing Parquet files should succeed"); diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index a24b886349..2b2b98c7cd 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -114,7 +114,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(true).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -134,7 +134,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(true).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result again diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 42cc596f54..382e24ebfc 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -123,7 +123,7 @@ async fn test_append_partition_data_file() { append_action .add_data_files(data_file_valid.clone()) .unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(true).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 0b4d9785dc..898692fcab 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -92,12 +92,12 @@ async fn test_append_data_file_conflict() { let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + let tx1 = append_action.apply(true).await.unwrap(); let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + let tx2 = append_action.apply(true).await.unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 673a78ac03..7d2577907a 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -311,7 +311,7 @@ async fn test_scan_all_type() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(true).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result From 74f64e6c533203d02d8119f0abdeaa0c949e184b Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 11 Mar 2025 20:24:27 -0400 Subject: [PATCH 2/2] fix --- crates/iceberg/src/transaction.rs | 24 +++++++++++-------- .../shared_tests/append_data_file_test.rs | 4 ++-- .../append_partition_data_file_test.rs | 2 +- .../shared_tests/conflict_commit_test.rs | 4 ++-- .../tests/shared_tests/scan_all_type.rs | 2 +- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 4e0b58d8bd..ae19f25fc9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -177,6 +177,7 @@ impl<'a> Transaction<'a> { /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction<'a> { snapshot_produce_action: SnapshotProduceAction<'a>, + check_duplicate: bool, } impl<'a> FastAppendAction<'a> { @@ -196,9 +197,16 @@ impl<'a> FastAppendAction<'a> { commit_uuid, snapshot_properties, )?, + check_duplicate: true, }) } + /// Set whether to check duplicate files + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + /// Add data files to the snapshot. pub fn add_data_files( &mut self, @@ -210,11 +218,7 @@ impl<'a> FastAppendAction<'a> { /// Adds existing parquet files #[allow(dead_code)] - async fn add_parquet_files( - mut self, - file_path: Vec, - check_duplicate: bool, - ) -> Result> { + async fn add_parquet_files(mut self, file_path: Vec) -> Result> { if !self .snapshot_produce_action .tx @@ -240,13 +244,13 @@ impl<'a> FastAppendAction<'a> { self.add_data_files(data_files)?; - self.apply(check_duplicate).await + self.apply().await } /// Finished building the action and apply it to the transaction. - pub async fn apply(self, check_duplicate: bool) -> Result> { + pub async fn apply(self) -> Result> { // Checks duplicate files - if check_duplicate { + if self.check_duplicate { let new_files: HashSet<&str> = self .snapshot_produce_action .added_data_files @@ -890,7 +894,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply(true).await.unwrap(); + let tx = action.apply().await.unwrap(); // check updates and requirements assert!( @@ -977,7 +981,7 @@ mod tests { // Attempt to add the existing Parquet files with fast append. let new_tx = fast_append_action - .add_parquet_files(file_paths.clone(), true) + .add_parquet_files(file_paths.clone()) .await .expect("Adding existing Parquet files should succeed"); diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 2b2b98c7cd..a24b886349 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -114,7 +114,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply(true).await.unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -134,7 +134,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply(true).await.unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result again diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 382e24ebfc..42cc596f54 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -123,7 +123,7 @@ async fn test_append_partition_data_file() { append_action .add_data_files(data_file_valid.clone()) .unwrap(); - let tx = append_action.apply(true).await.unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 898692fcab..0b4d9785dc 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -92,12 +92,12 @@ async fn test_append_data_file_conflict() { let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply(true).await.unwrap(); + let tx1 = append_action.apply().await.unwrap(); let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply(true).await.unwrap(); + let tx2 = append_action.apply().await.unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 7d2577907a..673a78ac03 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -311,7 +311,7 @@ async fn test_scan_all_type() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply(true).await.unwrap(); + let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result