diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index b2df32a62e441..b58ef66d4cd2b 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -382,14 +382,14 @@ //! //! Calling [`execute`] produces 1 or more partitions of data, //! as a [`SendableRecordBatchStream`], which implements a pull based execution -//! API. Calling `.next().await` will incrementally compute and return the next +//! API. Calling [`next()`]`.await` will incrementally compute and return the next //! [`RecordBatch`]. Balanced parallelism is achieved using [Volcano style] //! "Exchange" operations implemented by [`RepartitionExec`]. //! //! While some recent research such as [Morsel-Driven Parallelism] describes challenges //! with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves -//! similar scalability as systems that use morsel driven approach such as DuckDB. -//! See the [DataFusion paper submitted to SIGMOD] for more details. +//! similar scalability as systems that use push driven schedulers [such as DuckDB]. +//! See the [DataFusion paper in SIGMOD 2024] for more details. //! //! [`execute`]: physical_plan::ExecutionPlan::execute //! [`SendableRecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream @@ -403,22 +403,183 @@ //! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html //! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf //! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf -//! [DataFusion paper submitted SIGMOD]: https://github.com/apache/datafusion/files/13874720/DataFusion_Query_Engine___SIGMOD_2024.pdf +//! [DataFusion paper in SIGMOD 2024]: https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf +//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583 //! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors //! -//! ## Thread Scheduling +//! ## Streaming Execution //! -//! DataFusion incrementally computes output from a [`SendableRecordBatchStream`] -//! with `target_partitions` threads. Parallelism is implementing using multiple -//! [Tokio] [`task`]s, which are executed by threads managed by a tokio Runtime. -//! While tokio is most commonly used -//! for asynchronous network I/O, its combination of an efficient, work-stealing -//! scheduler, first class compiler support for automatic continuation generation, -//! and exceptional performance makes it a compelling choice for CPU intensive -//! applications as well. This is explained in more detail in [Using Rustlang’s Async Tokio -//! Runtime for CPU-Bound Tasks]. +//! DataFusion is a "streaming" query engine which means `ExecutionPlan`s incrementally +//! read from their input(s) and compute output one [`RecordBatch`] at a time +//! by continually polling [`SendableRecordBatchStream`]s. Output and +//! intermediate `RecordBatch`s each have approximately `batch_size` rows, +//! which amortizes per-batch overhead of execution. +//! +//! Note that certain operations, sometimes called "pipeline breakers", +//! (for example full sorts or hash aggregations) are fundamentally non streaming and +//! must read their input fully before producing **any** output. As much as possible, +//! other operators read a single [`RecordBatch`] from their input to produce a +//! single `RecordBatch` as output. +//! +//! For example, given this SQL query: +//! +//! ```sql +//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30); +//! ``` +//! +//! The diagram below shows the call sequence when a consumer calls [`next()`] to +//! get the next `RecordBatch` of output. While it is possible that some +//! steps run on different threads, typically tokio will use the same thread +//! that called `next()` to read from the input, apply the filter, and +//! return the results without interleaving any other operations. This results +//! in excellent cache locality as the same CPU core that produces the data often +//! consumes it immediately as well. +//! +//! ```text +//! +//! Step 3: FilterExec calls next() Step 2: ProjectionExec calls +//! on input Stream next() on input Stream +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +//! │ Step 1: Consumer +//! ▼ ▼ │ calls next() +//! ┏━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! ┃ DataSource ┃ ┃ ┃ ┃ ┃ +//! ┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃ +//! ┃ ParquetExec) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃ +//! ┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ +//! ┃ ┃ ┃ ┃ ┃ ┃ +//! ┗━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! │ ▲ ▲ Step 6: ProjectionExec +//! │ │ │ computes date_trunc into a +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned +//! ┌─────────────────────┐ ┌─────────────┐ from client +//! │ RecordBatch │ │ RecordBatch │ +//! └─────────────────────┘ └─────────────┘ +//! +//! Step 4: DataSource returns a Step 5: FilterExec returns a new +//! single RecordBatch RecordBatch with only matching rows +//! ``` +//! +//! [`next()`]: futures::StreamExt::next +//! +//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s +//! +//! DataFusion automatically runs each plan with multiple CPU cores using +//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used +//! for asynchronous network I/O, the combination of an efficient, work-stealing +//! scheduler and first class compiler support for automatic continuation +//! generation (`async`), also makes it a compelling choice for CPU intensive +//! applications as explained in the [Using Rustlang’s Async Tokio +//! Runtime for CPU-Bound Tasks] blog. +//! +//! The number of cores used is determined by the `target_partitions` +//! configuration setting, which defaults to the number of CPU cores. +//! During execution, DataFusion creates this many distinct `async` [`Stream`]s and +//! this many distinct [Tokio] [`task`]s, which drive the `Stream`s +//! using threads managed by the `Runtime`. Many DataFusion `Stream`s perform +//! CPU intensive processing. +//! +//! Using `async` for CPU intensive tasks makes it easy for [`TableProvider`]s +//! to perform network I/O using standard Rust `async` during execution. +//! However, this design also makes it very easy to mix CPU intensive and latency +//! sensitive I/O work on the same thread pool ([`Runtime`]). +//! Using the same (default) `Runtime` is convenient, and often works well for +//! initial development and processing local files, but it can lead to problems +//! under load and/or when reading from network sources such as AWS S3. +//! +//! If your system does not fully utilize either the CPU or network bandwidth +//! during execution, or you see significantly higher tail (e.g. p99) latencies +//! responding to network requests, **it is likely you need to use a different +//! `Runtime` for CPU intensive DataFusion plans**. This effect can be especially +//! pronounced when running several queries concurrently. +//! +//! As shown in the following figure, using the same `Runtime` for both CPU +//! intensive processing and network requests can introduce significant +//! delays in responding to those network requests. Delays in processing network +//! requests can and does lead network flow control to throttle the available +//! bandwidth in response. +//! +//! ```text +//! Legend +//! +//! ┏━━━━━━┓ +//! Processing network request ┃ ┃ CPU bound work +//! is delayed due to processing ┗━━━━━━┛ +//! CPU bound work ┌─┐ +//! │ │ Network request +//! ││ └─┘ processing +//! +//! ││ +//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! │ │ +//! +//! ▼ ▼ +//! ┌─────────────┐ ┌─┐┌─┐┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓┌─┐ +//! │ │thread 1 │ ││ │┃ Decoding ┃┃ Filtering ┃│ │ +//! │ │ └─┘└─┘┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛└─┘ +//! │ │ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ +//! │Tokio Runtime│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ... +//! │(thread pool)│ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛ +//! │ │ ... ... +//! │ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓┌─┐ ┏━━━━━━━━━━━━━━┓ +//! │ │thread N ┃ Decoding ┃ Filtering ┃│ │ ┃ Decoding ┃ +//! └─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┛└─┘ ┗━━━━━━━━━━━━━━┛ +//! ─────────────────────────────────────────────────────────────▶ +//! time +//! ``` +//! +//! The bottleneck resulting from network throttling can be avoided +//! by using separate [`Runtime`]s for the different types of work, as shown +//! in the diagram below. +//! +//! ```text +//! A separate thread pool processes network Legend +//! requests, reducing the latency for +//! processing each request ┏━━━━━━┓ +//! ┃ ┃ CPU bound work +//! │ ┗━━━━━━┛ +//! │ ┌─┐ +//! ┌ ─ ─ ─ ─ ┘ │ │ Network request +//! ┌ ─ ─ ─ ┘ └─┘ processing +//! │ +//! ▼ ▼ +//! ┌─────────────┐ ┌─┐┌─┐┌─┐ +//! │ │thread 1 │ ││ ││ │ +//! │ │ └─┘└─┘└─┘ +//! │Tokio Runtime│ ... +//! │(thread pool)│thread 2 +//! │ │ +//! │"IO Runtime" │ ... +//! │ │ ┌─┐ +//! │ │thread N │ │ +//! └─────────────┘ └─┘ +//! ─────────────────────────────────────────────────────────────▶ +//! time +//! +//! ┌─────────────┐ ┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓ +//! │ │thread 1 ┃ Decoding ┃┃ Filtering ┃ +//! │ │ ┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛ +//! │Tokio Runtime│ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ +//! │(thread pool)│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ... +//! │ │ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛ +//! │ CPU Runtime │ ... ... +//! │ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ +//! │ │thread N ┃ Decoding ┃ Filtering ┃ Decoding ┃ +//! └─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛ +//! ─────────────────────────────────────────────────────────────▶ +//! time +//!``` +//! +//! Note that DataFusion does not use [`tokio::task::spawn_blocking`] for +//! CPU-bounded work, because `spawn_blocking` is designed for blocking **IO**, +//! not designed CPU bound tasks. Among other challenges, spawned blocking +//! tasks can't yield waiting for input (can't call `await`) so they +//! can't be used to limit the number of concurrent CPU bound tasks or +//! keep the processing pipeline to the same core. //! //! [Tokio]: https://tokio.rs +//! [`Runtime`]: tokio::runtime::Runtime //! [`task`]: tokio::task //! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ //!