diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5cb06d1d37..4b3493f56b 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -148,15 +148,21 @@ impl ArrowReader { let stream = tasks .map_ok(move |task| { let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - ) + let delete_file_loader = self.delete_file_loader.clone(); + + async move { + let file_stream = Self::process_file_scan_task( + task, + batch_size, + file_io, + delete_file_loader, + row_group_filtering_enabled, + row_selection_enabled, + ).await?; + + // Convert the stream to run on separate tasks for parallel processing + Ok::<_, crate::Error>(Self::stream_to_receiver(file_stream)) + } }) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) @@ -323,6 +329,31 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + /// Converts an ArrowRecordBatchStream to a receiver-based stream that runs in its own task. + /// This allows each file's parquet stream polling to run on separate tokio tasks. + fn stream_to_receiver(mut stream: ArrowRecordBatchStream) -> ArrowRecordBatchStream { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a dedicated task for this stream's processing + // Each file's stream will run on its own task, allowing parallel processing + tokio::spawn(async move { + while let Some(result) = StreamExt::next(&mut stream).await { + if tx.send(result).is_err() { + // Receiver dropped, stop processing + break; + } + } + // Stream ends, channel will be closed automatically when tx is dropped + }); + + // Convert the receiver to a stream using futures utilities + let receiver_stream = futures::stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|result| (result, rx)) + }); + + Box::pin(receiver_stream) + } + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, @@ -1423,6 +1454,7 @@ fn try_cast_literal( Ok(Arc::new(Scalar::new(literal_array))) } + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet};