From 243f4cbe39225b3412fa488f8943920ee6a2cd14 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Nov 2024 11:58:46 -0500 Subject: [PATCH 01/12] Add example for using a separate threadpool for CPU bound work --- datafusion-examples/examples/thread_pools.rs | 213 ++++ datafusion/physical-plan/Cargo.toml | 4 + .../physical-plan/src/cross_rt_stream.rs | 408 ++++++++ .../physical-plan/src/dedicated_executor.rs | 988 ++++++++++++++++++ .../physical-plan/src/io_object_store.rs | 138 +++ datafusion/physical-plan/src/lib.rs | 5 + 6 files changed, 1756 insertions(+) create mode 100644 datafusion-examples/examples/thread_pools.rs create mode 100644 datafusion/physical-plan/src/cross_rt_stream.rs create mode 100644 datafusion/physical-plan/src/dedicated_executor.rs create mode 100644 datafusion/physical-plan/src/io_object_store.rs diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs new file mode 100644 index 0000000000000..9ed47dec93b53 --- /dev/null +++ b/datafusion-examples/examples/thread_pools.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to +//! run the CPU intensive parts of DataFusion plans. +//! +//! Running DataFusion plans that perform I/O, such as reading parquet files +//! directly from remote object storage (e.g. AWS S3) without care will result +//! in running CPU intensive jobs on the same thread pool, which can lead to the +//! issues described in the [Architecture section] such as throttled bandwidth +//! due to congestion control and increased latencies for processing network +//! messages. +use arrow::util::pretty::pretty_format_batches; +use datafusion::error::Result; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::DedicatedExecutor; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use object_store::http::HttpBuilder; +use object_store::ObjectStore; +use std::sync::Arc; +use url::Url; + +/// Normally, you don't need to worry about the details of the tokio runtime, +/// but for this example it is important to understand how the [`Runtime`]s work. +/// +/// There is a "current" runtime that is installed in a thread local variable +/// that is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as +/// as the "current" runtime (on which any `async` futures, streams and tasks +/// are run). +#[tokio::main] +async fn main() -> Result<()> { + // The first two examples only do local file IO. Enable the URL table so we + // can select directly from filenames in SQL. + let ctx = SessionContext::new().enable_url_table(); + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run the same query on the same runtime. Note that calling `await` here + // will effectively run the future (in this case the `async` function) on + // the current runtime. + same_runtime(&ctx, &sql).await?; + + // Run the same query on a different runtime. Note that we are still calling + // `await` here, so the the `async` function still runs on the current runtime. + // We use the `DedicatedExecutor` to run the query on a different runtime. + different_runtime_basic(ctx, sql).await?; + + // Run the same query on a different runtime including remote IO + different_runtime_advanced().await?; + + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is now most examples in DataFusion are written and works well for +/// development and local query processing. +async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` on a DataFrame returns a + // `SendableRecordBatchStream`. Depending on the plan, this may also do + // network I/O, for example to begin reading a parquet file from a remote + // object store as well. It is also possible that this function call spawns + // tasks that begin doing CPU intensive work as well + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` drives the plan, producing new `RecordBatch`es using the + // current runtime (and typically also the current thread). + // + // Perhaps somewhat non obvious, calling the `next()` function often will + // result in other tasks being spawned on the current runtime (e.g. for + // `RepartitionExec` to read data from each of its input partitions in + // parallel). + // + // Executing the plan like this results in all CPU intensive work + // running on same (default) Runtime. + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Demonstrates how to run queries on a **different** runtime than the current one +/// +/// See [`different_runtime_advanced`] to see how you should run DataFusion +/// queries from a network server or when processing data from a remote object +/// store. +async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> { + // First, we need a new runtime, which we can create with the tokio builder + // however, since we are already in the context of another runtime + // (installed by #[tokio::main]) we create a new thread for the runtime + let dedicated_executor = DedicatedExecutor::builder().build(); + + // Now, we can simply run the query on the new runtime + dedicated_executor + .spawn(async move { + // this runs on the different threadpool + let df = ctx.sql(&sql).await?; + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` to drive the plan on the different threadpool + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) as Result<()> + }) + // even though we are `await`ing here on the "current" pool, internally + // the DedicatedExecutor runs the work on the separate threadpool pool + // and the `await` simply notifies when the work is done that the work is done + .await??; + + // When done with a DedicatedExecutor, it should be shut down cleanly to give + // any outstanding tasks a chance to clean up + dedicated_executor.join().await; + + Ok(()) +} + +/// Demonstrates how to run queries on a different runtime than the current run +/// and how to handle IO operations. +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will configure access to a remote object store + // over the network during the plan + + let ctx = SessionContext::new().enable_url_table(); + + // setup http object store + let base_url = Url::parse("https://github.com").unwrap(); + let http_store: Arc = + Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?); + + let dedicated_executor = DedicatedExecutor::builder().build(); + + // By default, the object store will use the current runtime for IO operations + // if we use a dedicated executor to run the plan, the eventual object store requests will also use the + // dedicated executor's runtime + // + // To avoid this, we can wrap the object store to run on the "IO" runtime + // + // (if we don't do this the example fails with an error like + // + // ctx.register_object_store(&base_url, http_store); + // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. + + let http_store = dedicated_executor.wrap_object_store(http_store); + + // Tell datafusion about processing http:// urls with this wrapped object store + ctx.register_object_store(&base_url, http_store); + + // Plan (and execute) the query on the dedicated runtime + let stream = dedicated_executor + .spawn(async move { + // Plan / execute the query + let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; + let df = ctx + .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) + .await?; + let stream: SendableRecordBatchStream = df.execute_stream().await?; + + Ok(stream) as Result<_> + }).await??; + + // We have now planned the query on the dedicated runtime, Yay! but we still need to + // drive the stream (aka call `next()` to get the results). + + // However, as mentioned above, calling `next()` resolves the Stream (and + // any work it may do) on a thread in the current (default) runtime. + // + // To drive the stream on the dedicated runtime, we need to wrap it using a + // `DedicatedExecutor::wrap_stream` stream function + // + // Note if you don't do this you will likely see a panic about `No IO runtime registered.` + // because the threads in the current (main) tokio runtime have not had the IO runtime + // installed + let mut stream = dedicated_executor.run_sendable_record_batch_stream(stream); + + // Note you can run other streams on the DedicatedExecutor as well using the + // DedicatedExecutor:YYYXXX function. This is helpful for example, if you + // need to do non trivial CPU work on the results of the stream (e.g. + // calling a FlightDataEncoder to convert the results to flight to send it + // over the network), + + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + + Ok(()) +} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index bb0e21fdfd158..a1917985eb018 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -65,6 +65,10 @@ parking_lot = { workspace = true } pin-project-lite = "^0.2.7" rand = { workspace = true } tokio = { workspace = true } +# todo figure out if we need to use tokio_stream / could use record batch receiver stream +tokio-stream = {version = "0.1"} +object_store = { workspace = true } + [dev-dependencies] criterion = { version = "0.5", features = ["async_futures"] } diff --git a/datafusion/physical-plan/src/cross_rt_stream.rs b/datafusion/physical-plan/src/cross_rt_stream.rs new file mode 100644 index 0000000000000..df997d2c80f8f --- /dev/null +++ b/datafusion/physical-plan/src/cross_rt_stream.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [CrossRtStream] runs [`Stream`]s in a different tokio runtime. + +//! Tooling to pull [`Stream`]s from one tokio runtime into another. +//! +//! Originally from [InfluxDB 3.0] +//! [InfluxDB 3.0]:https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/iox_query/src/exec/cross_rt_stream.rs#L1 +//! +//! This is critical so that CPU heavy loads are not run on the same runtime as IO handling + +// TODO: figure out where ot pull this code (not in physical plan...) +// maybe its own crate or maybe in common-runtime ?? + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::dedicated_executor::JobError; +use crate::DedicatedExecutor; +use datafusion_common::DataFusionError; +use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::ReceiverStream; + +/// [`Stream`] that is calculated by one tokio runtime but can safely be pulled +/// from another w/o stalling (esp. when the calculating runtime is +/// CPU-blocked). +/// +/// See XXX in the architecture documentation for moe details +pub struct CrossRtStream { + /// Future that drives the underlying stream. + /// + /// This is actually wrapped into [`DedicatedExecutor::spawn`] so it can be safely polled by the receiving runtime. + driver: BoxFuture<'static, ()>, + + /// Flags if the [driver](Self::driver) returned [`Poll::Ready`]. + driver_ready: bool, + + /// Receiving stream. + /// + /// This one can be polled from the receiving runtime. + inner: ReceiverStream, + + /// Signals that [`inner`](Self::inner) finished. + /// + /// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups. + inner_done: bool, +} + +impl std::fmt::Debug for CrossRtStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CrossRtStream") + .field("driver", &"...") + .field("driver_ready", &self.driver_ready) + .field("inner", &"...") + .field("inner_done", &self.inner_done) + .finish() + } +} + +impl CrossRtStream { + /// Create new stream by producing a future that sends its state to the given [`Sender`]. + /// + /// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn`] (except for testing purposes). + fn new_with_tx(f: F) -> Self + where + F: FnOnce(Sender) -> Fut, + Fut: Future + Send + 'static, + { + let (tx, rx) = channel(1); + let driver = f(tx).boxed(); + Self { + driver, + driver_ready: false, + inner: ReceiverStream::new(rx), + inner_done: false, + } + } +} + +impl CrossRtStream> +where + X: Send + 'static, + E: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s. + /// + /// Also receives an executor that actually executes the underlying stream as well as a converter that converts + /// [`executor::JobError`] to the error type of the stream (so we can send potential crashes/panics). + pub fn new_with_error_stream( + stream: S, + exec: DedicatedExecutor, + converter: C, + ) -> Self + where + S: Stream> + Send + 'static, + C: Fn(JobError) -> E + Send + 'static, + { + Self::new_with_tx(|tx| { + // future to be run in the other runtime + let tx_captured = tx.clone(); + let fut = async move { + tokio::pin!(stream); + + while let Some(res) = stream.next().await { + if tx_captured.send(res).await.is_err() { + // receiver gone + return; + } + } + }; + + // future for this runtime (likely the tokio/tonic/web driver) + async move { + if let Err(e) = exec.spawn(fut).await { + let e = converter(e); + + // last message, so we don't care about the receiver side + tx.send(Err(e)).await.ok(); + } + } + }) + } +} + +impl CrossRtStream> +where + X: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s w/ [`DataFusionError`]s. + /// + /// Also receives an executor that actually executes the underlying stream. + pub fn new_with_df_error_stream(stream: S, exec: DedicatedExecutor) -> Self + where + S: Stream> + Send + 'static, + { + Self::new_with_error_stream(stream, exec, |e| { + DataFusionError::Context( + "Join Error (panic)".to_string(), + Box::new(DataFusionError::External(e.into())), + ) + }) + } +} + +impl Stream for CrossRtStream { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + + if !this.driver_ready { + let res = this.driver.poll_unpin(cx); + + if res.is_ready() { + this.driver_ready = true; + } + } + + if this.inner_done { + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } else { + match ready!(this.inner.poll_next_unpin(cx)) { + None => { + this.inner_done = true; + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } + Some(x) => Poll::Ready(Some(x)), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::DedicatedExecutorBuilder; + use datafusion_execution::dedicated_executor::JobError; + use std::sync::OnceLock; + use std::{sync::Arc, time::Duration}; + use tokio::runtime::{Handle, RuntimeFlavor}; + + // Don't create many different runtimes for testing to avoid thread creation/description overhead + fn testing_executor() -> DedicatedExecutor { + TESTING_EXECUTOR + .get_or_init(|| { + DedicatedExecutorBuilder::new() + .with_name("cross_rt_stream") + .build() + }) + .clone() + } + static TESTING_EXECUTOR: OnceLock = OnceLock::new(); + + #[tokio::test] + async fn test_async_block() { + let exec = testing_executor(); + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait().await; + ensure_pending(&mut f).await; + barrier2.wait().await; + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_sync_block() { + // This would deadlock if the stream payload would run within the same tokio runtime. To prevent any cheating + // (e.g. via channels), we ensure that the current runtime only has a single thread: + assert_eq!( + RuntimeFlavor::CurrentThread, + Handle::current().runtime_flavor() + ); + + let exec = testing_executor(); + let barrier1 = Arc::new(std::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(std::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait(); + barrier2_captured.wait(); + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait(); + ensure_pending(&mut f).await; + barrier2.wait(); + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_panic() { + let exec = testing_executor(); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async { panic!("foo") }), + exec, + std::convert::identity, + ); + + let e = stream + .next() + .await + .expect("stream not finished") + .unwrap_err(); + assert_eq!(e.to_string(), "Panic: foo"); + + let none = stream.next().await; + assert!(none.is_none()); + } + + #[tokio::test] + async fn test_cancel_future() { + let exec = testing_executor; + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier1.wait().await; + + // cancel + drop(f); + + barrier2.wait().await; + let res = stream.next().await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_cancel_stream() { + let exec = testing_executor(); + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier_captured.wait().await; + + // block forever + futures::future::pending::<()>().await; + + // keep barrier Arc alive + drop(barrier_captured); + unreachable!() + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier.wait().await; + assert_eq!(Arc::strong_count(&barrier), 2); + + // cancel + drop(f); + drop(stream); + + tokio::time::timeout(Duration::from_secs(5), async { + loop { + if Arc::strong_count(&barrier) == 1 { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_inner_future_driven_to_completion_after_stream_ready() { + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + + let mut stream = CrossRtStream::::new_with_tx(|tx| async move { + tx.send(1).await.ok(); + drop(tx); + barrier_captured.wait().await; + }); + + let handle = tokio::spawn(async move { barrier.wait().await }); + + assert_eq!(stream.next().await, Some(1)); + handle.await.unwrap(); + } + + async fn ensure_pending(f: &mut F) + where + F: Future + Send + Unpin, + { + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + _ = f => {panic!("not pending")}, + } + } +} diff --git a/datafusion/physical-plan/src/dedicated_executor.rs b/datafusion/physical-plan/src/dedicated_executor.rs new file mode 100644 index 0000000000000..51f14a3513ee6 --- /dev/null +++ b/datafusion/physical-plan/src/dedicated_executor.rs @@ -0,0 +1,988 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use crate::cross_rt_stream::CrossRtStream; +use crate::io_object_store::IoObjectStore; +use crate::stream::RecordBatchStreamAdapter; +use crate::SendableRecordBatchStream; +use datafusion_common::DataFusionError; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, Stream, TryFutureExt, +}; +use log::{info, warn}; +use object_store::ObjectStore; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// DedicatedExecutor can be `clone`ed and all clones share the same threadpool. +/// +/// TODO add note about `io_thread` +/// +/// TODO: things we use in InfluxData +/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors +/// 2. Some sort of hook to install tokio metrics +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +#[derive(Clone, Debug)] +pub struct DedicatedExecutor { + state: Arc>, +} + +impl DedicatedExecutor { + /// Create a new builder to crate a [`DedicatedExecutor`] + pub fn builder() -> DedicatedExecutorBuilder { + DedicatedExecutorBuilder::new() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # Notes + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } + + /// Returns an ObjectStore instance that will always perform I/O work on the + /// IO_RUNTIME. + /// + /// Note that this object store will only work correctly if run on this + /// dedicated executor. If you try and use it on another executor, it will + /// panic with "no IO runtime registered" type error. + pub fn wrap_object_store( + &self, + object_store: Arc, + ) -> Arc { + Arc::new(IoObjectStore::new(self.clone(), object_store)) + } + + /// Returns a SendableRecordBatchStream that will run on this executor's thread pool + pub fn run_sendable_record_batch_stream( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + let schema = stream.schema(); + let cross_rt_stream = + CrossRtStream::new_with_df_error_stream(stream, self.clone()); + Box::pin(RecordBatchStreamAdapter::new(schema, cross_rt_stream)) + } + + /// Runs an stream that produces Results on the executor's thread pool + /// + /// Ths stream must produce Results so that any errors on the dedicated + /// executor (like a panic or shutdown) can be communicated back. + /// + /// # Arguments: + /// - stream: the stream to run on this dedicated executor + /// - converter: a function that converts a [`JobError`] to the error type of the stream + pub fn run_stream( + &self, + stream: S, + converter: C, + ) -> impl Stream> + Send + 'static + where + X: Send + 'static, + E: Send + 'static, + S: Stream> + Send + 'static, + C: Fn(JobError) -> E + Send + 'static, + { + CrossRtStream::new_with_error_stream(stream, self.clone(), converter) + } + + /// Registers `handle` as the IO runtime for this thread + /// + /// This sets a thread-local variable + /// + /// See [`spawn_io`](Self::spawn_io) for more details + pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) + } + + /// Registers the "current" `handle` as the IO runtime for this thread + /// + /// This is useful for testing purposes. + /// + /// # Panics if no current handle is available (aka not running in a tokio + /// runtime) + pub fn register_current_runtime_for_io() { + Self::register_io_runtime(Some(Handle::current())) + } + + /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, + /// otherwise awaits on the current thread + /// + /// # Panic + /// Needs a IO runtime [registered](register_io_runtime). + pub async fn spawn_io(fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExecutor::spawn` or for tests `DedicatedExecutor::register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await + } +} + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +#[derive(Debug)] +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +/// IMPORTANT: Implement `Drop` for [`State`], NOT for [`DedicatedExecutor`], +/// because the executor can be cloned and clones share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() + && self.completed_shutdown.clone().now_or_never().is_none() + { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Potential error returned when polling [`DedicatedExecutor::spawn`]. +#[derive(Debug)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +impl std::error::Error for JobError {} + +/// Builder for [`DedicatedExecutor`] +pub struct DedicatedExecutorBuilder { + /// Name given to all execution threads. Defaults to "DedicatedExecutor" + name: String, + /// Builder for tokio runtime. Defaults to multi-threaded builder + runtime_builder: Builder, +} + +impl From for DataFusionError { + fn from(value: JobError) -> Self { + DataFusionError::External(Box::new(value)) + .context("JobError from DedicatedExecutor") + } +} + +impl DedicatedExecutorBuilder { + /// Create a new `DedicatedExecutorBuilder` with default values + /// + /// Note that by default this `DedicatedExecutor` will not be able to + /// perform network I/O. + pub fn new() -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder: Builder::new_multi_thread(), + } + } + + /// Create a new `DedicatedExecutorBuilder` from a pre-existing tokio + /// runtime [`Builder`]. + /// + /// This method permits customizing the tokio [`Executor`] used for the + /// [`DedicatedExecutor`] + pub fn new_from_builder(runtime_builder: Builder) -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder, + } + } + + /// Set the name of the dedicated executor (appear in the names of each thread). + /// + /// Defaults to "DedicatedExecutor" + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Set the number of worker threads. Defaults to the tokio default (the + /// number of virtual CPUs) + pub fn with_worker_threads(mut self, num_threads: usize) -> Self { + self.runtime_builder.worker_threads(num_threads); + self + } + + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the thread pool created via + /// `[tokio::main]` or similar. + /// + /// Note: If [`DedicatedExecutorBuilder::build`] is called from an existing + /// tokio runtime, it will assume that the existing runtime should be used + /// for I/O. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + pub fn build(self) -> DedicatedExecutor { + let Self { + name, + runtime_builder, + } = self; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + DedicatedExecutor::register_io_runtime(io_handle.clone()); + + info!("Creating DedicatedExecutor",); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .on_thread_start(move || { + DedicatedExecutor::register_io_runtime(io_handle.clone()) + }) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + DedicatedExecutor { + state: Arc::new(RwLock::new(state)), + } + } +} + +#[cfg(test)] +#[allow(unused_qualifications)] +mod tests { + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build() + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn(async move { + let dedicated_id = std::thread::current().id(); + let spawned = + DedicatedExecutor::spawn_io( + async move { std::thread::current().id() }, + ) + .await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutorBuilder::new_from_builder(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_default_executor_prevents_io() { + let exec = DedicatedExecutorBuilder::new().build(); + + let io_disabled = exec + .spawn(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = + DedicatedExecutor::spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } +} diff --git a/datafusion/physical-plan/src/io_object_store.rs b/datafusion/physical-plan/src/io_object_store.rs new file mode 100644 index 0000000000000..d4331f1beb27d --- /dev/null +++ b/datafusion/physical-plan/src/io_object_store.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::dedicated_executor::JobError; +use crate::DedicatedExecutor; +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying +/// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime +/// dedicated to doing IO. +/// +/// +/// +#[derive(Debug)] +pub struct IoObjectStore { + executor: DedicatedExecutor, + inner: Arc, +} + +impl IoObjectStore { + pub fn new(executor: DedicatedExecutor, object_store: Arc) -> Self { + Self { + executor, + inner: object_store, + } + } +} + +impl std::fmt::Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IoObjectStore") + } +} + +fn convert_error(e: JobError) -> object_store::Error { + object_store::Error::Generic { + store: "IoObjectStore", + source: Box::new(e), + } +} + +#[async_trait] +impl ObjectStore for IoObjectStore { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io( + async move { store.get_opts(&location, options).await }, + ) + .await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.delete(&location).await }).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + // run the inner list on the dedicated executor + let inner_stream = self.inner.list(prefix); + + inner_stream + //self.executor.run_stream(inner_stream, convert_error) + // .boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let prefix = prefix.cloned(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.list_with_delimiter(prefix.as_ref()).await + }) + .await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.put_multipart_opts(&location, opts).await + }) + .await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.put_opts(&location, payload, opts).await + }) + .await + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48e..2e9adbdca2111 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -86,6 +86,11 @@ pub mod udaf { pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } +pub use dedicated_executor::{DedicatedExecutor, DedicatedExecutorBuilder}; + pub mod coalesce; +mod cross_rt_stream; +pub mod dedicated_executor; +mod io_object_store; #[cfg(test)] pub mod test; From 4b5372d8512881cabbb18fa3ad79f9f63ee80133 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 3 Dec 2024 14:55:54 -0500 Subject: [PATCH 02/12] move executor to runtime --- datafusion-examples/examples/thread_pools.rs | 6 +-- datafusion/execution/Cargo.toml | 7 +++ .../src/cross_rt_stream.rs | 6 +-- .../src/dedicated_executor.rs | 0 .../src/io_object_store.rs | 8 ++- datafusion/execution/src/lib.rs | 5 +- datafusion/execution/src/stream.rs | 53 ++++++++++++++++++ datafusion/physical-plan/src/lib.rs | 5 -- datafusion/physical-plan/src/stream.rs | 54 ++----------------- 9 files changed, 75 insertions(+), 69 deletions(-) rename datafusion/{physical-plan => execution}/src/cross_rt_stream.rs (98%) rename datafusion/{physical-plan => execution}/src/dedicated_executor.rs (100%) rename datafusion/{physical-plan => execution}/src/io_object_store.rs (95%) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 9ed47dec93b53..4c908250715c1 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -26,8 +26,8 @@ //! messages. use arrow::util::pretty::pretty_format_batches; use datafusion::error::Result; +use datafusion::execution::dedicated_executor::DedicatedExecutor; use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::DedicatedExecutor; use datafusion::prelude::*; use futures::stream::StreamExt; use object_store::http::HttpBuilder; @@ -156,7 +156,7 @@ async fn different_runtime_advanced() -> Result<()> { let dedicated_executor = DedicatedExecutor::builder().build(); - // By default, the object store will use the current runtime for IO operations + // By default, the object store will use the "current runtime" for IO operations // if we use a dedicated executor to run the plan, the eventual object store requests will also use the // dedicated executor's runtime // @@ -167,7 +167,7 @@ async fn different_runtime_advanced() -> Result<()> { // ctx.register_object_store(&base_url, http_store); // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. - let http_store = dedicated_executor.wrap_object_store(http_store); + //let http_store = dedicated_executor.wrap_object_store(http_store); // Tell datafusion about processing http:// urls with this wrapped object store ctx.register_object_store(&base_url, http_store); diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index ac1eb729b6ff8..72cf722c88aca 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -37,6 +37,7 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } +async-trait = {workspace = true} chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } @@ -45,6 +46,12 @@ futures = { workspace = true } log = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } +pin-project-lite = "^0.2.7" rand = { workspace = true } tempfile = { workspace = true } + url = { workspace = true } +tokio = { workspace = true } +# todo figure out if we need to use tokio_stream / could use record batch receiver stream +tokio-stream = {version = "0.1"} + diff --git a/datafusion/physical-plan/src/cross_rt_stream.rs b/datafusion/execution/src/cross_rt_stream.rs similarity index 98% rename from datafusion/physical-plan/src/cross_rt_stream.rs rename to datafusion/execution/src/cross_rt_stream.rs index df997d2c80f8f..7f5adb99ed7da 100644 --- a/datafusion/physical-plan/src/cross_rt_stream.rs +++ b/datafusion/execution/src/cross_rt_stream.rs @@ -33,8 +33,7 @@ use std::{ task::{Context, Poll}, }; -use crate::dedicated_executor::JobError; -use crate::DedicatedExecutor; +use crate::dedicated_executor::{DedicatedExecutor, JobError}; use datafusion_common::DataFusionError; use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; use tokio::sync::mpsc::{channel, Sender}; @@ -203,8 +202,7 @@ impl Stream for CrossRtStream { #[cfg(test)] mod tests { use super::*; - use crate::DedicatedExecutorBuilder; - use datafusion_execution::dedicated_executor::JobError; + use crate::dedicated_executor::DedicatedExecutorBuilder; use std::sync::OnceLock; use std::{sync::Arc, time::Duration}; use tokio::runtime::{Handle, RuntimeFlavor}; diff --git a/datafusion/physical-plan/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs similarity index 100% rename from datafusion/physical-plan/src/dedicated_executor.rs rename to datafusion/execution/src/dedicated_executor.rs diff --git a/datafusion/physical-plan/src/io_object_store.rs b/datafusion/execution/src/io_object_store.rs similarity index 95% rename from datafusion/physical-plan/src/io_object_store.rs rename to datafusion/execution/src/io_object_store.rs index d4331f1beb27d..9f7c996540149 100644 --- a/datafusion/physical-plan/src/io_object_store.rs +++ b/datafusion/execution/src/io_object_store.rs @@ -17,8 +17,7 @@ use std::sync::Arc; -use crate::dedicated_executor::JobError; -use crate::DedicatedExecutor; +use crate::dedicated_executor::{DedicatedExecutor, JobError}; use async_trait::async_trait; use futures::stream::BoxStream; use object_store::{ @@ -34,14 +33,13 @@ use object_store::{ /// #[derive(Debug)] pub struct IoObjectStore { - executor: DedicatedExecutor, + //executor: DedicatedExecutor, inner: Arc, } impl IoObjectStore { - pub fn new(executor: DedicatedExecutor, object_store: Arc) -> Self { + pub fn new(_executor: DedicatedExecutor, object_store: Arc) -> Self { Self { - executor, inner: object_store, } } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 317bd3203ab1b..a981c178eacad 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -22,11 +22,14 @@ pub mod cache; pub mod config; +pub mod cross_rt_stream; +pub mod dedicated_executor; pub mod disk_manager; +pub mod io_object_store; pub mod memory_pool; pub mod object_store; pub mod runtime_env; -mod stream; +pub mod stream; mod task; pub mod registry { diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index f3eb7b77e03cc..9dcf6eba90595 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -18,7 +18,10 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::Result; use futures::Stream; +use pin_project_lite::pin_project; use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; /// Trait for types that stream [RecordBatch] /// @@ -51,3 +54,53 @@ pub trait RecordBatchStream: Stream> { /// [`Stream`]s there is no mechanism to prevent callers polling so returning /// `Ready(None)` is recommended. pub type SendableRecordBatchStream = Pin>; + +pin_project! { + /// Combines a [`Stream`] with a [`SchemaRef`] implementing + /// [`RecordBatchStream`] for the combination + pub struct RecordBatchStreamAdapter { + schema: SchemaRef, + + #[pin] + stream: S, + } +} + +impl RecordBatchStreamAdapter { + /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream + pub fn new(schema: SchemaRef, stream: S) -> Self { + Self { schema, stream } + } +} + +impl std::fmt::Debug for RecordBatchStreamAdapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RecordBatchStreamAdapter") + .field("schema", &self.schema) + .finish() + } +} + +impl Stream for RecordBatchStreamAdapter +where + S: Stream>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl RecordBatchStream for RecordBatchStreamAdapter +where + S: Stream>, +{ + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 2e9adbdca2111..845a74eaea48e 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -86,11 +86,6 @@ pub mod udaf { pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } -pub use dedicated_executor::{DedicatedExecutor, DedicatedExecutorBuilder}; - pub mod coalesce; -mod cross_rt_stream; -pub mod dedicated_executor; -mod io_object_store; #[cfg(test)] pub mod test; diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index b3054299b7f73..0be003f393219 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -33,10 +33,12 @@ use datafusion_execution::TaskContext; use futures::stream::BoxStream; use futures::{Future, Stream, StreamExt}; use log::debug; -use pin_project_lite::pin_project; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinSet; +// Backwards compatibility +pub use datafusion_execution::stream::RecordBatchStreamAdapter; + /// Creates a stream from a collection of producing tasks, routing panics to the stream. /// /// Note that this is similar to [`ReceiverStream` from tokio-stream], with the differences being: @@ -335,56 +337,6 @@ impl RecordBatchReceiverStream { } } -pin_project! { - /// Combines a [`Stream`] with a [`SchemaRef`] implementing - /// [`RecordBatchStream`] for the combination - pub struct RecordBatchStreamAdapter { - schema: SchemaRef, - - #[pin] - stream: S, - } -} - -impl RecordBatchStreamAdapter { - /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream - pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } - } -} - -impl std::fmt::Debug for RecordBatchStreamAdapter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RecordBatchStreamAdapter") - .field("schema", &self.schema) - .finish() - } -} - -impl Stream for RecordBatchStreamAdapter -where - S: Stream>, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -impl RecordBatchStream for RecordBatchStreamAdapter -where - S: Stream>, -{ - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - /// `EmptyRecordBatchStream` can be used to create a [`RecordBatchStream`] /// that will produce no results pub struct EmptyRecordBatchStream { From 6af773620dff6bacca71d144eb5b97d70073b937 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 10:34:23 -0500 Subject: [PATCH 03/12] wire in external runtime env --- .../core/src/execution/session_state.rs | 13 ++++++ .../execution/src/dedicated_executor.rs | 4 +- datafusion/execution/src/runtime_env.rs | 42 +++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 4ccad5ffd323d..08735a70a59c3 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -43,6 +43,7 @@ use datafusion_common::{ ResolvedTableReference, TableReference, }; use datafusion_execution::config::SessionConfig; +use datafusion_execution::dedicated_executor::{DedicatedExecutor, IO_RUNTIME}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; @@ -74,6 +75,7 @@ use std::any::Any; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::future::Future; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -768,6 +770,17 @@ impl SessionState { &self.runtime_env } + /// Spawn a future that will do IO operations. + /// + /// See [`RuntimeEnv::spawn_io`] for more details + pub async fn spawn_io(&self, fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + self.runtime_env.spawn_io(fut).await + } + /// Return the execution properties pub fn execution_props(&self) -> &ExecutionProps { &self.execution_props diff --git a/datafusion/execution/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs index 51f14a3513ee6..1ff51b641d972 100644 --- a/datafusion/execution/src/dedicated_executor.rs +++ b/datafusion/execution/src/dedicated_executor.rs @@ -121,6 +121,8 @@ impl From for DedicatedExecutorBuilder { /// happens when a runtime is dropped from within an asynchronous /// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 /// +/// TODO: make this an Arc<..> rather than an cloneable thing (to follow the smae +/// pattern as the rest of the system) #[derive(Clone, Debug)] pub struct DedicatedExecutor { state: Arc>, @@ -284,7 +286,7 @@ impl DedicatedExecutor { } /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, - /// otherwise awaits on the current thread + /// otherwise panics. /// /// # Panic /// Needs a IO runtime [registered](register_io_runtime). diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 5420080efd3e3..0437cda46b8fc 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -27,8 +27,10 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; +use crate::dedicated_executor::DedicatedExecutor; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::{ @@ -76,6 +78,8 @@ pub struct RuntimeEnv { pub cache_manager: Arc, /// Object Store Registry pub object_store_registry: Arc, + /// Optional dedicated executor + pub dedicated_executor: Option, } impl Debug for RuntimeEnv { @@ -155,6 +159,30 @@ impl RuntimeEnv { .get_store(url.as_ref()) .map_err(DataFusionError::from) } + + /// Return the current DedicatedExecutor + pub fn dedicated_executor(&self) -> Option<&DedicatedExecutor> { + self.dedicated_executor.as_ref() + } + + /// Spawn a future that will do IO operations on the IO thread pool + /// if there is a [`DedicatedExecutor`] registered + /// + /// See [`DedicatedExecutor`] for more details + pub async fn spawn_io(&self, fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + if self.dedicated_executor().is_some() { + // TODO it is strange that the io thread is tied directly to a thread + // local rather than bound to an instance + DedicatedExecutor::spawn_io(fut).await + } else { + // otherwise run on the current runtime + fut.await + } + } } impl Default for RuntimeEnv { @@ -183,6 +211,8 @@ pub struct RuntimeEnvBuilder { pub cache_manager: CacheManagerConfig, /// ObjectStoreRegistry to get object store based on url pub object_store_registry: Arc, + /// Optional dedicated executor + pub dedicated_executor: Option, } impl Default for RuntimeEnvBuilder { @@ -199,6 +229,7 @@ impl RuntimeEnvBuilder { memory_pool: Default::default(), cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), + dedicated_executor: None, } } @@ -229,6 +260,15 @@ impl RuntimeEnvBuilder { self } + /// Customize [`DedicatedExecutor`] to be used for running queries + pub fn with_dedicated_executor( + mut self, + dedicated_executor: DedicatedExecutor, + ) -> Self { + self.dedicated_executor = Some(dedicated_executor); + self + } + /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// @@ -255,6 +295,7 @@ impl RuntimeEnvBuilder { memory_pool, cache_manager, object_store_registry, + dedicated_executor, } = self; let memory_pool = memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); @@ -264,6 +305,7 @@ impl RuntimeEnvBuilder { disk_manager: DiskManager::try_new(disk_manager)?, cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, + dedicated_executor, }) } From ab4ad1740a7e1cf0328e18e88ef3a460fdd349fc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 11:48:27 -0500 Subject: [PATCH 04/12] Example works --- datafusion-examples/examples/thread_pools.rs | 22 +++++++-- .../core/src/datasource/dynamic_file.rs | 48 +++++++++++++------ datafusion/execution/src/runtime_env.rs | 2 + 3 files changed, 52 insertions(+), 20 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 4c908250715c1..5dca06968ae88 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -27,7 +27,8 @@ use arrow::util::pretty::pretty_format_batches; use datafusion::error::Result; use datafusion::execution::dedicated_executor::DedicatedExecutor; -use datafusion::execution::SendableRecordBatchStream; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; use datafusion::prelude::*; use futures::stream::StreamExt; use object_store::http::HttpBuilder; @@ -147,15 +148,13 @@ async fn different_runtime_advanced() -> Result<()> { // In this example, we will configure access to a remote object store // over the network during the plan - let ctx = SessionContext::new().enable_url_table(); + let dedicated_executor = DedicatedExecutor::builder().build(); // setup http object store let base_url = Url::parse("https://github.com").unwrap(); let http_store: Arc = Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?); - let dedicated_executor = DedicatedExecutor::builder().build(); - // By default, the object store will use the "current runtime" for IO operations // if we use a dedicated executor to run the plan, the eventual object store requests will also use the // dedicated executor's runtime @@ -169,8 +168,21 @@ async fn different_runtime_advanced() -> Result<()> { //let http_store = dedicated_executor.wrap_object_store(http_store); + // we must also register the dedicated executor with the runtime + let runtime_env = RuntimeEnvBuilder::new() + .with_dedicated_executor(dedicated_executor.clone()) + .build_arc()?; + // Tell datafusion about processing http:// urls with this wrapped object store - ctx.register_object_store(&base_url, http_store); + runtime_env.register_object_store(&base_url, http_store); + + let ctx = SessionContext::from( + SessionStateBuilder::new() + .with_runtime_env(runtime_env) + .with_default_features() + .build(), + ) + .enable_url_table(); // Plan (and execute) the query on the dedicated runtime let stream = dedicated_executor diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs index 6654d0871c3f6..7118254904693 100644 --- a/datafusion/core/src/datasource/dynamic_file.rs +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion_catalog::{SessionStore, UrlTableFactory}; -use datafusion_common::plan_datafusion_err; +use datafusion_common::{internal_err, plan_datafusion_err}; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; use crate::datasource::TableProvider; @@ -55,28 +55,46 @@ impl UrlTableFactory for DynamicListTableFactory { return Ok(None); }; - let state = &self + let session = self .session_store() .get_session() .upgrade() - .and_then(|session| { - session - .read() - .as_any() - .downcast_ref::() - .cloned() - }) .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?; - match ListingTableConfig::new(table_url.clone()) - .infer_options(state) - .await - { + let runtime_env = Arc::clone(&session.read().runtime_env()); + + // Do remove catalog operations + let Some(state) = session + .read() + .as_any() + .downcast_ref::() + .cloned() + else { + return internal_err!("Expected SessionState, got something else"); + }; + let config = runtime_env + .spawn_io(async move { + ListingTableConfig::new(table_url.clone()) + .infer_options(&state) + .await + }) + .await; + + let Some(state) = session + .read() + .as_any() + .downcast_ref::() + .cloned() + else { + return internal_err!("Expected SessionState, got something else"); + }; + + match config { Ok(cfg) => { let cfg = cfg - .infer_partitions_from_path(state) + .infer_partitions_from_path(&state) .await? - .infer_schema(state) + .infer_schema(&state) .await?; ListingTable::try_new(cfg) .map(|table| Some(Arc::new(table) as Arc)) diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 0437cda46b8fc..73cdcf36099e1 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -175,11 +175,13 @@ impl RuntimeEnv { Fut::Output: Send, { if self.dedicated_executor().is_some() { + println!("Running on dedicated executor"); // TODO it is strange that the io thread is tied directly to a thread // local rather than bound to an instance DedicatedExecutor::spawn_io(fut).await } else { // otherwise run on the current runtime + println!("Running on current runtime"); fut.await } } From f5673f52655f8764cdb72860bdd7b90f07252fa9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 11:55:56 -0500 Subject: [PATCH 05/12] Updates --- .../core/src/datasource/dynamic_file.rs | 43 ++++++++----------- .../core/src/execution/session_state.rs | 1 - 2 files changed, 17 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs index 7118254904693..eababe70435d0 100644 --- a/datafusion/core/src/datasource/dynamic_file.rs +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -63,7 +63,6 @@ impl UrlTableFactory for DynamicListTableFactory { let runtime_env = Arc::clone(&session.read().runtime_env()); - // Do remove catalog operations let Some(state) = session .read() .as_any() @@ -72,34 +71,26 @@ impl UrlTableFactory for DynamicListTableFactory { else { return internal_err!("Expected SessionState, got something else"); }; - let config = runtime_env + + // Do remove catalog operations on a different runtime + runtime_env .spawn_io(async move { - ListingTableConfig::new(table_url.clone()) + match ListingTableConfig::new(table_url.clone()) .infer_options(&state) .await + { + Ok(cfg) => { + let cfg = cfg + .infer_partitions_from_path(&state) + .await? + .infer_schema(&state) + .await?; + ListingTable::try_new(cfg) + .map(|table| Some(Arc::new(table) as Arc)) + } + Err(_) => Ok(None), + } }) - .await; - - let Some(state) = session - .read() - .as_any() - .downcast_ref::() - .cloned() - else { - return internal_err!("Expected SessionState, got something else"); - }; - - match config { - Ok(cfg) => { - let cfg = cfg - .infer_partitions_from_path(&state) - .await? - .infer_schema(&state) - .await?; - ListingTable::try_new(cfg) - .map(|table| Some(Arc::new(table) as Arc)) - } - Err(_) => Ok(None), - } + .await } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 08735a70a59c3..b0f34398566c6 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -43,7 +43,6 @@ use datafusion_common::{ ResolvedTableReference, TableReference, }; use datafusion_execution::config::SessionConfig; -use datafusion_execution::dedicated_executor::{DedicatedExecutor, IO_RUNTIME}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; From 329ac71eb4d0eb89b506e20460bd2f798b210280 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:06:46 -0500 Subject: [PATCH 06/12] plumb in CPU --- .../core/src/execution/session_state.rs | 12 ------ datafusion/execution/src/runtime_env.rs | 42 +++++++++++++++++-- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index b0f34398566c6..4ccad5ffd323d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -74,7 +74,6 @@ use std::any::Any; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; -use std::future::Future; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -769,17 +768,6 @@ impl SessionState { &self.runtime_env } - /// Spawn a future that will do IO operations. - /// - /// See [`RuntimeEnv::spawn_io`] for more details - pub async fn spawn_io(&self, fut: Fut) -> Fut::Output - where - Fut: Future + Send + 'static, - Fut::Output: Send, - { - self.runtime_env.spawn_io(fut).await - } - /// Return the execution properties pub fn execution_props(&self) -> &ExecutionProps { &self.execution_props diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 73cdcf36099e1..3b92f88199137 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -28,7 +28,7 @@ use crate::{ use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use crate::dedicated_executor::DedicatedExecutor; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use object_store::ObjectStore; use std::future::Future; use std::path::PathBuf; @@ -37,6 +37,7 @@ use std::{ fmt::{Debug, Formatter}, num::NonZeroUsize, }; +use futures::TryFutureExt; use url::Url; #[derive(Clone)] @@ -69,6 +70,8 @@ use url::Url; /// .build() /// .unwrap(); /// ``` +/// +/// TODO examples for spawning IO / CPU bound work pub struct RuntimeEnv { /// Runtime memory management pub memory_pool: Arc, @@ -165,9 +168,12 @@ impl RuntimeEnv { self.dedicated_executor.as_ref() } - /// Spawn a future that will do IO operations on the IO thread pool + /// Run an async future that will do IO operations on the IO thread pool /// if there is a [`DedicatedExecutor`] registered /// + /// If no DedicatedExecutor is registered, runs the operation on the current + /// thread pool + /// /// See [`DedicatedExecutor`] for more details pub async fn spawn_io(&self, fut: Fut) -> Fut::Output where @@ -175,16 +181,44 @@ impl RuntimeEnv { Fut::Output: Send, { if self.dedicated_executor().is_some() { - println!("Running on dedicated executor"); + println!("Running IO on dedicated executor"); // TODO it is strange that the io thread is tied directly to a thread // local rather than bound to an instance DedicatedExecutor::spawn_io(fut).await } else { // otherwise run on the current runtime - println!("Running on current runtime"); + println!("Running IO on current runtime"); fut.await } } + + /// Run an async future that will do CPU operations on the CPU task pool + /// if there is a [`DedicatedExecutor`] registered + /// + /// If no DedicatedExecutor is registered, runs the operation on the current + /// thread pool + /// + /// See [`DedicatedExecutor`] for more details + pub async fn spawn_cpu(&self, fut: Fut) -> Result + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + if let Some(dedicated_executor) = self.dedicated_executor() { + println!("Running CPU on dedicated executor"); + dedicated_executor.spawn(fut) + .await + .map_err( |e| DataFusionError::Context( + "Join Error (panic)".to_string(), + Box::new(DataFusionError::External(e.into())), + )) + } else { + // otherwise run on the current runtime + println!("Running CPU on current runtime"); + Ok(fut.await) + } + } + } impl Default for RuntimeEnv { From c82f163a61ced098f893e236bb039dd84db003c3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:09:21 -0500 Subject: [PATCH 07/12] remove io object store --- .../execution/src/dedicated_executor.rs | 15 -- datafusion/execution/src/io_object_store.rs | 136 ------------------ datafusion/execution/src/lib.rs | 1 - datafusion/execution/src/runtime_env.rs | 12 +- 4 files changed, 5 insertions(+), 159 deletions(-) delete mode 100644 datafusion/execution/src/io_object_store.rs diff --git a/datafusion/execution/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs index 1ff51b641d972..eb9b9b0844317 100644 --- a/datafusion/execution/src/dedicated_executor.rs +++ b/datafusion/execution/src/dedicated_executor.rs @@ -21,7 +21,6 @@ //! //! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor use crate::cross_rt_stream::CrossRtStream; -use crate::io_object_store::IoObjectStore; use crate::stream::RecordBatchStreamAdapter; use crate::SendableRecordBatchStream; use datafusion_common::DataFusionError; @@ -30,7 +29,6 @@ use futures::{ Future, FutureExt, Stream, TryFutureExt, }; use log::{info, warn}; -use object_store::ObjectStore; use parking_lot::RwLock; use std::cell::RefCell; use std::pin::Pin; @@ -220,19 +218,6 @@ impl DedicatedExecutor { handle.await.expect("Thread died?") } - /// Returns an ObjectStore instance that will always perform I/O work on the - /// IO_RUNTIME. - /// - /// Note that this object store will only work correctly if run on this - /// dedicated executor. If you try and use it on another executor, it will - /// panic with "no IO runtime registered" type error. - pub fn wrap_object_store( - &self, - object_store: Arc, - ) -> Arc { - Arc::new(IoObjectStore::new(self.clone(), object_store)) - } - /// Returns a SendableRecordBatchStream that will run on this executor's thread pool pub fn run_sendable_record_batch_stream( &self, diff --git a/datafusion/execution/src/io_object_store.rs b/datafusion/execution/src/io_object_store.rs deleted file mode 100644 index 9f7c996540149..0000000000000 --- a/datafusion/execution/src/io_object_store.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use crate::dedicated_executor::{DedicatedExecutor, JobError}; -use async_trait::async_trait; -use futures::stream::BoxStream; -use object_store::{ - path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, -}; - -/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying -/// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime -/// dedicated to doing IO. -/// -/// -/// -#[derive(Debug)] -pub struct IoObjectStore { - //executor: DedicatedExecutor, - inner: Arc, -} - -impl IoObjectStore { - pub fn new(_executor: DedicatedExecutor, object_store: Arc) -> Self { - Self { - inner: object_store, - } - } -} - -impl std::fmt::Display for IoObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "IoObjectStore") - } -} - -fn convert_error(e: JobError) -> object_store::Error { - object_store::Error::Generic { - store: "IoObjectStore", - source: Box::new(e), - } -} - -#[async_trait] -impl ObjectStore for IoObjectStore { - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let location = location.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io( - async move { store.get_opts(&location, options).await }, - ) - .await - } - - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let from = from.clone(); - let to = to.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await - } - - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let from = from.clone(); - let to = to.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await - } - - async fn delete(&self, location: &Path) -> Result<()> { - let location = location.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { store.delete(&location).await }).await - } - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { - // run the inner list on the dedicated executor - let inner_stream = self.inner.list(prefix); - - inner_stream - //self.executor.run_stream(inner_stream, convert_error) - // .boxed() - } - - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - let prefix = prefix.cloned(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { - store.list_with_delimiter(prefix.as_ref()).await - }) - .await - } - - async fn put_multipart_opts( - &self, - location: &Path, - opts: PutMultipartOpts, - ) -> Result> { - let location = location.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { - store.put_multipart_opts(&location, opts).await - }) - .await - } - - async fn put_opts( - &self, - location: &Path, - payload: PutPayload, - opts: PutOptions, - ) -> Result { - let location = location.clone(); - let store = Arc::clone(&self.inner); - DedicatedExecutor::spawn_io(async move { - store.put_opts(&location, payload, opts).await - }) - .await - } -} diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index a981c178eacad..9565a85c9fb09 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -25,7 +25,6 @@ pub mod config; pub mod cross_rt_stream; pub mod dedicated_executor; pub mod disk_manager; -pub mod io_object_store; pub mod memory_pool; pub mod object_store; pub mod runtime_env; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 3b92f88199137..07e925cd72bb0 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -28,7 +28,7 @@ use crate::{ use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use crate::dedicated_executor::DedicatedExecutor; -use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; use std::future::Future; use std::path::PathBuf; @@ -37,7 +37,6 @@ use std::{ fmt::{Debug, Formatter}, num::NonZeroUsize, }; -use futures::TryFutureExt; use url::Url; #[derive(Clone)] @@ -206,19 +205,18 @@ impl RuntimeEnv { { if let Some(dedicated_executor) = self.dedicated_executor() { println!("Running CPU on dedicated executor"); - dedicated_executor.spawn(fut) - .await - .map_err( |e| DataFusionError::Context( + dedicated_executor.spawn(fut).await.map_err(|e| { + DataFusionError::Context( "Join Error (panic)".to_string(), Box::new(DataFusionError::External(e.into())), - )) + ) + }) } else { // otherwise run on the current runtime println!("Running CPU on current runtime"); Ok(fut.await) } } - } impl Default for RuntimeEnv { From b24aef44b55d1379c501b1bdd3184e6e48ef757c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:16:20 -0500 Subject: [PATCH 08/12] more cleanup --- datafusion-examples/examples/thread_pools.rs | 41 ++------------ datafusion/execution/src/cross_rt_stream.rs | 6 +-- .../execution/src/dedicated_executor.rs | 54 ++++++++++++------- datafusion/execution/src/runtime_env.rs | 2 +- 4 files changed, 43 insertions(+), 60 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 5dca06968ae88..eff1da9291df2 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -108,42 +108,6 @@ async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { /// Demonstrates how to run queries on a **different** runtime than the current one /// -/// See [`different_runtime_advanced`] to see how you should run DataFusion -/// queries from a network server or when processing data from a remote object -/// store. -async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> { - // First, we need a new runtime, which we can create with the tokio builder - // however, since we are already in the context of another runtime - // (installed by #[tokio::main]) we create a new thread for the runtime - let dedicated_executor = DedicatedExecutor::builder().build(); - - // Now, we can simply run the query on the new runtime - dedicated_executor - .spawn(async move { - // this runs on the different threadpool - let df = ctx.sql(&sql).await?; - let mut stream: SendableRecordBatchStream = df.execute_stream().await?; - - // Calling `next()` to drive the plan on the different threadpool - while let Some(batch) = stream.next().await { - println!("{}", pretty_format_batches(&[batch?]).unwrap()); - } - Ok(()) as Result<()> - }) - // even though we are `await`ing here on the "current" pool, internally - // the DedicatedExecutor runs the work on the separate threadpool pool - // and the `await` simply notifies when the work is done that the work is done - .await??; - - // When done with a DedicatedExecutor, it should be shut down cleanly to give - // any outstanding tasks a chance to clean up - dedicated_executor.join().await; - - Ok(()) -} - -/// Demonstrates how to run queries on a different runtime than the current run -/// and how to handle IO operations. async fn different_runtime_advanced() -> Result<()> { // In this example, we will configure access to a remote object store // over the network during the plan @@ -186,7 +150,7 @@ async fn different_runtime_advanced() -> Result<()> { // Plan (and execute) the query on the dedicated runtime let stream = dedicated_executor - .spawn(async move { + .spawn_cpu(async move { // Plan / execute the query let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; let df = ctx @@ -223,3 +187,6 @@ async fn different_runtime_advanced() -> Result<()> { Ok(()) } + +// TODO add an example of a how to call IO / CPU bound work directly using DedicatedExecutor +// (e.g. to create a listing table directly) diff --git a/datafusion/execution/src/cross_rt_stream.rs b/datafusion/execution/src/cross_rt_stream.rs index 7f5adb99ed7da..85d63765f9748 100644 --- a/datafusion/execution/src/cross_rt_stream.rs +++ b/datafusion/execution/src/cross_rt_stream.rs @@ -47,7 +47,7 @@ use tokio_stream::wrappers::ReceiverStream; pub struct CrossRtStream { /// Future that drives the underlying stream. /// - /// This is actually wrapped into [`DedicatedExecutor::spawn`] so it can be safely polled by the receiving runtime. + /// This is actually wrapped into [`DedicatedExecutor::spawn_cpu`] so it can be safely polled by the receiving runtime. driver: BoxFuture<'static, ()>, /// Flags if the [driver](Self::driver) returned [`Poll::Ready`]. @@ -78,7 +78,7 @@ impl std::fmt::Debug for CrossRtStream { impl CrossRtStream { /// Create new stream by producing a future that sends its state to the given [`Sender`]. /// - /// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn`] (except for testing purposes). + /// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn_cpu`] (except for testing purposes). fn new_with_tx(f: F) -> Self where F: FnOnce(Sender) -> Fut, @@ -129,7 +129,7 @@ where // future for this runtime (likely the tokio/tonic/web driver) async move { - if let Err(e) = exec.spawn(fut).await { + if let Err(e) = exec.spawn_cpu(fut).await { let e = converter(e); // last message, so we don't care about the receiver side diff --git a/datafusion/execution/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs index eb9b9b0844317..62d02acab5e49 100644 --- a/datafusion/execution/src/dedicated_executor.rs +++ b/datafusion/execution/src/dedicated_executor.rs @@ -109,7 +109,7 @@ impl From for DedicatedExecutorBuilder { /// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! /// /// This means that IO was attempted on a tokio runtime that was not registered -/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn_cpu]. /// /// ## "Cannot drop a runtime in a context where blocking is not allowed"` /// @@ -135,15 +135,31 @@ impl DedicatedExecutor { /// Runs the specified [`Future`] (and any tasks it spawns) on the thread /// pool managed by this `DedicatedExecutor`. /// + /// # TODO: make this wait (aka so the API doesn't start a new background task or whatever) + /// /// # Notes /// + /// This task is run on a dedicated Tokio runtime that purposely does not have + /// IO enabled. If your future makes any IO calls, you have to + /// explicitly run them on DedicatedExecutor::spawn_io. + /// + /// If you see a message like this + /// + /// (Panic { msg: "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." + /// + /// It means some work that was meant to be done on the IO runtime was done + /// on the CPU runtime. + /// /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when /// it is dropped. Thus, you need ensure the returned future lives until it /// completes (call `await`) or you wish to cancel it. /// /// All spawned tasks are added to the tokio executor immediately and /// compete for the threadpool's resources. - pub fn spawn(&self, task: T) -> impl Future> + pub fn spawn_cpu( + &self, + task: T, + ) -> impl Future> where T: Future + Send + 'static, T::Output: Send + 'static, @@ -364,7 +380,7 @@ impl Drop for State { const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); -/// Potential error returned when polling [`DedicatedExecutor::spawn`]. +/// Potential error returned when polling [`DedicatedExecutor::spawn_cpu`]. #[derive(Debug)] pub enum JobError { WorkerGone, @@ -561,7 +577,7 @@ mod tests { async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { let io_runtime_id = std::thread::current().id(); dedicated - .spawn(async move { + .spawn_cpu(async move { let dedicated_id = std::thread::current().id(); let spawned = DedicatedExecutor::spawn_io( @@ -581,7 +597,7 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let exec = exec(); - let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + let dedicated_task = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); // Note the dedicated task will never complete if it runs on // the main tokio thread (as this test is not using the @@ -600,7 +616,7 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let exec = exec(); // Run task on clone should work fine - let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + let dedicated_task = exec.clone().spawn_cpu(do_work(42, Arc::clone(&barrier))); barrier.wait(); assert_eq!(dedicated_task.await.unwrap(), 42); @@ -619,7 +635,7 @@ mod tests { drop(exec.clone()); - let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + let task = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); barrier.wait(); assert_eq!(task.await.unwrap(), 42); @@ -650,8 +666,8 @@ mod tests { // make an executor with two threads let exec = exec2(); - let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); - let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + let dedicated_task1 = exec.spawn_cpu(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); // block main thread until completion of other two tasks barrier.wait(); @@ -669,7 +685,7 @@ mod tests { // spawn a task that spawns to other tasks and ensure they run on the dedicated // executor - let dedicated_task = exec.spawn(async move { + let dedicated_task = exec.spawn_cpu(async move { // spawn separate tasks let t1 = tokio::task::spawn(async { 25usize }); t1.await.unwrap() @@ -684,7 +700,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_str() { let exec = exec(); - let dedicated_task = exec.spawn(async move { + let dedicated_task = exec.spawn_cpu(async move { if true { panic!("At the disco, on the dedicated task scheduler"); } else { @@ -705,7 +721,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_string() { let exec = exec(); - let dedicated_task = exec.spawn(async move { + let dedicated_task = exec.spawn_cpu(async move { if true { panic!("{} {}", 1, 2); } else { @@ -723,7 +739,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_other() { let exec = exec(); - let dedicated_task = exec.spawn(async move { + let dedicated_task = exec.spawn_cpu(async move { if true { panic_any(1) } else { @@ -746,7 +762,7 @@ mod tests { let captured_2 = Arc::clone(&barrier_2); let exec = exec(); - let dedicated_task = exec.spawn(async move { + let dedicated_task = exec.spawn_cpu(async move { captured_1.wait(); do_work(42, captured_2).await }); @@ -768,7 +784,7 @@ mod tests { // Simulate trying to submit tasks once executor has shutdown exec.shutdown(); - let dedicated_task = exec.spawn(async { 11 }); + let dedicated_task = exec.spawn_cpu(async { 11 }); // task should complete, but return an error let err = dedicated_task.await.unwrap_err(); @@ -788,7 +804,7 @@ mod tests { exec.clone().join().await; // Simulate trying to submit tasks once executor has shutdown - let dedicated_task = exec.spawn(async { 11 }); + let dedicated_task = exec.spawn_cpu(async { 11 }); // task should complete, but return an error let err = dedicated_task.await.unwrap_err(); @@ -835,7 +851,7 @@ mod tests { let barrier1_pre_captured = Arc::clone(&barrier1_pre); let barrier1_post = Arc::new(AsyncBarrier::new(2)); let barrier1_post_captured = Arc::clone(&barrier1_post); - let dedicated_task1 = exec.spawn(async move { + let dedicated_task1 = exec.spawn_cpu(async move { barrier1_pre_captured.wait().await; do_work_async(11, barrier1_post_captured).await }); @@ -846,7 +862,7 @@ mod tests { let barrier2_pre_captured = Arc::clone(&barrier2_pre); let barrier2_post = Arc::new(AsyncBarrier::new(2)); let barrier2_post_captured = Arc::clone(&barrier2_post); - let dedicated_task2 = exec.spawn(async move { + let dedicated_task2 = exec.spawn_cpu(async move { barrier2_pre_captured.wait().await; do_work_async(22, barrier2_post_captured).await }); @@ -910,7 +926,7 @@ mod tests { let exec = DedicatedExecutorBuilder::new().build(); let io_disabled = exec - .spawn(async move { + .spawn_cpu(async move { // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics TcpListener::bind("127.0.0.1:0") .catch_unwind() diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 07e925cd72bb0..7ae014e5f1d8a 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -205,7 +205,7 @@ impl RuntimeEnv { { if let Some(dedicated_executor) = self.dedicated_executor() { println!("Running CPU on dedicated executor"); - dedicated_executor.spawn(fut).await.map_err(|e| { + dedicated_executor.spawn_cpu(fut).await.map_err(|e| { DataFusionError::Context( "Join Error (panic)".to_string(), Box::new(DataFusionError::External(e.into())), From d83fc89904dcf59bc7297e351a07d000a8d8d16f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:17:31 -0500 Subject: [PATCH 09/12] update --- datafusion-examples/examples/thread_pools.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index eff1da9291df2..0b3c4d93bfa32 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -49,7 +49,6 @@ use url::Url; async fn main() -> Result<()> { // The first two examples only do local file IO. Enable the URL table so we // can select directly from filenames in SQL. - let ctx = SessionContext::new().enable_url_table(); let sql = format!( "SELECT * FROM '{}/alltypes_plain.parquet'", datafusion::test_util::parquet_test_data() @@ -57,14 +56,10 @@ async fn main() -> Result<()> { // Run the same query on the same runtime. Note that calling `await` here // will effectively run the future (in this case the `async` function) on - // the current runtime. - same_runtime(&ctx, &sql).await?; - - // Run the same query on a different runtime. Note that we are still calling - // `await` here, so the the `async` function still runs on the current runtime. - // We use the `DedicatedExecutor` to run the query on a different runtime. - different_runtime_basic(ctx, sql).await?; + // the current runtime + same_runtime(&sql).await?; + // Run the same query on a different runtime. // Run the same query on a different runtime including remote IO different_runtime_advanced().await?; @@ -75,7 +70,9 @@ async fn main() -> Result<()> { /// /// This is now most examples in DataFusion are written and works well for /// development and local query processing. -async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { +async fn same_runtime(sql: &str) -> Result<()> { + let ctx = SessionContext::new().enable_url_table(); + // Calling .sql is an async function as it may also do network // I/O, for example to contact a remote catalog or do an object store LIST let df = ctx.sql(sql).await?; From 149fa9ac76fb21a9b6a916be3ca0e9f36f0340c2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:24:15 -0500 Subject: [PATCH 10/12] Annotate another location --- .../core/src/datasource/listing/table.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ffe49dd2ba116..89ca3d8a54cc9 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -679,7 +679,7 @@ impl ListingOptions { /// # Ok(()) /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ListingTable { table_paths: Vec, /// File fields only @@ -843,10 +843,22 @@ impl TableProvider for ListingTable { }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); - let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, &partition_filters, limit) + // TODO avoid these clones when possible. + let session_state_captured = session_state.clone(); + let partition_filters_captured = partition_filters.clone(); + let self_captured = self.clone(); + let (mut partitioned_file_lists, statistics) = state + .runtime_env() + .spawn_io(async move { + self_captured + .list_files_for_scan( + &session_state_captured, + &partition_filters_captured, + limit, + ) + .await + }) .await?; - // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { let projected_schema = project_schema(&self.schema(), projection)?; From 520314e3d8621a5000a1669f26d2e584d2efa79b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 12:48:39 -0500 Subject: [PATCH 11/12] add note --- datafusion-examples/examples/thread_pools.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 0b3c4d93bfa32..7b15eb287f127 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -146,6 +146,7 @@ async fn different_runtime_advanced() -> Result<()> { .enable_url_table(); // Plan (and execute) the query on the dedicated runtime + // TODO it would be great to figure out how to run this as part of `ctx.sql` let stream = dedicated_executor .spawn_cpu(async move { // Plan / execute the query From 6f5dac0aeab74f69f7d0e184ebc4e64750262405 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 13:06:36 -0500 Subject: [PATCH 12/12] sketch out the API that would be necessary --- datafusion-examples/examples/thread_pools.rs | 17 ++++++----------- datafusion/core/src/execution/context/mod.rs | 10 +++++++--- datafusion/execution/src/dedicated_executor.rs | 13 +++++++++++++ datafusion/execution/src/runtime_env.rs | 17 +++++++++++++++++ 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index 7b15eb287f127..a4db6eebbbbc7 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -147,17 +147,12 @@ async fn different_runtime_advanced() -> Result<()> { // Plan (and execute) the query on the dedicated runtime // TODO it would be great to figure out how to run this as part of `ctx.sql` - let stream = dedicated_executor - .spawn_cpu(async move { - // Plan / execute the query - let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; - let df = ctx - .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) - .await?; - let stream: SendableRecordBatchStream = df.execute_stream().await?; - - Ok(stream) as Result<_> - }).await??; + // Plan / execute the query + let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; + let df = ctx + .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) + .await?; + let stream: SendableRecordBatchStream = df.execute_stream().await?; // We have now planned the query on the dedicated runtime, Yay! but we still need to // drive the stream (aka call `next()` to get the results). diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 67236c9a6bd2c..8d661a4e316ab 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -608,10 +608,14 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql).await?; - options.verify_plan(&plan)?; + self.runtime_env() + .spawn_cpu2(async { + let plan = self.state().create_logical_plan(sql).await?; + options.verify_plan(&plan)?; - self.execute_logical_plan(plan).await + self.execute_logical_plan(plan).await + }) + .await } /// Creates logical expressions from SQL query text. diff --git a/datafusion/execution/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs index 62d02acab5e49..1d42a995aaece 100644 --- a/datafusion/execution/src/dedicated_executor.rs +++ b/datafusion/execution/src/dedicated_executor.rs @@ -199,6 +199,19 @@ impl DedicatedExecutor { .boxed() } + /// Runs the specified work on the dedicated executor and returns the result + /// + /// Note the future is not 'static (aka it can have internal references) + pub fn spawn_cpu2<'a, T>(&self, task: T) -> impl Future + where + T: Future + Send + 'a, + T::Output: Send, + { + // If we can figure out how to make this work, then + // we could integrate it nicely into DataFusion + async { todo!() } + } + /// signals shutdown of this executor and any Clones pub fn shutdown(&self) { // hang up the channel which will cause the dedicated thread diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 7ae014e5f1d8a..b91bba381d2a3 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -217,6 +217,23 @@ impl RuntimeEnv { Ok(fut.await) } } + + /// Figure out signature necessary to run futures with references in them on + /// a different thread pool. + pub async fn spawn_cpu2<'a, Fut>(&self, fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'a, + Fut::Output: Send, + { + if let Some(dedicated_executor) = self.dedicated_executor() { + println!("2 Running CPU on dedicated executor"); + dedicated_executor.spawn_cpu2(fut).await + } else { + // otherwise run on the current runtime + println!("2 Running CPU on current runtime"); + fut.await + } + } } impl Default for RuntimeEnv {