diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 14b5124ee6..64df2b800f 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -35,11 +35,22 @@ enum EqDelState { } #[derive(Debug, Default)] -struct DeleteFileFilterState { +pub(crate) struct DeleteFileFilterState { delete_vectors: HashMap>>, equality_deletes: HashMap, } +impl DeleteFileFilterState { + pub fn delete_vectors(&self) -> &HashMap>> { + &self.delete_vectors + } + + /// Remove and return the delete vector for the given data file path. + pub fn remove_delete_vector(&mut self, path: &str) -> Option>> { + self.delete_vectors.remove(path) + } +} + #[derive(Clone, Debug, Default)] pub(crate) struct DeleteFilter { state: Arc>, @@ -65,6 +76,28 @@ impl DeleteFilter { .and_then(|st| st.delete_vectors.get(delete_file_path).cloned()) } + pub(crate) fn with_read(&self, f: F) -> Result + where F: FnOnce(&DeleteFileFilterState) -> Result { + let state = self.state.read().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to acquire read lock: {}", e), + ) + })?; + f(&state) + } + + pub(crate) fn with_write(&self, f: F) -> Result + where F: FnOnce(&mut DeleteFileFilterState) -> Result { + let mut state = self.state.write().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to acquire write lock: {}", e), + ) + })?; + f(&mut state) + } + pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option> { let mut state = self.state.write().unwrap(); diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs new file mode 100644 index 0000000000..ce1b6add5c --- /dev/null +++ b/crates/iceberg/src/arrow/incremental.rs @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use arrow_array::{RecordBatch, UInt64Array}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use futures::channel::mpsc::channel; +use futures::stream::select; +use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; +use crate::arrow::{ + ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH, + RESERVED_FIELD_ID_POS, StreamsInto, +}; +use crate::delete_vector::DeleteVector; +use crate::io::FileIO; +use crate::runtime::spawn; +use crate::scan::ArrowRecordBatchStream; +use crate::scan::incremental::{ + AppendedFileScanTask, IncrementalFileScanTask, IncrementalFileScanTaskStream, +}; +use crate::{Error, ErrorKind, Result}; + +/// Default batch size for incremental delete operations. +const DEFAULT_BATCH_SIZE: usize = 1024; + +/// Creates the schema for positional delete records containing the "pos" column. +/// The pos field includes the reserved field ID as metadata. +fn create_pos_delete_schema() -> Arc { + let pos_field = + Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_POS.to_string(), + ), + ])); + Arc::new(ArrowSchema::new(vec![pos_field])) +} + +/// The type of incremental batch: appended data or deleted records. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IncrementalBatchType { + /// Appended records. + Append, + /// Deleted records. + Delete, +} + +/// The stream of incremental Arrow `RecordBatch`es with batch type. +pub type CombinedIncrementalBatchRecordStream = + Pin> + Send + 'static>>; + +/// Stream type for obtaining a separate stream of appended and deleted record batches. +pub type UnzippedIncrementalBatchRecordStream = (ArrowRecordBatchStream, ArrowRecordBatchStream); + +impl StreamsInto + for IncrementalFileScanTaskStream +{ + /// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns a + /// stream of Arrow `RecordBatch`es containing the data from the files. + fn stream(self, reader: ArrowReader) -> Result { + let (appends, deletes) = + StreamsInto::::stream(self, reader)?; + + let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch))); + let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch))); + + Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream) + } +} + +impl StreamsInto + for IncrementalFileScanTaskStream +{ + /// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns two + /// separate streams of Arrow `RecordBatch`es containing appended data and deleted records. + fn stream(self, reader: ArrowReader) -> Result { + let (appends_tx, appends_rx) = channel(reader.concurrency_limit_data_files); + let (deletes_tx, deletes_rx) = channel(reader.concurrency_limit_data_files); + + let batch_size = reader.batch_size; + let concurrency_limit_data_files = reader.concurrency_limit_data_files; + + spawn(async move { + let _ = self + .try_for_each_concurrent(concurrency_limit_data_files, |task| { + let file_io = reader.file_io.clone(); + let mut appends_tx = appends_tx.clone(); + let mut deletes_tx = deletes_tx.clone(); + async move { + match task { + IncrementalFileScanTask::Append(append_task) => { + spawn(async move { + let record_batch_stream = process_incremental_append_task( + append_task, + batch_size, + file_io, + ) + .await; + + match record_batch_stream { + Ok(stream) => { + // Process batches with parallelism for CPU-heavy operations + let _: Vec<_> = stream + .map(|batch_result| { + let mut appends_tx = appends_tx.clone(); + spawn(async move { + let result = batch_result.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read appended record batch", + ) + .with_source(e) + }); + let _ = appends_tx.send(result).await; + }) + }) + .buffer_unordered(concurrency_limit_data_files) + .collect() + .await; + } + Err(e) => { + let _ = appends_tx.send(Err(e)).await; + } + } + }) + .await + } + IncrementalFileScanTask::Delete(deleted_file_task) => { + spawn(async move { + let file_path = deleted_file_task.data_file_path().to_string(); + let total_records = deleted_file_task.base.record_count.unwrap_or(0); + + let record_batch_stream = process_incremental_deleted_file_task( + file_path, + total_records, + batch_size, + ); + + match record_batch_stream { + Ok(stream) => { + // Process batches with parallelism for CPU-heavy operations + let _: Vec<_> = stream + .map(|batch_result| { + let mut deletes_tx = deletes_tx.clone(); + spawn(async move { + let result = batch_result.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read deleted file record batch", + ) + .with_source(e) + }); + let _ = deletes_tx.send(result).await; + }) + }) + .buffer_unordered(concurrency_limit_data_files) + .collect() + .await; + } + Err(e) => { + let _ = deletes_tx.send(Err(e)).await; + } + } + }) + .await + } + IncrementalFileScanTask::PositionalDeletes( + file_path, + delete_vector, + ) => { + spawn(async move { + let record_batch_stream = process_incremental_delete_task( + file_path, + delete_vector, + batch_size, + ); + + match record_batch_stream { + Ok(stream) => { + // Process batches with parallelism for CPU-heavy operations + let _: Vec<_> = stream + .map(|batch_result| { + let mut deletes_tx = deletes_tx.clone(); + spawn(async move { + let result = batch_result.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read deleted record batch", + ) + .with_source(e) + }); + let _ = deletes_tx.send(result).await; + }) + }) + .buffer_unordered(concurrency_limit_data_files) + .collect() + .await; + } + Err(e) => { + let _ = deletes_tx.send(Err(e)).await; + } + } + }) + .await + } + }; + + Ok(()) + } + }) + .await; + }); + + Ok(( + Box::pin(appends_rx) as ArrowRecordBatchStream, + Box::pin(deletes_rx) as ArrowRecordBatchStream, + )) + } +} + +async fn process_incremental_append_task( + task: AppendedFileScanTask, + batch_size: Option, + file_io: FileIO, +) -> Result { + let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder( + &task.base.data_file_path, + file_io, + true, + None, // arrow_reader_options + ) + .await?; + + // Create a projection mask for the batch stream to select which columns in the + // Parquet file that we want in the response + let projection_mask = ArrowReader::get_arrow_projection_mask( + &task.base.project_field_ids, + &task.schema_ref(), + record_batch_stream_builder.parquet_schema(), + record_batch_stream_builder.schema(), + false, // use_fallback + )?; + record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + + // RecordBatchTransformer performs any transformations required on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering + let mut record_batch_transformer = + RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids).build(); + + if let Some(batch_size) = batch_size { + record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); + } + + // Apply positional deletes as row selections. + let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes { + Some(ArrowReader::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &None, + &positional_delete_indexes.lock().unwrap(), + )?) + } else { + None + }; + + if let Some(row_selection) = row_selection { + record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection); + } + + // Build the batch stream and send all the RecordBatches that it generates + // to the requester. + let record_batch_stream = record_batch_stream_builder + .build()? + .map(move |batch| match batch { + Ok(batch) => record_batch_transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) +} + +fn process_incremental_delete_task( + file_path: String, + delete_vector: DeleteVector, + batch_size: Option, +) -> Result { + let schema = create_pos_delete_schema(); + + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + let treemap = delete_vector.inner; + + let stream = futures::stream::iter(treemap) + .chunks(batch_size) + .map(move |chunk| { + let array = UInt64Array::from_iter(chunk); + RecordBatch::try_new( + Arc::clone(&schema), // Cheap Arc clone instead of full schema creation + vec![Arc::new(array)], + ) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RecordBatch for DeleteVector", + ) + }) + .and_then(|batch| { + ArrowReader::add_file_path_column( + batch, + &file_path, + RESERVED_COL_NAME_FILE_PATH, + RESERVED_FIELD_ID_FILE_PATH, + ) + }) + }); + + Ok(Box::pin(stream) as ArrowRecordBatchStream) +} + +fn process_incremental_deleted_file_task( + file_path: String, + total_records: u64, + batch_size: Option, +) -> Result { + let schema = create_pos_delete_schema(); + + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + // Create a stream of position values from 0 to total_records-1 (0-indexed) + let stream = futures::stream::iter(0..total_records) + .chunks(batch_size) + .map(move |chunk| { + let array = UInt64Array::from_iter(chunk); + RecordBatch::try_new( + Arc::clone(&schema), // Cheap Arc clone instead of full schema creation + vec![Arc::new(array)], + ) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RecordBatch for deleted file", + ) + }) + .and_then(|batch| { + ArrowReader::add_file_path_column( + batch, + &file_path, + RESERVED_COL_NAME_FILE_PATH, + RESERVED_FIELD_ID_FILE_PATH, + ) + }) + }); + + Ok(Box::pin(stream) as ArrowRecordBatchStream) +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..db85cd5730 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -33,6 +33,8 @@ pub mod record_batch_projector; pub(crate) mod record_batch_transformer; mod value; +mod incremental; +pub use incremental::*; pub use reader::*; pub use value::*; /// Partition value calculator for computing partition values diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ab5a96f751..f4d4b8981c 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -23,17 +23,21 @@ use std::str::FromStr; use std::sync::Arc; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; -use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar, + StringArray, +}; use arrow_cast::cast::cast; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ - ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; +use futures::channel::mpsc::channel; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -54,11 +58,34 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; +use crate::runtime::spawn; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Reserved field ID for the file path (_file) column per Iceberg spec +/// This is dead code for now but will be used when we add the _file column support. +#[allow(dead_code)] +pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + +/// Column name for the file path metadata column per Iceberg spec +/// This is dead code for now but will be used when we add the _file column support. +#[allow(dead_code)] +pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Reserved field ID for the file path column used in delete file reading. +pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; + +/// Column name for the file path metadata column used in delete file reading. +pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path"; + +/// Reserved field ID for the position column used in delete file reading. +pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483545; + +/// Column name for the position metadata column used in delete file reading. +pub(crate) const RESERVED_COL_NAME_POS: &str = "pos"; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -126,47 +153,100 @@ impl ArrowReaderBuilder { /// Reads data from Parquet files #[derive(Clone)] pub struct ArrowReader { - batch_size: Option, - file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, + pub(crate) batch_size: Option, + pub(crate) file_io: FileIO, + pub(crate) delete_file_loader: CachingDeleteFileLoader, /// the maximum number of data files that can be fetched at the same time - concurrency_limit_data_files: usize, + pub(crate) concurrency_limit_data_files: usize, - row_group_filtering_enabled: bool, - row_selection_enabled: bool, + pub(crate) row_group_filtering_enabled: bool, + pub(crate) row_selection_enabled: bool, +} + +/// Trait indicating that the implementing type streams into a stream of type `S` using +/// a reader of type `R`. +pub trait StreamsInto { + /// Stream from the reader and produce a stream of type `S`. + fn stream(self, reader: R) -> Result; } impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. - /// Returns a stream of Arrow RecordBatches containing the data from the files + /// Returns a stream of Arrow RecordBatches containing the data from the files. + /// + /// This implementation provides both file-level and batch-level parallelism: + /// - Multiple files are processed in parallel (IO-heavy operations) + /// - Multiple batches are processed in parallel across all files (CPU-heavy operations) pub fn read(self, tasks: FileScanTaskStream) -> Result { - let file_io = self.file_io.clone(); + let (tx, rx) = channel(self.concurrency_limit_data_files); + + let file_io = self.file_io; let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let delete_file_loader = self.delete_file_loader; + + // Outer spawn for coordination - runs the entire processing pipeline in background + spawn(async move { + let _ = tasks + .try_for_each_concurrent(concurrency_limit_data_files, |task| { + let file_io = file_io.clone(); + let delete_file_loader = delete_file_loader.clone(); + let mut tx = tx.clone(); + + async move { + // Inner spawn for IO-heavy file operations (parallel file processing) + spawn(async move { + let record_batch_stream = Self::process_file_scan_task( + task, + batch_size, + file_io, + delete_file_loader, + row_group_filtering_enabled, + row_selection_enabled, + ) + .await; + + match record_batch_stream { + Ok(stream) => { + // Process batches with parallelism for CPU-heavy operations + // Each batch gets its own spawned task, enabling true parallel processing + let _: Vec<_> = stream + .map(|batch_result| { + let mut tx = tx.clone(); + spawn(async move { + // CPU-heavy batch processing happens here in parallel + let result = batch_result.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read record batch", + ) + .with_source(e) + }); + + let _ = tx.send(result).await; + }) + }) + .buffer_unordered(concurrency_limit_data_files) + .collect() + .await; + } + Err(e) => { + let _ = tx.send(Err(e)).await; + } + } + }) + .await; + + Ok(()) + } + }) + .await; + }); - let stream = tasks - .map_ok(move |task| { - let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - ) - }) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) - }) - .try_buffer_unordered(concurrency_limit_data_files) - .try_flatten_unordered(concurrency_limit_data_files); - - Ok(Box::pin(stream) as ArrowRecordBatchStream) + Ok(Box::pin(rx) as ArrowRecordBatchStream) } #[allow(clippy::too_many_arguments)] @@ -451,7 +531,7 @@ impl ArrowReader { /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated /// as having been deleted by a positional delete, taking into account any row groups that have /// been skipped entirely by the filter predicate - fn build_deletes_row_selection( + pub(crate) fn build_deletes_row_selection( row_group_metadata_list: &[RowGroupMetaData], selected_row_groups: &Option>, positional_deletes: &DeleteVector, @@ -565,6 +645,103 @@ impl ArrowReader { Ok(results.into()) } + /// Helper function to add a `_file` column to a RecordBatch. + /// Takes the array and field to add, reducing code duplication. + fn create_file_field( + batch: RecordBatch, + file_array: ArrayRef, + file_field: Field, + field_id: i32, + ) -> Result { + let mut columns = batch.columns().to_vec(); + columns.push(file_array); + + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + fields.push(Arc::new(file_field.with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )])))); + + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to add _file column to RecordBatch", + ) + .with_source(e) + }) + } + + /// Adds a `_file` column to the RecordBatch containing the file path. + /// Uses Run-End Encoding (REE) for maximum memory efficiency when the same + /// file path is repeated across all rows. + /// Note: This is only used in tests for now, for production usage we use the + /// non-REE version as it is Julia-compatible. + #[allow(dead_code)] + pub(crate) fn add_file_path_column_ree( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + ) -> Result { + let num_rows = batch.num_rows(); + + // Use Run-End Encoded array for optimal memory efficiency + // For a constant value repeated num_rows times, this stores: + // - run_ends: [num_rows] (one i32) for non-empty batches, or [] for empty batches + // - values: [file_path] (one string) for non-empty batches, or [] for empty batches + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::<&str>::new()) + } else { + StringArray::from(vec![file_path]) + }; + + let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for _file column", + ) + .with_source(e) + })?; + + // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE + // DataType is RunEndEncoded with Int32 run ends and Utf8 values + // Note: values field is nullable to match what RunArray::try_new(..) expects. + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + let file_field = Field::new( + field_name, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ); + + Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + } + + /// Adds a `_file` column to the RecordBatch containing the file path. + /// Materializes the file path string for each row (no compression). + pub(crate) fn add_file_path_column( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + ) -> Result { + let num_rows = batch.num_rows(); + + // Create a StringArray with the file path repeated num_rows times + let file_array = StringArray::from(vec![file_path; num_rows]); + + // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE + let file_field = Field::new(field_name, DataType::Utf8, false); + + Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + } + fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, @@ -608,7 +785,7 @@ impl ArrowReader { } } - fn get_arrow_projection_mask( + pub(crate) fn get_arrow_projection_mask( field_ids: &[i32], iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, @@ -1739,6 +1916,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use as_any::Downcast; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::{ArrowWriter, ProjectionMask}; @@ -1752,7 +1930,9 @@ mod tests { use crate::ErrorKind; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; - use crate::arrow::{ArrowReader, ArrowReaderBuilder}; + use crate::arrow::{ + ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, + }; use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; @@ -2563,6 +2743,321 @@ message schema { assert!(col_b.is_null(2)); } + #[test] + fn test_add_file_path_column_ree() { + use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + // Create a simple test batch with 2 columns and 3 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) + .unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Add file path column with REE + let file_path = "/path/to/data/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok(), "Should successfully add file path column"); + + let new_batch = result.unwrap(); + + // Verify the new batch has 3 columns + assert_eq!(new_batch.num_columns(), 3); + assert_eq!(new_batch.num_rows(), 3); + + // Verify schema has the _file column + let schema = new_batch.schema(); + assert_eq!(schema.fields().len(), 3); + + let file_field = schema.field(2); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert!(!file_field.is_nullable()); + + // Verify the field has the correct metadata + let metadata = file_field.metadata(); + assert_eq!( + metadata.get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the data type is RunEndEncoded + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.name(), "run_ends"); + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert!(!run_ends_field.is_nullable()); + + assert_eq!(values_field.name(), "values"); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify the original columns are intact + let id_col = new_batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + let name_col = new_batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + + // Verify the file path column contains the correct value + // The _file column is a RunArray, so we need to decode it + let file_col = new_batch.column(2); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("Expected RunArray for _file column"); + + // Verify the run array structure (should be optimally encoded) + let run_ends = run_array.run_ends(); + assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end"); + assert_eq!( + run_ends.values()[0], + new_batch.num_rows() as i32, + "Run end should equal number of rows" + ); + + // Check that the single value in the RunArray is the expected file path + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have only 1 value"); + assert_eq!(string_values.value(0), file_path); + + assert!( + string_values + .downcast_ref::() + .unwrap() + .iter() + .all(|v| v == Some(file_path)) + ) + } + + #[test] + fn test_add_file_path_column_ree_empty_batch() { + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + // Create an empty batch + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = arrow_array::Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + assert_eq!(batch.num_rows(), 0); + + // Add file path column to empty batch with REE + let file_path = "/empty/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + + // Should succeed with empty RunArray for empty batches + assert!(result.is_ok()); + let new_batch = result.unwrap(); + assert_eq!(new_batch.num_rows(), 0); + assert_eq!(new_batch.num_columns(), 2); + + // Verify the _file column exists with correct schema + let schema = new_batch.schema(); + let file_field = schema.field(1); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + + // Should use RunEndEncoded even for empty batches + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify metadata with reserved field ID + assert_eq!( + file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the file path column is empty but properly structured + let file_path_column = new_batch.column(1); + assert_eq!(file_path_column.len(), 0); + } + + #[test] + fn test_add_file_path_column_special_characters() { + use arrow_array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = Int32Array::from(vec![42]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + // Test with file path containing special characters (materialized version) + let file_path = "/path/with spaces/and-dashes/file_name.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok()); + + let new_batch = result.unwrap(); + let file_col = new_batch.column(1); + + // Verify the file path is correctly stored as a materialized StringArray + let str_arr = file_col.as_string::(); + assert_eq!(str_arr.value(0), file_path); + } + + #[test] + fn test_add_file_path_column() { + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + // Create a simple test batch with 2 columns and 3 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) + .unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Add file path column with materialization + let file_path = "/path/to/data/file.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok(), "Should successfully add file path column"); + + let new_batch = result.unwrap(); + + // Verify the new batch has 3 columns + assert_eq!(new_batch.num_columns(), 3); + assert_eq!(new_batch.num_rows(), 3); + + // Verify schema has the _file column + let schema = new_batch.schema(); + assert_eq!(schema.fields().len(), 3); + + let file_field = schema.field(2); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert!(!file_field.is_nullable()); + + // Verify the field has the correct metadata + let metadata = file_field.metadata(); + assert_eq!( + metadata.get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the data type is Utf8 (materialized strings) + assert_eq!(file_field.data_type(), &DataType::Utf8); + + // Verify the original columns are intact + let id_col = new_batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + let name_col = new_batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + + // Verify the file path column contains the correct value for all rows + let file_col = new_batch.column(2).as_string::(); + for i in 0..new_batch.num_rows() { + assert_eq!(file_col.value(i), file_path); + } + } + + #[test] + fn test_add_file_path_column_empty_batch() { + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + // Create an empty batch + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = arrow_array::Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + assert_eq!(batch.num_rows(), 0); + + // Add file path column to empty batch (materialized version) + let file_path = "/empty/file.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + + // Should succeed with empty StringArray + assert!(result.is_ok()); + let new_batch = result.unwrap(); + assert_eq!(new_batch.num_rows(), 0); + assert_eq!(new_batch.num_columns(), 2); + + // Verify the _file column exists with correct schema + let schema = new_batch.schema(); + let file_field = schema.field(1); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + + // Should use Utf8 (materialized strings) + assert_eq!(file_field.data_type(), &DataType::Utf8); + + // Verify metadata with reserved field ID + assert_eq!( + file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the file path column is empty but properly structured + let file_path_column = new_batch.column(1); + assert_eq!(file_path_column.len(), 0); + } + /// Test for bug where position deletes in later row groups are not applied correctly. /// /// When a file has multiple row groups and a position delete targets a row in a later diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4f6fd28483..3a3dfbb529 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -107,6 +107,26 @@ impl DeleteFileIndex { _ => unreachable!("Cannot be any other state than loaded"), } } + + pub(crate) async fn positional_deletes(&self) -> Vec { + let notifier = { + let guard = self.state.read().unwrap(); + match *guard { + DeleteFileIndexState::Populating(ref notifier) => notifier.clone(), + DeleteFileIndexState::Populated(ref index) => { + return index.positional_deletes(); + } + } + }; + + notifier.notified().await; + + let guard = self.state.read().unwrap(); + match guard.deref() { + DeleteFileIndexState::Populated(index) => index.positional_deletes(), + _ => unreachable!("Cannot be any other state than loaded"), + } + } } impl PopulatedDeleteFileIndex { @@ -210,6 +230,14 @@ impl PopulatedDeleteFileIndex { results } + + fn positional_deletes(&self) -> Vec { + self.pos_deletes_by_partition + .values() + .flatten() + .map(|ctx| ctx.as_ref().into()) + .collect() + } } #[cfg(test)] diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index f382bf079e..1040796034 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -23,9 +23,9 @@ use roaring::treemap::BitmapIter; use crate::{Error, ErrorKind, Result}; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct DeleteVector { - inner: RoaringTreemap, + pub inner: RoaringTreemap, } impl DeleteVector { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..9c9c7460fb 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -97,3 +97,6 @@ pub mod writer; mod delete_vector; pub mod puffin; + +/// Utility functions and modules. +pub mod util; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index fe3f5c8f7e..e87dba4a1f 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -33,19 +33,23 @@ use crate::spec::{ }; use crate::{Error, ErrorKind, Result}; +pub(crate) type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync; + /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { - manifest_file: ManifestFile, + pub manifest_file: ManifestFile, - sender: Sender, + pub sender: Sender, - field_ids: Arc>, - bound_predicates: Option>, - object_cache: Arc, - snapshot_schema: SchemaRef, - expression_evaluator_cache: Arc, - delete_file_index: DeleteFileIndex, + pub field_ids: Arc>, + pub bound_predicates: Option>, + pub object_cache: Arc, + pub snapshot_schema: SchemaRef, + pub expression_evaluator_cache: Arc, + pub delete_file_index: DeleteFileIndex, + + pub filter_fn: Option>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -74,12 +78,15 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, + filter_fn, .. } = self; + let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true)); + let manifest = object_cache.get_manifest(&manifest_file).await?; - for manifest_entry in manifest.entries() { + for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) { let manifest_entry_context = ManifestEntryContext { // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), @@ -196,7 +203,6 @@ impl PlanContext { ) -> Result> + 'static>> { let manifest_files = manifest_list.entries().iter(); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { @@ -231,6 +237,7 @@ impl PlanContext { partition_bound_predicate, tx, delete_file_idx.clone(), + None, ); filtered_mfcs.push(Ok(mfc)); @@ -245,6 +252,7 @@ impl PlanContext { partition_filter: Option>, sender: Sender, delete_file_index: DeleteFileIndex, + filter_fn: Option>, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -267,6 +275,7 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + filter_fn, } } } diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs new file mode 100644 index 0000000000..fbd9219d5c --- /dev/null +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use futures::channel::mpsc::Sender; + +use crate::Result; +use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::delete_file_index::DeleteFileIndex; +use crate::io::object_cache::ObjectCache; +use crate::scan::ExpressionEvaluatorCache; +use crate::scan::context::{ManifestEntryContext, ManifestEntryFilterFn, ManifestFileContext}; +use crate::spec::{ + ManifestContentType, ManifestEntryRef, ManifestFile, SchemaRef, SnapshotRef, TableMetadataRef, +}; + +#[derive(Debug)] +pub(crate) struct IncrementalPlanContext { + /// The snapshots involved in the incremental scan. + pub snapshots: Vec, + + /// The snapshot to start the incremental scan from. + pub from_snapshot: SnapshotRef, + + /// The metadata of the table being scanned. + pub table_metadata: TableMetadataRef, + + /// The schema of the snapshot to end the incremental scan at. + pub to_snapshot_schema: SchemaRef, + + /// The object cache to use for the scan. + pub object_cache: Arc, + + /// The field IDs to scan. + pub field_ids: Arc>, + + /// The expression evaluator cache to use for the scan. + pub expression_evaluator_cache: Arc, + + /// The caching delete file loader to use for the scan. + pub caching_delete_file_loader: CachingDeleteFileLoader, +} + +impl IncrementalPlanContext { + pub(crate) async fn build_manifest_file_contexts( + &self, + tx_data: Sender, + delete_file_idx: DeleteFileIndex, + delete_file_tx: Sender, + ) -> Result> + 'static>> { + // Collect all snapshot IDs (all operation types are supported) + let snapshot_ids: HashSet = self.snapshots.iter().map(|s| s.snapshot_id()).collect(); + + let (manifest_files, filter_fn) = { + let mut manifest_files = HashSet::::new(); + for snapshot in self.snapshots.iter() { + let manifest_list = self + .object_cache + .get_manifest_list(snapshot, &self.table_metadata) + .await?; + for entry in manifest_list.entries() { + if !snapshot_ids.contains(&entry.added_snapshot_id) { + continue; + } + manifest_files.insert(entry.clone()); + } + } + let filter_fn: Option> = + Some(Arc::new(move |entry: &ManifestEntryRef| { + entry + .snapshot_id() + .map(|id| snapshot_ids.contains(&id)) + .unwrap_or(true) // Include entries without `snapshot_id`. + })); + + (manifest_files, filter_fn) + }; + + let mut mfcs = vec![]; + for manifest_file in &manifest_files { + let tx = if manifest_file.content == ManifestContentType::Deletes { + delete_file_tx.clone() + } else { + tx_data.clone() + }; + + let mfc = ManifestFileContext { + manifest_file: manifest_file.clone(), + bound_predicates: None, + sender: tx, + object_cache: self.object_cache.clone(), + snapshot_schema: self.to_snapshot_schema.clone(), + field_ids: self.field_ids.clone(), + expression_evaluator_cache: self.expression_evaluator_cache.clone(), + delete_file_index: delete_file_idx.clone(), + filter_fn: filter_fn.clone(), + }; + + mfcs.push(Ok(mfc)); + } + + Ok(Box::new(mfcs.into_iter())) + } +} diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs new file mode 100644 index 0000000000..e074f49c98 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -0,0 +1,690 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Incremental table scan implementation. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::delete_filter::DeleteFilter; +use crate::arrow::{ + ArrowReaderBuilder, CombinedIncrementalBatchRecordStream, StreamsInto, + UnzippedIncrementalBatchRecordStream, +}; +use crate::delete_file_index::DeleteFileIndex; +use crate::io::FileIO; +use crate::scan::DeleteFileContext; +use crate::scan::cache::ExpressionEvaluatorCache; +use crate::scan::context::ManifestEntryContext; +use crate::spec::{DataContentType, ManifestStatus, Snapshot, SnapshotRef}; +use crate::table::Table; +use crate::util::snapshot::ancestors_between; +use crate::utils::available_parallelism; +use crate::{Error, ErrorKind, Result}; + +mod context; +use context::*; +mod task; +use futures::channel::mpsc::{Sender, channel}; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use itertools::Itertools; +pub use task::*; + +use crate::runtime::spawn; + +/// Builder for an incremental table scan. +#[derive(Debug)] +pub struct IncrementalTableScanBuilder<'a> { + table: &'a Table, + // Defaults to `None`, which means all columns. + column_names: Option>, + // None means scan from the first snapshot (inclusive) + from_snapshot_id: Option, + // None means scan to the current/last snapshot + to_snapshot_id: Option, + batch_size: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, +} + +impl<'a> IncrementalTableScanBuilder<'a> { + pub(crate) fn new( + table: &'a Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + ) -> Self { + let num_cpus = available_parallelism().get(); + Self { + table, + column_names: None, + from_snapshot_id, + to_snapshot_id, + batch_size: None, + concurrency_limit_data_files: num_cpus, + concurrency_limit_manifest_entries: num_cpus, + concurrency_limit_manifest_files: num_cpus, + } + } + + /// Set the batch size for reading data files. + pub fn with_batch_size(mut self, batch_size: Option) -> Self { + self.batch_size = batch_size; + self + } + + /// Select all columns of the table. + pub fn select_all(mut self) -> Self { + self.column_names = None; + self + } + + /// Select no columns of the table. + pub fn select_empty(mut self) -> Self { + self.column_names = Some(vec![]); + self + } + + /// Select some columns of the table. + pub fn select(mut self, column_names: impl IntoIterator) -> Self { + self.column_names = Some( + column_names + .into_iter() + .map(|item| item.to_string()) + .collect(), + ); + self + } + + /// Set the `from_snapshot_id` for the incremental scan. + pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { + self.from_snapshot_id = Some(from_snapshot_id); + self + } + + /// Set the `to_snapshot_id` for the incremental scan. + pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self { + self.to_snapshot_id = Some(to_snapshot_id); + self + } + + /// Set the concurrency limit for reading data files. + pub fn with_concurrency_limit_data_files(mut self, limit: usize) -> Self { + self.concurrency_limit_data_files = limit; + self + } + + /// Set the concurrency limit for reading manifest entries. + pub fn with_concurrency_limit_manifest_entries(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_entries = limit; + self + } + + /// Set the concurrency limit for reading manifest files. + pub fn with_concurrency_limit_manifest_files(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_files = limit; + self + } + + /// Build the incremental table scan. + pub fn build(self) -> Result { + let metadata = self.table.metadata(); + + // Resolve to_snapshot_id: if None, use the current (latest) snapshot + let to_snapshot_id = if let Some(id) = self.to_snapshot_id { + id + } else { + metadata + .current_snapshot() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "No current snapshot found"))? + .snapshot_id() + }; + + let snapshot_to: Arc = metadata + .snapshot_by_id(to_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", to_snapshot_id), + ) + })? + .clone(); + + // Determine oldest_snapshot_id for ancestors_between + // If from was None, include root snapshot by passing None to ancestors_between + // If from was Some(id), exclude that snapshot by passing Some(id) + let oldest_snapshot_id = if let Some(from_id) = self.from_snapshot_id { + // Validate the from snapshot exists + let _ = metadata.snapshot_by_id(from_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", from_id), + ) + })?; + Some(from_id) + } else { + None + }; + + let snapshots = ancestors_between( + &self.table.metadata_ref(), + snapshot_to.snapshot_id(), + oldest_snapshot_id, + ) + .collect_vec(); + + // Get the from_snapshot for the plan context + // This is either the user-specified snapshot or the root snapshot + let from_snapshot_id = oldest_snapshot_id.unwrap_or_else(|| { + snapshots + .last() + .map(|s| s.snapshot_id()) + .expect("snapshots should not be empty") + }); + + let snapshot_from: Arc = metadata + .snapshot_by_id(from_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", from_snapshot_id), + ) + })? + .clone(); + + if !snapshots.is_empty() { + assert_eq!( + snapshots.first().map(|s| s.snapshot_id()), + Some(snapshot_to.snapshot_id()) + ); + } + + let schema = snapshot_to.schema(self.table.metadata())?; + + if let Some(column_names) = self.column_names.as_ref() { + for column_name in column_names { + if schema.field_by_name(column_name).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Column {} not found in table. Schema: {}", + column_name, schema + ), + )); + } + } + } + + let mut field_ids = vec![]; + let column_names = self.column_names.clone().unwrap_or_else(|| { + schema + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + for column_name in column_names.iter() { + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Column {} not found in table. Schema: {}", + column_name, schema + ), + ) + })?; + + schema + .as_struct() + .field_by_id(field_id) + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", + column_name, schema + ), + ) + })?; + + field_ids.push(field_id); + } + + let plan_context = IncrementalPlanContext { + snapshots, + from_snapshot: snapshot_from, + table_metadata: self.table.metadata_ref(), + to_snapshot_schema: schema, + object_cache: self.table.object_cache().clone(), + field_ids: Arc::new(field_ids), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + caching_delete_file_loader: CachingDeleteFileLoader::new( + self.table.file_io().clone(), + self.concurrency_limit_data_files, + ), + }; + + Ok(IncrementalTableScan { + plan_context, + file_io: self.table.file_io().clone(), + column_names: Some(column_names), + batch_size: self.batch_size, + concurrency_limit_data_files: self.concurrency_limit_data_files, + concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, + }) + } +} + +/// An incremental table scan. +#[derive(Debug)] +pub struct IncrementalTableScan { + plan_context: IncrementalPlanContext, + file_io: FileIO, + column_names: Option>, + batch_size: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, +} + +impl IncrementalTableScan { + /// Returns the `from` snapshot of this incremental table scan. + pub fn snapshot_from(&self) -> &SnapshotRef { + &self.plan_context.from_snapshot + } + + /// Returns the snapshots involved in this incremental table scan. + pub fn snapshots(&self) -> &[SnapshotRef] { + &self.plan_context.snapshots + } + + /// Returns the `to` snapshot of this incremental table scan. + pub fn snapshot_to(&self) -> &SnapshotRef { + self.snapshots() + .first() + .expect("There is always at least one snapshot") + } + + /// Returns the selected column names of this incremental table scan. + /// If `None`, all columns are selected. + pub fn column_names(&self) -> Option<&[String]> { + self.column_names.as_deref() + } + + /// Plans the files to be read in this incremental table scan. + pub async fn plan_files(&self) -> Result { + let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; + let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; + + // Used to stream `ManifestEntryContexts` between stages of the planning operation. + let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = + channel(concurrency_limit_manifest_files); + let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = + channel(concurrency_limit_manifest_files); + + // Used to stream the results back to the caller. + let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + + let manifest_file_contexts = self + .plan_context + .build_manifest_file_contexts( + manifest_entry_data_ctx_tx, + delete_file_idx.clone(), + manifest_entry_delete_ctx_tx, + ) + .await?; + + let mut channel_for_manifest_error: Sender> = file_scan_task_tx.clone(); + + // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s + spawn(async move { + let result = futures::stream::iter(manifest_file_contexts) + .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { + ctx.fetch_manifest_and_stream_manifest_entries().await + }) + .await; + + if let Err(error) = result { + let _ = channel_for_manifest_error.send(Err(error)).await; + } + }); + + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel. Builds the delete + // index below. + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(tx, manifest_entry_context).await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + + // TODO: Streaming this into the delete index seems somewhat redundant, as we + // could directly stream into the CachingDeleteFileLoader and instantly load the + // delete files. + let positional_deletes = delete_file_idx.positional_deletes().await; + let result = self + .plan_context + .caching_delete_file_loader + .load_deletes( + &positional_deletes, + self.plan_context.to_snapshot_schema.clone(), + ) + .await; + + // Build the delete filter from the loaded deletes. + let delete_filter = match result { + Ok(loaded_deletes) => loaded_deletes.unwrap(), + Err(e) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Failed to load positional deletes: {}", e), + )); + } + }; + + // Process the data file [`ManifestEntry`] stream in parallel + let filter = delete_filter.clone(); + spawn(async move { + let result = manifest_entry_data_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let filter = filter.clone(); + async move { + if manifest_entry_context.manifest_entry.status() + == ManifestStatus::Added + { + spawn(async move { + Self::process_data_manifest_entry( + tx, + manifest_entry_context, + &filter, + ) + .await + }) + .await + } else if manifest_entry_context.manifest_entry.status() + == ManifestStatus::Deleted + { + spawn(async move { + Self::process_deleted_data_manifest_entry( + tx, + manifest_entry_context, + ) + .await + }) + .await + } else { + Ok(()) + } + } + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + } + }); + + // Collect all tasks from manifest processing. + let all_tasks = file_scan_task_rx.try_collect::>().await?; + + // Separate tasks by type and compute file path sets in a single pass + let mut append_tasks = Vec::new(); + let mut delete_tasks = Vec::new(); + let mut appended_files = HashSet::new(); + let mut deleted_files = HashSet::new(); + + for task in all_tasks { + match task { + IncrementalFileScanTask::Append(append_task) => { + appended_files.insert(append_task.data_file_path().to_string()); + append_tasks.push(append_task); + } + IncrementalFileScanTask::Delete(delete_task) => { + deleted_files.insert(delete_task.data_file_path().to_string()); + delete_tasks.push(delete_task); + } + _ => {} + } + } + + // Build final task list with net changes + // We filter out tasks for files that appear in both sets (cancelled out) + let mut final_tasks: Vec = Vec::new(); + + // Add net append tasks (only files not in deleted_files) + for append_task in append_tasks { + if !deleted_files.contains(append_task.data_file_path()) { + final_tasks.push(IncrementalFileScanTask::Append(append_task)); + } + } + + // Add net delete tasks (only files not in appended_files) + for delete_task in delete_tasks { + if !appended_files.contains(delete_task.data_file_path()) { + final_tasks.push(IncrementalFileScanTask::Delete(delete_task)); + } + } + + // Add positional delete tasks (only for files that haven't been deleted) + let positional_delete_paths: Vec = delete_filter.with_read(|state| { + Ok(state + .delete_vectors() + .keys() + .filter(|path| { + // Only include positional deletes for files that were not appended in + // this range and not deleted. + !appended_files.contains::(path) && !deleted_files.contains::(path) + }) + .cloned() + .collect()) + })?; + + // Now remove and take ownership of each delete vector + for path in positional_delete_paths { + let delete_vector_arc = delete_filter.with_write(|state| { + state.remove_delete_vector(&path).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!("DeleteVector for path {} not found", path), + ) + }) + })?; + + // Try to unwrap the Arc to avoid cloning the DeleteVector + let delete_vector_inner = Arc::try_unwrap(delete_vector_arc) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "DeleteVector Arc has multiple references, cannot take ownership", + ) + })? + .into_inner() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to unwrap DeleteVector Mutex") + .with_source(e) + })?; + + let positional_delete_task = + IncrementalFileScanTask::PositionalDeletes(path, delete_vector_inner); + final_tasks.push(positional_delete_task); + } + + // We actually would not need a stream here, but we can keep it compatible with + // other scan types. + Ok(futures::stream::iter(final_tasks).map(Ok).boxed()) + } + + /// Returns an [`CombinedIncrementalBatchRecordStream`] for this incremental table scan. + pub async fn to_arrow(&self) -> Result { + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true); + + if let Some(batch_size) = self.batch_size { + arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); + } + + let arrow_reader = arrow_reader_builder.build(); + let file_scan_task_stream = self.plan_files().await?; + file_scan_task_stream.stream(arrow_reader) + } + + /// Returns an [`UnzippedIncrementalBatchRecordStream`] for this incremental table scan. + /// This stream will yield separate streams for appended and deleted record batches. + pub async fn to_unzipped_arrow(&self) -> Result { + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true); + + if let Some(batch_size) = self.batch_size { + arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); + } + + let arrow_reader = arrow_reader_builder.build(); + let file_scan_task_stream = self.plan_files().await?; + file_scan_task_stream.stream(arrow_reader) + } + + async fn process_delete_manifest_entry( + mut delete_file_ctx_tx: Sender, + manifest_entry_context: ManifestEntryContext, + ) -> Result<()> { + // Abort the plan if we encounter a manifest entry for a data file or equality + // deletes. + if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a data file in a delete file manifest", + )); + } else if manifest_entry_context.manifest_entry.content_type() + == DataContentType::EqualityDeletes + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Equality deletes are not supported yet in incremental scans", + )); + } + + // Abort if it has been marked as deleted. + if !manifest_entry_context.manifest_entry.is_alive() + && manifest_entry_context.manifest_entry.content_type() + == DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Processing deleted (position) delete files is not supported yet in incremental scans", + )); + } + + delete_file_ctx_tx + .send(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + }) + .await?; + Ok(()) + } + + async fn process_data_manifest_entry( + mut file_scan_task_tx: Sender>, + manifest_entry_context: ManifestEntryContext, + delete_filter: &DeleteFilter, + ) -> Result<()> { + // Skip processing this manifest entry if it has been marked as deleted. + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // Abort the plan if we encounter a manifest entry for a delete file + if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a delete file in a data file manifest", + )); + } + + let file_scan_task = IncrementalFileScanTask::append_from_manifest_entry( + &manifest_entry_context, + delete_filter, + ); + + file_scan_task_tx.send(Ok(file_scan_task)).await?; + Ok(()) + } + + async fn process_deleted_data_manifest_entry( + mut file_scan_task_tx: Sender>, + manifest_entry_context: ManifestEntryContext, + ) -> Result<()> { + // Abort the plan if we encounter a manifest entry for a delete file + if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a delete file in a data file manifest", + )); + } + + let data_file_path = manifest_entry_context.manifest_entry.file_path(); + let file_scan_task = IncrementalFileScanTask::Delete(DeletedFileScanTask { + base: BaseIncrementalFileScanTask { + start: 0, + length: manifest_entry_context.manifest_entry.file_size_in_bytes(), + record_count: Some(manifest_entry_context.manifest_entry.record_count()), + data_file_path: data_file_path.to_string(), + data_file_format: manifest_entry_context.manifest_entry.file_format(), + schema: manifest_entry_context.snapshot_schema.clone(), + project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + }, + }); + + file_scan_task_tx.send(Ok(file_scan_task)).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests; diff --git a/crates/iceberg/src/scan/incremental/task.rs b/crates/iceberg/src/scan/incremental/task.rs new file mode 100644 index 0000000000..e05703ae28 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/task.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, Mutex}; + +use futures::stream::BoxStream; + +use crate::Result; +use crate::arrow::delete_filter::DeleteFilter; +use crate::delete_vector::DeleteVector; +use crate::scan::context::ManifestEntryContext; +use crate::spec::{DataFileFormat, Schema, SchemaRef}; + +/// Base file scan task containing common attributes for incremental scan tasks. +#[derive(Debug, Clone)] +pub struct BaseIncrementalFileScanTask { + /// The start offset of the file to scan. + pub start: u64, + /// The length of the file to scan. + pub length: u64, + /// The number of records in the file. + pub record_count: Option, + /// The path to the data file to scan. + pub data_file_path: String, + /// The format of the data file to scan. + pub data_file_format: DataFileFormat, + /// The schema of the data file to scan. + pub schema: crate::spec::SchemaRef, + /// The field ids to project. + pub project_field_ids: Vec, +} + +impl BaseIncrementalFileScanTask { + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// A file scan task for appended data files in an incremental scan. +#[derive(Debug, Clone)] +pub struct AppendedFileScanTask { + /// The base file scan task attributes. + pub base: BaseIncrementalFileScanTask, + /// The optional positional deletes associated with this data file. + pub positional_deletes: Option>>, +} + +impl AppendedFileScanTask { + /// Returns the data file path of this appended file scan task. + pub fn data_file_path(&self) -> &str { + self.base.data_file_path() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + self.base.schema() + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.base.schema_ref() + } +} + +/// A file scan task for deleted data files in an incremental scan. +#[derive(Debug, Clone)] +pub struct DeletedFileScanTask { + /// The base file scan task attributes. + pub base: BaseIncrementalFileScanTask, +} + +impl DeletedFileScanTask { + /// Returns the data file path of this deleted file scan task. + pub fn data_file_path(&self) -> &str { + self.base.data_file_path() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + self.base.schema() + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.base.schema_ref() + } +} + +/// The stream of incremental file scan tasks. +pub type IncrementalFileScanTaskStream = BoxStream<'static, Result>; + +/// An incremental file scan task, which can be an appended data file, deleted data file, +/// or positional deletes. +#[derive(Debug, Clone)] +pub enum IncrementalFileScanTask { + /// An appended data file. + Append(AppendedFileScanTask), + /// A deleted data file. + Delete(DeletedFileScanTask), + /// Positional deletes (deleted records of a data file). First argument is the file path, + /// second the delete vector. + PositionalDeletes(String, DeleteVector), +} + +impl IncrementalFileScanTask { + /// Create an `IncrementalFileScanTask::Append` from a `ManifestEntryContext` and `DeleteFilter`. + pub(crate) fn append_from_manifest_entry( + manifest_entry_context: &ManifestEntryContext, + delete_filter: &DeleteFilter, + ) -> Self { + let data_file_path = manifest_entry_context.manifest_entry.file_path(); + IncrementalFileScanTask::Append(AppendedFileScanTask { + base: BaseIncrementalFileScanTask { + start: 0, + length: manifest_entry_context.manifest_entry.file_size_in_bytes(), + record_count: Some(manifest_entry_context.manifest_entry.record_count()), + data_file_path: data_file_path.to_string(), + data_file_format: manifest_entry_context.manifest_entry.file_format(), + schema: manifest_entry_context.snapshot_schema.clone(), + project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + }, + positional_deletes: delete_filter.get_delete_vector_for_path(data_file_path), + }) + } + + /// Returns the data file path of this incremental file scan task. + pub fn data_file_path(&self) -> &str { + match self { + IncrementalFileScanTask::Append(task) => task.data_file_path(), + IncrementalFileScanTask::Delete(task) => task.data_file_path(), + IncrementalFileScanTask::PositionalDeletes(path, _) => path, + } + } +} diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs new file mode 100644 index 0000000000..276ab9ef12 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -0,0 +1,2457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fs; +use std::fs::File; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; +use tempfile::TempDir; +use uuid::Uuid; + +use crate::TableIdent; +use crate::io::{FileIO, OutputFile}; +use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, ManifestEntry, ManifestListWriter, + ManifestStatus, ManifestWriterBuilder, PartitionSpec, SchemaRef, Struct, TableMetadata, +}; +use crate::table::Table; + +/// Represents an operation to perform on a snapshot of a table with schema (id: Int32, +/// data: String). +#[derive(Debug, Clone)] +pub enum Operation { + /// Add rows with the given (n, data) tuples, and write to the specified parquet file name. + /// Example: `Add(vec![(1, "a".to_string()), (2, "b".to_string())], "data-1.parquet".to_string())` + /// adds two rows with n=1,2 and data="a","b" to a file named "data-1.parquet" + Add(Vec<(i32, String)>, String), + + /// Delete rows by their positions within specific parquet files (uses positional deletes). + /// Takes a vector of (position, file_name) tuples specifying which position in which file to delete. + /// Example: `Delete(vec![(0, "data-1.parquet"), (1, "data-1.parquet")])` deletes positions 0 and 1 from data-1.parquet + Delete(Vec<(i64, String)>), + + /// Overwrite operation that can append new rows, delete specific positions, and remove entire data files. + /// This is a combination of append and delete operations in a single atomic snapshot. + /// + /// Parameters: + /// 1. Rows to append: Vec<(n, data)> tuples and the filename to write them to + /// 2. Positions to delete: Vec<(position, file_name)> tuples for positional deletes + /// 3. Data files to delete: Vec of file names to completely remove + /// + /// All three parameters can be empty, allowing for various combinations: + /// - Pure append: `Overwrite((rows, "file.parquet"), vec![], vec![])` + /// - Pure positional delete: `Overwrite((vec![], ""), vec![(pos, "file")], vec![])` + /// - Pure file deletion: `Overwrite((vec![], ""), vec![], vec!["file.parquet"])` + /// - Delete entire files: `Overwrite((vec![], ""), vec![], vec!["old-file.parquet"])` + /// + /// Example: `Overwrite((vec![(1, "new".to_string())], "new.parquet"), vec![(0, "old.parquet")], vec!["remove.parquet"])` + /// This adds new data to "new.parquet", deletes position 0 from "old.parquet", and removes "remove.parquet" entirely. + Overwrite( + (Vec<(i32, String)>, String), + Vec<(i64, String)>, + Vec, + ), + + /// Replace operation for file compaction/reorganization. + /// The logical table content does NOT change - only the physical file representation changes. + /// + /// Parameters: + /// 1. Files to compact: Vec of existing file names that are being compacted + /// 2. Target file: String name of the new compacted file + /// + /// Example: `Replace(vec!["file-a.parquet", "file-b.parquet"], "file-a-b-compacted.parquet")` + /// This compacts two existing files into one new file with the same logical content. + /// + /// For an incremental scan that only contains Replace operations, the result should be + /// zero additions and zero deletions, because the logical data hasn't changed. + Replace(Vec, String), +} + +/// Tracks the state of data files across snapshots +#[derive(Debug, Clone)] +struct DataFileInfo { + path: String, + snapshot_id: i64, + sequence_number: i64, + n_values: Vec, + data_values: Vec, + file_size: u64, +} + +/// Test fixture that creates a table with custom snapshots based on operations. +/// +/// # Example +/// ``` +/// let fixture = IncrementalTestFixture::new(vec![ +/// Operation::Add(vec![], "empty.parquet".to_string()), // Empty snapshot +/// Operation::Add( +/// vec![ +/// (1, "1".to_string()), +/// (2, "2".to_string()), +/// (3, "3".to_string()), +/// ], +/// "data-1.parquet".to_string(), +/// ), // Add 3 rows +/// Operation::Delete(vec![(1, "data-1.parquet".to_string())]), // Delete position 1 from data-1.parquet +/// ]) +/// .await; +/// ``` +pub struct IncrementalTestFixture { + pub table_location: String, + pub table: Table, + _tmp_dir: TempDir, // Keep temp dir alive +} + +impl IncrementalTestFixture { + /// Create a new test fixture with the given operations. + pub async fn new(operations: Vec) -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("incremental_test_table"); + + // Create directory structure + fs::create_dir_all(table_location.join("metadata")).unwrap(); + fs::create_dir_all(table_location.join("data")).unwrap(); + + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let num_snapshots = operations.len(); + let current_snapshot_id = num_snapshots as i64; + let last_sequence_number = (num_snapshots - 1) as i64; + + // Build the snapshots JSON dynamically + let mut snapshots_json = Vec::new(); + let mut snapshot_log_json = Vec::new(); + let mut manifest_list_locations = Vec::new(); + + for (i, op) in operations.iter().enumerate() { + let snapshot_id = (i + 1) as i64; + let parent_id = if i == 0 { None } else { Some(i as i64) }; + let sequence_number = i as i64; + let timestamp = 1515100955770 + (i as i64 * 1000); + + let operation_type = match op { + Operation::Add(..) => "append", + Operation::Delete(..) => "delete", + Operation::Overwrite(..) => "overwrite", + Operation::Replace(..) => "replace", + }; + + let manifest_list_location = + table_location.join(format!("metadata/snap-{}-manifest-list.avro", snapshot_id)); + manifest_list_locations.push(manifest_list_location.clone()); + + let parent_str = if let Some(pid) = parent_id { + format!(r#""parent-snapshot-id": {},"#, pid) + } else { + String::new() + }; + + snapshots_json.push(format!( + r#" {{ + "snapshot-id": {}, + {} + "timestamp-ms": {}, + "sequence-number": {}, + "summary": {{"operation": "{}"}}, + "manifest-list": "{}", + "schema-id": 0 + }}"#, + snapshot_id, + parent_str, + timestamp, + sequence_number, + operation_type, + manifest_list_location.display() + )); + + snapshot_log_json.push(format!( + r#" {{"snapshot-id": {}, "timestamp-ms": {}}}"#, + snapshot_id, timestamp + )); + } + + let snapshots_str = snapshots_json.join(",\n"); + let snapshot_log_str = snapshot_log_json.join(",\n"); + + // Create the table metadata + let metadata_json = format!( + r#"{{ + "format-version": 2, + "table-uuid": "{}", + "location": "{}", + "last-sequence-number": {}, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [ + {{ + "type": "struct", + "schema-id": 0, + "fields": [ + {{"id": 1, "name": "n", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": true, "type": "string"}} + ] + }} + ], + "default-spec-id": 0, + "partition-specs": [ + {{ + "spec-id": 0, + "fields": [] + }} + ], + "last-partition-id": 0, + "default-sort-order-id": 0, + "sort-orders": [ + {{ + "order-id": 0, + "fields": [] + }} + ], + "properties": {{}}, + "current-snapshot-id": {}, + "snapshots": [ +{} + ], + "snapshot-log": [ +{} + ], + "metadata-log": [] +}}"#, + Uuid::new_v4(), + table_location.display(), + last_sequence_number, + current_snapshot_id, + snapshots_str, + snapshot_log_str + ); + + let table_metadata_location = table_location.join("metadata/v1.json"); + let table_metadata = serde_json::from_str::(&metadata_json).unwrap(); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "incremental_test"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata_location.as_os_str().to_str().unwrap()) + .build() + .unwrap(); + + let mut fixture = Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + _tmp_dir: tmp_dir, + }; + + // Setup all snapshots based on operations + fixture.setup_snapshots(operations).await; + + fixture + } + + fn next_manifest_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "{}/metadata/manifest_{}.avro", + self.table_location, + Uuid::new_v4() + )) + .unwrap() + } + + async fn setup_snapshots(&mut self, operations: Vec) { + let current_schema = self + .table + .metadata() + .current_snapshot() + .unwrap() + .schema(self.table.metadata()) + .unwrap(); + let partition_spec = Arc::new(PartitionSpec::unpartition_spec()); + let empty_partition = Struct::empty(); + + // Track all data files and their contents across snapshots + let mut data_files: Vec = Vec::new(); + #[allow(clippy::type_complexity)] + let mut delete_files: Vec<(String, i64, i64, Vec<(String, i64)>, u64)> = Vec::new(); // (path, snapshot_id, sequence_number, [(data_file_path, position)], file_size) + + for (snapshot_idx, operation) in operations.iter().enumerate() { + let snapshot_id = (snapshot_idx + 1) as i64; + let sequence_number = snapshot_idx as i64; + let parent_snapshot_id = if snapshot_idx == 0 { + None + } else { + Some(snapshot_idx as i64) + }; + + match operation { + Operation::Add(rows, file_name) => { + // Extract n_values and data_values from tuples + let n_values: Vec = rows.iter().map(|(n, _)| *n).collect(); + let data_values: Vec = rows.iter().map(|(_, d)| d.clone()).collect(); + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Add existing data files from previous snapshots + for data_file in &data_files { + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new data if not empty + if !n_values.is_empty() { + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + let file_size = self + .write_parquet_file(&data_file_path, &n_values, &data_values) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(file_size) + .record_count(n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this data file + data_files.push(DataFileInfo { + path: data_file_path, + snapshot_id, + sequence_number, + n_values, + data_values, + file_size, + }); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Create delete manifest if there are any delete files + let mut manifests = vec![data_manifest]; + if !delete_files.is_empty() { + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files + { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(*del_file_size) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + manifests.push(delete_writer.write_manifest_file().await.unwrap()); + } + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(manifests.into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + + Operation::Delete(positions_to_delete) => { + // Group deletes by file + let mut deletes_by_file: HashMap> = HashMap::new(); + + for (position, file_name) in positions_to_delete { + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + deletes_by_file + .entry(data_file_path) + .or_default() + .push(*position); + } + + // Create data manifest with existing data files + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + for data_file in &data_files { + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Create delete manifest + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + // Add existing delete files + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files + { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(*del_file_size) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new delete files + for (data_file_path, positions) in deletes_by_file { + let delete_file_path = format!( + "{}/data/delete-{}-{}.parquet", + &self.table_location, + snapshot_id, + Uuid::new_v4() + ); + let delete_file_size = self + .write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; + + delete_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(delete_file_size) + .record_count(positions.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this delete file + delete_files.push(( + delete_file_path, + snapshot_id, + sequence_number, + positions + .into_iter() + .map(|pos| (data_file_path.clone(), pos)) + .collect(), + delete_file_size, + )); + } + + let delete_manifest = delete_writer.write_manifest_file().await.unwrap(); + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(vec![data_manifest, delete_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + + Operation::Overwrite((rows, file_name), positions_to_delete, files_to_delete) => { + // Overwrite creates a single snapshot that can: + // 1. Add new data files + // 2. Delete positions from existing files + // 3. Remove entire data files + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Determine which files to delete + let files_to_delete_set: std::collections::HashSet = files_to_delete + .iter() + .map(|f| format!("{}/data/{}", &self.table_location, f)) + .collect(); + + // Add existing data files (mark deleted ones as DELETED, others as EXISTING) + for data_file in &data_files { + if files_to_delete_set.contains(&data_file.path) { + // Mark file for deletion + data_writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } else { + // Keep existing file + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + } + + // Add new data file if rows provided + if !rows.is_empty() { + let n_values: Vec = rows.iter().map(|(n, _)| *n).collect(); + let data_values: Vec = + rows.iter().map(|(_, d)| d.clone()).collect(); + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + + let file_size = self + .write_parquet_file(&data_file_path, &n_values, &data_values) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(file_size) + .record_count(n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this new data file + data_files.push(DataFileInfo { + path: data_file_path, + snapshot_id, + sequence_number, + n_values, + data_values, + file_size, + }); + } + + // Remove deleted files from tracking + data_files.retain(|df| !files_to_delete_set.contains(&df.path)); + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Handle positional deletes if any + let mut manifests = vec![data_manifest]; + + if !positions_to_delete.is_empty() || !delete_files.is_empty() { + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + // Add existing delete files + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files + { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(*del_file_size) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new positional delete files + if !positions_to_delete.is_empty() { + // Group deletes by file + let mut deletes_by_file: HashMap> = HashMap::new(); + for (position, file_name) in positions_to_delete { + let data_file_path = + format!("{}/data/{}", &self.table_location, file_name); + deletes_by_file + .entry(data_file_path) + .or_default() + .push(*position); + } + + for (data_file_path, positions) in deletes_by_file { + let delete_file_path = format!( + "{}/data/delete-{}-{}.parquet", + &self.table_location, + snapshot_id, + Uuid::new_v4() + ); + let delete_file_size = self + .write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; + + delete_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(delete_file_size) + .record_count(positions.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this delete file + delete_files.push(( + delete_file_path, + snapshot_id, + sequence_number, + positions + .into_iter() + .map(|pos| (data_file_path.clone(), pos)) + .collect(), + delete_file_size, + )); + } + } + + manifests.push(delete_writer.write_manifest_file().await.unwrap()); + } + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(manifests.into_iter()) + .unwrap(); + + manifest_list_write.close().await.unwrap(); + } + + Operation::Replace(files_to_compact, target_file) => { + // Replace operation: compact existing files into a new file + // The logical content doesn't change, only the physical representation + self.handle_replace_operation( + files_to_compact, + target_file, + &mut data_files, + &delete_files, + current_schema.clone(), + &partition_spec, + snapshot_id, + sequence_number, + parent_snapshot_id, + ) + .await + .unwrap(); + } + } + } + } + + #[allow(clippy::too_many_arguments, clippy::type_complexity)] + async fn handle_replace_operation( + &mut self, + files_to_compact: &[String], + target_file: &str, + data_files: &mut Vec, + delete_files: &[(String, i64, i64, Vec<(String, i64)>, u64)], + current_schema: SchemaRef, + partition_spec: &PartitionSpec, + snapshot_id: i64, + sequence_number: i64, + parent_snapshot_id: Option, + ) -> Result<(), Box> { + let empty_partition = Struct::empty(); + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema, + partition_spec.clone(), + ) + .build_v2_data(); + + // Determine which files are being compacted + let files_to_compact_set: std::collections::HashSet = files_to_compact + .iter() + .map(|f| format!("{}/data/{}", &self.table_location, f)) + .collect(); + + // Build a set of deleted positions for each file being compacted + let mut deleted_positions: std::collections::HashMap< + String, + std::collections::HashSet, + > = std::collections::HashMap::new(); + for (_, _, _, delete_records, _) in delete_files { + for (file_path, position) in delete_records { + deleted_positions + .entry(file_path.clone()) + .or_default() + .insert(*position); + } + } + + // Track the data being compacted + let mut compacted_data: Vec<(i32, String)> = Vec::new(); + let mut compacted_record_count: u64 = 0; + + // Add existing data files (mark compacted ones as DELETED, others as EXISTING) + for data_file in data_files.iter() { + if files_to_compact_set.contains(&data_file.path) { + // Mark file as deleted (being compacted away) + data_writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Collect data from compacted files, filtering out deleted records + let file_deleted_positions = deleted_positions.get(&data_file.path); + for (position, (n, d)) in data_file + .n_values + .iter() + .zip(data_file.data_values.iter()) + .enumerate() + { + // Skip this record if it was deleted via positional delete + if let Some(deleted) = file_deleted_positions { + if deleted.contains(&(position as i64)) { + continue; + } + } + compacted_data.push((*n, d.clone())); + } + compacted_record_count += data_file.n_values.len() as u64; + } else { + // Keep existing file + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + } + + // Create the compacted file with the collected data + if !compacted_data.is_empty() { + let compacted_n_values: Vec = compacted_data.iter().map(|(n, _)| *n).collect(); + let compacted_data_values: Vec = + compacted_data.iter().map(|(_, d)| d.clone()).collect(); + let compacted_file_path = format!("{}/data/{}", &self.table_location, target_file); + + let file_size = self + .write_parquet_file( + &compacted_file_path, + &compacted_n_values, + &compacted_data_values, + ) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(compacted_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(file_size) + .record_count(compacted_record_count) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Update data_files tracking: remove compacted, add new + data_files.retain(|df| !files_to_compact_set.contains(&df.path)); + data_files.push(DataFileInfo { + path: compacted_file_path, + snapshot_id, + sequence_number, + n_values: compacted_n_values, + data_values: compacted_data_values, + file_size, + }); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Write manifest list (no delete manifests for Replace) + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(vec![data_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + Ok(()) + } + + async fn write_parquet_file( + &self, + path: &str, + n_values: &[i32], + data_values: &[String], + ) -> u64 { + let schema = { + let fields = vec![ + arrow_schema::Field::new("n", arrow_schema::DataType::Int32, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]), + ), + arrow_schema::Field::new("data", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let col_n = Arc::new(Int32Array::from(n_values.to_vec())) as ArrayRef; + let col_data = Arc::new(StringArray::from(data_values.to_vec())) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![col_n, col_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Return the actual file size + fs::metadata(path).unwrap().len() + } + + async fn write_positional_delete_file( + &self, + path: &str, + data_file_path: &str, + positions: &[i64], + ) -> u64 { + let schema = { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483546".to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483545".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let file_paths: Vec<&str> = vec![data_file_path; positions.len()]; + let col_file_path = Arc::new(StringArray::from(file_paths)) as ArrayRef; + let col_pos = Arc::new(arrow_array::Int64Array::from(positions.to_vec())) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![col_file_path, col_pos]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Return the actual file size + fs::metadata(path).unwrap().len() + } + + /// Verify incremental scan results. + /// + /// Verifies that the incremental scan contains the expected appended and deleted records. + pub async fn verify_incremental_scan( + &self, + from_snapshot_id: i64, + to_snapshot_id: i64, + expected_appends: Vec<(i32, &str)>, + expected_deletes: Vec<(u64, &str)>, + ) { + use arrow_array::cast::AsArray; + use arrow_select::concat::concat_batches; + use futures::TryStreamExt; + + let incremental_scan = self + .table + .incremental_scan(Some(from_snapshot_id), Some(to_snapshot_id)) + .build() + .unwrap(); + + let stream = incremental_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Separate appends and deletes + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify appended records + if !append_batches.is_empty() { + let append_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = append_batch + .column(0) + .as_primitive::(); + let data_array = append_batch.column(1).as_string::(); + + let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) + .map(|i| (n_array.value(i), data_array.value(i).to_string())) + .collect(); + appended_pairs.sort(); + + let expected_appends: Vec<(i32, String)> = expected_appends + .into_iter() + .map(|(n, s)| (n, s.to_string())) + .collect(); + + assert_eq!(appended_pairs, expected_appends); + } else { + assert!(expected_appends.is_empty(), "Expected appends but got none"); + } + + // Verify deleted records + if !delete_batches.is_empty() { + let delete_batch = + concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); + + let pos_array = delete_batch + .column(0) + .as_primitive::(); + + // The file path column is a StringArray with materialized values + let file_path_column = delete_batch.column(1); + let file_path_array = file_path_column.as_string::(); + + let mut deleted_pairs: Vec<(u64, String)> = (0..delete_batch.num_rows()) + .map(|i| { + let pos = pos_array.value(i); + let file_path = file_path_array.value(i).to_string(); + (pos, file_path) + }) + .collect(); + deleted_pairs.sort(); + + let expected_deletes: Vec<(u64, String)> = expected_deletes + .into_iter() + .map(|(pos, file)| (pos, file.to_string())) + .collect(); + + assert_eq!(deleted_pairs, expected_deletes); + } else { + assert!(expected_deletes.is_empty(), "Expected deletes but got none"); + } + } +} + +#[tokio::test] +async fn test_incremental_fixture_simple() { + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), + Operation::Add( + vec![ + (1, "1".to_string()), + (2, "2".to_string()), + (3, "3".to_string()), + ], + "data-2.parquet".to_string(), + ), + Operation::Delete(vec![(1, "data-2.parquet".to_string())]), // Delete position 1 (n=2, data="2") + ]) + .await; + + // Verify we have 3 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 3); + + // Verify snapshot IDs + assert_eq!(snapshots[0].snapshot_id(), 1); + assert_eq!(snapshots[1].snapshot_id(), 2); + assert_eq!(snapshots[2].snapshot_id(), 3); + + // Verify parent relationships + assert_eq!(snapshots[0].parent_snapshot_id(), None); + assert_eq!(snapshots[1].parent_snapshot_id(), Some(1)); + assert_eq!(snapshots[2].parent_snapshot_id(), Some(2)); + + // Verify incremental scan from snapshot 1 to snapshot 3. + // Expected appends: snapshot 2 adds [1, 2, 3] + // Expected deletes: snapshot 3 deletes [2] + // In total we expect appends [1, 3] and deletes [] + fixture + .verify_incremental_scan(1, 3, vec![(1, "1"), (3, "3")], vec![]) + .await; + + // Verify incremental scan from snapshot 2 to snapshot 3. + let data_file_path = format!("{}/data/data-2.parquet", fixture.table_location); + fixture + .verify_incremental_scan(2, 3, vec![], vec![(1, &data_file_path)]) + .await; + + // Verify incremental scan from snapshot 1 to snapshot 1. + fixture.verify_incremental_scan(1, 1, vec![], vec![]).await; +} + +#[tokio::test] +async fn test_incremental_fixture_complex() { + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), // Snapshot 1: Empty + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + (4, "d".to_string()), + (5, "e".to_string()), + ], + "data-2.parquet".to_string(), + ), // Snapshot 2: Add 5 rows (positions 0-4) + Operation::Delete(vec![ + (1, "data-2.parquet".to_string()), + (3, "data-2.parquet".to_string()), + ]), // Snapshot 3: Delete positions 1,3 (n=2,4; data=b,d) + Operation::Add( + vec![(6, "f".to_string()), (7, "g".to_string())], + "data-4.parquet".to_string(), + ), // Snapshot 4: Add 2 more rows (positions 5-6) + Operation::Delete(vec![ + (0, "data-2.parquet".to_string()), + (2, "data-2.parquet".to_string()), + (4, "data-2.parquet".to_string()), + (0, "data-4.parquet".to_string()), + (1, "data-4.parquet".to_string()), + ]), // Snapshot 5: Delete positions 0,2,4,5,6 (all remaining rows: n=1,3,5,6,7) + ]) + .await; + + // Verify we have 5 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 5); + + // Verify parent chain + assert_eq!(snapshots[0].parent_snapshot_id(), None); + for (i, snapshot) in snapshots.iter().enumerate().take(5).skip(1) { + assert_eq!(snapshot.parent_snapshot_id(), Some(i as i64)); + } + + // Verify current snapshot + assert_eq!( + fixture + .table + .metadata() + .current_snapshot() + .unwrap() + .snapshot_id(), + 5 + ); + + // Verify incremental scan from snapshot 1 to snapshot 5. + // All data has been deleted, so we expect the empty result. + fixture.verify_incremental_scan(1, 5, vec![], vec![]).await; + + // Verify incremental scan from snapshot 2 to snapshot 5. + // Snapshot 2 starts with: (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // Snapshot 5: Deletes positions 0,2,4 from data-2.parquet and 0,1 from data-4.parquet (n=1,3,5,6,7; data=a,c,e,f,g) + // + // The incremental scan computes the NET EFFECT between snapshot 2 and 5: + // - Files added in snapshot 4 were completely deleted in snapshot 5, so NO net appends + // - Net deletes from data-2.parquet: positions 0,1,2,3,4 (all 5 rows deleted across snapshots 3 and 5) + // - Since data-4 was added and deleted between 2 and 5, it doesn't appear in the scan + let data_2_path = format!("{}/data/data-2.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 2, + 5, + vec![], // No net appends (data-4 was added and fully deleted) + vec![ + (0, data_2_path.as_str()), // All 5 positions from data-2.parquet + (1, data_2_path.as_str()), + (2, data_2_path.as_str()), + (3, data_2_path.as_str()), + (4, data_2_path.as_str()), + ], + ) + .await; + + // Verify incremental scan from snapshot 3 to snapshot 5. + // Snapshot 3 state: (1,a), (3,c), (5,e) remain in data-2.parquet at positions 0,2,4 + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // Snapshot 5: Deletes positions 0,2,4 from data-2.parquet (n=1,3,5) and 0,1 from data-4.parquet (n=6,7) + // + // Net effect from snapshot 3 to 5: + // - No net appends (data-4 was added and fully deleted between 3 and 5) + // - Net deletes from data-2.parquet: positions 0,2,4 (the three remaining rows deleted in snapshot 5) + fixture + .verify_incremental_scan( + 3, + 5, + vec![], // No net appends (data-4 was added and fully deleted) + vec![ + (0, data_2_path.as_str()), // Positions 0,2,4 from data-2.parquet + (2, data_2_path.as_str()), // (n=1,3,5; data=a,c,e) + (4, data_2_path.as_str()), + ], + ) + .await; + + // Verify incremental scan from snapshot 1 to snapshot 4. + // Snapshot 1: Empty + // Snapshot 2: Adds (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // + // Net effect from snapshot 1 to 4: + // - Net appends: (1,a), (3,c), (5,e), (6,f), (7,g) - all rows that exist at snapshot 4 + // - No deletes: rows deleted in snapshot 3 were added after snapshot 1, so they don't count as deletes + fixture + .verify_incremental_scan( + 1, + 4, + vec![(1, "a"), (3, "c"), (5, "e"), (6, "f"), (7, "g")], + vec![], // No deletes (deleted rows were added after snapshot 1) + ) + .await; + + // Verify incremental scan from snapshot 2 to snapshot 4. + // Snapshot 2: Has (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // + // Net effect from snapshot 2 to 4: + // - Net appends: (6,f), (7,g) from data-4.parquet + // - Net deletes: positions 1,3 from data-2.parquet (n=2,4; data=b,d) - existed at snapshot 2 and deleted in 3 + fixture + .verify_incremental_scan(2, 4, vec![(6, "f"), (7, "g")], vec![ + (1, data_2_path.as_str()), // Positions 1,3 from data-2.parquet + (3, data_2_path.as_str()), // (n=2,4; data=b,d) + ]) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_edge_cases() { + // This test covers several edge cases: + // 1. Multiple data files added in separate snapshots + // 2. Deletes spread across multiple data files + // 3. Partial deletes from multiple files + // 4. Cross-file delete operations in a single snapshot + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file A with 3 rows + Operation::Add( + vec![ + (1, "a1".to_string()), + (2, "a2".to_string()), + (3, "a3".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file B with 4 rows + Operation::Add( + vec![ + (10, "b1".to_string()), + (20, "b2".to_string()), + (30, "b3".to_string()), + (40, "b4".to_string()), + ], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Partial delete from file A (delete middle row n=2) + Operation::Delete(vec![(1, "file-a.parquet".to_string())]), + // Snapshot 5: Partial delete from file B (delete first and last rows n=10,40) + Operation::Delete(vec![ + (0, "file-b.parquet".to_string()), + (3, "file-b.parquet".to_string()), + ]), + // Snapshot 6: Add file C with 2 rows + Operation::Add( + vec![(100, "c1".to_string()), (200, "c2".to_string())], + "file-c.parquet".to_string(), + ), + // Snapshot 7: Delete from multiple files in one snapshot (cross-file deletes) + Operation::Delete(vec![ + (0, "file-a.parquet".to_string()), // n=1 + (1, "file-b.parquet".to_string()), // n=20 + (0, "file-c.parquet".to_string()), // n=100 + ]), + ]) + .await; + + // Verify we have 7 snapshots + let n_snapshots = fixture.table.metadata().snapshots().count(); + assert_eq!(n_snapshots, 7); + + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + + // Test 1: Scan from snapshot 1 to 4 + // Should see: file-a (1,2,3), file-b (10,20,30,40) added, then (2) deleted from file-a + // BUT: The row n=2 was added AFTER snapshot 1, so it won't show as a delete! + // Net: appends (1,3) from file-a (n=2 added then deleted = net zero), (10,20,30,40) from file-b + // No deletes (n=2 was added and deleted between snapshots 1 and 4) + fixture + .verify_incremental_scan( + 1, + 4, + vec![ + (1, "a1"), + (3, "a3"), + (10, "b1"), + (20, "b2"), + (30, "b3"), + (40, "b4"), + ], + vec![], // No deletes - n=2 was added and deleted between snapshots + ) + .await; + + // Test 2: Scan from snapshot 4 to 6 + // Snapshot 4: has file-a (1,3) and file-b (10,20,30,40) + // Snapshot 5: deletes positions 0,3 from file-b (n=10,40) + // Snapshot 6: adds file-c (100,200) + // Net: appends (100,200) from file-c; deletes pos 0,3 from file-b + fixture + .verify_incremental_scan(4, 6, vec![(100, "c1"), (200, "c2")], vec![ + (0, file_b_path.as_str()), // n=10 + (3, file_b_path.as_str()), // n=40 + ]) + .await; + + // Test 3: Scan from snapshot 2 to 7 + // This tests the full lifecycle: multiple adds, partial deletes, more adds, cross-file deletes + // Starting at snapshot 2: file-a (1,2,3) exists + // File-b is added in snapshot 3 (after snapshot 2) + // File-c is added in snapshot 6 (after snapshot 2) + // By snapshot 7: file-a has (3) at position 2, file-b has (30), file-c has (200) + // + // Net appends: file-b (30) and file-c (200) were added after snapshot 2 + // Net deletes: positions 0,1 from file-a (n=1,2) existed at snapshot 2 and were deleted + // Note: (3) from file-a already existed at snapshot 2, so it's not a net append! + fixture + .verify_incremental_scan(2, 7, vec![(30, "b3"), (200, "c2")], vec![ + (0, file_a_path.as_str()), // n=1 + (1, file_a_path.as_str()), // n=2 + ]) + .await; + + // Test 4: Scan from snapshot 5 to 6 + // Simple test: just adding a new file + // Snapshot 5 state: file-a (1,3), file-b (20,30) + // Snapshot 6: adds file-c (100,200) + // Net: appends (100,200) from file-c, no deletes + fixture + .verify_incremental_scan(5, 6, vec![(100, "c1"), (200, "c2")], vec![]) + .await; + + // Test 5: Scan from snapshot 3 to 4 + // Tests a single delete operation + // State at 3: file-a (1,2,3), file-b (10,20,30,40) + // State at 4: file-a (1,3), file-b (10,20,30,40) + // Net: no appends, 1 delete (position 1, n=2) from file-a + fixture + .verify_incremental_scan( + 3, + 4, + vec![], + vec![(1, file_a_path.as_str())], // n=2 + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_builder_options() { + // This test demonstrates using the incremental scan builder API with various options: + // - Column projection (selecting specific columns) + // - Batch size configuration + // - Verifying the schema and batch structure + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add 10 rows to test batch size behavior + Operation::Add( + vec![ + (1, "data-1".to_string()), + (2, "data-2".to_string()), + (3, "data-3".to_string()), + (4, "data-4".to_string()), + (5, "data-5".to_string()), + (6, "data-6".to_string()), + (7, "data-7".to_string()), + (8, "data-8".to_string()), + (9, "data-9".to_string()), + (10, "data-10".to_string()), + ], + "data-2.parquet".to_string(), + ), + // Snapshot 3: Delete some rows + Operation::Delete(vec![ + (2, "data-2.parquet".to_string()), // n=3 + (5, "data-2.parquet".to_string()), // n=6 + (8, "data-2.parquet".to_string()), // n=9 + ]), + // Snapshot 4: Add more rows + Operation::Add( + vec![ + (20, "data-20".to_string()), + (21, "data-21".to_string()), + (22, "data-22".to_string()), + (23, "data-23".to_string()), + (24, "data-24".to_string()), + ], + "data-4.parquet".to_string(), + ), + ]) + .await; + + // Test 1: Column projection - select only the "n" column + // Scan from root (None) to last (None) + let scan = fixture + .table + .incremental_scan(None, None) + .select(vec!["n"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Verify we have both append and delete batches + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + assert!(!append_batches.is_empty(), "Should have append batches"); + + // Check schema - should only have "n" column + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 1, + "Should have only 1 column when projecting 'n'" + ); + assert_eq!( + batch.schema().field(0).name(), + "n", + "Projected column should be 'n'" + ); + } + + // Test 2: Column projection - select only the "data" column + // Scan from root (None) to last (None) + let scan = fixture + .table + .incremental_scan(None, None) + .select(vec!["data"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 1, + "Should have only 1 column when projecting 'data'" + ); + assert_eq!( + batch.schema().field(0).name(), + "data", + "Projected column should be 'data'" + ); + } + + // Test 3: Select both columns explicitly + // Scan from root (None) to last (None) + let scan = fixture + .table + .incremental_scan(None, None) + .select(vec!["n", "data"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 2, + "Should have 2 columns when projecting both" + ); + assert_eq!(batch.schema().field(0).name(), "n"); + assert_eq!(batch.schema().field(1).name(), "data"); + } + + // Test 4: Batch size configuration + // Scan from root (None) to snapshot 2 + let scan = fixture + .table + .incremental_scan(None, Some(2)) + .with_batch_size(Some(3)) // Small batch size to test batching + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()); + + for batch in append_batches { + // Each batch should have at most 3 rows (except possibly the last) + assert!( + batch.num_rows() <= 3, + "Batch size should be <= 3 as configured" + ); + } + + // Test 5: Combining column projection and batch size + // Scan from root (None) to last (None) + let scan = fixture + .table + .incremental_scan(None, None) + .select(vec!["n"]) + .with_batch_size(Some(4)) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()); + + for batch in append_batches { + assert_eq!(batch.schema().fields().len(), 1, "Should project only 'n'"); + assert!(batch.num_rows() <= 4, "Batch size should be <= 4"); + } + + // Test 6: Verify actual data with column projection + // Scan from root (None) to snapshot 2 + let scan = fixture + .table + .incremental_scan(None, Some(2)) + .select(vec!["n"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + if !append_batches.is_empty() { + use arrow_select::concat::concat_batches; + let combined_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = combined_batch + .column(0) + .as_primitive::(); + + let mut n_values: Vec = (0..combined_batch.num_rows()) + .map(|i| n_array.value(i)) + .collect(); + n_values.sort(); + + assert_eq!( + n_values, + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "Should have all 10 n values from snapshot 2" + ); + } + + // Test 7: Delete batches always have the same schema. + let scan = fixture + .table + .incremental_scan(Some(2), Some(3)) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + if !delete_batches.is_empty() { + for batch in &delete_batches { + // Delete batches should have "pos" and "_file" columns + assert!( + batch.schema().fields().len() == 2, + "Delete batch should have exactly position and file columns" + ); + assert_eq!( + batch.num_rows(), + 3, + "Should have 3 deleted positions from snapshot 3" + ); + } + } +} + +#[tokio::test] +async fn test_incremental_scan_append_then_delete_file() { + // This test verifies the basic delete file functionality: + // - Snapshot 1: Empty starting point + // - Snapshot 2: Append a file with 3 records + // - Snapshot 3: Overwrite that deletes the entire file + // + // An incremental scan from snapshot 1 to 3 should yield no tuples, as there is no net change. + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Append a file with 3 records + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), + // Snapshot 3: Overwrite that deletes the entire file + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["data-1.parquet".to_string()], // Delete the file completely + ), + ]) + .await; + + // Verify we have 3 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 3); + + // Verify snapshot IDs + assert_eq!(snapshots[0].snapshot_id(), 1); + assert_eq!(snapshots[1].snapshot_id(), 2); + assert_eq!(snapshots[2].snapshot_id(), 3); + + // Incremental scan from snapshot 1 to 3 should yield no tuples + // The file was added in snapshot 2 and deleted in snapshot 3, so they cancel out + fixture + .verify_incremental_scan( + 1, + 3, + vec![], // No net appends + vec![], // No net deletes + ) + .await; + + // Incremental scan from snapshot 1 to 2 should yield the appended records + fixture + .verify_incremental_scan( + 1, + 2, + vec![(1, "a"), (2, "b"), (3, "c")], // All 3 records appended + vec![], // No deletes + ) + .await; + + // Incremental scan from snapshot 2 to 3 should produce delete records + // The file existed at snapshot 2 and was deleted in snapshot 3 + let data_file_path = format!("{}/data/data-1.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 2, + 3, + vec![], // No appends + vec![ + // All 3 positions deleted (pos values are 0-indexed: 0, 1, 2) + (0, data_file_path.as_str()), + (1, data_file_path.as_str()), + (2, data_file_path.as_str()), + ], + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_positional_deletes_then_file_delete() { + // This test verifies that the system correctly handles the case where: + // - Snapshot 1: Empty starting point + // - Snapshot 2: Append a file with 3 records + // - Snapshot 3: Delete these 3 records using positional deletes + // - Snapshot 4: Delete the file entirely (that was introduced in snapshot 2) + // + // The system should avoid double-deletes by filtering out positional deletes + // for files that are subsequently deleted entirely. + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Append a file with 3 records + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), + // Snapshot 3: Delete one record using a positional delete + Operation::Delete(vec![(0, "data-1.parquet".to_string())]), + // Snapshot 4: Delete the file entirely + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["data-1.parquet".to_string()], // Delete the file completely + ), + ]) + .await; + + // Verify we have 4 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 4); + + // Incremental scan from snapshot 1 to 4 + // Since data-1.parquet was both appended (snapshot 2) and deleted (snapshot 4) + // in the scan range, BOTH appends and deletes are fully cancelled out. + // Even though there were positional deletes in snapshot 3, those are filtered out + // because the file qualifies for cancellation (appended && deleted). + let data_file_path = format!("{}/data/data-1.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 1, + 4, + vec![], // No appends (file was added and fully deleted, cancelled out) + vec![], // No deletes (fully cancelled since file was added and deleted in range) + ) + .await; + + // Incremental scan from snapshot 3 to 4 + // This demonstrates the double-delete scenario: + // - At snapshot 3, all records have already been deleted via positional deletes + // - At snapshot 4, the entire file is deleted + // - The scan from 3 to 4 should still emit delete records for the file, + // even though the records were already deleted in snapshot 3 + // This is because the incremental scan doesn't know about the prior state + fixture + .verify_incremental_scan( + 3, + 4, + vec![], // No appends + vec![ + // File deletion emits positions 0, 1, 2 + // These are "redundant" deletes since the records were already + // deleted by positional deletes in snapshot 3 + (0, data_file_path.as_str()), + (1, data_file_path.as_str()), + (2, data_file_path.as_str()), + ], + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_with_replace_operation() { + // This test verifies the Replace operation semantics: + // - A Replace operation compacts multiple files into one + // - The logical table content does NOT change (same data) + // - But physically, files are reorganized: old files are DELETED, new file is ADDED + // - For incremental scans, we report these physical changes (additions and deletions) + // even though the logical data is identical + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-a with 3 rows + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file-b with 2 rows + Operation::Add( + vec![(10, "x".to_string()), (20, "y".to_string())], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Replace (compact file-a and file-b into file-ab-compact) + // This will delete file-a and file-b, and add file-ab-compact with the same data + Operation::Replace( + vec!["file-a.parquet".to_string(), "file-b.parquet".to_string()], + "file-ab-compact.parquet".to_string(), + ), + ]) + .await; + + // Verify we have 4 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 4); + + // Verify snapshot operations + assert_eq!( + snapshots[3].summary().operation, + crate::spec::Operation::Replace, + "Snapshot 4 should be a Replace operation" + ); + + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + + // Test 1: Incremental scan from snapshot 3 to 4 (ONLY Replace operation) + // Physical changes: file-a and file-b are deleted, file-ab-compact is added + // But the data is the same, so: + // - Additions: all 5 rows from file-ab-compact (1,2,3,10,20) + // - Deletions: all 5 rows from file-a and file-b (positions sorted by (position, file_path)) + fixture + .verify_incremental_scan( + 3, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![ + (0, &file_a_path), + (0, &file_b_path), + (1, &file_a_path), + (1, &file_b_path), + (2, &file_a_path), + ], + ) + .await; + + // Test 2: Incremental scan from snapshot 1 to 4 (includes Appends and Replace) + // Snapshot 2 adds file-a (1,2,3) + // Snapshot 3 adds file-b (10,20) + // Snapshot 4 replaces both files (deletes file-a, file-b; adds file-ab-compact) + // Since file-a and file-b are added in the scan range and then deleted in the Replace: + // - The additions of file-a and file-b cancel with their deletions + // - Net result: only file-ab-compact is added (1,2,3,10,20) + // - Deletions: empty (file-a and file-b deletions cancelled by their additions) + fixture + .verify_incremental_scan( + 1, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![], + ) + .await; + + // Test 3: Incremental scan from snapshot 2 to 4 + // Starting from snapshot 2: file-a (1,2,3) exists (added before scan start) + // Snapshot 3: adds file-b (10,20) + // Snapshot 4: replaces both files with file-ab-compact + // Since file-a was added before the scan started, its deletion is not cancelled + // But file-b is added and deleted within the scan, so cancels out + // Net result: + // - Additions: file-ab-compact (1,2,3,10,20) + // - Deletions: file-a (0,1,2) since it existed at scan start + fixture + .verify_incremental_scan( + 2, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![(0, &file_a_path), (1, &file_a_path), (2, &file_a_path)], + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_with_deleted_files_cancellation() { + // This test verifies that incremental scans properly handle file deletions with cancellation logic: + // - Files added and deleted in the same scan range should cancel out + // - Files deleted that weren't added in the scan range should produce Delete tasks + // + // Test scenario: + // Snapshot 1: Empty starting point + // Snapshot 2: Add file-1.parquet with data (3 records) + // Snapshot 3: Add file-2.parquet with data (2 records) + // Snapshot 4: Overwrite - delete file-1.parquet entirely + // Snapshot 5: Add file-3.parquet with data (1 record) + // + // Incremental scan from snapshot 1 to 4: file-1 added and deleted, should cancel out (only file-2 remains). + // Incremental scan from snapshot 3 to 5: file-1 deleted but not added in range, produces Delete tasks. + // Incremental scan from snapshot 1 to 3: file-1 and file-2 added before any deletion. + // Incremental scan from snapshot 4 to 5: file-3 added after file-1 deletion occurred. + + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-1 with rows + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "file-1.parquet".to_string(), + ), + // Snapshot 3: Add file-2 with rows + Operation::Add( + vec![(10, "x".to_string()), (20, "y".to_string())], + "file-2.parquet".to_string(), + ), + // Snapshot 4: Overwrite - delete file-1 entirely + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["file-1.parquet".to_string()], // Delete file-1 completely + ), + // Snapshot 5: Add file-3 (to have more snapshots) + Operation::Add(vec![(100, "p".to_string())], "file-3.parquet".to_string()), + ]) + .await; + + let file_1_path = format!("{}/data/file-1.parquet", fixture.table_location); + + // Test 1: Incremental scan from snapshot 1 to 4 + // file-1 was added in snapshot 2 and deleted in snapshot 4 + // Both appends and deletes for file-1 are fully cancelled out: + // - Appends: only file-2 records (10, x), (20, y) + // - Deletes: none (file-1 deletions cancelled since it was added in range) + fixture + .verify_incremental_scan( + 1, + 4, + vec![(10, "x"), (20, "y")], // Only file-2 (file-1 cancelled out) + vec![], // No deletes (file-1 fully cancelled) + ) + .await; + + // Test 2: Incremental scan from snapshot 3 to 5 + // file-1 was deleted in snapshot 4 but wasn't added in the scan range (added in snapshot 2) + // This produces a net Delete task with positions 0, 1, 2 (all records in file-1) + fixture + .verify_incremental_scan( + 3, + 5, + vec![(100, "p")], // file-3 added in snapshot 5 + vec![ + // file-1 deleted in snapshot 4 (all 3 positions: 0, 1, 2) + (0, file_1_path.as_str()), + (1, file_1_path.as_str()), + (2, file_1_path.as_str()), + ], + ) + .await; + + // Test 3: Incremental scan from snapshot 1 to 3 + // Verifies basic append-only operations before any deletions occur. + // Both file-1 (snapshot 2) and file-2 (snapshot 3) are added, no deletions yet. + // Expected: All records from both files appear in appends, no deletes. + fixture + .verify_incremental_scan( + 1, + 3, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![], // No deletes + ) + .await; + + // Test 4: Incremental scan from snapshot 4 to 5 + // Verifies scanning after the deletion has already occurred. + // Starting from snapshot 4 (after file-1 deletion), only file-3 is added in snapshot 5. + // Expected: Only file-3 records in appends, no deletes (deletion happened before scan range). + fixture + .verify_incremental_scan( + 4, + 5, + vec![(100, "p")], // file-3 added + vec![], // No deletes + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_with_replace_and_positional_deletes() { + // This test verifies Replace operations with positional deletes before and after the replace. + // + // Test scenario: + // Snapshot 1: Empty starting point + // Snapshot 2: Add file-a with 5 records (1,2,3,4,5) + // Snapshot 3: Add file-b with 3 records (10,11,12) + // Snapshot 4: Delete record at position 1 in file-a (delete "2") + // Snapshot 5: Replace - compact file-a and file-b into file-ab-compact + // (containing records 1,3,4,5 from file-a and 10,11,12 from file-b after the previous delete) + // Snapshot 6: Delete record at position 2 in file-ab-compact (delete "4") + + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-a with 5 rows + Operation::Add( + vec![ + (1, "1".to_string()), + (2, "2".to_string()), + (3, "3".to_string()), + (4, "4".to_string()), + (5, "5".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file-b with 3 rows + Operation::Add( + vec![ + (10, "10".to_string()), + (11, "11".to_string()), + (12, "12".to_string()), + ], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Delete position 1 (record "2") from file-a + Operation::Delete(vec![(1, "file-a.parquet".to_string())]), + // Snapshot 5: Replace - compact file-a and file-b into file-ab-compact + Operation::Replace( + vec!["file-a.parquet".to_string(), "file-b.parquet".to_string()], + "file-ab-compact.parquet".to_string(), + ), + // Snapshot 6: Delete position 2 (record "4") from file-ab-compact + Operation::Delete(vec![(2, "file-ab-compact.parquet".to_string())]), + ]) + .await; + + // Test 1: Full scan from snapshot 1 to 6 + // Snapshot 2: Add file-a (1,2,3,4,5) + // Snapshot 3: Add file-b (10,11,12) + // Snapshot 4: Delete position 1 from file-a (record "2" deleted) + // Snapshot 5: Replace both files with file-ab-compact (compacts to 1,3,4,5,10,11,12, filtering out position 1) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: records from compacted file with deleted positions filtered (1,3,5,10,11,12) + // Note: position 1 from file-a (record "2") is filtered during compaction + // position 2 from compacted file (record "4") is never added since it's deleted in snapshot 6 + // - Deletions: empty (deletes from Replace are absorbed into the additions of the compacted file) + fixture + .verify_incremental_scan( + 1, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + vec![], + ) + .await; + + // Test 2: Scan from snapshot 3 to 6 (after file-b added, through replace and final delete) + // Snapshot 3: Starting point with file-a and file-b (both exist in starting snapshot) + // Snapshot 4: Delete position 1 from file-a (record "2" deleted) + // Snapshot 5: Replace both files with file-ab-compact (filters out position 1 from file-a) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: compacted file records with filtered positions (1,3,5,10,11,12) + // - Deletions: All positions from file-a (0-4) and file-b (0-2) because these files + // existed in the starting snapshot (3) and are deleted/replaced in snapshot 5. + // Deletes are sorted by (position, file_path) tuple ordering + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + let test2_deletes = vec![ + (0, file_a_path.as_str()), + (0, file_b_path.as_str()), + (1, file_a_path.as_str()), + (1, file_b_path.as_str()), + (2, file_a_path.as_str()), + (2, file_b_path.as_str()), + (3, file_a_path.as_str()), + (4, file_a_path.as_str()), + ]; + fixture + .verify_incremental_scan( + 3, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + test2_deletes, + ) + .await; + + // Test 3: Scan from snapshot 4 to 6 (after first delete, through replace and final delete) + // Snapshot 4: Starting point - file-a already has position 1 deleted (record "2") + // Snapshot 5: Replace both files with file-ab-compact (filters out position 1) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: compacted records with position 1 filtered from file-a (1,3,5,10,11,12) + // - Deletions: All positions from file-a (0-4) and file-b (0-2) because these files + // existed in the starting snapshot (4) and are deleted/replaced in snapshot 5. + // Sorted by (position, file_path) tuple ordering + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + let test3_deletes = vec![ + (0, file_a_path.as_str()), + (0, file_b_path.as_str()), + (1, file_a_path.as_str()), + (1, file_b_path.as_str()), + (2, file_a_path.as_str()), + (2, file_b_path.as_str()), + (3, file_a_path.as_str()), + (4, file_a_path.as_str()), + ]; + fixture + .verify_incremental_scan( + 4, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + test3_deletes, + ) + .await; + + // Test 4: Scan from snapshot 5 to 6 (after replace, only sees the final delete) + // Snapshot 5: Starting point - file-ab-compact is newly created with records (1,3,4,5,10,11,12) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: empty (file-ab-compact was added in snapshot 5, not in this scan range) + // - Deletions: position 2 from file-ab-compact (the positional delete in snapshot 6) + let file_ab_path = format!("{}/data/file-ab-compact.parquet", fixture.table_location); + let test4_deletes = vec![(2, file_ab_path.as_str())]; + fixture + .verify_incremental_scan(5, 6, vec![], test4_deletes) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_includes_root_when_from_is_none() { + // Test that when from=None, the root snapshot is INCLUDED in the scan + // (not excluded like when an explicit from snapshot is specified) + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1 (root): Add initial data - this should be INCLUDED when from=None + Operation::Add( + vec![(1, "one".to_string()), (2, "two".to_string())], + "root-data.parquet".to_string(), + ), + // Snapshot 2: Add more data + Operation::Add( + vec![(10, "ten".to_string()), (20, "twenty".to_string())], + "second-data.parquet".to_string(), + ), + // Snapshot 3: Add final data + Operation::Add( + vec![(100, "hundred".to_string())], + "third-data.parquet".to_string(), + ), + ]) + .await; + + // Test 1: Scan with explicit from=1 should EXCLUDE snapshot 1 (normal behavior) + fixture + .verify_incremental_scan( + 1, // from snapshot 1 - EXCLUDED + 3, // to snapshot 3 + vec![(10, "ten"), (20, "twenty"), (100, "hundred")], // Only snapshots 2 and 3 + vec![], + ) + .await; + + // Test 2: Scan using table.incremental_scan(None, None) API + // This should INCLUDE the root snapshot + let scan = fixture.table.incremental_scan(None, None).build().unwrap(); + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Collect all appended data + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + // Extract n and data values + use arrow_array::cast::AsArray; + let mut results = Vec::new(); + for batch in append_batches { + let n_array = batch + .column_by_name("n") + .unwrap() + .as_primitive::(); + let data_array = batch.column_by_name("data").unwrap().as_string::(); + for i in 0..batch.num_rows() { + results.push((n_array.value(i), data_array.value(i).to_string())); + } + } + + // Sort for consistent comparison + results.sort(); + + // Should include ALL data including root snapshot + assert_eq!( + results, + vec![ + (1, "one".to_string()), + (2, "two".to_string()), + (10, "ten".to_string()), + (20, "twenty".to_string()), + (100, "hundred".to_string()), + ], + "Scan with from=None should include root snapshot data" + ); +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3e319ca062..7acefadb47 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,6 +21,9 @@ mod cache; use cache::*; mod context; use context::*; + +pub mod incremental; + mod task; use std::sync::Arc; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 9c789e2186..3ea6f61476 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -24,6 +24,7 @@ use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::scan::TableScanBuilder; +use crate::scan::incremental::IncrementalTableScanBuilder; use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -224,6 +225,19 @@ impl Table { TableScanBuilder::new(self) } + /// Creates an incremental table scan between two snapshots. + /// + /// # Arguments + /// * `from_snapshot_id` - Starting snapshot ID. If None, scans from the root (oldest) snapshot. + /// * `to_snapshot_id` - Ending snapshot ID. If None, scans to the current (latest) snapshot. + pub fn incremental_scan( + &self, + from_snapshot_id: Option, + to_snapshot_id: Option, + ) -> IncrementalTableScanBuilder<'_> { + IncrementalTableScanBuilder::new(self, from_snapshot_id, to_snapshot_id) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn inspect(&self) -> MetadataTable<'_> { diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs new file mode 100644 index 0000000000..b614c981ec --- /dev/null +++ b/crates/iceberg/src/util/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Utilities for working with snapshots. +pub mod snapshot; diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs new file mode 100644 index 0000000000..d73a255f5e --- /dev/null +++ b/crates/iceberg/src/util/snapshot.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Taken from https://github.com/apache/iceberg-rust/pull/1470 + +use crate::spec::{SnapshotRef, TableMetadataRef}; + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +pub fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +pub fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +} + +/// Returns the root (oldest/first) snapshot ID in the table's snapshot lineage. +/// +/// # Arguments +/// * `table_metadata` - The table metadata +/// +/// # Returns +/// The snapshot ID of the root snapshot, or None if there are no snapshots +pub fn root_snapshot_id(table_metadata: &TableMetadataRef) -> Option { + let current = table_metadata.current_snapshot()?; + ancestors_of(table_metadata, current.snapshot_id()) + .last() + .map(|s| s.snapshot_id()) +}