From 2369b90789d24e5a3e9d47ee4b917abb4d8dda5c Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 16 Jan 2026 14:11:30 +0800 Subject: [PATCH 1/8] Optimize posting list write --- .../src/scalar/inverted/builder.rs | 12 +- .../src/scalar/inverted/encoding.rs | 81 ++++++++++++++ rust/lance-index/src/scalar/inverted/index.rs | 104 +++++++++++++++--- 3 files changed, 177 insertions(+), 20 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 15b037a00ad..b6e9f83c666 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -490,14 +490,14 @@ impl InnerBuilder { self.with_position ); let schema = inverted_list_schema(self.with_position); + let docs_for_batches = docs.clone(); + let schema_for_batches = schema.clone(); let mut batches = stream::iter(posting_lists) - .map(|posting_list| { - let block_max_scores = docs.calculate_block_max_scores( - posting_list.doc_ids.iter(), - posting_list.frequencies.iter(), - ); - spawn_cpu(move || posting_list.to_batch(block_max_scores)) + .map(move |posting_list| { + let docs = docs_for_batches.clone(); + let schema = schema_for_batches.clone(); + spawn_cpu(move || posting_list.to_batch_with_docs(&docs, schema)) }) .buffered(get_num_compute_intensive_cpus()); diff --git a/rust/lance-index/src/scalar/inverted/encoding.rs b/rust/lance-index/src/scalar/inverted/encoding.rs index 29c4eb39f4b..559e3eb3a74 100644 --- a/rust/lance-index/src/scalar/inverted/encoding.rs +++ b/rust/lance-index/src/scalar/inverted/encoding.rs @@ -90,6 +90,87 @@ pub fn compress_posting_list<'a>( Ok(builder.finish()) } +pub fn compress_posting_list_with_scores<'a, F>( + length: usize, + doc_ids: impl Iterator, + frequencies: impl Iterator, + mut score_for: F, + idf_scale: f32, +) -> Result<(arrow::array::LargeBinaryArray, f32)> +where + F: FnMut(u32, u32) -> f32, +{ + debug_assert!(length > 0); + if length < BLOCK_SIZE { + let mut builder = LargeBinaryBuilder::with_capacity(1, length * 4 * 2 + 1); + let mut max_score = f32::MIN; + let mut doc_id_buffer = Vec::with_capacity(length); + let mut freq_buffer = Vec::with_capacity(length); + for (doc_id, freq) in std::iter::zip(doc_ids, frequencies) { + let doc_id = *doc_id; + let freq = *freq; + doc_id_buffer.push(doc_id); + freq_buffer.push(freq); + let score = score_for(doc_id, freq); + if score > max_score { + max_score = score; + } + } + let max_score = max_score * idf_scale; + let _ = builder.write(max_score.to_le_bytes().as_ref())?; + compress_remainder(&doc_id_buffer, &mut builder)?; + compress_remainder(&freq_buffer, &mut builder)?; + builder.append_value(""); + return Ok((builder.finish(), max_score)); + } + + let mut builder = LargeBinaryBuilder::with_capacity(length.div_ceil(BLOCK_SIZE), length * 3); + let mut buffer = [0u8; BLOCK_SIZE * 4 + 5]; + let mut doc_id_buffer = Vec::with_capacity(BLOCK_SIZE); + let mut freq_buffer = Vec::with_capacity(BLOCK_SIZE); + let mut max_score = f32::MIN; + let mut block_max_score = f32::MIN; + for (doc_id, freq) in std::iter::zip(doc_ids, frequencies) { + let doc_id = *doc_id; + let freq = *freq; + doc_id_buffer.push(doc_id); + freq_buffer.push(freq); + + let score = score_for(doc_id, freq); + if score > block_max_score { + block_max_score = score; + } + + if doc_id_buffer.len() < BLOCK_SIZE { + continue; + } + + let block_score = block_max_score * idf_scale; + if block_score > max_score { + max_score = block_score; + } + let _ = builder.write(block_score.to_le_bytes().as_ref())?; + compress_sorted_block(&doc_id_buffer, &mut buffer, &mut builder)?; + compress_block(&freq_buffer, &mut buffer, &mut builder)?; + builder.append_value(""); + doc_id_buffer.clear(); + freq_buffer.clear(); + block_max_score = f32::MIN; + } + + if !doc_id_buffer.is_empty() { + let block_score = block_max_score * idf_scale; + if block_score > max_score { + max_score = block_score; + } + let _ = builder.write(block_score.to_le_bytes().as_ref())?; + compress_remainder(&doc_id_buffer, &mut builder)?; + compress_remainder(&freq_buffer, &mut builder)?; + builder.append_value(""); + } + Ok((builder.finish(), max_score)) +} + #[inline] fn compress_sorted_block( data: &[u32], diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 3605364715e..52a5e9623b0 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -59,7 +59,7 @@ use super::{ }; use super::{ builder::{InnerBuilder, PositionRecorder}, - encoding::compress_posting_list, + encoding::{compress_posting_list, compress_posting_list_with_scores}, iter::CompressedPostingListIterator, }; use super::{encoding::compress_positions, iter::PostingListIterator}; @@ -1900,18 +1900,13 @@ impl PostingListBuilder { } } - // assume the posting list is sorted by doc id - pub fn to_batch(self, block_max_scores: Vec) -> Result { + fn build_batch( + self, + compressed: LargeBinaryArray, + max_score: f32, + schema: SchemaRef, + ) -> Result { let length = self.len(); - let max_score = block_max_scores.iter().copied().fold(f32::MIN, f32::max); - - let schema = inverted_list_schema(self.has_positions()); - let compressed = compress_posting_list( - self.doc_ids.len(), - self.doc_ids.iter(), - self.frequencies.iter(), - block_max_scores.into_iter(), - )?; let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, compressed.len() as i32])); let mut columns = vec![ Arc::new(ListArray::try_new( @@ -1922,7 +1917,7 @@ impl PostingListBuilder { )?) as ArrayRef, Arc::new(Float32Array::from_iter_values(std::iter::once(max_score))) as ArrayRef, Arc::new(UInt32Array::from_iter_values(std::iter::once( - self.len() as u32 + length as u32, ))) as ArrayRef, ]; @@ -1946,6 +1941,37 @@ impl PostingListBuilder { Ok(batch) } + // assume the posting list is sorted by doc id + pub fn to_batch(self, block_max_scores: Vec) -> Result { + let max_score = block_max_scores.iter().copied().fold(f32::MIN, f32::max); + let schema = inverted_list_schema(self.has_positions()); + let compressed = compress_posting_list( + self.doc_ids.len(), + self.doc_ids.iter(), + self.frequencies.iter(), + block_max_scores.into_iter(), + )?; + self.build_batch(compressed, max_score, schema) + } + + pub fn to_batch_with_docs(self, docs: &DocSet, schema: SchemaRef) -> Result { + let length = self.len(); + let avgdl = docs.average_length(); + let idf_scale = idf(length, docs.len()) * (K1 + 1.0); + let (compressed, max_score) = compress_posting_list_with_scores( + length, + self.doc_ids.iter(), + self.frequencies.iter(), + |doc_id, freq| { + let doc_norm = K1 * (1.0 - B + B * docs.num_tokens(doc_id) as f32 / avgdl); + let freq = freq as f32; + freq / (freq + doc_norm) + }, + idf_scale, + )?; + self.build_batch(compressed, max_score, schema) + } + pub fn remap(&mut self, removed: &[u32]) { let mut cursor = 0; let mut new_doc_ids = ExpLinkedList::with_capacity(self.len()); @@ -2550,10 +2576,12 @@ mod tests { use crate::metrics::NoOpMetricsCollector; use crate::prefilter::NoFilter; - use crate::scalar::inverted::builder::{InnerBuilder, PositionRecorder}; + use crate::scalar::inverted::builder::{inverted_list_schema, InnerBuilder, PositionRecorder}; use crate::scalar::inverted::encoding::decompress_posting_list; use crate::scalar::inverted::query::{FtsSearchParams, Operator}; use crate::scalar::lance_format::LanceIndexStore; + use arrow::array::AsArray; + use arrow::datatypes::{Float32Type, UInt32Type}; use super::*; @@ -2595,6 +2623,54 @@ mod tests { .all(|(a, b)| a == b)); } + #[test] + fn test_posting_list_batch_matches_docset_scoring() { + let mut docs = DocSet::default(); + let num_docs = BLOCK_SIZE + 3; + for doc_id in 0..num_docs as u32 { + docs.append(doc_id as u64, doc_id % 7 + 1); + } + + let doc_ids = (0..num_docs as u32).collect::>(); + let freqs = doc_ids + .iter() + .map(|doc_id| doc_id % 5 + 1) + .collect::>(); + + let mut builder_scores = PostingListBuilder::new(false); + let mut builder_docs = PostingListBuilder::new(false); + for (&doc_id, &freq) in doc_ids.iter().zip(freqs.iter()) { + builder_scores.add(doc_id, PositionRecorder::Count(freq)); + builder_docs.add(doc_id, PositionRecorder::Count(freq)); + } + + let block_max_scores = docs.calculate_block_max_scores(doc_ids.iter(), freqs.iter()); + let batch_scores = builder_scores.to_batch(block_max_scores).unwrap(); + let batch_docs = builder_docs + .to_batch_with_docs(&docs, inverted_list_schema(false)) + .unwrap(); + + let scores_posting = batch_scores[POSTING_COL].as_list::().value(0); + let scores_posting = scores_posting.as_binary::(); + let docs_posting = batch_docs[POSTING_COL].as_list::().value(0); + let docs_posting = docs_posting.as_binary::(); + assert_eq!(scores_posting, docs_posting); + + let score_left = batch_scores[MAX_SCORE_COL] + .as_primitive::() + .value(0); + let score_right = batch_docs[MAX_SCORE_COL] + .as_primitive::() + .value(0); + assert!((score_left - score_right).abs() < 1e-6); + + let len_left = batch_scores[LENGTH_COL] + .as_primitive::() + .value(0); + let len_right = batch_docs[LENGTH_COL].as_primitive::().value(0); + assert_eq!(len_left, len_right); + } + #[tokio::test] async fn test_remap_to_empty_posting_list() { let tmpdir = TempObjDir::default(); From 3775afccdaff5359e569ba7518a6ff1145dd5dfa Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 16 Jan 2026 15:44:16 +0800 Subject: [PATCH 2/8] Limit posting list write fanout --- rust/lance-index/src/scalar/inverted/builder.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index b6e9f83c666..21d386d040d 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -493,13 +493,19 @@ impl InnerBuilder { let docs_for_batches = docs.clone(); let schema_for_batches = schema.clone(); + let cpu_parallelism = get_num_compute_intensive_cpus().max(1); + let shard_parallelism = (*LANCE_FTS_NUM_SHARDS).max(1); + // Indexing already shards work across workers, so limit per-worker fanout to avoid + // oversubscribing the CPU pool. + let per_worker_parallelism = (cpu_parallelism / shard_parallelism).max(1); + let mut batches = stream::iter(posting_lists) .map(move |posting_list| { let docs = docs_for_batches.clone(); let schema = schema_for_batches.clone(); spawn_cpu(move || posting_list.to_batch_with_docs(&docs, schema)) }) - .buffered(get_num_compute_intensive_cpus()); + .buffered(per_worker_parallelism); let mut write_duration = std::time::Duration::ZERO; let mut num_posting_lists = 0; From 99cc71a1d47b481fdc67bd2784b6a0841d8ee460 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 16 Jan 2026 15:49:28 +0800 Subject: [PATCH 3/8] Serialize posting list writes --- .../src/scalar/inverted/builder.rs | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 21d386d040d..9f09d4d1a8d 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -19,7 +19,7 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef}; use bitpacking::{BitPacker, BitPacker4x}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; use deepsize::DeepSizeOf; -use futures::{stream, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use lance_arrow::json::JSON_EXT_NAME; use lance_arrow::{iter_str_array, ARROW_EXT_NAME_KEY}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; @@ -493,25 +493,28 @@ impl InnerBuilder { let docs_for_batches = docs.clone(); let schema_for_batches = schema.clone(); - let cpu_parallelism = get_num_compute_intensive_cpus().max(1); - let shard_parallelism = (*LANCE_FTS_NUM_SHARDS).max(1); - // Indexing already shards work across workers, so limit per-worker fanout to avoid - // oversubscribing the CPU pool. - let per_worker_parallelism = (cpu_parallelism / shard_parallelism).max(1); - - let mut batches = stream::iter(posting_lists) - .map(move |posting_list| { - let docs = docs_for_batches.clone(); - let schema = schema_for_batches.clone(); - spawn_cpu(move || posting_list.to_batch_with_docs(&docs, schema)) - }) - .buffered(per_worker_parallelism); + let (tx, mut rx) = tokio::sync::mpsc::channel::>(1); + let producer = spawn_cpu(move || { + for posting_list in posting_lists { + let batch = + posting_list.to_batch_with_docs(&docs_for_batches, schema_for_batches.clone()); + let is_err = batch.is_err(); + if tx.blocking_send(batch).is_err() { + break; + } + if is_err { + break; + } + } + Ok(()) + }); let mut write_duration = std::time::Duration::ZERO; let mut num_posting_lists = 0; let mut buffer = Vec::new(); let mut size_sum = 0; - while let Some(batch) = batches.try_next().await? { + while let Some(batch) = rx.recv().await { + let batch = batch?; num_posting_lists += 1; size_sum += batch.get_array_memory_size(); buffer.push(batch); @@ -538,6 +541,7 @@ impl InnerBuilder { writer.write_record_batch(batch).await?; } + producer.await?; writer.finish().await?; Ok(()) } From 55d5515efbb5a624c21ebbd51c07c0892f28bdc2 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 16 Jan 2026 15:58:56 +0800 Subject: [PATCH 4/8] Increase posting list channel buffer --- rust/lance-index/src/scalar/inverted/builder.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 9f09d4d1a8d..bf37c45f976 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -493,7 +493,8 @@ impl InnerBuilder { let docs_for_batches = docs.clone(); let schema_for_batches = schema.clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel::>(1); + let channel_capacity = get_num_compute_intensive_cpus().max(1); + let (tx, mut rx) = tokio::sync::mpsc::channel::>(channel_capacity); let producer = spawn_cpu(move || { for posting_list in posting_lists { let batch = From 0759c8ae1a29ebd436fc3e409ed08960c53ac1b3 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 22 Jan 2026 13:19:08 +0800 Subject: [PATCH 5/8] fix Signed-off-by: BubbleCal --- rust/lance-index/src/scalar/inverted/builder.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index abdacad240f..91bf1352fa9 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -488,8 +488,6 @@ impl InnerBuilder { let mut write_duration = std::time::Duration::ZERO; let mut num_posting_lists = 0; - let mut buffer = Vec::new(); - let mut size_sum = 0; while let Some(batch) = rx.recv().await { let batch = batch?; num_posting_lists += 1; From 985b1cf454904f46cc5785ab257f79ff8809ee02 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 22 Jan 2026 16:08:40 +0800 Subject: [PATCH 6/8] smaller channel Signed-off-by: BubbleCal --- rust/lance-index/src/scalar/inverted/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 91bf1352fa9..e4be88c39cc 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -469,8 +469,8 @@ impl InnerBuilder { let docs_for_batches = docs.clone(); let schema_for_batches = schema.clone(); - let channel_capacity = get_num_compute_intensive_cpus().max(1); - let (tx, mut rx) = tokio::sync::mpsc::channel::>(channel_capacity); + // let channel_capacity = get_num_compute_intensive_cpus().max(1); + let (tx, mut rx) = tokio::sync::mpsc::channel::>(2); let producer = spawn_cpu(move || { for posting_list in posting_lists { let batch = From dc26a58b60b9f704abcf207d5ddf024478f67811 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 22 Jan 2026 16:54:57 +0800 Subject: [PATCH 7/8] Clarify posting list error flow --- rust/lance-index/src/scalar/inverted/builder.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index e4be88c39cc..c044f693af4 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -505,6 +505,8 @@ impl InnerBuilder { } } + // Errors from batch generation are sent through the channel and surfaced via `batch?`. + // Awaiting the producer here is just to propagate panics/cancellation. producer.await?; writer.finish().await?; Ok(()) From 424760ed73d09ddc6739f4d7193b2b6428920cd2 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 23 Jan 2026 16:19:34 +0800 Subject: [PATCH 8/8] Address review comments --- rust/lance-index/src/scalar/inverted/builder.rs | 1 - rust/lance-index/src/scalar/inverted/encoding.rs | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index c044f693af4..07f16391a0a 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -469,7 +469,6 @@ impl InnerBuilder { let docs_for_batches = docs.clone(); let schema_for_batches = schema.clone(); - // let channel_capacity = get_num_compute_intensive_cpus().max(1); let (tx, mut rx) = tokio::sync::mpsc::channel::>(2); let producer = spawn_cpu(move || { for posting_list in posting_lists { diff --git a/rust/lance-index/src/scalar/inverted/encoding.rs b/rust/lance-index/src/scalar/inverted/encoding.rs index 559e3eb3a74..57bc80cda66 100644 --- a/rust/lance-index/src/scalar/inverted/encoding.rs +++ b/rust/lance-index/src/scalar/inverted/encoding.rs @@ -100,6 +100,8 @@ pub fn compress_posting_list_with_scores<'a, F>( where F: FnMut(u32, u32) -> f32, { + // `length` comes from posting list size; zero would produce an invalid block + // (a max-score header with no doc/frequency data) and readers assume > 0 docs. debug_assert!(length > 0); if length < BLOCK_SIZE { let mut builder = LargeBinaryBuilder::with_capacity(1, length * 4 * 2 + 1);