Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 108 additions & 13 deletions crates/iceberg/src/arrow/incremental.rs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are row numbers supposed to be 0-based?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good question. In the positional delete files they are, so I did this for consistency.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

Expand All @@ -23,10 +24,12 @@ use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_FIELD_ID_FILE_PATH, StreamsInto,
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH,
RESERVED_FIELD_ID_POS, StreamsInto,
};
use crate::delete_vector::DeleteVector;
use crate::io::FileIO;
Expand All @@ -37,6 +40,22 @@ use crate::scan::incremental::{
};
use crate::{Error, ErrorKind, Result};

/// Default batch size for incremental delete operations.
const DEFAULT_BATCH_SIZE: usize = 1024;

/// Creates the schema for positional delete records containing the "pos" column.
/// The pos field includes the reserved field ID as metadata.
fn create_pos_delete_schema() -> Arc<ArrowSchema> {
let pos_field =
Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([
(
PARQUET_FIELD_ID_META_KEY.to_string(),
RESERVED_FIELD_ID_POS.to_string(),
),
]));
Arc::new(ArrowSchema::new(vec![pos_field]))
}

/// The type of incremental batch: appended data or deleted records.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IncrementalBatchType {
Expand Down Expand Up @@ -120,9 +139,49 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
let _ = appends_tx.send(Err(e)).await;
}
}
});
})
.await
}
IncrementalFileScanTask::Delete(deleted_file_task) => {
spawn(async move {
let file_path = deleted_file_task.data_file_path().to_string();
let total_records = deleted_file_task.base.record_count.unwrap_or(0);

let record_batch_stream = process_incremental_deleted_file_task(
file_path,
total_records,
batch_size,
);

match record_batch_stream {
Ok(mut stream) => {
while let Some(batch) = stream.next().await {
let result = deletes_tx
.send(batch.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"failed to read deleted file record batch",
)
.with_source(e)
}))
.await;

if result.is_err() {
break;
}
}
}
Err(e) => {
let _ = deletes_tx.send(Err(e)).await;
}
}
})
.await
}
IncrementalFileScanTask::Delete(file_path, delete_vector) => {
IncrementalFileScanTask::PositionalDeletes(
file_path,
delete_vector,
) => {
spawn(async move {
let record_batch_stream = process_incremental_delete_task(
file_path,
Expand Down Expand Up @@ -152,7 +211,8 @@ impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
let _ = deletes_tx.send(Err(e)).await;
}
}
});
})
.await
}
};

Expand All @@ -175,27 +235,29 @@ async fn process_incremental_append_task(
file_io: FileIO,
) -> Result<ArrowRecordBatchStream> {
let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
&task.data_file_path,
&task.base.data_file_path,
file_io,
true,
None, // arrow_reader_options
)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = ArrowReader::get_arrow_projection_mask(
&task.project_field_ids,
&task.base.project_field_ids,
&task.schema_ref(),
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
false, // use_fallback
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids);
RecordBatchTransformer::build(task.schema_ref(), &task.base.project_field_ids);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -233,13 +295,9 @@ fn process_incremental_delete_task(
delete_vector: DeleteVector,
batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"pos",
DataType::UInt64,
false,
)]));
let schema = create_pos_delete_schema();

let batch_size = batch_size.unwrap_or(1024);
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

let treemap = delete_vector.inner;

Expand Down Expand Up @@ -269,3 +327,40 @@ fn process_incremental_delete_task(

Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

fn process_incremental_deleted_file_task(
file_path: String,
total_records: u64,
batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = create_pos_delete_schema();

let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

// Create a stream of position values from 0 to total_records-1 (0-indexed)
let stream = futures::stream::iter(0..total_records)
.chunks(batch_size)
.map(move |chunk| {
let array = UInt64Array::from_iter(chunk);
RecordBatch::try_new(
Arc::clone(&schema), // Cheap Arc clone instead of full schema creation
vec![Arc::new(array)],
)
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"Failed to create RecordBatch for deleted file",
)
})
.and_then(|batch| {
ArrowReader::add_file_path_column(
batch,
&file_path,
RESERVED_COL_NAME_FILE_PATH,
RESERVED_FIELD_ID_FILE_PATH,
)
})
});

Ok(Box::pin(stream) as ArrowRecordBatchStream)
}
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546;
/// Column name for the file path metadata column used in delete file reading.
pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path";

/// Reserved field ID for the position column used in delete file reading.
pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483545;

/// Column name for the position metadata column used in delete file reading.
pub(crate) const RESERVED_COL_NAME_POS: &str = "pos";

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/scan/incremental/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ impl IncrementalPlanContext {
(manifest_files, filter_fn)
};

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut mfcs = vec![];
for manifest_file in &manifest_files {
let tx = if manifest_file.content == ManifestContentType::Deletes {
Expand Down
109 changes: 86 additions & 23 deletions crates/iceberg/src/scan/incremental/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,14 @@ impl IncrementalTableScan {
} else if manifest_entry_context.manifest_entry.status()
== ManifestStatus::Deleted
{
// TODO (RAI-43291): Process deleted files
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Processing deleted data files is not supported yet in incremental scans",
))
spawn(async move {
Self::process_deleted_data_manifest_entry(
tx,
manifest_entry_context,
)
.await
})
.await
} else {
Ok(())
}
Expand All @@ -424,33 +427,63 @@ impl IncrementalTableScan {
}
});

// Collect all append tasks.
let mut tasks = file_scan_task_rx.try_collect::<Vec<_>>().await?;
// Collect all tasks from manifest processing.
let all_tasks = file_scan_task_rx.try_collect::<Vec<_>>().await?;

// Separate tasks by type and compute file path sets in a single pass
let mut append_tasks = Vec::new();
let mut delete_tasks = Vec::new();
let mut appended_files = HashSet::new();
let mut deleted_files = HashSet::new();

// Compute those file paths that have been appended.
let appended_files = tasks
.iter()
.filter_map(|task| match task {
for task in all_tasks {
match task {
IncrementalFileScanTask::Append(append_task) => {
Some(append_task.data_file_path.clone())
appended_files.insert(append_task.data_file_path().to_string());
append_tasks.push(append_task);
}
_ => None,
})
.collect::<HashSet<String>>();
IncrementalFileScanTask::Delete(delete_task) => {
deleted_files.insert(delete_task.data_file_path().to_string());
delete_tasks.push(delete_task);
}
_ => {}
}
}

// Build final task list with net changes
// We filter out tasks for files that appear in both sets (cancelled out)
let mut final_tasks: Vec<IncrementalFileScanTask> = Vec::new();

// Add net append tasks (only files not in deleted_files)
for append_task in append_tasks {
if !deleted_files.contains(append_task.data_file_path()) {
final_tasks.push(IncrementalFileScanTask::Append(append_task));
}
}

// Add net delete tasks (only files not in appended_files)
for delete_task in delete_tasks {
if !appended_files.contains(delete_task.data_file_path()) {
final_tasks.push(IncrementalFileScanTask::Delete(delete_task));
}
}

// Augment `tasks` with delete tasks.
// First collect paths to process (paths that weren't appended in this scan range)
let delete_paths: Vec<String> = delete_filter.with_read(|state| {
// Add positional delete tasks (only for files that haven't been deleted)
let positional_delete_paths: Vec<String> = delete_filter.with_read(|state| {
Ok(state
.delete_vectors()
.keys()
.filter(|path| !appended_files.contains::<String>(path))
.filter(|path| {
// Only include positional deletes for files that were not appended in
// this range and not deleted.
!appended_files.contains::<str>(path) && !deleted_files.contains::<str>(path)
})
.cloned()
.collect())
})?;

// Now remove and take ownership of each delete vector
for path in delete_paths {
for path in positional_delete_paths {
let delete_vector_arc = delete_filter.with_write(|state| {
state.remove_delete_vector(&path).ok_or_else(|| {
Error::new(
Expand All @@ -474,13 +507,14 @@ impl IncrementalTableScan {
.with_source(e)
})?;

let delete_task = IncrementalFileScanTask::Delete(path, delete_vector_inner);
tasks.push(delete_task);
let positional_delete_task =
IncrementalFileScanTask::PositionalDeletes(path, delete_vector_inner);
final_tasks.push(positional_delete_task);
}

// We actually would not need a stream here, but we can keep it compatible with
// other scan types.
Ok(futures::stream::iter(tasks).map(Ok).boxed())
Ok(futures::stream::iter(final_tasks).map(Ok).boxed())
}

/// Returns an [`CombinedIncrementalBatchRecordStream`] for this incremental table scan.
Expand Down Expand Up @@ -582,6 +616,35 @@ impl IncrementalTableScan {
file_scan_task_tx.send(Ok(file_scan_task)).await?;
Ok(())
}

async fn process_deleted_data_manifest_entry(
mut file_scan_task_tx: Sender<Result<IncrementalFileScanTask>>,
manifest_entry_context: ManifestEntryContext,
) -> Result<()> {
// Abort the plan if we encounter a manifest entry for a delete file
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Encountered an entry for a delete file in a data file manifest",
));
}

let data_file_path = manifest_entry_context.manifest_entry.file_path();
let file_scan_task = IncrementalFileScanTask::Delete(DeletedFileScanTask {
base: BaseIncrementalFileScanTask {
start: 0,
length: manifest_entry_context.manifest_entry.file_size_in_bytes(),
record_count: Some(manifest_entry_context.manifest_entry.record_count()),
data_file_path: data_file_path.to_string(),
data_file_format: manifest_entry_context.manifest_entry.file_format(),
schema: manifest_entry_context.snapshot_schema.clone(),
project_field_ids: manifest_entry_context.field_ids.as_ref().clone(),
},
});

file_scan_task_tx.send(Ok(file_scan_task)).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading
Loading