Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions benchmarks/vector-search-bench/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ async fn write_shard_streaming(
.open(vortex_path)
.await?;

// This will write in parallel, using `std::thread::available_parallelism()`.
// See `CompressingStrategy` for more details.
flavor
.create_write_options(&SESSION)
.write(&mut output, stream)
Expand Down
3 changes: 2 additions & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ disallowed-types = [

disallowed-methods = [
{ path = "itertools::Itertools::counts", reason = "It uses the default hasher which is slow for primitives. Just inline the loop for better performance.", allow-invalid = true },
{ path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result<Validity>`.", allow-invalid = true },
{ path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result<Validity>`." },
{ path = "std::thread::available_parallelism", reason = "This function might do an unbounded amount of work, use `vortex_utils::parallelism::get_available_parallelism instead" },
]
5 changes: 2 additions & 3 deletions vortex-bench/src/polarsignals/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use parquet::file::properties::WriterProperties;
use rand::RngExt;
use rand::SeedableRng;
use rand::rngs::StdRng;
use vortex::utils::parallelism::get_available_parallelism;

use super::schema::Int64DictBuilder;
use super::schema::LABELS;
Expand Down Expand Up @@ -146,9 +147,7 @@ pub fn generate_polarsignals_parquet(n_rows: usize, output_path: &Path) -> Resul
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;

let batch_size = 10_000;
let num_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_threads = get_available_parallelism().unwrap_or(1);

let batch_ranges: Vec<(usize, usize)> = (0..n_rows)
.step_by(batch_size)
Expand Down
17 changes: 6 additions & 11 deletions vortex-datafusion/src/v2/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::num::NonZero;
use std::num::NonZeroUsize;
use std::sync::Arc;

use arrow_schema::DataType;
Expand Down Expand Up @@ -103,7 +101,6 @@ use vortex::array::arrow::ArrowArrayExecutor;
use vortex::dtype::DType;
use vortex::dtype::FieldPath;
use vortex::dtype::Nullability;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::expr::Expression;
Expand All @@ -117,6 +114,7 @@ use vortex::io::session::RuntimeSessionExt;
use vortex::scan::DataSourceRef;
use vortex::scan::ScanRequest;
use vortex::session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::convert::exprs::DefaultExpressionConvertor;
use crate::convert::exprs::ExpressionConvertor;
Expand Down Expand Up @@ -277,9 +275,7 @@ impl VortexDataSourceBuilder {
filter: None,
limit: None,
ordered: false,
num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| {
NonZero::new(1).vortex_expect("available parallelism must be non-zero")
}),
num_partitions: get_available_parallelism().unwrap_or(1),
})
}
}
Expand Down Expand Up @@ -360,7 +356,7 @@ pub struct VortexDataSource {
/// We use this as a hint for how many splits to execute concurrently in `open()`, but we
/// always declare to DataFusion that we only have a single partition so that we can
/// internally manage concurrency and fix the problem of partition skew.
num_partitions: NonZeroUsize,
num_partitions: usize,
}

impl fmt::Debug for VortexDataSource {
Expand Down Expand Up @@ -428,7 +424,7 @@ impl DataSource for VortexDataSource {

let handle = session.handle();
let stream = scan_streams
.try_flatten_unordered(Some(num_partitions.get() * 2))
.try_flatten_unordered(Some(num_partitions * 2))
.map(move |result| {
let session = session.clone();
let schema = Arc::clone(&projected_schema);
Expand All @@ -437,7 +433,7 @@ impl DataSource for VortexDataSource {
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
})
})
.buffered(num_partitions.get())
.buffered(num_partitions)
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));

// Apply leftover projection (expressions that couldn't be pushed into Vortex).
Expand Down Expand Up @@ -488,8 +484,7 @@ impl DataSource for VortexDataSource {
) -> DFResult<Option<Arc<dyn DataSource>>> {
// Vortex handles parallelism internally — always use a single partition.
let mut this = self.clone();
this.num_partitions = NonZero::new(target_partitions)
.ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?;
this.num_partitions = target_partitions;
this.ordered |= output_ordering.is_some();
Ok(Some(Arc::new(this)))
}
Expand Down
5 changes: 2 additions & 3 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use vortex::scalar_fn::fns::pack::Pack;
use vortex::scan::DataSource;
use vortex::scan::ScanRequest;
use vortex_utils::aliases::hash_set::HashSet;
use vortex_utils::parallelism::get_available_parallelism;

use crate::RUNTIME;
use crate::SESSION;
Expand Down Expand Up @@ -304,9 +305,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {

let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?;

let num_workers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_workers = get_available_parallelism().unwrap_or(1);

// We create an async bounded channel so that all thread-local workers can pull the next
// available array chunk regardless of which partition it came from.
Expand Down
1 change: 1 addition & 0 deletions vortex-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ vortex-buffer = { workspace = true }
vortex-error = { workspace = true }
vortex-metrics = { workspace = true }
vortex-session = { workspace = true }
vortex-utils = { workspace = true }

[target.'cfg(unix)'.dependencies]
custom-labels = { workspace = true }
Expand Down
7 changes: 4 additions & 3 deletions vortex-io/src/runtime/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;
use parking_lot::Mutex;
use smol::block_on;
use vortex_error::VortexExpect;
use vortex_utils::parallelism::get_available_parallelism;

#[derive(Clone)]
pub struct CurrentThreadWorkerPool {
Expand All @@ -25,10 +26,10 @@ impl CurrentThreadWorkerPool {
}

/// Set the number of worker threads to the available system parallelism as reported by
/// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread.
/// [`get_available_parallelism()`] minus 1, to leave a slot open for the calling thread.
pub fn set_workers_to_available_parallelism(&self) {
let n = std::thread::available_parallelism()
.map(|n| n.get().saturating_sub(1).max(1))
let n = get_available_parallelism()
.map(|n| n.saturating_sub(1).max(1))
.unwrap_or(1);
self.set_workers(n);
}
Expand Down
5 changes: 2 additions & 3 deletions vortex-layout/src/layouts/compressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use vortex_btrblocks::BtrBlocksCompressor;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSessionExt;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::LayoutRef;
use crate::LayoutStrategy;
Expand Down Expand Up @@ -67,9 +68,7 @@ impl CompressingStrategy {
child: Arc::new(child),
compressor: Arc::new(compressor),
stats: Stat::all().collect(),
concurrency: std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1),
concurrency: get_available_parallelism().unwrap_or(1),
}
}

Expand Down
5 changes: 2 additions & 3 deletions vortex-layout/src/layouts/zoned/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use vortex_array::stats::PRUNING_STATS;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSessionExt;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::IntoLayout;
use crate::LayoutRef;
Expand Down Expand Up @@ -44,9 +45,7 @@ impl Default for ZonedLayoutOptions {
block_size: 8192,
stats: PRUNING_STATS.into(),
max_variable_length_statistics_size: 64,
concurrency: std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1),
concurrency: get_available_parallelism().unwrap_or(1),
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions vortex-layout/src/scan/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use vortex_scan::PartitionRef;
use vortex_scan::PartitionStream;
use vortex_scan::ScanRequest;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::LayoutReaderRef;
use crate::scan::scan_builder::ScanBuilder;
Expand Down Expand Up @@ -100,9 +101,7 @@ impl MultiLayoutDataSource {
session: &VortexSession,
) -> Self {
let dtype = first.dtype().clone();
let concurrency = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(DEFAULT_CONCURRENCY);
let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);

let mut children = Vec::with_capacity(1 + remaining.len());
children.push(MultiLayoutChild::Opened(first));
Expand All @@ -126,9 +125,7 @@ impl MultiLayoutDataSource {
factories: Vec<Arc<dyn LayoutReaderFactory>>,
session: &VortexSession,
) -> Self {
let concurrency = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(DEFAULT_CONCURRENCY);
let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY);

Self {
dtype,
Expand Down
5 changes: 2 additions & 3 deletions vortex-layout/src/scan/repeated_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use vortex_io::runtime::BlockingRuntime;
use vortex_io::session::RuntimeSessionExt;
use vortex_scan::selection::Selection;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::LayoutReaderRef;
use crate::scan::filter::FilterExpr;
Expand Down Expand Up @@ -187,9 +188,7 @@ impl<A: 'static + Send> RepeatedScan<A> {
row_range: Option<Range<u64>>,
) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
use futures::StreamExt;
let num_workers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_workers = get_available_parallelism().unwrap_or(1);
let concurrency = self.concurrency * num_workers;
let handle = self.session.handle();

Expand Down
5 changes: 2 additions & 3 deletions vortex-layout/src/scan/scan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use vortex_io::session::RuntimeSessionExt;
use vortex_metrics::MetricsRegistry;
use vortex_scan::selection::Selection;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::LayoutReader;
use crate::LayoutReaderRef;
Expand Down Expand Up @@ -367,9 +368,7 @@ impl<A: 'static + Send> Stream for LazyScanStream<A> {
LazyScanState::Builder(builder) => {
let builder = builder.take().vortex_expect("polled after completion");
let ordered = builder.ordered;
let num_workers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_workers = get_available_parallelism().unwrap_or(1);
let concurrency = builder.concurrency * num_workers;
let handle = builder.session.handle();
let task = handle.spawn_blocking(move || {
Expand Down
4 changes: 4 additions & 0 deletions vortex-utils/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ impl<I: core::iter::traits::iterator::Iterator + core::marker::Sized> vortex_uti
pub fn I::reduce_balanced<F>(self, combine: F) -> core::option::Option<Self::Item> where Self::Item: core::clone::Clone, F: core::ops::function::Fn(Self::Item, Self::Item) -> Self::Item

pub fn I::try_reduce_balanced<F, E>(self, combine: F) -> core::result::Result<core::option::Option<Self::Item>, E> where Self::Item: core::clone::Clone, F: core::ops::function::Fn(Self::Item, Self::Item) -> core::result::Result<Self::Item, E>

pub mod vortex_utils::parallelism

pub fn vortex_utils::parallelism::get_available_parallelism() -> core::option::Option<usize>
1 change: 1 addition & 0 deletions vortex-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub mod debug_with;
#[cfg(feature = "dyn-traits")]
pub mod dyn_traits;
pub mod iter;
pub mod parallelism;
19 changes: 19 additions & 0 deletions vortex-utils/src/parallelism.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Useful utilities for discovering the desired level of parallelism

use std::sync::LazyLock;

/// Estimates the degree of parallelism the program should use, caching the result after the first call.
///
/// This is currently implemented using [`std::thread::available_parallelism`], but might change in the future.
///
/// Returns `None` if the underlying functions fails.
pub fn get_available_parallelism() -> Option<usize> {
#[allow(clippy::disallowed_methods)]
static PARALLELISM: LazyLock<Option<usize>> =
LazyLock::new(|| std::thread::available_parallelism().ok().map(|n| n.get()));

*PARALLELISM
}
Loading