diff --git a/Cargo.lock b/Cargo.lock index 069aa68ebab..727918f14e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10582,6 +10582,7 @@ dependencies = [ "vortex-error", "vortex-metrics", "vortex-session", + "vortex-utils", "wasm-bindgen-futures", ] diff --git a/benchmarks/vector-search-bench/src/prepare.rs b/benchmarks/vector-search-bench/src/prepare.rs index 78b8716e4a1..8cf9d9860ed 100644 --- a/benchmarks/vector-search-bench/src/prepare.rs +++ b/benchmarks/vector-search-bench/src/prepare.rs @@ -162,8 +162,6 @@ async fn write_shard_streaming( .open(vortex_path) .await?; - // This will write in parallel, using `std::thread::available_parallelism()`. - // See `CompressingStrategy` for more details. flavor .create_write_options(&SESSION) .write(&mut output, stream) diff --git a/clippy.toml b/clippy.toml index 4b6fc10fe76..dc1b625c4de 100644 --- a/clippy.toml +++ b/clippy.toml @@ -12,5 +12,6 @@ disallowed-types = [ disallowed-methods = [ { path = "itertools::Itertools::counts", reason = "It uses the default hasher which is slow for primitives. Just inline the loop for better performance.", allow-invalid = true }, - { path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result`.", allow-invalid = true }, + { path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result`." }, + { path = "std::thread::available_parallelism", reason = "This function might do an unbounded amount of work, use `vortex_utils::parallelism::get_available_parallelism instead" }, ] diff --git a/vortex-bench/src/polarsignals/data.rs b/vortex-bench/src/polarsignals/data.rs index f95049f59b4..d1a67785b2a 100644 --- a/vortex-bench/src/polarsignals/data.rs +++ b/vortex-bench/src/polarsignals/data.rs @@ -25,6 +25,7 @@ use parquet::file::properties::WriterProperties; use rand::RngExt; use rand::SeedableRng; use rand::rngs::StdRng; +use vortex::utils::parallelism::get_available_parallelism; use super::schema::Int64DictBuilder; use super::schema::LABELS; @@ -146,9 +147,7 @@ pub fn generate_polarsignals_parquet(n_rows: usize, output_path: &Path) -> Resul let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?; let batch_size = 10_000; - let num_threads = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); + let num_threads = get_available_parallelism().unwrap_or(1); let batch_ranges: Vec<(usize, usize)> = (0..n_rows) .step_by(batch_size) diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs index 76a78845bab..80a00a7efbe 100644 --- a/vortex-datafusion/src/v2/source.rs +++ b/vortex-datafusion/src/v2/source.rs @@ -70,8 +70,6 @@ use std::any::Any; use std::fmt; use std::fmt::Formatter; -use std::num::NonZero; -use std::num::NonZeroUsize; use std::sync::Arc; use arrow_schema::DataType; @@ -103,7 +101,6 @@ use vortex::array::arrow::ArrowArrayExecutor; use vortex::dtype::DType; use vortex::dtype::FieldPath; use vortex::dtype::Nullability; -use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::expr::Expression; @@ -117,6 +114,7 @@ use vortex::io::session::RuntimeSessionExt; use vortex::scan::DataSourceRef; use vortex::scan::ScanRequest; use vortex::session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::convert::exprs::DefaultExpressionConvertor; use crate::convert::exprs::ExpressionConvertor; @@ -277,9 +275,7 @@ impl VortexDataSourceBuilder { filter: None, limit: None, ordered: false, - num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| { - NonZero::new(1).vortex_expect("available parallelism must be non-zero") - }), + num_partitions: get_available_parallelism().unwrap_or(1), }) } } @@ -360,7 +356,7 @@ pub struct VortexDataSource { /// We use this as a hint for how many splits to execute concurrently in `open()`, but we /// always declare to DataFusion that we only have a single partition so that we can /// internally manage concurrency and fix the problem of partition skew. - num_partitions: NonZeroUsize, + num_partitions: usize, } impl fmt::Debug for VortexDataSource { @@ -428,7 +424,7 @@ impl DataSource for VortexDataSource { let handle = session.handle(); let stream = scan_streams - .try_flatten_unordered(Some(num_partitions.get() * 2)) + .try_flatten_unordered(Some(num_partitions * 2)) .map(move |result| { let session = session.clone(); let schema = Arc::clone(&projected_schema); @@ -437,7 +433,7 @@ impl DataSource for VortexDataSource { result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx)) }) }) - .buffered(num_partitions.get()) + .buffered(num_partitions) .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e)))); // Apply leftover projection (expressions that couldn't be pushed into Vortex). @@ -488,8 +484,7 @@ impl DataSource for VortexDataSource { ) -> DFResult>> { // Vortex handles parallelism internally — always use a single partition. let mut this = self.clone(); - this.num_partitions = NonZero::new(target_partitions) - .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?; + this.num_partitions = target_partitions; this.ordered |= output_ordering.is_some(); Ok(Some(Arc::new(this))) } diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index c59fe6cbc2b..1bff3ad6a3a 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -51,6 +51,7 @@ use vortex::scalar_fn::fns::pack::Pack; use vortex::scan::DataSource; use vortex::scan::ScanRequest; use vortex_utils::aliases::hash_set::HashSet; +use vortex_utils::parallelism::get_available_parallelism; use crate::RUNTIME; use crate::SESSION; @@ -304,9 +305,7 @@ impl TableFunction for T { let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); + let num_workers = get_available_parallelism().unwrap_or(1); // We create an async bounded channel so that all thread-local workers can pull the next // available array chunk regardless of which partition it came from. diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index 45ba0d90f9d..b3f6448484a 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -39,6 +39,7 @@ vortex-buffer = { workspace = true } vortex-error = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } +vortex-utils = { workspace = true } [target.'cfg(unix)'.dependencies] custom-labels = { workspace = true } diff --git a/vortex-io/src/runtime/pool.rs b/vortex-io/src/runtime/pool.rs index a8d1f67a149..71ffb54cae6 100644 --- a/vortex-io/src/runtime/pool.rs +++ b/vortex-io/src/runtime/pool.rs @@ -9,6 +9,7 @@ use std::time::Duration; use parking_lot::Mutex; use smol::block_on; use vortex_error::VortexExpect; +use vortex_utils::parallelism::get_available_parallelism; #[derive(Clone)] pub struct CurrentThreadWorkerPool { @@ -25,10 +26,10 @@ impl CurrentThreadWorkerPool { } /// Set the number of worker threads to the available system parallelism as reported by - /// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread. + /// [`get_available_parallelism()`] minus 1, to leave a slot open for the calling thread. pub fn set_workers_to_available_parallelism(&self) { - let n = std::thread::available_parallelism() - .map(|n| n.get().saturating_sub(1).max(1)) + let n = get_available_parallelism() + .map(|n| n.saturating_sub(1).max(1)) .unwrap_or(1); self.set_workers(n); } diff --git a/vortex-layout/src/layouts/compressed.rs b/vortex-layout/src/layouts/compressed.rs index 8ed69e756cb..efd85db9e5d 100644 --- a/vortex-layout/src/layouts/compressed.rs +++ b/vortex-layout/src/layouts/compressed.rs @@ -14,6 +14,7 @@ use vortex_btrblocks::BtrBlocksCompressor; use vortex_error::VortexResult; use vortex_io::session::RuntimeSessionExt; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::LayoutRef; use crate::LayoutStrategy; @@ -67,9 +68,7 @@ impl CompressingStrategy { child: Arc::new(child), compressor: Arc::new(compressor), stats: Stat::all().collect(), - concurrency: std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(1), + concurrency: get_available_parallelism().unwrap_or(1), } } diff --git a/vortex-layout/src/layouts/zoned/writer.rs b/vortex-layout/src/layouts/zoned/writer.rs index 09e54e97e4a..8ec37f8bae4 100644 --- a/vortex-layout/src/layouts/zoned/writer.rs +++ b/vortex-layout/src/layouts/zoned/writer.rs @@ -14,6 +14,7 @@ use vortex_array::stats::PRUNING_STATS; use vortex_error::VortexResult; use vortex_io::session::RuntimeSessionExt; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::IntoLayout; use crate::LayoutRef; @@ -44,9 +45,7 @@ impl Default for ZonedLayoutOptions { block_size: 8192, stats: PRUNING_STATS.into(), max_variable_length_statistics_size: 64, - concurrency: std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(1), + concurrency: get_available_parallelism().unwrap_or(1), } } } diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 600d14ccffd..a753e0d6271 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -52,6 +52,7 @@ use vortex_scan::PartitionRef; use vortex_scan::PartitionStream; use vortex_scan::ScanRequest; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::LayoutReaderRef; use crate::scan::scan_builder::ScanBuilder; @@ -100,9 +101,7 @@ impl MultiLayoutDataSource { session: &VortexSession, ) -> Self { let dtype = first.dtype().clone(); - let concurrency = std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(DEFAULT_CONCURRENCY); + let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); let mut children = Vec::with_capacity(1 + remaining.len()); children.push(MultiLayoutChild::Opened(first)); @@ -126,9 +125,7 @@ impl MultiLayoutDataSource { factories: Vec>, session: &VortexSession, ) -> Self { - let concurrency = std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(DEFAULT_CONCURRENCY); + let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); Self { dtype, diff --git a/vortex-layout/src/scan/repeated_scan.rs b/vortex-layout/src/scan/repeated_scan.rs index 1d5caaa66cf..a0f2101556c 100644 --- a/vortex-layout/src/scan/repeated_scan.rs +++ b/vortex-layout/src/scan/repeated_scan.rs @@ -22,6 +22,7 @@ use vortex_io::runtime::BlockingRuntime; use vortex_io::session::RuntimeSessionExt; use vortex_scan::selection::Selection; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::LayoutReaderRef; use crate::scan::filter::FilterExpr; @@ -187,9 +188,7 @@ impl RepeatedScan { row_range: Option>, ) -> VortexResult> + Send + 'static + use> { use futures::StreamExt; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); + let num_workers = get_available_parallelism().unwrap_or(1); let concurrency = self.concurrency * num_workers; let handle = self.session.handle(); diff --git a/vortex-layout/src/scan/scan_builder.rs b/vortex-layout/src/scan/scan_builder.rs index d6a372ca19e..bdf5d0bfb11 100644 --- a/vortex-layout/src/scan/scan_builder.rs +++ b/vortex-layout/src/scan/scan_builder.rs @@ -38,6 +38,7 @@ use vortex_io::session::RuntimeSessionExt; use vortex_metrics::MetricsRegistry; use vortex_scan::selection::Selection; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::LayoutReader; use crate::LayoutReaderRef; @@ -367,9 +368,7 @@ impl Stream for LazyScanStream { LazyScanState::Builder(builder) => { let builder = builder.take().vortex_expect("polled after completion"); let ordered = builder.ordered; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); + let num_workers = get_available_parallelism().unwrap_or(1); let concurrency = builder.concurrency * num_workers; let handle = builder.session.handle(); let task = handle.spawn_blocking(move || { diff --git a/vortex-utils/public-api.lock b/vortex-utils/public-api.lock index 185029d5627..e7d259551d1 100644 --- a/vortex-utils/public-api.lock +++ b/vortex-utils/public-api.lock @@ -83,3 +83,7 @@ impl vortex_uti pub fn I::reduce_balanced(self, combine: F) -> core::option::Option where Self::Item: core::clone::Clone, F: core::ops::function::Fn(Self::Item, Self::Item) -> Self::Item pub fn I::try_reduce_balanced(self, combine: F) -> core::result::Result, E> where Self::Item: core::clone::Clone, F: core::ops::function::Fn(Self::Item, Self::Item) -> core::result::Result + +pub mod vortex_utils::parallelism + +pub fn vortex_utils::parallelism::get_available_parallelism() -> core::option::Option diff --git a/vortex-utils/src/lib.rs b/vortex-utils/src/lib.rs index cc650072eab..31decd406ff 100644 --- a/vortex-utils/src/lib.rs +++ b/vortex-utils/src/lib.rs @@ -10,3 +10,4 @@ pub mod debug_with; #[cfg(feature = "dyn-traits")] pub mod dyn_traits; pub mod iter; +pub mod parallelism; diff --git a/vortex-utils/src/parallelism.rs b/vortex-utils/src/parallelism.rs new file mode 100644 index 00000000000..14b251e8bfb --- /dev/null +++ b/vortex-utils/src/parallelism.rs @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Useful utilities for discovering the desired level of parallelism + +use std::sync::LazyLock; + +/// Estimates the degree of parallelism the program should use, caching the result after the first call. +/// +/// This is currently implemented using [`std::thread::available_parallelism`], but might change in the future. +/// +/// Returns `None` if the underlying functions fails. +pub fn get_available_parallelism() -> Option { + #[allow(clippy::disallowed_methods)] + static PARALLELISM: LazyLock> = + LazyLock::new(|| std::thread::available_parallelism().ok().map(|n| n.get())); + + *PARALLELISM +}