diff --git a/rust/lance-index/src/scalar/inverted/merger.rs b/rust/lance-index/src/scalar/inverted/merger.rs index 21b46d0d453..af81b7aa10f 100644 --- a/rust/lance-index/src/scalar/inverted/merger.rs +++ b/rust/lance-index/src/scalar/inverted/merger.rs @@ -2,8 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use fst::Streamer; -use futures::{stream, StreamExt, TryStreamExt}; -use lance_core::{cache::LanceCache, utils::tokio::get_num_compute_intensive_cpus, Error, Result}; +use lance_core::{cache::LanceCache, Error, Result}; use snafu::location; use std::sync::Arc; @@ -252,21 +251,11 @@ impl Merger for SizeBasedMerger<'_> { let start = std::time::Instant::now(); let parts = std::mem::take(&mut self.input); let num_parts = parts.len(); - let buffer_size = std::cmp::max( - 1, - std::cmp::min(get_num_compute_intensive_cpus(), num_parts), - ); let cache = LanceCache::no_cache(); let token_set_format = self.token_set_format; - let mut stream = stream::iter(parts.into_iter().map(|part| { - let cache = cache.clone(); - async move { part.load(&cache, token_set_format).await } - })) - .buffered(buffer_size); - - let mut idx = 0; - while let Some(part) = stream.try_next().await? { - idx += 1; + + for (idx, part) in parts.into_iter().enumerate() { + let part = part.load(&cache, token_set_format).await?; self.merge_partition(part, &mut estimated_size).await?; self.progress .stage_progress("merge_partitions", idx as u64) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index e04796cad34..0cbf3fa1f1f 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use lance_core::{Error, Result}; use lance_index::metrics::NoOpMetricsCollector; use lance_index::optimize::OptimizeOptions; +use lance_index::progress::NoopIndexBuildProgress; use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::scalar::CreatedIndex; use lance_index::VECTOR_INDEX_VERSION; @@ -160,6 +161,7 @@ pub async fn merge_indices_with_unindexed_frags<'a>( true, None, Some(new_data_stream), + Arc::new(NoopIndexBuildProgress::default()), ) .await? } else {