From 4e0d7136dfda9015f464d42de5782e7abc07a64f Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 6 Jan 2026 18:51:57 +0800 Subject: [PATCH 1/7] Add inverted index build optimization notes --- docs/inverted-index-build-optimizations.md | 98 ++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 docs/inverted-index-build-optimizations.md diff --git a/docs/inverted-index-build-optimizations.md b/docs/inverted-index-build-optimizations.md new file mode 100644 index 00000000000..702c33034ce --- /dev/null +++ b/docs/inverted-index-build-optimizations.md @@ -0,0 +1,98 @@ +# Inverted 索引建索引流程:可显著优化点 + +> 目标:找出**显著提升性能**或**显著降低内存**的优化方向(不改代码,仅列建议与原因)。 + +## 现有流程简述(定位用) +- 入口:`InvertedIndexBuilder::update` / `update_index`(`rust/lance-index/src/scalar/inverted/builder.rs`) +- 并行:按 `LANCE_FTS_NUM_SHARDS` 创建多个 `IndexWorker`,每个 worker 消费 RecordBatch +- 索引构建:`IndexWorker::process_batch` -> Tokenize -> 组装 posting lists / docs +- 写盘:`InnerBuilder::write_*`(posting list / tokens / docs) +- 合并:`SizeBasedMerger::merge`(`rust/lance-index/src/scalar/inverted/merger.rs`) + +--- + +## 高优先级优化点(显著收益潜力) + +### 1) JSON 索引的双重转换(CPU + 内存热点) +- 位置: + - `document_input` -> `JsonTextStream`(`rust/lance-index/src/scalar/inverted/builder.rs` / `json.rs`) + - `JsonTokenizer::token_stream_for_doc`(`rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs`) +- 现状问题:JSONB -> JSON 字符串 -> `serde_json::Value` 解析,**双重转换 + 全量构建 AST**。 +- 优化建议: + - 增加 JSONB 直通 tokenizer(直接从 `jsonb::RawJsonb` 或字节流产出 token),绕过字符串中间层。 + - 或使用 `serde_json::Deserializer`/`simd-json` 的 streaming 访问器直接发 token,避免构建完整 `Value`。 +- 预期收益:对 JSON 列的索引速度与峰值内存**大幅下降**。 + +### 2) 每文档 `HashMap` + `Vec` 的高频分配 +- 位置:`IndexWorker::process_batch`(`builder.rs`) +- 现状问题:每条 doc 创建 `HashMap`;每个 token 还可能分配 `Vec` 保存 positions。 +- 优化建议: + - 复用 per-worker 的 `HashMap`/临时缓冲:`clear()` 保留容量;根据 token 数量 `reserve()`。 + - `with_position=false` 时:收集 `Vec` token_id -> `sort_unstable()` -> run-length,**避免 HashMap**。 + - `with_position=true` 时:用 `SmallVec<[u32; N]>` 或 arena/pool 降低 per-token Vec 分配。 +- 预期收益:减少大量临时分配,提升吞吐,降低 RSS 峰值。 + +### 3) Merge 阶段 token 映射的额外拷贝 +- 位置:`SizeBasedMerger::merge`(`merger.rs`) +- 现状问题:`inv_token: HashMap` + `token.clone()`;token 重复时仍频繁 clone。 +- 优化建议: + - 用 `Vec<&str>` 按 token_id 索引替代 `HashMap`(token_id 连续)。 + - 给 `TokenSet` 增加 `get_or_add(&str)`,避免 token 已存在时的 clone。 +- 预期收益:**显著减少 merge 内存与 CPU**,尤其是高重叠词表场景。 + +### 4) Posting list 写盘前的 `RecordBatch` 拼接成本 +- 位置:`InnerBuilder::write_posting_lists`(`builder.rs`) +- 现状问题:每个 posting list 先 `to_batch`,再 `concat_batches` 聚合;中间对象多、内存占用高。 +- 优化建议: + - 直接用 `ListBuilder/Float32Builder/UInt32Builder` 构建列,按行追加 posting list,按阈值 flush。 + - 或降低 buffer 规模,改为“更多小批量写入”以换取低峰值内存。 +- 预期收益:降低峰值内存,减少 `concat` 复制成本。 + +### 5) 多 worker 内存线性放大(缺少全局预算) +- 位置:`LANCE_FTS_NUM_SHARDS` + `LANCE_FTS_PARTITION_SIZE`(`builder.rs`) +- 现状问题:每个 worker 最高可吃掉一个分区规模(默认 256MiB),总内存随 worker 线性增长。 +- 优化建议: + - 引入全局 `AtomicU64` 统计 estimated_size;超过预算时让 worker 触发 flush 或暂停。 + - 自动根据内存预算和分区大小调整 shard 数量。 +- 预期收益:对大数据集构建时**显著降低峰值内存**,避免 OOM。 + +### 6) Merge 阶段的“全量解压再重压” +- 位置:`SizeBasedMerger::merge`(`merger.rs`) +- 现状问题:遍历 posting list 迭代器会解压后再写,CPU 与内存成本高。 +- 优化建议: + - 做“流式 k-way merge”:按 token 流式读取并写出,不保留全量 posting list。 + - 分批 merge(多轮 merge,小批次输入),降低峰值占用。 + - 评估“块级重写”:仅调整 doc_id 偏移与 block header,避免完全解压。 +- 预期收益:大规模索引合并时 CPU/内存消耗**明显下降**。 + +--- + +## 中优先级优化点(需评估收益) + +### 7) 大批次下的并行粒度不足 +- 位置:`update_index` 把整批 RecordBatch 发给单个 worker +- 现状问题:若上游 batch 大而数量少,多核利用率下降。 +- 优化建议:按行切分 batch(分片)后入队,或将 batch 切成更小块。 +- 预期收益:提升 CPU 利用率(注意可能增加 partition 数量)。 + +### 8) `FlattenStream` 扩展 list 时复制 row_id +- 位置:`FlattenStream::poll_next` / `flatten_string_list`(`builder.rs`) +- 现状问题:为 list 展平复制 row_id 数组,长列表场景内存增加明显。 +- 优化建议:避免构造新 RecordBatch,改为“行迭代器”直接喂给 `IndexWorker`;或用字典编码 row_id。 +- 预期收益:降低峰值内存,特别是 list-of-strings 字段。 + +### 9) Posting list 内部结构的缓存局部性 +- 位置:`PostingListBuilder` 使用 `ExpLinkedList`(`index.rs`, `lance-core/src/container/list.rs`) +- 现状问题:`LinkedList>` 带来指针跳转和额外节点开销。 +- 优化建议:评估 `Vec` 或 `ChunkedVec`(连续块)替代,提升缓存局部性。 +- 预期收益:CPU 性能提升 + 小幅内存优化(需基准验证)。 + +--- + +## 建议的验证方式(避免盲改) +- 针对 `with_position=false` / JSON 文档 / 超大词表分别做 micro-bench。 +- 记录: + - tokenization 时间 + - 进程 RSS 峰值 + - 每百万 doc 的索引吞吐 +- 建议在 `builder.rs` / `merger.rs` 增加临时 metrics 打点(统计 doc token 数分布、per-doc map 容量、flush 次数)。 From 42b7c4cfff9028c4f05f1308cfdbdc00467dacd9 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 6 Jan 2026 19:06:42 +0800 Subject: [PATCH 2/7] Add inverted index build metrics --- .../benches/inverted_build_metrics.rs | 106 +++++++++++ rust/lance-index/src/scalar/inverted.rs | 4 +- .../src/scalar/inverted/builder.rs | 174 +++++++++++++++++- 3 files changed, 274 insertions(+), 10 deletions(-) create mode 100644 rust/lance-index/benches/inverted_build_metrics.rs diff --git a/rust/lance-index/benches/inverted_build_metrics.rs b/rust/lance-index/benches/inverted_build_metrics.rs new file mode 100644 index 00000000000..5f0ccfa7da0 --- /dev/null +++ b/rust/lance-index/benches/inverted_build_metrics.rs @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::{sync::Arc, time::Duration}; + +use arrow_array::{RecordBatch, UInt64Array}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use futures::stream; +use lance_core::cache::LanceCache; +use lance_core::ROW_ID; +use lance_datagen::{array, RowCount}; +use lance_index::scalar::inverted::InvertedIndexBuilder; +use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +fn build_metrics_bench(c: &mut Criterion) { + let total_docs: usize = std::env::var("LANCE_FTS_BENCH_DOCS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(100_000); + + let cases = [ + ("short_no_pos", 3, 15, false), + ("short_pos", 3, 15, true), + ("long_no_pos", 50, 200, false), + ]; + + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + + for (name, min_words, max_words, with_position) in cases { + let mut words_gen = array::random_sentence(min_words, max_words, true); + let doc_col = words_gen + .generate_default(RowCount::from(total_docs as u64)) + .unwrap(); + let row_id_col = Arc::new(UInt64Array::from_iter_values(0..total_docs as u64)); + + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("doc", arrow_schema::DataType::LargeUtf8, false), + arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false), + ]) + .into(), + vec![doc_col.clone(), row_id_col], + ) + .unwrap(); + + c.bench_function(format!("invert_build_{name}_{total_docs}").as_str(), |b| { + b.to_async(&rt).iter(|| async { + let tempdir = tempfile::tempdir().unwrap(); + let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + index_dir, + Arc::new(LanceCache::no_cache()), + )); + + let stream = RecordBatchStreamAdapter::new( + batch.schema(), + stream::iter(vec![Ok(batch.clone())]), + ); + let stream = Box::pin(stream); + + let params = InvertedIndexParams::new( + "whitespace".to_string(), + tantivy::tokenizer::Language::English, + ) + .with_position(with_position) + .remove_stop_words(false) + .stem(false) + .max_token_length(None); + + let mut builder = InvertedIndexBuilder::new(params); + let metrics = builder.enable_metrics(); + builder.update(stream, store.as_ref()).await.unwrap(); + + black_box(metrics.snapshot()); + }) + }); + } +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = build_metrics_bench +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10); + targets = build_metrics_bench +); + +criterion_main!(benches); diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index e8644600513..4ffc1849ade 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use arrow_schema::{DataType, Field}; use async_trait::async_trait; -pub use builder::InvertedIndexBuilder; +pub use builder::{ + InvertedIndexBuildMetrics, InvertedIndexBuildMetricsSnapshot, InvertedIndexBuilder, +}; use datafusion::execution::SendableRecordBatchStream; pub use index::*; use lance_core::{cache::LanceCache, Result}; diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 75c3aa9a33b..d645a0fea7f 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -79,6 +79,77 @@ pub static LANCE_FTS_TARGET_SIZE: LazyLock = LazyLock::new(|| { .expect("failed to parse LANCE_FTS_TARGET_SIZE") }); +/// Build-time metrics for inverted index creation. +#[derive(Debug, Default)] +pub struct InvertedIndexBuildMetrics { + docs_indexed: std::sync::atomic::AtomicU64, + tokens_indexed: std::sync::atomic::AtomicU64, + unique_tokens_indexed: std::sync::atomic::AtomicU64, + positions_indexed: std::sync::atomic::AtomicU64, + token_bytes_indexed: std::sync::atomic::AtomicU64, + flushes: std::sync::atomic::AtomicU64, +} + +/// Snapshot of build-time metrics values. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InvertedIndexBuildMetricsSnapshot { + /// Total documents indexed. + pub docs_indexed: u64, + /// Total tokens indexed (after tokenization). + pub tokens_indexed: u64, + /// Sum of per-document unique token counts. + pub unique_tokens_indexed: u64, + /// Total positions recorded (only when with_position = true). + pub positions_indexed: u64, + /// Sum of token byte lengths. + pub token_bytes_indexed: u64, + /// Number of partition flushes. + pub flushes: u64, +} + +impl InvertedIndexBuildMetrics { + fn record_doc( + &self, + tokens: u64, + unique_tokens: u64, + positions: u64, + token_bytes: u64, + ) { + self.docs_indexed + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.tokens_indexed + .fetch_add(tokens, std::sync::atomic::Ordering::Relaxed); + self.unique_tokens_indexed + .fetch_add(unique_tokens, std::sync::atomic::Ordering::Relaxed); + self.positions_indexed + .fetch_add(positions, std::sync::atomic::Ordering::Relaxed); + self.token_bytes_indexed + .fetch_add(token_bytes, std::sync::atomic::Ordering::Relaxed); + } + + fn record_flush(&self) { + self.flushes + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + pub fn snapshot(&self) -> InvertedIndexBuildMetricsSnapshot { + InvertedIndexBuildMetricsSnapshot { + docs_indexed: self.docs_indexed.load(std::sync::atomic::Ordering::Relaxed), + tokens_indexed: self.tokens_indexed.load(std::sync::atomic::Ordering::Relaxed), + unique_tokens_indexed: self + .unique_tokens_indexed + .load(std::sync::atomic::Ordering::Relaxed), + positions_indexed: self + .positions_indexed + .load(std::sync::atomic::Ordering::Relaxed), + token_bytes_indexed: self + .token_bytes_indexed + .load(std::sync::atomic::Ordering::Relaxed), + flushes: self.flushes.load(std::sync::atomic::Ordering::Relaxed), + } + } +} + #[derive(Debug)] pub struct InvertedIndexBuilder { params: InvertedIndexParams, @@ -86,6 +157,7 @@ pub struct InvertedIndexBuilder { new_partitions: Vec, fragment_mask: Option, token_set_format: TokenSetFormat, + metrics: Option>, _tmpdir: TempDir, local_store: Arc, src_store: Arc, @@ -135,9 +207,17 @@ impl InvertedIndexBuilder { src_store, token_set_format, fragment_mask, + metrics: None, } } + /// Enable metrics collection for this build and return a shared handle. + pub fn enable_metrics(&mut self) -> Arc { + let metrics = Arc::new(InvertedIndexBuildMetrics::default()); + self.metrics = Some(metrics.clone()); + metrics + } + pub async fn update( &mut self, new_data: SendableRecordBatchStream, @@ -177,6 +257,7 @@ impl InvertedIndexBuilder { let id_alloc = id_alloc.clone(); let fragment_mask = self.fragment_mask; let token_set_format = self.token_set_format; + let metrics = self.metrics.clone(); let task = tokio::task::spawn(async move { let mut worker = IndexWorker::new( store, @@ -185,6 +266,7 @@ impl InvertedIndexBuilder { id_alloc, fragment_mask, token_set_format, + metrics, ) .await?; while let Ok(batch) = receiver.recv().await { @@ -572,6 +654,7 @@ struct IndexWorker { total_doc_length: usize, fragment_mask: Option, token_set_format: TokenSetFormat, + metrics: Option>, } impl IndexWorker { @@ -582,6 +665,7 @@ impl IndexWorker { id_alloc: Arc, fragment_mask: Option, token_set_format: TokenSetFormat, + metrics: Option>, ) -> Result { let schema = inverted_list_schema(with_position); @@ -601,6 +685,7 @@ impl IndexWorker { total_doc_length: 0, fragment_mask, token_set_format, + metrics, }) } @@ -620,11 +705,15 @@ impl IndexWorker { for (doc, row_id) in docs { let mut token_occurrences = HashMap::new(); let mut token_num = 0; + let mut token_bytes = 0u64; { let mut token_stream = self.tokenizer.token_stream_for_doc(doc); while token_stream.advance() { let token = token_stream.token_mut(); let token_text = std::mem::take(&mut token.text); + if self.metrics.is_some() { + token_bytes += token_text.len() as u64; + } let token_id = self.builder.tokens.add(token_text) as usize; token_occurrences .entry(token_id as u32) @@ -641,16 +730,29 @@ impl IndexWorker { let doc_id = self.builder.docs.append(row_id, token_num); self.total_doc_length += doc.len(); - token_occurrences - .into_iter() - .for_each(|(token_id, term_positions)| { - let posting_list = &mut self.builder.posting_lists[token_id as usize]; + let unique_tokens = token_occurrences.len() as u64; + let mut positions_indexed = 0u64; + for (token_id, term_positions) in token_occurrences.into_iter() { + if with_position && self.metrics.is_some() { + positions_indexed += term_positions.len() as u64; + } - let old_size = posting_list.size(); - posting_list.add(doc_id, term_positions); - let new_size = posting_list.size(); - self.estimated_size += new_size - old_size; - }); + let posting_list = &mut self.builder.posting_lists[token_id as usize]; + + let old_size = posting_list.size(); + posting_list.add(doc_id, term_positions); + let new_size = posting_list.size(); + self.estimated_size += new_size - old_size; + } + + if let Some(metrics) = self.metrics.as_ref() { + metrics.record_doc( + token_num as u64, + unique_tokens, + positions_indexed, + token_bytes, + ); + } if self.builder.docs.len() as u32 == u32::MAX || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20 @@ -686,6 +788,9 @@ impl IndexWorker { ); builder.write(self.store.as_ref()).await?; self.partitions.push(builder.id()); + if let Some(metrics) = self.metrics.as_ref() { + metrics.record_flush(); + } Ok(()) } @@ -1194,9 +1299,12 @@ mod tests { use super::*; use arrow_array::{RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + use futures::stream; use lance_core::cache::LanceCache; use lance_core::utils::tempfile::TempDir; use lance_core::ROW_ID; + use lance_io::object_store::ObjectStore; use std::sync::atomic::AtomicU64; fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { @@ -1236,6 +1344,7 @@ mod tests { id_alloc.clone(), None, token_set_format, + None, ) .await?; worker1 @@ -1250,6 +1359,7 @@ mod tests { id_alloc.clone(), None, token_set_format, + None, ) .await?; worker2 @@ -1285,4 +1395,50 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_inverted_index_build_metrics() -> Result<()> { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let schema = Arc::new(Schema::new(vec![ + Field::new("doc", DataType::Utf8, true), + Field::new(ROW_ID, DataType::UInt64, false), + ])); + let docs = Arc::new(StringArray::from(vec![ + Some("hello world"), + Some("hello hello"), + ])); + let row_ids = Arc::new(UInt64Array::from(vec![0u64, 1u64])); + let batch = RecordBatch::try_new(schema, vec![docs, row_ids])?; + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let params = InvertedIndexParams::new( + "whitespace".to_string(), + tantivy::tokenizer::Language::English, + ) + .with_position(true) + .remove_stop_words(false) + .stem(false) + .max_token_length(None); + + let mut builder = InvertedIndexBuilder::new(params); + let metrics = builder.enable_metrics(); + builder.update(stream, store.as_ref()).await?; + + let snapshot = metrics.snapshot(); + assert_eq!(snapshot.docs_indexed, 2); + assert_eq!(snapshot.tokens_indexed, 4); + assert_eq!(snapshot.unique_tokens_indexed, 3); + assert_eq!(snapshot.positions_indexed, 4); + assert_eq!(snapshot.token_bytes_indexed, 20); + assert_eq!(snapshot.flushes, 1); + + Ok(()) + } } From caff28ad8e10fcb25af5547652d06cd1a84a1e46 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 7 Jan 2026 16:27:16 +0800 Subject: [PATCH 3/7] Remove inverted index optimization notes --- docs/inverted-index-build-optimizations.md | 98 ---------------------- 1 file changed, 98 deletions(-) delete mode 100644 docs/inverted-index-build-optimizations.md diff --git a/docs/inverted-index-build-optimizations.md b/docs/inverted-index-build-optimizations.md deleted file mode 100644 index 702c33034ce..00000000000 --- a/docs/inverted-index-build-optimizations.md +++ /dev/null @@ -1,98 +0,0 @@ -# Inverted 索引建索引流程:可显著优化点 - -> 目标:找出**显著提升性能**或**显著降低内存**的优化方向(不改代码,仅列建议与原因)。 - -## 现有流程简述(定位用) -- 入口:`InvertedIndexBuilder::update` / `update_index`(`rust/lance-index/src/scalar/inverted/builder.rs`) -- 并行:按 `LANCE_FTS_NUM_SHARDS` 创建多个 `IndexWorker`,每个 worker 消费 RecordBatch -- 索引构建:`IndexWorker::process_batch` -> Tokenize -> 组装 posting lists / docs -- 写盘:`InnerBuilder::write_*`(posting list / tokens / docs) -- 合并:`SizeBasedMerger::merge`(`rust/lance-index/src/scalar/inverted/merger.rs`) - ---- - -## 高优先级优化点(显著收益潜力) - -### 1) JSON 索引的双重转换(CPU + 内存热点) -- 位置: - - `document_input` -> `JsonTextStream`(`rust/lance-index/src/scalar/inverted/builder.rs` / `json.rs`) - - `JsonTokenizer::token_stream_for_doc`(`rust/lance-index/src/scalar/inverted/tokenizer/lance_tokenizer.rs`) -- 现状问题:JSONB -> JSON 字符串 -> `serde_json::Value` 解析,**双重转换 + 全量构建 AST**。 -- 优化建议: - - 增加 JSONB 直通 tokenizer(直接从 `jsonb::RawJsonb` 或字节流产出 token),绕过字符串中间层。 - - 或使用 `serde_json::Deserializer`/`simd-json` 的 streaming 访问器直接发 token,避免构建完整 `Value`。 -- 预期收益:对 JSON 列的索引速度与峰值内存**大幅下降**。 - -### 2) 每文档 `HashMap` + `Vec` 的高频分配 -- 位置:`IndexWorker::process_batch`(`builder.rs`) -- 现状问题:每条 doc 创建 `HashMap`;每个 token 还可能分配 `Vec` 保存 positions。 -- 优化建议: - - 复用 per-worker 的 `HashMap`/临时缓冲:`clear()` 保留容量;根据 token 数量 `reserve()`。 - - `with_position=false` 时:收集 `Vec` token_id -> `sort_unstable()` -> run-length,**避免 HashMap**。 - - `with_position=true` 时:用 `SmallVec<[u32; N]>` 或 arena/pool 降低 per-token Vec 分配。 -- 预期收益:减少大量临时分配,提升吞吐,降低 RSS 峰值。 - -### 3) Merge 阶段 token 映射的额外拷贝 -- 位置:`SizeBasedMerger::merge`(`merger.rs`) -- 现状问题:`inv_token: HashMap` + `token.clone()`;token 重复时仍频繁 clone。 -- 优化建议: - - 用 `Vec<&str>` 按 token_id 索引替代 `HashMap`(token_id 连续)。 - - 给 `TokenSet` 增加 `get_or_add(&str)`,避免 token 已存在时的 clone。 -- 预期收益:**显著减少 merge 内存与 CPU**,尤其是高重叠词表场景。 - -### 4) Posting list 写盘前的 `RecordBatch` 拼接成本 -- 位置:`InnerBuilder::write_posting_lists`(`builder.rs`) -- 现状问题:每个 posting list 先 `to_batch`,再 `concat_batches` 聚合;中间对象多、内存占用高。 -- 优化建议: - - 直接用 `ListBuilder/Float32Builder/UInt32Builder` 构建列,按行追加 posting list,按阈值 flush。 - - 或降低 buffer 规模,改为“更多小批量写入”以换取低峰值内存。 -- 预期收益:降低峰值内存,减少 `concat` 复制成本。 - -### 5) 多 worker 内存线性放大(缺少全局预算) -- 位置:`LANCE_FTS_NUM_SHARDS` + `LANCE_FTS_PARTITION_SIZE`(`builder.rs`) -- 现状问题:每个 worker 最高可吃掉一个分区规模(默认 256MiB),总内存随 worker 线性增长。 -- 优化建议: - - 引入全局 `AtomicU64` 统计 estimated_size;超过预算时让 worker 触发 flush 或暂停。 - - 自动根据内存预算和分区大小调整 shard 数量。 -- 预期收益:对大数据集构建时**显著降低峰值内存**,避免 OOM。 - -### 6) Merge 阶段的“全量解压再重压” -- 位置:`SizeBasedMerger::merge`(`merger.rs`) -- 现状问题:遍历 posting list 迭代器会解压后再写,CPU 与内存成本高。 -- 优化建议: - - 做“流式 k-way merge”:按 token 流式读取并写出,不保留全量 posting list。 - - 分批 merge(多轮 merge,小批次输入),降低峰值占用。 - - 评估“块级重写”:仅调整 doc_id 偏移与 block header,避免完全解压。 -- 预期收益:大规模索引合并时 CPU/内存消耗**明显下降**。 - ---- - -## 中优先级优化点(需评估收益) - -### 7) 大批次下的并行粒度不足 -- 位置:`update_index` 把整批 RecordBatch 发给单个 worker -- 现状问题:若上游 batch 大而数量少,多核利用率下降。 -- 优化建议:按行切分 batch(分片)后入队,或将 batch 切成更小块。 -- 预期收益:提升 CPU 利用率(注意可能增加 partition 数量)。 - -### 8) `FlattenStream` 扩展 list 时复制 row_id -- 位置:`FlattenStream::poll_next` / `flatten_string_list`(`builder.rs`) -- 现状问题:为 list 展平复制 row_id 数组,长列表场景内存增加明显。 -- 优化建议:避免构造新 RecordBatch,改为“行迭代器”直接喂给 `IndexWorker`;或用字典编码 row_id。 -- 预期收益:降低峰值内存,特别是 list-of-strings 字段。 - -### 9) Posting list 内部结构的缓存局部性 -- 位置:`PostingListBuilder` 使用 `ExpLinkedList`(`index.rs`, `lance-core/src/container/list.rs`) -- 现状问题:`LinkedList>` 带来指针跳转和额外节点开销。 -- 优化建议:评估 `Vec` 或 `ChunkedVec`(连续块)替代,提升缓存局部性。 -- 预期收益:CPU 性能提升 + 小幅内存优化(需基准验证)。 - ---- - -## 建议的验证方式(避免盲改) -- 针对 `with_position=false` / JSON 文档 / 超大词表分别做 micro-bench。 -- 记录: - - tokenization 时间 - - 进程 RSS 峰值 - - 每百万 doc 的索引吞吐 -- 建议在 `builder.rs` / `merger.rs` 增加临时 metrics 打点(统计 doc token 数分布、per-doc map 容量、flush 次数)。 From a8bc9cca697848550fc38f6b287be8d6ed09e99d Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 7 Jan 2026 20:36:39 +0800 Subject: [PATCH 4/7] Optimize inverted index build buffers --- Cargo.lock | 1 + rust/lance-index/Cargo.toml | 1 + .../benches/inverted_build_metrics.rs | 106 -------- rust/lance-index/src/scalar/inverted.rs | 4 +- .../src/scalar/inverted/builder.rs | 231 +++++++----------- rust/lance-index/src/scalar/inverted/index.rs | 4 +- 6 files changed, 98 insertions(+), 249 deletions(-) delete mode 100644 rust/lance-index/benches/inverted_build_metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 7bc3445c1e4..0d95a93cd6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5115,6 +5115,7 @@ dependencies = [ "rstest", "serde", "serde_json", + "smallvec", "snafu", "tantivy", "tempfile", diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 29a6ba18475..70262b8de92 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -59,6 +59,7 @@ rayon.workspace = true serde_json.workspace = true serde.workspace = true snafu.workspace = true +smallvec = "1.15" tantivy.workspace = true lindera = { workspace = true, optional = true } lindera-tantivy = { workspace = true, optional = true } diff --git a/rust/lance-index/benches/inverted_build_metrics.rs b/rust/lance-index/benches/inverted_build_metrics.rs deleted file mode 100644 index 5f0ccfa7da0..00000000000 --- a/rust/lance-index/benches/inverted_build_metrics.rs +++ /dev/null @@ -1,106 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -use std::{sync::Arc, time::Duration}; - -use arrow_array::{RecordBatch, UInt64Array}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::stream; -use lance_core::cache::LanceCache; -use lance_core::ROW_ID; -use lance_datagen::{array, RowCount}; -use lance_index::scalar::inverted::InvertedIndexBuilder; -use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; -use lance_index::scalar::lance_format::LanceIndexStore; -use lance_io::object_store::ObjectStore; -use object_store::path::Path; -#[cfg(target_os = "linux")] -use pprof::criterion::{Output, PProfProfiler}; - -fn build_metrics_bench(c: &mut Criterion) { - let total_docs: usize = std::env::var("LANCE_FTS_BENCH_DOCS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(100_000); - - let cases = [ - ("short_no_pos", 3, 15, false), - ("short_pos", 3, 15, true), - ("long_no_pos", 50, 200, false), - ]; - - let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - - for (name, min_words, max_words, with_position) in cases { - let mut words_gen = array::random_sentence(min_words, max_words, true); - let doc_col = words_gen - .generate_default(RowCount::from(total_docs as u64)) - .unwrap(); - let row_id_col = Arc::new(UInt64Array::from_iter_values(0..total_docs as u64)); - - let batch = RecordBatch::try_new( - arrow_schema::Schema::new(vec![ - arrow_schema::Field::new("doc", arrow_schema::DataType::LargeUtf8, false), - arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false), - ]) - .into(), - vec![doc_col.clone(), row_id_col], - ) - .unwrap(); - - c.bench_function(format!("invert_build_{name}_{total_docs}").as_str(), |b| { - b.to_async(&rt).iter(|| async { - let tempdir = tempfile::tempdir().unwrap(); - let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); - let store = Arc::new(LanceIndexStore::new( - Arc::new(ObjectStore::local()), - index_dir, - Arc::new(LanceCache::no_cache()), - )); - - let stream = RecordBatchStreamAdapter::new( - batch.schema(), - stream::iter(vec![Ok(batch.clone())]), - ); - let stream = Box::pin(stream); - - let params = InvertedIndexParams::new( - "whitespace".to_string(), - tantivy::tokenizer::Language::English, - ) - .with_position(with_position) - .remove_stop_words(false) - .stem(false) - .max_token_length(None); - - let mut builder = InvertedIndexBuilder::new(params); - let metrics = builder.enable_metrics(); - builder.update(stream, store.as_ref()).await.unwrap(); - - black_box(metrics.snapshot()); - }) - }); - } -} - -#[cfg(target_os = "linux")] -criterion_group!( - name=benches; - config = Criterion::default() - .measurement_time(Duration::from_secs(10)) - .sample_size(10) - .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = build_metrics_bench -); - -#[cfg(not(target_os = "linux"))] -criterion_group!( - name=benches; - config = Criterion::default() - .measurement_time(Duration::from_secs(10)) - .sample_size(10); - targets = build_metrics_bench -); - -criterion_main!(benches); diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index 4ffc1849ade..e8644600513 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Field}; use async_trait::async_trait; -pub use builder::{ - InvertedIndexBuildMetrics, InvertedIndexBuildMetricsSnapshot, InvertedIndexBuilder, -}; +pub use builder::InvertedIndexBuilder; use datafusion::execution::SendableRecordBatchStream; pub use index::*; use lance_core::{cache::LanceCache, Result}; diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index d645a0fea7f..8f75aa57311 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -29,6 +29,7 @@ use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD}; use lance_io::object_store::ObjectStore; use object_store::path::Path; use snafu::location; +use smallvec::SmallVec; use std::collections::HashMap; use std::pin::Pin; use std::str::FromStr; @@ -79,77 +80,6 @@ pub static LANCE_FTS_TARGET_SIZE: LazyLock = LazyLock::new(|| { .expect("failed to parse LANCE_FTS_TARGET_SIZE") }); -/// Build-time metrics for inverted index creation. -#[derive(Debug, Default)] -pub struct InvertedIndexBuildMetrics { - docs_indexed: std::sync::atomic::AtomicU64, - tokens_indexed: std::sync::atomic::AtomicU64, - unique_tokens_indexed: std::sync::atomic::AtomicU64, - positions_indexed: std::sync::atomic::AtomicU64, - token_bytes_indexed: std::sync::atomic::AtomicU64, - flushes: std::sync::atomic::AtomicU64, -} - -/// Snapshot of build-time metrics values. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct InvertedIndexBuildMetricsSnapshot { - /// Total documents indexed. - pub docs_indexed: u64, - /// Total tokens indexed (after tokenization). - pub tokens_indexed: u64, - /// Sum of per-document unique token counts. - pub unique_tokens_indexed: u64, - /// Total positions recorded (only when with_position = true). - pub positions_indexed: u64, - /// Sum of token byte lengths. - pub token_bytes_indexed: u64, - /// Number of partition flushes. - pub flushes: u64, -} - -impl InvertedIndexBuildMetrics { - fn record_doc( - &self, - tokens: u64, - unique_tokens: u64, - positions: u64, - token_bytes: u64, - ) { - self.docs_indexed - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - self.tokens_indexed - .fetch_add(tokens, std::sync::atomic::Ordering::Relaxed); - self.unique_tokens_indexed - .fetch_add(unique_tokens, std::sync::atomic::Ordering::Relaxed); - self.positions_indexed - .fetch_add(positions, std::sync::atomic::Ordering::Relaxed); - self.token_bytes_indexed - .fetch_add(token_bytes, std::sync::atomic::Ordering::Relaxed); - } - - fn record_flush(&self) { - self.flushes - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - pub fn snapshot(&self) -> InvertedIndexBuildMetricsSnapshot { - InvertedIndexBuildMetricsSnapshot { - docs_indexed: self.docs_indexed.load(std::sync::atomic::Ordering::Relaxed), - tokens_indexed: self.tokens_indexed.load(std::sync::atomic::Ordering::Relaxed), - unique_tokens_indexed: self - .unique_tokens_indexed - .load(std::sync::atomic::Ordering::Relaxed), - positions_indexed: self - .positions_indexed - .load(std::sync::atomic::Ordering::Relaxed), - token_bytes_indexed: self - .token_bytes_indexed - .load(std::sync::atomic::Ordering::Relaxed), - flushes: self.flushes.load(std::sync::atomic::Ordering::Relaxed), - } - } -} - #[derive(Debug)] pub struct InvertedIndexBuilder { params: InvertedIndexParams, @@ -157,7 +87,6 @@ pub struct InvertedIndexBuilder { new_partitions: Vec, fragment_mask: Option, token_set_format: TokenSetFormat, - metrics: Option>, _tmpdir: TempDir, local_store: Arc, src_store: Arc, @@ -207,17 +136,9 @@ impl InvertedIndexBuilder { src_store, token_set_format, fragment_mask, - metrics: None, } } - /// Enable metrics collection for this build and return a shared handle. - pub fn enable_metrics(&mut self) -> Arc { - let metrics = Arc::new(InvertedIndexBuildMetrics::default()); - self.metrics = Some(metrics.clone()); - metrics - } - pub async fn update( &mut self, new_data: SendableRecordBatchStream, @@ -257,7 +178,6 @@ impl InvertedIndexBuilder { let id_alloc = id_alloc.clone(); let fragment_mask = self.fragment_mask; let token_set_format = self.token_set_format; - let metrics = self.metrics.clone(); let task = tokio::task::spawn(async move { let mut worker = IndexWorker::new( store, @@ -266,7 +186,6 @@ impl InvertedIndexBuilder { id_alloc, fragment_mask, token_set_format, - metrics, ) .await?; while let Ok(batch) = receiver.recv().await { @@ -654,7 +573,10 @@ struct IndexWorker { total_doc_length: usize, fragment_mask: Option, token_set_format: TokenSetFormat, - metrics: Option>, + token_occurrences: HashMap, + token_ids: Vec, + last_token_count: usize, + last_unique_token_count: usize, } impl IndexWorker { @@ -665,7 +587,6 @@ impl IndexWorker { id_alloc: Arc, fragment_mask: Option, token_set_format: TokenSetFormat, - metrics: Option>, ) -> Result { let schema = inverted_list_schema(with_position); @@ -685,7 +606,10 @@ impl IndexWorker { total_doc_length: 0, fragment_mask, token_set_format, - metrics, + token_occurrences: HashMap::new(), + token_ids: Vec::new(), + last_token_count: 0, + last_unique_token_count: 0, }) } @@ -703,24 +627,40 @@ impl IndexWorker { let with_position = self.has_position(); for (doc, row_id) in docs { - let mut token_occurrences = HashMap::new(); - let mut token_num = 0; - let mut token_bytes = 0u64; - { + let mut token_num: u32 = 0; + if with_position { + if self.token_occurrences.capacity() < self.last_unique_token_count { + self.token_occurrences + .reserve(self.last_unique_token_count - self.token_occurrences.capacity()); + } + self.token_occurrences.clear(); + let mut token_stream = self.tokenizer.token_stream_for_doc(doc); while token_stream.advance() { let token = token_stream.token_mut(); let token_text = std::mem::take(&mut token.text); - if self.metrics.is_some() { - token_bytes += token_text.len() as u64; - } - let token_id = self.builder.tokens.add(token_text) as usize; - token_occurrences - .entry(token_id as u32) - .or_insert_with(|| PositionRecorder::new(with_position)) + let token_id = self.builder.tokens.add(token_text) as u32; + self.token_occurrences + .entry(token_id) + .or_insert_with(|| PositionRecorder::new(true)) .push(token.position as u32); token_num += 1; } + } else { + if self.token_ids.capacity() < self.last_token_count { + self.token_ids + .reserve(self.last_token_count - self.token_ids.capacity()); + } + self.token_ids.clear(); + + let mut token_stream = self.tokenizer.token_stream_for_doc(doc); + while token_stream.advance() { + let token = token_stream.token_mut(); + let token_text = std::mem::take(&mut token.text); + let token_id = self.builder.tokens.add(token_text) as u32; + self.token_ids.push(token_id); + token_num += 1; + } } self.builder .posting_lists @@ -730,29 +670,44 @@ impl IndexWorker { let doc_id = self.builder.docs.append(row_id, token_num); self.total_doc_length += doc.len(); - let unique_tokens = token_occurrences.len() as u64; - let mut positions_indexed = 0u64; - for (token_id, term_positions) in token_occurrences.into_iter() { - if with_position && self.metrics.is_some() { - positions_indexed += term_positions.len() as u64; + if with_position { + let unique_tokens = self.token_occurrences.len(); + for (token_id, term_positions) in self.token_occurrences.drain() { + let posting_list = &mut self.builder.posting_lists[token_id as usize]; + + let old_size = posting_list.size(); + posting_list.add(doc_id, term_positions); + let new_size = posting_list.size(); + self.estimated_size += new_size - old_size; } + self.last_unique_token_count = unique_tokens; + } else if token_num > 0 { + self.token_ids.sort_unstable(); + let mut iter = self.token_ids.iter(); + let mut current = *iter.next().unwrap(); + let mut count = 1u32; + for &token_id in iter { + if token_id == current { + count += 1; + continue; + } - let posting_list = &mut self.builder.posting_lists[token_id as usize]; + let posting_list = &mut self.builder.posting_lists[current as usize]; + let old_size = posting_list.size(); + posting_list.add(doc_id, PositionRecorder::Count(count)); + let new_size = posting_list.size(); + self.estimated_size += new_size - old_size; + current = token_id; + count = 1; + } + let posting_list = &mut self.builder.posting_lists[current as usize]; let old_size = posting_list.size(); - posting_list.add(doc_id, term_positions); + posting_list.add(doc_id, PositionRecorder::Count(count)); let new_size = posting_list.size(); self.estimated_size += new_size - old_size; } - - if let Some(metrics) = self.metrics.as_ref() { - metrics.record_doc( - token_num as u64, - unique_tokens, - positions_indexed, - token_bytes, - ); - } + self.last_token_count = token_num as usize; if self.builder.docs.len() as u32 == u32::MAX || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20 @@ -788,9 +743,6 @@ impl IndexWorker { ); builder.write(self.store.as_ref()).await?; self.partitions.push(builder.id()); - if let Some(metrics) = self.metrics.as_ref() { - metrics.record_flush(); - } Ok(()) } @@ -804,14 +756,14 @@ impl IndexWorker { #[derive(Debug, Clone)] pub enum PositionRecorder { - Position(Vec), + Position(SmallVec<[u32; 8]>), Count(u32), } impl PositionRecorder { fn new(with_position: bool) -> Self { if with_position { - Self::Position(Vec::new()) + Self::Position(SmallVec::new()) } else { Self::Count(0) } @@ -837,7 +789,7 @@ impl PositionRecorder { pub fn into_vec(self) -> Vec { match self { - Self::Position(positions) => positions, + Self::Position(positions) => positions.into_vec(), Self::Count(_) => vec![0], } } @@ -1304,7 +1256,7 @@ mod tests { use lance_core::cache::LanceCache; use lance_core::utils::tempfile::TempDir; use lance_core::ROW_ID; - use lance_io::object_store::ObjectStore; + use crate::metrics::NoOpMetricsCollector; use std::sync::atomic::AtomicU64; fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { @@ -1344,7 +1296,6 @@ mod tests { id_alloc.clone(), None, token_set_format, - None, ) .await?; worker1 @@ -1359,7 +1310,6 @@ mod tests { id_alloc.clone(), None, token_set_format, - None, ) .await?; worker2 @@ -1397,7 +1347,7 @@ mod tests { } #[tokio::test] - async fn test_inverted_index_build_metrics() -> Result<()> { + async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> { let index_dir = TempDir::default(); let store = Arc::new(LanceIndexStore::new( ObjectStore::local().into(), @@ -1409,36 +1359,41 @@ mod tests { Field::new("doc", DataType::Utf8, true), Field::new(ROW_ID, DataType::UInt64, false), ])); - let docs = Arc::new(StringArray::from(vec![ - Some("hello world"), - Some("hello hello"), - ])); - let row_ids = Arc::new(UInt64Array::from(vec![0u64, 1u64])); - let batch = RecordBatch::try_new(schema, vec![docs, row_ids])?; - let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let docs = Arc::new(StringArray::from(vec![Some("hello hello world")])); + let row_ids = Arc::new(UInt64Array::from(vec![0u64])); + let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?; + let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)])); let stream = Box::pin(stream); let params = InvertedIndexParams::new( "whitespace".to_string(), tantivy::tokenizer::Language::English, ) - .with_position(true) + .with_position(false) .remove_stop_words(false) .stem(false) .max_token_length(None); let mut builder = InvertedIndexBuilder::new(params); - let metrics = builder.enable_metrics(); builder.update(stream, store.as_ref()).await?; - let snapshot = metrics.snapshot(); - assert_eq!(snapshot.docs_indexed, 2); - assert_eq!(snapshot.tokens_indexed, 4); - assert_eq!(snapshot.unique_tokens_indexed, 3); - assert_eq!(snapshot.positions_indexed, 4); - assert_eq!(snapshot.token_bytes_indexed, 20); - assert_eq!(snapshot.flushes, 1); + let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?; + assert_eq!(index.partitions.len(), 1); + let partition = &index.partitions[0]; + let token_id = partition.tokens.get("hello").unwrap(); + let posting = partition + .inverted_list + .posting_list(token_id, false, &NoOpMetricsCollector) + .await?; + + let mut iter = posting.iter(); + let (doc_id, freq, positions) = iter.next().unwrap(); + assert_eq!(doc_id, 0); + assert_eq!(freq, 2); + assert!(positions.is_none()); + assert!(iter.next().is_none()); Ok(()) } + } diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 2c5c7a847a5..61c44a0c005 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1632,7 +1632,7 @@ impl PostingList { let freq = freq as u32; let positions = match positions { Some(positions) => { - PositionRecorder::Position(positions.collect::>()) + PositionRecorder::Position(positions.collect::>().into()) } None => PositionRecorder::Count(freq), }; @@ -1650,7 +1650,7 @@ impl PostingList { posting.iter().for_each(|(doc_id, freq, positions)| { let positions = match positions { Some(positions) => { - PositionRecorder::Position(positions.collect::>()) + PositionRecorder::Position(positions.collect::>().into()) } None => PositionRecorder::Count(freq), }; From 338ce2f3ecf26cb31f76abc3cb2b557c0b47a2c6 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 7 Jan 2026 21:07:41 +0800 Subject: [PATCH 5/7] Reduce inverted index SmallVec size --- rust/lance-index/src/scalar/inverted/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 8f75aa57311..afe2a9c6060 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -756,7 +756,7 @@ impl IndexWorker { #[derive(Debug, Clone)] pub enum PositionRecorder { - Position(SmallVec<[u32; 8]>), + Position(SmallVec<[u32; 4]>), Count(u32), } From ae99072cc0a5dda01d3189dd642ffb7b28fdab2a Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 7 Jan 2026 22:22:35 +0800 Subject: [PATCH 6/7] Remove unnecessary token id casts --- rust/lance-index/src/scalar/inverted/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index afe2a9c6060..1d7c046d718 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -639,7 +639,7 @@ impl IndexWorker { while token_stream.advance() { let token = token_stream.token_mut(); let token_text = std::mem::take(&mut token.text); - let token_id = self.builder.tokens.add(token_text) as u32; + let token_id = self.builder.tokens.add(token_text); self.token_occurrences .entry(token_id) .or_insert_with(|| PositionRecorder::new(true)) @@ -657,7 +657,7 @@ impl IndexWorker { while token_stream.advance() { let token = token_stream.token_mut(); let token_text = std::mem::take(&mut token.text); - let token_id = self.builder.tokens.add(token_text) as u32; + let token_id = self.builder.tokens.add(token_text); self.token_ids.push(token_id); token_num += 1; } From 29f16de0d747e9b2868de7161d6868ef19ff81cd Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 7 Jan 2026 22:28:11 +0800 Subject: [PATCH 7/7] Format inverted builder --- rust/lance-index/src/scalar/inverted/builder.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 1d7c046d718..15b037a00ad 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -28,8 +28,8 @@ use lance_core::{error::LanceOptionExt, utils::tempfile::TempDir}; use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD}; use lance_io::object_store::ObjectStore; use object_store::path::Path; -use snafu::location; use smallvec::SmallVec; +use snafu::location; use std::collections::HashMap; use std::pin::Pin; use std::str::FromStr; @@ -1249,6 +1249,7 @@ pub fn document_input( #[cfg(test)] mod tests { use super::*; + use crate::metrics::NoOpMetricsCollector; use arrow_array::{RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -1256,7 +1257,6 @@ mod tests { use lance_core::cache::LanceCache; use lance_core::utils::tempfile::TempDir; use lance_core::ROW_ID; - use crate::metrics::NoOpMetricsCollector; use std::sync::atomic::AtomicU64; fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { @@ -1395,5 +1395,4 @@ mod tests { Ok(()) } - }