diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 5bae670a4ab..6e5155e1c20 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -12,8 +12,8 @@ use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer; use crate::scalar::lance_format::LanceIndexStore; use crate::scalar::IndexStore; use crate::vector::graph::OrderedFloat; +use arrow::array::AsArray; use arrow::datatypes; -use arrow::{array::AsArray, compute::concat_batches}; use arrow_array::{Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use bitpacking::{BitPacker, BitPacker4x}; @@ -44,16 +44,6 @@ use tracing::instrument; // WARNING: changing this value will break the compatibility with existing indexes pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN; -// the (compressed) size of each flush for posting lists in MiB, -// when the `LANCE_FTS_FLUSH_THRESHOLD` is reached, the flush will be triggered, -// higher for better indexing performance, but more memory usage, -// it's in 16 MiB by default -static LANCE_FTS_FLUSH_SIZE: LazyLock = LazyLock::new(|| { - std::env::var("LANCE_FTS_FLUSH_SIZE") - .unwrap_or_else(|_| "16".to_string()) - .parse() - .expect("failed to parse LANCE_FTS_FLUSH_SIZE") -}); // the number of shards to split the indexing work, // the indexing process would spawn `LANCE_FTS_NUM_SHARDS` workers to build FTS, // higher for faster indexing performance, but more memory usage, @@ -475,8 +465,6 @@ impl InnerBuilder { id, self.with_position ); - let schema = inverted_list_schema(self.with_position); - let mut batches = stream::iter(posting_lists) .map(|posting_list| { let block_max_scores = docs.calculate_block_max_scores( @@ -489,20 +477,11 @@ impl InnerBuilder { let mut write_duration = std::time::Duration::ZERO; let mut num_posting_lists = 0; - let mut buffer = Vec::new(); - let mut size_sum = 0; while let Some(batch) = batches.try_next().await? { num_posting_lists += 1; - size_sum += batch.get_array_memory_size(); - buffer.push(batch); - if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 { - let batch = concat_batches(&schema, buffer.iter())?; - buffer.clear(); - size_sum = 0; - let start = std::time::Instant::now(); - writer.write_record_batch(batch).await?; - write_duration += start.elapsed(); - } + let start = std::time::Instant::now(); + writer.write_record_batch(batch).await?; + write_duration += start.elapsed(); if num_posting_lists % 500_000 == 0 { log::info!( @@ -513,10 +492,6 @@ impl InnerBuilder { ); } } - if !buffer.is_empty() { - let batch = concat_batches(&schema, buffer.iter())?; - writer.write_record_batch(batch).await?; - } writer.finish().await?; Ok(()) @@ -1236,14 +1211,18 @@ pub fn document_input( mod tests { use super::*; use crate::metrics::NoOpMetricsCollector; + use crate::scalar::{IndexReader, IndexWriter}; use arrow_array::{RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; + use async_trait::async_trait; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; use lance_core::cache::LanceCache; use lance_core::utils::tempfile::TempDir; use lance_core::ROW_ID; - use std::sync::atomic::AtomicU64; + use snafu::location; + use std::any::Any; + use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ @@ -1255,6 +1234,119 @@ mod tests { RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap() } + #[derive(Debug, Default)] + struct CountingStore { + write_count: Arc, + } + + impl CountingStore { + fn new() -> Self { + Self { + write_count: Arc::new(AtomicUsize::new(0)), + } + } + + fn write_count(&self) -> usize { + self.write_count.load(Ordering::SeqCst) + } + } + + impl DeepSizeOf for CountingStore { + fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize { + 0 + } + } + + #[derive(Debug)] + struct CountingWriter { + write_count: Arc, + } + + #[async_trait] + impl IndexWriter for CountingWriter { + async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result { + Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64) + } + + async fn finish(&mut self) -> Result<()> { + Ok(()) + } + + async fn finish_with_metadata(&mut self, _metadata: HashMap) -> Result<()> { + Ok(()) + } + } + + #[async_trait] + impl IndexStore for CountingStore { + fn as_any(&self) -> &dyn Any { + self + } + + fn io_parallelism(&self) -> usize { + 1 + } + + async fn new_index_file( + &self, + _name: &str, + _schema: Arc, + ) -> Result> { + Ok(Box::new(CountingWriter { + write_count: self.write_count.clone(), + })) + } + + async fn open_index_file(&self, _name: &str) -> Result> { + Err(Error::not_supported( + "CountingStore does not support reading", + location!(), + )) + } + + async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> { + Err(Error::not_supported( + "CountingStore does not support copying", + location!(), + )) + } + + async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> { + Err(Error::not_supported( + "CountingStore does not support renaming", + location!(), + )) + } + + async fn delete_index_file(&self, _name: &str) -> Result<()> { + Err(Error::not_supported( + "CountingStore does not support deleting", + location!(), + )) + } + } + + #[tokio::test] + async fn test_write_posting_lists_writes_each_batch() -> Result<()> { + let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default()); + for doc_id in 0..3u64 { + builder.docs.append(doc_id, 1); + } + + for doc_id in 0..3u32 { + let mut posting_list = PostingListBuilder::new(false); + posting_list.add(doc_id, PositionRecorder::Count(1)); + builder.posting_lists.push(posting_list); + } + + let store = CountingStore::new(); + let docs = Arc::new(std::mem::take(&mut builder.docs)); + builder.write_posting_lists(&store, docs).await?; + + assert_eq!(store.write_count(), 3); + Ok(()) + } + #[tokio::test] async fn test_skip_merge_writes_partitions_as_is() -> Result<()> { let src_dir = TempDir::default();