From 803d93d41acf40268c2f89f40e7fca1fe442c68d Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Fri, 31 Oct 2025 08:54:10 +0100 Subject: [PATCH 1/8] Remove TODO --- crates/iceberg/src/scan/incremental/context.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs index 04db28a4c4..3f6169270e 100644 --- a/crates/iceberg/src/scan/incremental/context.rs +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -113,7 +113,6 @@ impl IncrementalPlanContext { (manifest_files, filter_fn) }; - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut mfcs = vec![]; for manifest_file in &manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { From bee0f89d0a1fee18ef36e2c96efb3f424375c862 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 3 Nov 2025 09:08:52 +0100 Subject: [PATCH 2/8] Implement delete files --- crates/iceberg/src/arrow/incremental.rs | 94 +++++++- crates/iceberg/src/scan/incremental/mod.rs | 113 +++++++-- crates/iceberg/src/scan/incremental/task.rs | 89 +++++-- crates/iceberg/src/scan/incremental/tests.rs | 240 ++++++++++++++++--- 4 files changed, 450 insertions(+), 86 deletions(-) diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index e7ca2521bf..dd34a35cd9 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -120,9 +120,49 @@ impl StreamsInto let _ = appends_tx.send(Err(e)).await; } } - }); + }) + .await } - IncrementalFileScanTask::Delete(file_path, delete_vector) => { + IncrementalFileScanTask::Delete(deleted_file_task) => { + spawn(async move { + let file_path = deleted_file_task.data_file_path().to_string(); + let total_records = deleted_file_task.base.record_count.unwrap_or(0); + + let record_batch_stream = process_incremental_deleted_file_task( + file_path, + total_records, + batch_size, + ); + + match record_batch_stream { + Ok(mut stream) => { + while let Some(batch) = stream.next().await { + let result = deletes_tx + .send(batch.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read deleted file record batch", + ) + .with_source(e) + })) + .await; + + if result.is_err() { + break; + } + } + } + Err(e) => { + let _ = deletes_tx.send(Err(e)).await; + } + } + }) + .await + } + IncrementalFileScanTask::PositionalDeletes( + file_path, + delete_vector, + ) => { spawn(async move { let record_batch_stream = process_incremental_delete_task( file_path, @@ -152,7 +192,8 @@ impl StreamsInto let _ = deletes_tx.send(Err(e)).await; } } - }); + }) + .await } }; @@ -175,7 +216,7 @@ async fn process_incremental_append_task( file_io: FileIO, ) -> Result { let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder( - &task.data_file_path, + &task.base.data_file_path, file_io, true, ) @@ -184,7 +225,7 @@ async fn process_incremental_append_task( // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response let projection_mask = ArrowReader::get_arrow_projection_mask( - &task.project_field_ids, + &task.base.project_field_ids, &task.schema_ref(), record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -195,7 +236,7 @@ async fn process_incremental_append_task( // that come back from the file, such as type promotion, default column insertion // and column re-ordering let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids); + RecordBatchTransformer::build(task.schema_ref(), &task.base.project_field_ids); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -269,3 +310,44 @@ fn process_incremental_delete_task( Ok(Box::pin(stream) as ArrowRecordBatchStream) } + +fn process_incremental_deleted_file_task( + file_path: String, + total_records: u64, + batch_size: Option, +) -> Result { + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "pos", + DataType::UInt64, + false, + )])); + + let batch_size = batch_size.unwrap_or(1024); + + // Create a stream of position values from 0 to total_records-1 (0-indexed) + let stream = futures::stream::iter(0..total_records) + .chunks(batch_size) + .map(move |chunk| { + let array = UInt64Array::from_iter(chunk); + RecordBatch::try_new( + Arc::clone(&schema), // Cheap Arc clone instead of full schema creation + vec![Arc::new(array)], + ) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RecordBatch for deleted file", + ) + }) + .and_then(|batch| { + ArrowReader::add_file_path_column( + batch, + &file_path, + RESERVED_COL_NAME_FILE_PATH, + RESERVED_FIELD_ID_FILE_PATH, + ) + }) + }); + + Ok(Box::pin(stream) as ArrowRecordBatchStream) +} diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index 1253353b4d..2b917bc5b2 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -406,11 +406,14 @@ impl IncrementalTableScan { } else if manifest_entry_context.manifest_entry.status() == ManifestStatus::Deleted { - // TODO (RAI-43291): Process deleted files - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Processing deleted data files is not supported yet in incremental scans", - )) + spawn(async move { + Self::process_deleted_data_manifest_entry( + tx, + manifest_entry_context, + ) + .await + }) + .await } else { Ok(()) } @@ -424,33 +427,67 @@ impl IncrementalTableScan { } }); - // Collect all append tasks. - let mut tasks = file_scan_task_rx.try_collect::>().await?; + // Collect all tasks from manifest processing. + let all_tasks = file_scan_task_rx.try_collect::>().await?; + + // Separate tasks by type and compute file path sets in a single pass + let mut append_tasks = Vec::new(); + let mut delete_tasks = Vec::new(); + let mut appended_files = HashSet::new(); + let mut deleted_files = HashSet::new(); - // Compute those file paths that have been appended. - let appended_files = tasks - .iter() - .filter_map(|task| match task { + for task in all_tasks { + match task { IncrementalFileScanTask::Append(append_task) => { - Some(append_task.data_file_path.clone()) + appended_files.insert(append_task.data_file_path().to_string()); + append_tasks.push(append_task); } - _ => None, - }) - .collect::>(); + IncrementalFileScanTask::Delete(delete_task) => { + deleted_files.insert(delete_task.data_file_path().to_string()); + delete_tasks.push(delete_task); + } + _ => {} + } + } + + // Build final task list with net changes + // We filter out tasks for files that appear in both sets (cancelled out) + let mut final_tasks: Vec = Vec::new(); + + // Add net append tasks (only files not in deleted_files) + for append_task in append_tasks { + if !deleted_files.contains(append_task.data_file_path()) { + final_tasks.push(IncrementalFileScanTask::Append(append_task)); + } + } + + // Add net delete tasks (only files not in appended_files) + for delete_task in delete_tasks { + if !appended_files.contains(delete_task.data_file_path()) { + final_tasks.push(IncrementalFileScanTask::Delete(delete_task)); + } + } - // Augment `tasks` with delete tasks. - // First collect paths to process (paths that weren't appended in this scan range) - let delete_paths: Vec = delete_filter.with_read(|state| { + // Add positional delete tasks (only for files that haven't been deleted) + let positional_delete_paths: Vec = delete_filter.with_read(|state| { Ok(state .delete_vectors() .keys() - .filter(|path| !appended_files.contains::(path)) + .filter(|path| { + // Only include positional deletes for files that: + // 1. Were not appended in this scan range, OR + // 2. Were appended but also deleted (cancelled out) + // Exclude positional deletes for files that were deleted + !appended_files.contains::(path) && !deleted_files.contains::(path) + || (appended_files.contains::(path) + && deleted_files.contains::(path)) + }) .cloned() .collect()) })?; // Now remove and take ownership of each delete vector - for path in delete_paths { + for path in positional_delete_paths { let delete_vector_arc = delete_filter.with_write(|state| { state.remove_delete_vector(&path).ok_or_else(|| { Error::new( @@ -474,13 +511,14 @@ impl IncrementalTableScan { .with_source(e) })?; - let delete_task = IncrementalFileScanTask::Delete(path, delete_vector_inner); - tasks.push(delete_task); + let positional_delete_task = + IncrementalFileScanTask::PositionalDeletes(path, delete_vector_inner); + final_tasks.push(positional_delete_task); } // We actually would not need a stream here, but we can keep it compatible with // other scan types. - Ok(futures::stream::iter(tasks).map(Ok).boxed()) + Ok(futures::stream::iter(final_tasks).map(Ok).boxed()) } /// Returns an [`CombinedIncrementalBatchRecordStream`] for this incremental table scan. @@ -582,6 +620,35 @@ impl IncrementalTableScan { file_scan_task_tx.send(Ok(file_scan_task)).await?; Ok(()) } + + async fn process_deleted_data_manifest_entry( + mut file_scan_task_tx: Sender>, + manifest_entry_context: ManifestEntryContext, + ) -> Result<()> { + // Abort the plan if we encounter a manifest entry for a delete file + if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a delete file in a data file manifest", + )); + } + + let data_file_path = manifest_entry_context.manifest_entry.file_path(); + let file_scan_task = IncrementalFileScanTask::Delete(DeletedFileScanTask { + base: BaseIncrementalFileScanTask { + start: 0, + length: manifest_entry_context.manifest_entry.file_size_in_bytes(), + record_count: Some(manifest_entry_context.manifest_entry.record_count()), + data_file_path: data_file_path.to_string(), + data_file_format: manifest_entry_context.manifest_entry.file_format(), + schema: manifest_entry_context.snapshot_schema.clone(), + project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + }, + }); + + file_scan_task_tx.send(Ok(file_scan_task)).await?; + Ok(()) + } } #[cfg(test)] diff --git a/crates/iceberg/src/scan/incremental/task.rs b/crates/iceberg/src/scan/incremental/task.rs index 4b2f244410..e05703ae28 100644 --- a/crates/iceberg/src/scan/incremental/task.rs +++ b/crates/iceberg/src/scan/incremental/task.rs @@ -25,9 +25,9 @@ use crate::delete_vector::DeleteVector; use crate::scan::context::ManifestEntryContext; use crate::spec::{DataFileFormat, Schema, SchemaRef}; -/// A file scan task for appended data files in an incremental scan. +/// Base file scan task containing common attributes for incremental scan tasks. #[derive(Debug, Clone)] -pub struct AppendedFileScanTask { +pub struct BaseIncrementalFileScanTask { /// The start offset of the file to scan. pub start: u64, /// The length of the file to scan. @@ -42,6 +42,30 @@ pub struct AppendedFileScanTask { pub schema: crate::spec::SchemaRef, /// The field ids to project. pub project_field_ids: Vec, +} + +impl BaseIncrementalFileScanTask { + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// A file scan task for appended data files in an incremental scan. +#[derive(Debug, Clone)] +pub struct AppendedFileScanTask { + /// The base file scan task attributes. + pub base: BaseIncrementalFileScanTask, /// The optional positional deletes associated with this data file. pub positional_deletes: Option>>, } @@ -49,32 +73,58 @@ pub struct AppendedFileScanTask { impl AppendedFileScanTask { /// Returns the data file path of this appended file scan task. pub fn data_file_path(&self) -> &str { - &self.data_file_path + self.base.data_file_path() } /// Returns the schema of this file scan task as a reference pub fn schema(&self) -> &Schema { - &self.schema + self.base.schema() } /// Returns the schema of this file scan task as a SchemaRef pub fn schema_ref(&self) -> SchemaRef { - self.schema.clone() + self.base.schema_ref() + } +} + +/// A file scan task for deleted data files in an incremental scan. +#[derive(Debug, Clone)] +pub struct DeletedFileScanTask { + /// The base file scan task attributes. + pub base: BaseIncrementalFileScanTask, +} + +impl DeletedFileScanTask { + /// Returns the data file path of this deleted file scan task. + pub fn data_file_path(&self) -> &str { + self.base.data_file_path() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + self.base.schema() + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.base.schema_ref() } } /// The stream of incremental file scan tasks. pub type IncrementalFileScanTaskStream = BoxStream<'static, Result>; -/// An incremental file scan task, which can be either an appended data file or positional -/// deletes. +/// An incremental file scan task, which can be an appended data file, deleted data file, +/// or positional deletes. #[derive(Debug, Clone)] pub enum IncrementalFileScanTask { /// An appended data file. Append(AppendedFileScanTask), - /// Deleted records of a data file. First argument is the file path, second the delete - /// vector. - Delete(String, DeleteVector), + /// A deleted data file. + Delete(DeletedFileScanTask), + /// Positional deletes (deleted records of a data file). First argument is the file path, + /// second the delete vector. + PositionalDeletes(String, DeleteVector), } impl IncrementalFileScanTask { @@ -85,13 +135,15 @@ impl IncrementalFileScanTask { ) -> Self { let data_file_path = manifest_entry_context.manifest_entry.file_path(); IncrementalFileScanTask::Append(AppendedFileScanTask { - start: 0, - length: manifest_entry_context.manifest_entry.file_size_in_bytes(), - record_count: Some(manifest_entry_context.manifest_entry.record_count()), - data_file_path: data_file_path.to_string(), - data_file_format: manifest_entry_context.manifest_entry.file_format(), - schema: manifest_entry_context.snapshot_schema.clone(), - project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + base: BaseIncrementalFileScanTask { + start: 0, + length: manifest_entry_context.manifest_entry.file_size_in_bytes(), + record_count: Some(manifest_entry_context.manifest_entry.record_count()), + data_file_path: data_file_path.to_string(), + data_file_format: manifest_entry_context.manifest_entry.file_format(), + schema: manifest_entry_context.snapshot_schema.clone(), + project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + }, positional_deletes: delete_filter.get_delete_vector_for_path(data_file_path), }) } @@ -100,7 +152,8 @@ impl IncrementalFileScanTask { pub fn data_file_path(&self) -> &str { match self { IncrementalFileScanTask::Append(task) => task.data_file_path(), - IncrementalFileScanTask::Delete(path, _) => path, + IncrementalFileScanTask::Delete(task) => task.data_file_path(), + IncrementalFileScanTask::PositionalDeletes(path, _) => path, } } } diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 05724a54a4..363acfcde6 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1589,18 +1589,186 @@ async fn test_incremental_scan_builder_options() { } #[tokio::test] -async fn test_incremental_scan_with_deleted_files_errors() { - // This test verifies that incremental scans properly error out when entire data files - // are deleted (overwrite operation), since this is not yet supported. +async fn test_incremental_scan_append_then_delete_file() { + // This test verifies the basic delete file functionality: + // - Snapshot 1: Empty starting point + // - Snapshot 2: Append a file with 3 records + // - Snapshot 3: Overwrite that deletes the entire file + // + // An incremental scan from snapshot 1 to 3 should yield no tuples, as there is no net change. + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Append a file with 3 records + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), + // Snapshot 3: Overwrite that deletes the entire file + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["data-1.parquet".to_string()], // Delete the file completely + ), + ]) + .await; + + // Verify we have 3 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 3); + + // Verify snapshot IDs + assert_eq!(snapshots[0].snapshot_id(), 1); + assert_eq!(snapshots[1].snapshot_id(), 2); + assert_eq!(snapshots[2].snapshot_id(), 3); + + // Incremental scan from snapshot 1 to 3 should yield no tuples + // The file was added in snapshot 2 and deleted in snapshot 3, so they cancel out + fixture + .verify_incremental_scan( + 1, + 3, + vec![], // No net appends + vec![], // No net deletes + ) + .await; + + // Incremental scan from snapshot 1 to 2 should yield the appended records + fixture + .verify_incremental_scan( + 1, + 2, + vec![(1, "a"), (2, "b"), (3, "c")], // All 3 records appended + vec![], // No deletes + ) + .await; + + // Incremental scan from snapshot 2 to 3 should produce delete records + // The file existed at snapshot 2 and was deleted in snapshot 3 + let data_file_path = format!("{}/data/data-1.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 2, + 3, + vec![], // No appends + vec![ + // All 3 positions deleted (pos values are 0-indexed: 0, 1, 2) + (0, data_file_path.as_str()), + (1, data_file_path.as_str()), + (2, data_file_path.as_str()), + ], + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_positional_deletes_then_file_delete() { + // This test verifies that the system correctly handles the case where: + // - Snapshot 1: Empty starting point + // - Snapshot 2: Append a file with 3 records + // - Snapshot 3: Delete these 3 records using positional deletes + // - Snapshot 4: Delete the file entirely (that was introduced in snapshot 2) + // + // The system should avoid double-deletes by filtering out positional deletes + // for files that are subsequently deleted entirely. + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Append a file with 3 records + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), + // Snapshot 3: Delete all 3 records using positional deletes + Operation::Delete(vec![ + (0, "data-1.parquet".to_string()), + (1, "data-1.parquet".to_string()), + (2, "data-1.parquet".to_string()), + ]), + // Snapshot 4: Delete the file entirely + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["data-1.parquet".to_string()], // Delete the file completely + ), + ]) + .await; + + // Verify we have 4 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 4); + + // Incremental scan from snapshot 1 to 4 + // Since data-1.parquet was both appended (snapshot 2) and deleted (snapshot 4) + // in the scan range, the system uses the POSITIONAL DELETES (from snapshot 3) + // rather than the deleted file task. This is because: + // - The file qualifies for cancellation (appended && deleted) + // - But it has positional deletes, so those are preferred over the deleted file task + // - The deleted file task is NOT emitted in this case + let data_file_path = format!("{}/data/data-1.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 1, + 4, + vec![], // No appends (file was added and fully deleted) + vec![ + // Positions from positional deletes (snapshot 3): 0, 1, 2 + // The deleted file task (which would emit 0, 1, 2) is NOT emitted + (0, data_file_path.as_str()), + (1, data_file_path.as_str()), + (2, data_file_path.as_str()), + ], + ) + .await; + + // Incremental scan from snapshot 3 to 4 + // This demonstrates the double-delete scenario: + // - At snapshot 3, all records have already been deleted via positional deletes + // - At snapshot 4, the entire file is deleted + // - The scan from 3 to 4 should still emit delete records for the file, + // even though the records were already deleted in snapshot 3 + // This is because the incremental scan doesn't know about the prior state + fixture + .verify_incremental_scan( + 3, + 4, + vec![], // No appends + vec![ + // File deletion emits positions 0, 1, 2 + // These are "redundant" deletes since the records were already + // deleted by positional deletes in snapshot 3 + (0, data_file_path.as_str()), + (1, data_file_path.as_str()), + (2, data_file_path.as_str()), + ], + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_with_deleted_files_cancellation() { + // This test verifies that incremental scans properly handle file deletions with cancellation logic: + // - Files added and deleted in the same scan range should cancel out + // - Files deleted that weren't added in the scan range should produce Delete tasks with positions 1..N // // Test scenario: - // Snapshot 1: Add file-1.parquet with data - // Snapshot 2: Add file-2.parquet with data + // Snapshot 1: Add file-1.parquet with data (3 records) + // Snapshot 2: Add file-2.parquet with data (2 records) // Snapshot 3: Overwrite - delete file-1.parquet entirely - // Snapshot 4: Add file-3.parquet with data + // Snapshot 4: Add file-3.parquet with data (1 record) // - // Incremental scan from snapshot 1 to snapshot 3 should error because file-1 - // was completely removed in the overwrite operation. + // Incremental scan from snapshot 1 to 3: file-1 added and deleted, should cancel out (only file-2 remains) + // Incremental scan from snapshot 2 to 4: file-1 deleted but not added in range, produces Delete tasks let fixture = IncrementalTestFixture::new(vec![ // Snapshot 1: Add file-1 with rows @@ -1628,8 +1796,9 @@ async fn test_incremental_scan_with_deleted_files_errors() { ]) .await; - // Test 1: Incremental scan from snapshot 1 to 3 should error when building the stream - // because file-1 was deleted entirely in snapshot 3 + // Test 1: Incremental scan from snapshot 1 to 3 should succeed + // file-1 was added in snapshot 1 and deleted in snapshot 3, so they cancel out + // Only file-2 (added in snapshot 2) should remain in the scan let scan = fixture .table .incremental_scan(1, 3) @@ -1638,36 +1807,29 @@ async fn test_incremental_scan_with_deleted_files_errors() { let stream_result = scan.to_arrow().await; - match stream_result { - Err(error) => { - assert_eq!( - error.message(), - "Processing deleted data files is not supported yet in incremental scans", - "Error message should indicate that deleted files are not supported. Got: {}", - error - ); - } - Ok(_) => panic!( - "Expected error when building stream over a snapshot that deletes entire data files" - ), - } - - // Test 2: Incremental scan from snapshot 2 to 4 should also error - // because it includes snapshot 3 which deletes a file - let scan = fixture - .table - .incremental_scan(2, 4) - .build() - .expect("Building the scan should succeed"); - - let stream_result = scan.to_arrow().await; + assert!( + stream_result.is_ok(), + "Scan should succeed when file additions and deletions cancel out. Error: {:?}", + stream_result.err() + ); - match stream_result { - Err(_) => { - // Expected error - } - Ok(_) => panic!("Expected error when scan range includes a snapshot that deletes files"), - } + // Test 2: Incremental scan from snapshot 2 to 4 should now succeed + // file-1 was deleted in snapshot 3 but wasn't added in the scan range (added in snapshot 1) + // This produces a net Delete task with positions 0, 1, 2 (all records in file-1) + let file_1_path = format!("{}/data/file-1.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 2, + 4, + vec![(100, "p")], // file-3 added in snapshot 4 + vec![ + // file-1 deleted in snapshot 3 (all 3 positions: 0, 1, 2) + (0, file_1_path.as_str()), + (1, file_1_path.as_str()), + (2, file_1_path.as_str()), + ], + ) + .await; // Test 3: Incremental scan from snapshot 1 to 2 should work fine // (no files deleted) From d7a26afdfceaec5c4433e84f4cb6f7f606b6d6de Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 3 Nov 2025 12:48:32 +0100 Subject: [PATCH 3/8] PR comments --- crates/iceberg/src/arrow/incremental.rs | 37 +++-- crates/iceberg/src/arrow/reader.rs | 6 + crates/iceberg/src/scan/incremental/mod.rs | 8 +- crates/iceberg/src/scan/incremental/tests.rs | 135 ++++++++----------- 4 files changed, 91 insertions(+), 95 deletions(-) diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index dd34a35cd9..3619d2a60f 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -23,10 +24,12 @@ use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use futures::channel::mpsc::channel; use futures::stream::select; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{ - ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_FIELD_ID_FILE_PATH, StreamsInto, + ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH, + RESERVED_FIELD_ID_POS, StreamsInto, }; use crate::delete_vector::DeleteVector; use crate::io::FileIO; @@ -37,6 +40,22 @@ use crate::scan::incremental::{ }; use crate::{Error, ErrorKind, Result}; +/// Default batch size for incremental delete operations. +const DEFAULT_BATCH_SIZE: usize = 1024; + +/// Creates the schema for positional delete records containing the "pos" column. +/// The pos field includes the reserved field ID as metadata. +fn create_pos_delete_schema() -> Arc { + let pos_field = + Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_POS.to_string(), + ), + ])); + Arc::new(ArrowSchema::new(vec![pos_field])) +} + /// The type of incremental batch: appended data or deleted records. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum IncrementalBatchType { @@ -274,13 +293,9 @@ fn process_incremental_delete_task( delete_vector: DeleteVector, batch_size: Option, ) -> Result { - let schema = Arc::new(ArrowSchema::new(vec![Field::new( - "pos", - DataType::UInt64, - false, - )])); + let schema = create_pos_delete_schema(); - let batch_size = batch_size.unwrap_or(1024); + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); let treemap = delete_vector.inner; @@ -316,13 +331,9 @@ fn process_incremental_deleted_file_task( total_records: u64, batch_size: Option, ) -> Result { - let schema = Arc::new(ArrowSchema::new(vec![Field::new( - "pos", - DataType::UInt64, - false, - )])); + let schema = create_pos_delete_schema(); - let batch_size = batch_size.unwrap_or(1024); + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); // Create a stream of position values from 0 to total_records-1 (0-indexed) let stream = futures::stream::iter(0..total_records) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c4a1b8a5f3..11392e517a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -78,6 +78,12 @@ pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; /// Column name for the file path metadata column used in delete file reading. pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path"; +/// Reserved field ID for the position column used in delete file reading. +pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483545; + +/// Column name for the position metadata column used in delete file reading. +pub(crate) const RESERVED_COL_NAME_POS: &str = "pos"; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index 2b917bc5b2..af011dfc16 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -474,13 +474,9 @@ impl IncrementalTableScan { .delete_vectors() .keys() .filter(|path| { - // Only include positional deletes for files that: - // 1. Were not appended in this scan range, OR - // 2. Were appended but also deleted (cancelled out) - // Exclude positional deletes for files that were deleted + // Only include positional deletes for files that were not appended in + // this range and not deleted. !appended_files.contains::(path) && !deleted_files.contains::(path) - || (appended_files.contains::(path) - && deleted_files.contains::(path)) }) .cloned() .collect()) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 363acfcde6..595586c8bf 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1710,24 +1710,16 @@ async fn test_incremental_scan_positional_deletes_then_file_delete() { // Incremental scan from snapshot 1 to 4 // Since data-1.parquet was both appended (snapshot 2) and deleted (snapshot 4) - // in the scan range, the system uses the POSITIONAL DELETES (from snapshot 3) - // rather than the deleted file task. This is because: - // - The file qualifies for cancellation (appended && deleted) - // - But it has positional deletes, so those are preferred over the deleted file task - // - The deleted file task is NOT emitted in this case + // in the scan range, BOTH appends and deletes are fully cancelled out. + // Even though there were positional deletes in snapshot 3, those are filtered out + // because the file qualifies for cancellation (appended && deleted). let data_file_path = format!("{}/data/data-1.parquet", fixture.table_location); fixture .verify_incremental_scan( 1, 4, - vec![], // No appends (file was added and fully deleted) - vec![ - // Positions from positional deletes (snapshot 3): 0, 1, 2 - // The deleted file task (which would emit 0, 1, 2) is NOT emitted - (0, data_file_path.as_str()), - (1, data_file_path.as_str()), - (2, data_file_path.as_str()), - ], + vec![], // No appends (file was added and fully deleted, cancelled out) + vec![], // No deletes (fully cancelled since file was added and deleted in range) ) .await; @@ -1759,19 +1751,22 @@ async fn test_incremental_scan_positional_deletes_then_file_delete() { async fn test_incremental_scan_with_deleted_files_cancellation() { // This test verifies that incremental scans properly handle file deletions with cancellation logic: // - Files added and deleted in the same scan range should cancel out - // - Files deleted that weren't added in the scan range should produce Delete tasks with positions 1..N + // - Files deleted that weren't added in the scan range should produce Delete tasks // // Test scenario: - // Snapshot 1: Add file-1.parquet with data (3 records) - // Snapshot 2: Add file-2.parquet with data (2 records) - // Snapshot 3: Overwrite - delete file-1.parquet entirely - // Snapshot 4: Add file-3.parquet with data (1 record) + // Snapshot 1: Empty starting point + // Snapshot 2: Add file-1.parquet with data (3 records) + // Snapshot 3: Add file-2.parquet with data (2 records) + // Snapshot 4: Overwrite - delete file-1.parquet entirely + // Snapshot 5: Add file-3.parquet with data (1 record) // - // Incremental scan from snapshot 1 to 3: file-1 added and deleted, should cancel out (only file-2 remains) - // Incremental scan from snapshot 2 to 4: file-1 deleted but not added in range, produces Delete tasks + // Incremental scan from snapshot 1 to 4: file-1 added and deleted, should cancel out (only file-2 remains) + // Incremental scan from snapshot 3 to 5: file-1 deleted but not added in range, produces Delete tasks let fixture = IncrementalTestFixture::new(vec![ - // Snapshot 1: Add file-1 with rows + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-1 with rows Operation::Add( vec![ (1, "a".to_string()), @@ -1780,50 +1775,48 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { ], "file-1.parquet".to_string(), ), - // Snapshot 2: Add file-2 with rows + // Snapshot 3: Add file-2 with rows Operation::Add( vec![(10, "x".to_string()), (20, "y".to_string())], "file-2.parquet".to_string(), ), - // Snapshot 3: Overwrite - delete file-1 entirely + // Snapshot 4: Overwrite - delete file-1 entirely Operation::Overwrite( (vec![], "".to_string()), // No new data to add vec![], // No positional deletes vec!["file-1.parquet".to_string()], // Delete file-1 completely ), - // Snapshot 4: Add file-3 (to have more snapshots) + // Snapshot 5: Add file-3 (to have more snapshots) Operation::Add(vec![(100, "p".to_string())], "file-3.parquet".to_string()), ]) .await; - // Test 1: Incremental scan from snapshot 1 to 3 should succeed - // file-1 was added in snapshot 1 and deleted in snapshot 3, so they cancel out - // Only file-2 (added in snapshot 2) should remain in the scan - let scan = fixture - .table - .incremental_scan(1, 3) - .build() - .expect("Building the scan should succeed"); - - let stream_result = scan.to_arrow().await; + let file_1_path = format!("{}/data/file-1.parquet", fixture.table_location); - assert!( - stream_result.is_ok(), - "Scan should succeed when file additions and deletions cancel out. Error: {:?}", - stream_result.err() - ); + // Test 1: Incremental scan from snapshot 1 to 4 + // file-1 was added in snapshot 2 and deleted in snapshot 4 + // Both appends and deletes for file-1 are fully cancelled out: + // - Appends: only file-2 records (10, x), (20, y) + // - Deletes: none (file-1 deletions cancelled since it was added in range) + fixture + .verify_incremental_scan( + 1, + 4, + vec![(10, "x"), (20, "y")], // Only file-2 (file-1 cancelled out) + vec![], // No deletes (file-1 fully cancelled) + ) + .await; - // Test 2: Incremental scan from snapshot 2 to 4 should now succeed - // file-1 was deleted in snapshot 3 but wasn't added in the scan range (added in snapshot 1) + // Test 2: Incremental scan from snapshot 3 to 5 + // file-1 was deleted in snapshot 4 but wasn't added in the scan range (added in snapshot 2) // This produces a net Delete task with positions 0, 1, 2 (all records in file-1) - let file_1_path = format!("{}/data/file-1.parquet", fixture.table_location); fixture .verify_incremental_scan( - 2, - 4, - vec![(100, "p")], // file-3 added in snapshot 4 + 3, + 5, + vec![(100, "p")], // file-3 added in snapshot 5 vec![ - // file-1 deleted in snapshot 3 (all 3 positions: 0, 1, 2) + // file-1 deleted in snapshot 4 (all 3 positions: 0, 1, 2) (0, file_1_path.as_str()), (1, file_1_path.as_str()), (2, file_1_path.as_str()), @@ -1831,35 +1824,25 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { ) .await; - // Test 3: Incremental scan from snapshot 1 to 2 should work fine - // (no files deleted) - let scan = fixture - .table - .incremental_scan(1, 2) - .build() - .expect("Building the scan should succeed"); - - let stream_result = scan.to_arrow().await; - - assert!( - stream_result.is_ok(), - "Scan should succeed when no files are deleted. Error: {:?}", - stream_result.err() - ); - - // Test 4: Incremental scan from snapshot 3 to 4 should work - // (starting from after the deletion) - let scan = fixture - .table - .incremental_scan(3, 4) - .build() - .expect("Building the scan should succeed"); - - let stream_result = scan.to_arrow().await; + // Test 3: Incremental scan from snapshot 1 to 3 + // Both file-1 and file-2 are added, no deletions + fixture + .verify_incremental_scan( + 1, + 3, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![], // No deletes + ) + .await; - assert!( - stream_result.is_ok(), - "Scan should succeed when starting after the file deletion. Error: {:?}", - stream_result.err() - ); + // Test 4: Incremental scan from snapshot 4 to 5 + // Only file-3 is added, starting from after file-1 deletion + fixture + .verify_incremental_scan( + 4, + 5, + vec![(100, "p")], // file-3 added + vec![], // No deletes + ) + .await; } From e1fbf77b4cd28a1c67180dc5b6a6a0e07a92a4b7 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Mon, 3 Nov 2025 12:56:17 +0100 Subject: [PATCH 4/8] Fix test comments --- crates/iceberg/src/scan/incremental/tests.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 595586c8bf..31f59db288 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1762,6 +1762,8 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { // // Incremental scan from snapshot 1 to 4: file-1 added and deleted, should cancel out (only file-2 remains) // Incremental scan from snapshot 3 to 5: file-1 deleted but not added in range, produces Delete tasks + // Incremental scan from snapshot 1 to 3: file-1 and file-2 added before any deletion + // Incremental scan from snapshot 4 to 5: file-3 added after file-1 deletion occurred let fixture = IncrementalTestFixture::new(vec![ // Snapshot 1: Empty starting point @@ -1825,7 +1827,9 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { .await; // Test 3: Incremental scan from snapshot 1 to 3 - // Both file-1 and file-2 are added, no deletions + // Verifies basic append-only operations before any deletions occur. + // Both file-1 (snapshot 2) and file-2 (snapshot 3) are added, no deletions yet. + // Expected: All records from both files appear in appends, no deletes. fixture .verify_incremental_scan( 1, @@ -1836,7 +1840,9 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { .await; // Test 4: Incremental scan from snapshot 4 to 5 - // Only file-3 is added, starting from after file-1 deletion + // Verifies scanning after the deletion has already occurred. + // Starting from snapshot 4 (after file-1 deletion), only file-3 is added in snapshot 5. + // Expected: Only file-3 records in appends, no deletes (deletion happened before scan range). fixture .verify_incremental_scan( 4, From d8f4fcdc5bcb3dc1c407235914f05c63e058a328 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 08:35:48 +0100 Subject: [PATCH 5/8] Comment fix --- crates/iceberg/src/scan/incremental/tests.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 31f59db288..67737cb3c0 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1760,10 +1760,10 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { // Snapshot 4: Overwrite - delete file-1.parquet entirely // Snapshot 5: Add file-3.parquet with data (1 record) // - // Incremental scan from snapshot 1 to 4: file-1 added and deleted, should cancel out (only file-2 remains) - // Incremental scan from snapshot 3 to 5: file-1 deleted but not added in range, produces Delete tasks - // Incremental scan from snapshot 1 to 3: file-1 and file-2 added before any deletion - // Incremental scan from snapshot 4 to 5: file-3 added after file-1 deletion occurred + // Incremental scan from snapshot 1 to 4: file-1 added and deleted, should cancel out (only file-2 remains). + // Incremental scan from snapshot 3 to 5: file-1 deleted but not added in range, produces Delete tasks. + // Incremental scan from snapshot 1 to 3: file-1 and file-2 added before any deletion. + // Incremental scan from snapshot 4 to 5: file-3 added after file-1 deletion occurred. let fixture = IncrementalTestFixture::new(vec![ // Snapshot 1: Empty starting point From d32babb1b94901b7ca91c4b92ee281ed1495a225 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:37:45 +0100 Subject: [PATCH 6/8] Fix upstream changes --- crates/iceberg/src/arrow/incremental.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index 3619d2a60f..0eeebfd5ef 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -238,6 +238,7 @@ async fn process_incremental_append_task( &task.base.data_file_path, file_io, true, + None, // arrow_reader_options ) .await?; @@ -248,6 +249,7 @@ async fn process_incremental_append_task( &task.schema_ref(), record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), + false, // use_fallback )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); From a096f894bb5d3cca949d728de491de9fbc1b1beb Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:38:33 +0100 Subject: [PATCH 7/8] Modify test --- crates/iceberg/src/scan/incremental/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 67737cb3c0..ef07e6da51 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1688,11 +1688,9 @@ async fn test_incremental_scan_positional_deletes_then_file_delete() { ], "data-1.parquet".to_string(), ), - // Snapshot 3: Delete all 3 records using positional deletes + // Snapshot 3: Delete one record using a positional delete Operation::Delete(vec![ (0, "data-1.parquet".to_string()), - (1, "data-1.parquet".to_string()), - (2, "data-1.parquet".to_string()), ]), // Snapshot 4: Delete the file entirely Operation::Overwrite( From 4fb6b14dc7f1392548dc7cf8c279315d7c354aab Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 4 Nov 2025 10:42:53 +0100 Subject: [PATCH 8/8] format --- crates/iceberg/src/scan/incremental/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index ef07e6da51..32f0078736 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1689,9 +1689,7 @@ async fn test_incremental_scan_positional_deletes_then_file_delete() { "data-1.parquet".to_string(), ), // Snapshot 3: Delete one record using a positional delete - Operation::Delete(vec![ - (0, "data-1.parquet".to_string()), - ]), + Operation::Delete(vec![(0, "data-1.parquet".to_string())]), // Snapshot 4: Delete the file entirely Operation::Overwrite( (vec![], "".to_string()), // No new data to add