Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions rust/lance-index/src/scalar/inverted/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

use fst::Streamer;
use futures::{stream, StreamExt, TryStreamExt};
use lance_core::{cache::LanceCache, utils::tokio::get_num_compute_intensive_cpus, Error, Result};
use lance_core::{cache::LanceCache, Error, Result};
use snafu::location;
use std::sync::Arc;

Expand Down Expand Up @@ -252,21 +251,11 @@ impl Merger for SizeBasedMerger<'_> {
let start = std::time::Instant::now();
let parts = std::mem::take(&mut self.input);
let num_parts = parts.len();
let buffer_size = std::cmp::max(
1,
std::cmp::min(get_num_compute_intensive_cpus(), num_parts),
);
let cache = LanceCache::no_cache();
let token_set_format = self.token_set_format;
let mut stream = stream::iter(parts.into_iter().map(|part| {
let cache = cache.clone();
async move { part.load(&cache, token_set_format).await }
}))
.buffered(buffer_size);

let mut idx = 0;
while let Some(part) = stream.try_next().await? {
idx += 1;

for (idx, part) in parts.into_iter().enumerate() {
let part = part.load(&cache, token_set_format).await?;
self.merge_partition(part, &mut estimated_size).await?;
self.progress
.stage_progress("merge_partitions", idx as u64)
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::FutureExt;
use lance_core::{Error, Result};
use lance_index::metrics::NoOpMetricsCollector;
use lance_index::optimize::OptimizeOptions;
use lance_index::progress::NoopIndexBuildProgress;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::CreatedIndex;
use lance_index::VECTOR_INDEX_VERSION;
Expand Down Expand Up @@ -160,6 +161,7 @@ pub async fn merge_indices_with_unindexed_frags<'a>(
true,
None,
Some(new_data_stream),
Arc::new(NoopIndexBuildProgress::default()),
)
.await?
} else {
Expand Down
Loading