Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 180 additions & 30 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, try_join};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
Expand Down Expand Up @@ -148,15 +148,21 @@ impl ArrowReader {
let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
let delete_file_loader = self.delete_file_loader.clone();

async move {
let file_stream = Self::process_file_scan_task(
task,
batch_size,
file_io,
delete_file_loader,
row_group_filtering_enabled,
row_selection_enabled,
).await?;

// Convert the stream to run on separate tasks for parallel processing
Ok::<_, crate::Error>(Self::stream_to_receiver(file_stream))
}
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
Expand Down Expand Up @@ -201,7 +207,7 @@ impl ArrowReader {
// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
let record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
Expand Down Expand Up @@ -241,7 +247,7 @@ impl ArrowReader {
let mut selected_row_group_indices = None;
let mut row_selection = None;

if let Some(predicate) = final_predicate {
if let Some(ref predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
&predicate,
Expand Down Expand Up @@ -300,27 +306,170 @@ impl ArrowReader {
};
}

if let Some(row_selection) = row_selection {
record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}
// Determine which row groups to process
let row_groups_to_process = selected_row_group_indices
.unwrap_or_else(|| {
// If no specific row groups selected, process all of them
(0..record_batch_stream_builder.metadata().row_groups().len()).collect()
});

if let Some(selected_row_group_indices) = selected_row_group_indices {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}
// Create parallel streams for row groups to enable intra-file parallelism
let parallel_stream = Self::create_parallel_row_group_streams(
task.data_file_path.clone(),
file_io,
should_load_page_index,
row_groups_to_process,
row_selection,
record_batch_transformer,
task.project_field_ids.clone(),
(*task.schema).clone(),
batch_size,
final_predicate,
).await?;

Ok(Box::pin(parallel_stream) as ArrowRecordBatchStream)
}

/// Converts an ArrowRecordBatchStream to a receiver-based stream that runs in its own task.
/// This allows each file's parquet stream polling to run on separate tokio tasks.
fn stream_to_receiver(mut stream: ArrowRecordBatchStream) -> ArrowRecordBatchStream {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

// Spawn a dedicated task for this stream's processing
// Each file's stream will run on its own task, allowing parallel processing
tokio::spawn(async move {
while let Some(result) = StreamExt::next(&mut stream).await {
if tx.send(result).is_err() {
// Receiver dropped, stop processing
break;
}
}
// Stream ends, channel will be closed automatically when tx is dropped
});

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});
// Convert the receiver to a stream using futures utilities
let receiver_stream = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|result| (result, rx))
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
Box::pin(receiver_stream)
}

/// Creates parallel streams for row groups to enable intra-file parallelism.
/// Each row group gets its own ParquetRecordBatchStreamBuilder and runs on a separate task.
async fn create_parallel_row_group_streams(
data_file_path: String,
file_io: FileIO,
should_load_page_index: bool,
row_groups_to_process: Vec<usize>,
row_selection: Option<RowSelection>,
_record_batch_transformer: RecordBatchTransformer,
project_field_ids: Vec<i32>,
schema: Schema,
batch_size: Option<usize>,
final_predicate: Option<BoundPredicate>,
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
// Determine the optimal concurrency for row groups
// Use available CPU cores but cap it to avoid excessive memory usage
let max_concurrent_row_groups = available_parallelism().get() * 4;

// For very small numbers of row groups, process them individually
// For larger numbers, batch them to limit concurrency and memory usage
// TODO: it would be ideal not to batch them, but to employ control mechanism
// where new row groups are started after tasks for previous row groups finish
let row_group_batches = if row_groups_to_process.len() <= max_concurrent_row_groups {
// Process each row group individually
row_groups_to_process.into_iter().map(|rg| vec![rg]).collect::<Vec<_>>()
} else {
// Batch row groups to limit concurrency
let batch_size_calc = (row_groups_to_process.len() + max_concurrent_row_groups - 1) / max_concurrent_row_groups;
row_groups_to_process
.chunks(batch_size_calc)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>()
};

// Create futures that will build separate stream builders for each row group batch
let row_group_stream_futures = row_group_batches
.into_iter()
.map(move |row_group_batch| {
let file_io = file_io.clone();
let data_file_path = data_file_path.clone();
let row_selection = row_selection.clone();
let project_field_ids = project_field_ids.clone();
let schema = schema.clone();
let mut transformer = RecordBatchTransformer::build(
Arc::new(schema.clone()),
&project_field_ids,
);
let final_predicate = final_predicate.clone();

async move {
// Create a fresh builder for this row group batch
let mut builder = Self::create_parquet_record_batch_stream_builder(
&data_file_path,
file_io,
should_load_page_index,
).await?;

// Apply projection
let projection_mask = Self::get_arrow_projection_mask(
&project_field_ids,
&schema,
builder.parquet_schema(),
builder.schema(),
).ok();
if let Some(projection_mask) = projection_mask {
builder = builder.with_projection(projection_mask);
}

// Apply batch size
if let Some(batch_size) = batch_size {
builder = builder.with_batch_size(batch_size);
}

// Apply row filter if predicate exists
if let Some(ref predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
builder.parquet_schema(),
predicate,
)?;

let row_filter = Self::get_row_filter(
predicate,
builder.parquet_schema(),
&iceberg_field_ids,
&field_id_map,
)?;
builder = builder.with_row_filter(row_filter);
}

// Apply row selection
if let Some(row_selection) = row_selection {
builder = builder.with_row_selection(row_selection);
}

// Apply row group selection
builder = builder.with_row_groups(row_group_batch);

// Build the stream
let stream = builder.build()?;
let transformed_stream = stream.map(move |batch| match batch {
Ok(batch) => transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

// Ensure each row group stream runs on its own task to avoid single-threaded polling
Ok::<_, crate::Error>(Self::stream_to_receiver(Box::pin(transformed_stream)))
}
});

// Process row group streams concurrently and flatten the results
let parallel_stream = futures::stream::iter(row_group_stream_futures)
.buffer_unordered(max_concurrent_row_groups)
.try_flatten_unordered(max_concurrent_row_groups);

Ok(parallel_stream)
}

pub(crate) async fn create_parquet_record_batch_stream_builder(
Expand Down Expand Up @@ -1423,6 +1572,7 @@ fn try_cast_literal(
Ok(Arc::new(Scalar::new(literal_array)))
}


#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
Loading