Skip to content

Fix single-threaded bottleneck in parquet file stream processing#1684

Closed
vustef wants to merge 1 commit intoapache:mainfrom
vustef:vs-scale-read-polls
Closed

Fix single-threaded bottleneck in parquet file stream processing#1684
vustef wants to merge 1 commit intoapache:mainfrom
vustef:vs-scale-read-polls

Conversation

@vustef
Copy link
Copy Markdown

@vustef vustef commented Sep 17, 2025

Which issue does this PR close?

  • Closes #.

What changes are included in this PR?

Problem

The iceberg-rust arrow reader had a critical performance bottleneck where
CPU-intensive parquet operations (decompression, decoding) were running
on a single thread, despite multiple files being processed concurrently.

Root cause: While try_buffer_unordered() enabled concurrent file
processing, the try_flatten_unordered() was polling each file's
ArrowRecordBatchStream sequentially on the same thread. This meant that
the CPU-heavy ParquetRecordBatchStream::poll_next() operations - which
perform parquet decompression and decoding - were all bottlenecked on a
single thread.
EDIT: actually now it appears to me that the whole crate doesn't utilize multiple threads too much, as it never spawns. So although it utilizes futures (in streams) which are processed concurrently, those futures are only processed on a single thread only for the most part. So some of the details below may not be accurate, although the perf gain and experiments from this change still stand.

Impact: Multi-core systems were severely underutilized when processing
multiple parquet files, as evidenced by flamegraphs showing
single-threaded execution in the critical parquet processing path.
Here's a sample flamegraph:
image

Solution

Modified the arrow reader to ensure each file's parquet stream processing
runs on dedicated tokio tasks:

  1. Added stream_to_receiver() method: Wraps each ArrowRecordBatchStream
    in a tokio::spawn() task that handles the CPU-intensive polling. Consideration: perhaps we should use spawn_blocking
  2. Updated process_file_scan_task(): Each file's stream is now wrapped
    with stream_to_receiver() before being returned
  3. Preserved existing concurrency patterns: Uses the same
    try_buffer_unordered() + try_flatten_unordered() flow, but ensures the
    underlying polling scales across threads

How it fixes the issue

  • Before: All parquet streams polled sequentially → single-threaded CPU
    bottleneck. TPCH lineitem scan on SF100 on r7i.8xlarge EC2 was taking 56s to scan.
    Network throughput:
image
  • After: Each parquet stream polled in its own tokio::spawn() task → true
    multi-threaded CPU utilization. TPCH lineitem scan now takes 20s.
    Network throughput:
image

The key insight was that try_flatten_unordered() only provides async
concurrency for the futures that create streams, but the actual stream
polling (where the CPU work happens) was still sequential. By moving each
stream's polling to its own tokio task via stream_to_receiver(), we
ensure that multiple files' CPU-intensive parquet operations can run
simultaneously on different threads.

Additional considerations:

[ ] Should we use spawn_blocking in stream_to_receiver? I think we should, since we're trying to scale CPU heavy work here (IO heavy work already works well, even with a single thread).

[ ] Should we add a new configuration method for ScanBuilder to be able to limit this new concurrency? Or is it fine that it's the same as concurrency_limit_data_files?

Are these changes tested?

).await?;

// Convert the stream to run on separate tasks for parallel processing
Ok::<_, crate::Error>(Self::stream_to_receiver(file_stream))
Copy link
Copy Markdown
Author

@vustef vustef Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this, perhaps we can just wrap this whole block in a task, and rely that try_buffer_unordered would fetch from stream in parallel, as per https://www.datawill.io/posts/rust-streams-concurrency/. I'm just not sure if it would parallelize tasks: FileScanTaskStream, or the nested file_stream: ArrowRecordBatchStream too. Perhaps the concurrency parameter on try_flatten_unordered would parallelize the nested stream.

@liurenjie1024
Copy link
Copy Markdown
Contributor

Thanks @vustef for this pr, the reason currently we keep reading to arrow method simple is that parallel scan depends on things like an neutral runtime, memory management, etc. This is beyond the scope of this core crate.If you want to read iceberg table locally in parallel, we recommend you to use datafusion integration.

@vustef
Copy link
Copy Markdown
Author

vustef commented Sep 26, 2025

Thanks @vustef for this pr, the reason currently we keep reading to arrow method simple is that parallel scan depends on things like an neutral runtime, memory management, etc. This is beyond the scope of this core crate.If you want to read iceberg table locally in parallel, we recommend you to use datafusion integration.

Thanks @liurenjie1024. I'm happy to use drop the PR and use anything else. Given that the datafusion integration still uses to_arrow method (ref here and here), this tells me that perhaps there's no API low-level enough for crates outside of the core crate to parallelize stuff. That's because the work already happens in the core crate by the time the items are put into the stream.

Is that right? Or do you think it'd be possible to parallelize things on the client side of the core crate?

If not, would you be willing to open up the core crate API so that the units of parallelism can be scheduled on different threads by the users of the core crate?

@liurenjie1024
Copy link
Copy Markdown
Contributor

Is that right? Or do you think it'd be possible to parallelize things on the client side of the core crate?

In fact, it not right. The desired flow is like following:

  1. (core crate)TableScan.plan_files to split the scanning into several pieces, each FileScanTask contains several parts, each part is part of a large parquet data file.
  2. (external engine) The external engine parallels scanning by running FileScanTask in parallel. For example in spark, each FileScanTask will be assigned to one task.
  3. (core crate) The ArrowReader accepts one FileScanTask and read them into arrow data stream. This happens in core crate because some iceberg specific thing like type promotion, field match by id should be handled by iceberg.

@vustef
Copy link
Copy Markdown
Author

vustef commented Sep 29, 2025

Is that right? Or do you think it'd be possible to parallelize things on the client side of the core crate?

In fact, it not right. The desired flow is like following:

  1. (core crate)TableScan.plan_files to split the scanning into several pieces, each FileScanTask contains several parts, each part is part of a large parquet data file.
  2. (external engine) The external engine parallels scanning by running FileScanTask in parallel. For example in spark, each FileScanTask will be assigned to one task.
  3. (core crate) The ArrowReader accepts one FileScanTask and read them into arrow data stream. This happens in core crate because some iceberg specific thing like type promotion, field match by id should be handled by iceberg.

Thank you @liurenjie1024 for the guidance. I think that makes sense to me at the moment. Closing this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants