diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5cb06d1d37..1018b9ff97 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -33,7 +33,7 @@ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, try_join}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -148,15 +148,21 @@ impl ArrowReader { let stream = tasks .map_ok(move |task| { let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - ) + let delete_file_loader = self.delete_file_loader.clone(); + + async move { + let file_stream = Self::process_file_scan_task( + task, + batch_size, + file_io, + delete_file_loader, + row_group_filtering_enabled, + row_selection_enabled, + ).await?; + + // Convert the stream to run on separate tasks for parallel processing + Ok::<_, crate::Error>(Self::stream_to_receiver(file_stream)) + } }) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) @@ -201,7 +207,7 @@ impl ArrowReader { // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion // and column re-ordering - let mut record_batch_transformer = + let record_batch_transformer = RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); if let Some(batch_size) = batch_size { @@ -241,7 +247,7 @@ impl ArrowReader { let mut selected_row_group_indices = None; let mut row_selection = None; - if let Some(predicate) = final_predicate { + if let Some(ref predicate) = final_predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), &predicate, @@ -300,27 +306,170 @@ impl ArrowReader { }; } - if let Some(row_selection) = row_selection { - record_batch_stream_builder = - record_batch_stream_builder.with_row_selection(row_selection); - } + // Determine which row groups to process + let row_groups_to_process = selected_row_group_indices + .unwrap_or_else(|| { + // If no specific row groups selected, process all of them + (0..record_batch_stream_builder.metadata().row_groups().len()).collect() + }); - if let Some(selected_row_group_indices) = selected_row_group_indices { - record_batch_stream_builder = - record_batch_stream_builder.with_row_groups(selected_row_group_indices); - } + // Create parallel streams for row groups to enable intra-file parallelism + let parallel_stream = Self::create_parallel_row_group_streams( + task.data_file_path.clone(), + file_io, + should_load_page_index, + row_groups_to_process, + row_selection, + record_batch_transformer, + task.project_field_ids.clone(), + (*task.schema).clone(), + batch_size, + final_predicate, + ).await?; + + Ok(Box::pin(parallel_stream) as ArrowRecordBatchStream) + } + + /// Converts an ArrowRecordBatchStream to a receiver-based stream that runs in its own task. + /// This allows each file's parquet stream polling to run on separate tokio tasks. + fn stream_to_receiver(mut stream: ArrowRecordBatchStream) -> ArrowRecordBatchStream { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a dedicated task for this stream's processing + // Each file's stream will run on its own task, allowing parallel processing + tokio::spawn(async move { + while let Some(result) = StreamExt::next(&mut stream).await { + if tx.send(result).is_err() { + // Receiver dropped, stop processing + break; + } + } + // Stream ends, channel will be closed automatically when tx is dropped + }); - // Build the batch stream and send all the RecordBatches that it generates - // to the requester. - let record_batch_stream = - record_batch_stream_builder - .build()? - .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), - Err(err) => Err(err.into()), - }); + // Convert the receiver to a stream using futures utilities + let receiver_stream = futures::stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|result| (result, rx)) + }); - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + Box::pin(receiver_stream) + } + + /// Creates parallel streams for row groups to enable intra-file parallelism. + /// Each row group gets its own ParquetRecordBatchStreamBuilder and runs on a separate task. + async fn create_parallel_row_group_streams( + data_file_path: String, + file_io: FileIO, + should_load_page_index: bool, + row_groups_to_process: Vec, + row_selection: Option, + _record_batch_transformer: RecordBatchTransformer, + project_field_ids: Vec, + schema: Schema, + batch_size: Option, + final_predicate: Option, + ) -> Result> + Send> { + // Determine the optimal concurrency for row groups + // Use available CPU cores but cap it to avoid excessive memory usage + let max_concurrent_row_groups = available_parallelism().get() * 4; + + // For very small numbers of row groups, process them individually + // For larger numbers, batch them to limit concurrency and memory usage + // TODO: it would be ideal not to batch them, but to employ control mechanism + // where new row groups are started after tasks for previous row groups finish + let row_group_batches = if row_groups_to_process.len() <= max_concurrent_row_groups { + // Process each row group individually + row_groups_to_process.into_iter().map(|rg| vec![rg]).collect::>() + } else { + // Batch row groups to limit concurrency + let batch_size_calc = (row_groups_to_process.len() + max_concurrent_row_groups - 1) / max_concurrent_row_groups; + row_groups_to_process + .chunks(batch_size_calc) + .map(|chunk| chunk.to_vec()) + .collect::>() + }; + + // Create futures that will build separate stream builders for each row group batch + let row_group_stream_futures = row_group_batches + .into_iter() + .map(move |row_group_batch| { + let file_io = file_io.clone(); + let data_file_path = data_file_path.clone(); + let row_selection = row_selection.clone(); + let project_field_ids = project_field_ids.clone(); + let schema = schema.clone(); + let mut transformer = RecordBatchTransformer::build( + Arc::new(schema.clone()), + &project_field_ids, + ); + let final_predicate = final_predicate.clone(); + + async move { + // Create a fresh builder for this row group batch + let mut builder = Self::create_parquet_record_batch_stream_builder( + &data_file_path, + file_io, + should_load_page_index, + ).await?; + + // Apply projection + let projection_mask = Self::get_arrow_projection_mask( + &project_field_ids, + &schema, + builder.parquet_schema(), + builder.schema(), + ).ok(); + if let Some(projection_mask) = projection_mask { + builder = builder.with_projection(projection_mask); + } + + // Apply batch size + if let Some(batch_size) = batch_size { + builder = builder.with_batch_size(batch_size); + } + + // Apply row filter if predicate exists + if let Some(ref predicate) = final_predicate { + let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( + builder.parquet_schema(), + predicate, + )?; + + let row_filter = Self::get_row_filter( + predicate, + builder.parquet_schema(), + &iceberg_field_ids, + &field_id_map, + )?; + builder = builder.with_row_filter(row_filter); + } + + // Apply row selection + if let Some(row_selection) = row_selection { + builder = builder.with_row_selection(row_selection); + } + + // Apply row group selection + builder = builder.with_row_groups(row_group_batch); + + // Build the stream + let stream = builder.build()?; + let transformed_stream = stream.map(move |batch| match batch { + Ok(batch) => transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }); + + // Ensure each row group stream runs on its own task to avoid single-threaded polling + Ok::<_, crate::Error>(Self::stream_to_receiver(Box::pin(transformed_stream))) + } + }); + + // Process row group streams concurrently and flatten the results + let parallel_stream = futures::stream::iter(row_group_stream_futures) + .buffer_unordered(max_concurrent_row_groups) + .try_flatten_unordered(max_concurrent_row_groups); + + Ok(parallel_stream) } pub(crate) async fn create_parquet_record_batch_stream_builder( @@ -1423,6 +1572,7 @@ fn try_cast_literal( Ok(Arc::new(Scalar::new(literal_array))) } + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet};