Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::sync::oneshot::{Receiver, channel};

use super::delete_filter::{DeleteFilter, PosDelLoadAction};
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
use crate::arrow::scan_metrics::ScanMetrics;
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
Expand Down Expand Up @@ -77,13 +78,22 @@ enum ParsedDeleteFileContext {
#[allow(unused_variables)]
impl CachingDeleteFileLoader {
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self {
let scan_metrics = ScanMetrics::new();
CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io, scan_metrics),
concurrency_limit_data_files,
delete_filter: DeleteFilter::default(),
}
}

pub(crate) fn with_scan_metrics(mut self, scan_metrics: ScanMetrics) -> Self {
self.basic_delete_file_loader = BasicDeleteFileLoader::new(
self.basic_delete_file_loader.file_io().clone(),
scan_metrics,
);
self
}

/// Initiates loading of all deletes for all the specified tasks
///
/// Returned future completes once all positional deletes and delete vectors
Expand Down Expand Up @@ -612,7 +622,8 @@ mod tests {

let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);

let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new());
let record_batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(
&eq_delete_file_path,
Expand Down Expand Up @@ -808,7 +819,8 @@ mod tests {
};

let file_io = FileIO::new_with_fs();
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new());

let batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(
Expand Down Expand Up @@ -994,7 +1006,8 @@ mod tests {
writer.write(&record_batch).unwrap();
writer.close().unwrap();

let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new());
let record_batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len())
.await
Expand Down
18 changes: 15 additions & 3 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
use crate::arrow::ArrowReader;
use crate::arrow::reader::ParquetReadOptions;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::scan_metrics::ScanMetrics;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{Schema, SchemaRef};
Expand All @@ -45,13 +46,22 @@ pub trait DeleteFileLoader {
#[derive(Clone, Debug)]
pub(crate) struct BasicDeleteFileLoader {
file_io: FileIO,
scan_metrics: ScanMetrics,
}

#[allow(unused_variables)]
impl BasicDeleteFileLoader {
pub fn new(file_io: FileIO) -> Self {
BasicDeleteFileLoader { file_io }
pub fn new(file_io: FileIO, scan_metrics: ScanMetrics) -> Self {
BasicDeleteFileLoader {
file_io,
scan_metrics,
}
}

pub(crate) fn file_io(&self) -> &FileIO {
&self.file_io
}

/// Loads a RecordBatchStream for a given datafile.
pub(crate) async fn parquet_to_batch_stream(
&self,
Expand All @@ -69,6 +79,7 @@ impl BasicDeleteFileLoader {
&self.file_io,
file_size_in_bytes,
parquet_read_options,
self.scan_metrics.bytes_read_counter(),
)
.await?;

Expand Down Expand Up @@ -137,7 +148,8 @@ mod tests {
let table_location = tmp_dir.path();
let file_io = FileIO::new_with_fs();

let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
let scan_metrics = ScanMetrics::new();
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone(), scan_metrics);

let file_scan_tasks = setup(table_location);

Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ mod reader;
/// RecordBatch projection utilities
pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod scan_metrics;
mod value;

pub use reader::*;
pub use scan_metrics::{ScanMetrics, ScanResult};
pub use value::*;
/// Partition value calculator for computing partition values
pub mod partition_value_calculator;
Expand Down
Loading
Loading