Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,21 @@ impl ArrowReader {
let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
let delete_file_loader = self.delete_file_loader.clone();

async move {
let file_stream = Self::process_file_scan_task(
task,
batch_size,
file_io,
delete_file_loader,
row_group_filtering_enabled,
row_selection_enabled,
).await?;

// Convert the stream to run on separate tasks for parallel processing
Ok::<_, crate::Error>(Self::stream_to_receiver(file_stream))
Copy link
Copy Markdown
Author

@vustef vustef Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this, perhaps we can just wrap this whole block in a task, and rely that try_buffer_unordered would fetch from stream in parallel, as per https://www.datawill.io/posts/rust-streams-concurrency/. I'm just not sure if it would parallelize tasks: FileScanTaskStream, or the nested file_stream: ArrowRecordBatchStream too. Perhaps the concurrency parameter on try_flatten_unordered would parallelize the nested stream.

}
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
Expand Down Expand Up @@ -323,6 +329,31 @@ impl ArrowReader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

/// Converts an ArrowRecordBatchStream to a receiver-based stream that runs in its own task.
/// This allows each file's parquet stream polling to run on separate tokio tasks.
fn stream_to_receiver(mut stream: ArrowRecordBatchStream) -> ArrowRecordBatchStream {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

// Spawn a dedicated task for this stream's processing
// Each file's stream will run on its own task, allowing parallel processing
tokio::spawn(async move {
while let Some(result) = StreamExt::next(&mut stream).await {
if tx.send(result).is_err() {
// Receiver dropped, stop processing
break;
}
}
// Stream ends, channel will be closed automatically when tx is dropped
});

// Convert the receiver to a stream using futures utilities
let receiver_stream = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|result| (result, rx))
});

Box::pin(receiver_stream)
}

pub(crate) async fn create_parquet_record_batch_stream_builder(
data_file_path: &str,
file_io: FileIO,
Expand Down Expand Up @@ -1423,6 +1454,7 @@ fn try_cast_literal(
Ok(Arc::new(Scalar::new(literal_array)))
}


#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
Loading