Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 122 additions & 30 deletions rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer;
use crate::scalar::lance_format::LanceIndexStore;
use crate::scalar::IndexStore;
use crate::vector::graph::OrderedFloat;
use arrow::array::AsArray;
use arrow::datatypes;
use arrow::{array::AsArray, compute::concat_batches};
use arrow_array::{Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use bitpacking::{BitPacker, BitPacker4x};
Expand Down Expand Up @@ -44,16 +44,6 @@ use tracing::instrument;
// WARNING: changing this value will break the compatibility with existing indexes
pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;

// the (compressed) size of each flush for posting lists in MiB,
// when the `LANCE_FTS_FLUSH_THRESHOLD` is reached, the flush will be triggered,
// higher for better indexing performance, but more memory usage,
// it's in 16 MiB by default
static LANCE_FTS_FLUSH_SIZE: LazyLock<usize> = LazyLock::new(|| {
std::env::var("LANCE_FTS_FLUSH_SIZE")
.unwrap_or_else(|_| "16".to_string())
.parse()
.expect("failed to parse LANCE_FTS_FLUSH_SIZE")
});
// the number of shards to split the indexing work,
// the indexing process would spawn `LANCE_FTS_NUM_SHARDS` workers to build FTS,
// higher for faster indexing performance, but more memory usage,
Expand Down Expand Up @@ -475,8 +465,6 @@ impl InnerBuilder {
id,
self.with_position
);
let schema = inverted_list_schema(self.with_position);

let mut batches = stream::iter(posting_lists)
.map(|posting_list| {
let block_max_scores = docs.calculate_block_max_scores(
Expand All @@ -489,20 +477,11 @@ impl InnerBuilder {

let mut write_duration = std::time::Duration::ZERO;
let mut num_posting_lists = 0;
let mut buffer = Vec::new();
let mut size_sum = 0;
while let Some(batch) = batches.try_next().await? {
num_posting_lists += 1;
size_sum += batch.get_array_memory_size();
buffer.push(batch);
if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 {
let batch = concat_batches(&schema, buffer.iter())?;
buffer.clear();
size_sum = 0;
let start = std::time::Instant::now();
writer.write_record_batch(batch).await?;
write_duration += start.elapsed();
}
let start = std::time::Instant::now();
writer.write_record_batch(batch).await?;
write_duration += start.elapsed();

if num_posting_lists % 500_000 == 0 {
log::info!(
Expand All @@ -513,10 +492,6 @@ impl InnerBuilder {
);
}
}
if !buffer.is_empty() {
let batch = concat_batches(&schema, buffer.iter())?;
writer.write_record_batch(batch).await?;
}

writer.finish().await?;
Ok(())
Expand Down Expand Up @@ -1236,14 +1211,18 @@ pub fn document_input(
mod tests {
use super::*;
use crate::metrics::NoOpMetricsCollector;
use crate::scalar::{IndexReader, IndexWriter};
use arrow_array::{RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance_core::cache::LanceCache;
use lance_core::utils::tempfile::TempDir;
use lance_core::ROW_ID;
use std::sync::atomic::AtomicU64;
use snafu::location;
use std::any::Any;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};

fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Expand All @@ -1255,6 +1234,119 @@ mod tests {
RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
}

#[derive(Debug, Default)]
struct CountingStore {
write_count: Arc<AtomicUsize>,
}

impl CountingStore {
fn new() -> Self {
Self {
write_count: Arc::new(AtomicUsize::new(0)),
}
}

fn write_count(&self) -> usize {
self.write_count.load(Ordering::SeqCst)
}
}

impl DeepSizeOf for CountingStore {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
0
}
}

#[derive(Debug)]
struct CountingWriter {
write_count: Arc<AtomicUsize>,
}

#[async_trait]
impl IndexWriter for CountingWriter {
async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result<u64> {
Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64)
}

async fn finish(&mut self) -> Result<()> {
Ok(())
}

async fn finish_with_metadata(&mut self, _metadata: HashMap<String, String>) -> Result<()> {
Ok(())
}
}

#[async_trait]
impl IndexStore for CountingStore {
fn as_any(&self) -> &dyn Any {
self
}

fn io_parallelism(&self) -> usize {
1
}

async fn new_index_file(
&self,
_name: &str,
_schema: Arc<Schema>,
) -> Result<Box<dyn IndexWriter>> {
Ok(Box::new(CountingWriter {
write_count: self.write_count.clone(),
}))
}

async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
Err(Error::not_supported(
"CountingStore does not support reading",
location!(),
))
}

async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
Err(Error::not_supported(
"CountingStore does not support copying",
location!(),
))
}

async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
Err(Error::not_supported(
"CountingStore does not support renaming",
location!(),
))
}

async fn delete_index_file(&self, _name: &str) -> Result<()> {
Err(Error::not_supported(
"CountingStore does not support deleting",
location!(),
))
}
}

#[tokio::test]
async fn test_write_posting_lists_writes_each_batch() -> Result<()> {
let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default());
for doc_id in 0..3u64 {
builder.docs.append(doc_id, 1);
}

for doc_id in 0..3u32 {
let mut posting_list = PostingListBuilder::new(false);
posting_list.add(doc_id, PositionRecorder::Count(1));
builder.posting_lists.push(posting_list);
}

let store = CountingStore::new();
let docs = Arc::new(std::mem::take(&mut builder.docs));
builder.write_posting_lists(&store, docs).await?;

assert_eq!(store.write_count(), 3);
Ok(())
}

#[tokio::test]
async fn test_skip_merge_writes_partitions_as_is() -> Result<()> {
let src_dir = TempDir::default();
Expand Down
Loading