From ac6f5b35d6f2c30e1915af1f76d54704b81bb74f Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 24 Mar 2026 23:07:21 +0800 Subject: [PATCH 1/5] fix: support stop-word gaps in phrase queries --- .../task-memory/phrase-stop-words/findings.md | 9 + .codex/task-memory/phrase-stop-words/plan.md | 10 + rust/lance-index/src/scalar/inverted/index.rs | 23 +- rust/lance-index/src/scalar/inverted/query.rs | 16 +- rust/lance-index/src/scalar/inverted/wand.rs | 38 ++++ rust/lance/src/dataset/tests/dataset_index.rs | 59 +++++ rust/lance/src/io/exec/fts.rs | 205 +++++++++++++++++- 7 files changed, 345 insertions(+), 15 deletions(-) create mode 100644 .codex/task-memory/phrase-stop-words/findings.md create mode 100644 .codex/task-memory/phrase-stop-words/plan.md diff --git a/.codex/task-memory/phrase-stop-words/findings.md b/.codex/task-memory/phrase-stop-words/findings.md new file mode 100644 index 00000000000..36f0328e9d8 --- /dev/null +++ b/.codex/task-memory/phrase-stop-words/findings.md @@ -0,0 +1,9 @@ +# Findings + +- `PhraseQueryExec` tokenizes the query with the inverted index tokenizer before searching. +- The main inverted index stores per-token positions from the tokenizer stream in `rust/lance-index/src/scalar/inverted/builder.rs`. +- The in-memory WAL FTS path already assigns positions over emitted tokens only, which naturally collapses removed stop words. +- The new end-to-end Rust dataset test fails on `query=want the apple` with zero matches against docs `want the apple` / `want an apple`. +- Root cause: `collect_query_tokens` dropped tokenizer positions and `load_posting_lists` reassigned phrase query positions as dense `0..n`, so removed stop words in the query lost their gaps. +- Preserving query positions fixes the missing-match bug, but it also introduces false positives like `want green apple` because the index cannot tell whether the gap token was a stop word. +- The final fix keeps tokenizer query positions for indexed phrase matching, then performs an exact post-validation step against the original document text when the phrase query actually contains removed stop words. diff --git a/.codex/task-memory/phrase-stop-words/plan.md b/.codex/task-memory/phrase-stop-words/plan.md new file mode 100644 index 00000000000..6ddee642fda --- /dev/null +++ b/.codex/task-memory/phrase-stop-words/plan.md @@ -0,0 +1,10 @@ +# Phrase Query Stop Words + +- Goal: verify whether phrase queries still match when the inverted index is built with `remove_stop_words=true`, especially for queries like `want the apple` vs `want an apple`. +- Steps: +- Add an end-to-end Rust dataset test that reproduces the requested cases. +- Run the targeted test to confirm current behavior. +- Only change implementation if the test fails. + +- Status: +- Completed. diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 422d154b453..8ecc92b9170 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -476,7 +476,12 @@ impl InvertedIndex { if postings.is_empty() { return Result::Ok(PartitionCandidates::empty()); } - let mut tokens_by_position = vec![String::new(); postings.len()]; + let max_position = postings + .iter() + .map(|posting| posting.term_index() as usize) + .max() + .unwrap_or_default(); + let mut tokens_by_position = vec![String::new(); max_position + 1]; for posting in &postings { let idx = posting.term_index() as usize; tokens_by_position[idx] = posting.token().to_owned(); @@ -978,11 +983,14 @@ impl InvertedPartition { true => self.expand_fuzzy(tokens, params)?, false => tokens.clone(), }; + let token_positions = (0..tokens.len()) + .map(|index| tokens.position(index)) + .collect::>(); let mut token_ids = Vec::with_capacity(tokens.len()); - for token in tokens { + for (index, token) in tokens.into_iter().enumerate() { let token_id = self.map(&token); if let Some(token_id) = token_id { - token_ids.push((token_id, token)); + token_ids.push((token_id, token, token_positions[index])); } else if is_phrase_query { // if the token is not found, we can't do phrase query return Ok(Vec::new()); @@ -992,14 +1000,13 @@ impl InvertedPartition { return Ok(Vec::new()); } if !is_phrase_query { - token_ids.sort_unstable_by_key(|(token_id, _)| *token_id); - token_ids.dedup_by_key(|(token_id, _)| *token_id); + token_ids.sort_unstable_by_key(|(token_id, _, _)| *token_id); + token_ids.dedup_by_key(|(token_id, _, _)| *token_id); } let num_docs = self.docs.len(); stream::iter(token_ids) - .enumerate() - .map(|(position, (token_id, token))| async move { + .map(|(token_id, token, position)| async move { let posting = self .inverted_list .posting_list(token_id, is_phrase_query, metrics) @@ -1010,7 +1017,7 @@ impl InvertedPartition { Result::Ok(PostingIterator::with_query_weight( token, token_id, - position as u32, + position, query_weight, posting, num_docs, diff --git a/rust/lance-index/src/scalar/inverted/query.rs b/rust/lance-index/src/scalar/inverted/query.rs index 4c207d83134..6a8ebb07840 100644 --- a/rust/lance-index/src/scalar/inverted/query.rs +++ b/rust/lance-index/src/scalar/inverted/query.rs @@ -719,12 +719,19 @@ impl FtsQueryNode for BooleanQuery { #[derive(Clone)] pub struct Tokens { tokens: Vec, + positions: Vec, tokens_map: HashMap, token_type: DocType, } impl Tokens { pub fn new(tokens: Vec, token_type: DocType) -> Self { + let positions = (0..tokens.len() as u32).collect(); + Self::with_positions(tokens, positions, token_type) + } + + pub fn with_positions(tokens: Vec, positions: Vec, token_type: DocType) -> Self { + debug_assert_eq!(tokens.len(), positions.len()); let mut tokens_vec = vec![]; let mut tokens_map = HashMap::new(); for (idx, token) in tokens.into_iter().enumerate() { @@ -734,6 +741,7 @@ impl Tokens { Self { tokens: tokens_vec, + positions, tokens_map, token_type, } @@ -762,6 +770,10 @@ impl Tokens { pub fn get_token(&self, index: usize) -> &str { &self.tokens[index] } + + pub fn position(&self, index: usize) -> u32 { + self.positions[index] + } } impl IntoIterator for Tokens { @@ -786,10 +798,12 @@ pub fn collect_query_tokens(text: &str, tokenizer: &mut Box) let token_type = tokenizer.doc_type(); let mut stream = tokenizer.token_stream_for_search(text); let mut tokens = Vec::new(); + let mut positions = Vec::new(); while let Some(token) = stream.next() { tokens.push(token.text.clone()); + positions.push(token.position as u32); } - Tokens::new(tokens, token_type) + Tokens::with_positions(tokens, positions, token_type) } pub fn has_query_token( diff --git a/rust/lance-index/src/scalar/inverted/wand.rs b/rust/lance-index/src/scalar/inverted/wand.rs index 7da5ddfb0fb..b06c75c0021 100644 --- a/rust/lance-index/src/scalar/inverted/wand.rs +++ b/rust/lance-index/src/scalar/inverted/wand.rs @@ -1993,6 +1993,44 @@ mod tests { assert!(wand.check_positions(0)); } + #[rstest] + fn test_exact_phrase_respects_query_position_gaps(#[values(false, true)] is_compressed: bool) { + let mut docs = DocSet::default(); + docs.append(0, 16); + + let postings = vec![ + PostingIterator::new( + String::from("want"), + 0, + 0, + generate_posting_list_with_positions( + vec![0], + vec![vec![0_u32]], + 1.0, + is_compressed, + ), + docs.len(), + ), + PostingIterator::new( + String::from("apple"), + 1, + 2, + generate_posting_list_with_positions( + vec![0], + vec![vec![2_u32]], + 1.0, + is_compressed, + ), + docs.len(), + ), + ]; + + let bm25 = IndexBM25Scorer::new(std::iter::empty()); + let wand = Wand::new(Operator::And, postings.into_iter(), &docs, bm25); + assert!(wand.check_exact_positions()); + assert!(wand.check_positions(0)); + } + #[rstest] fn test_and_phrase_miss_advances_to_next_candidate(#[values(false, true)] is_compressed: bool) { let mut docs = DocSet::default(); diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index c3aac4493d4..20d9a3d0fe5 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1776,6 +1776,65 @@ async fn test_fts_phrase_query() { assert_eq!(result.num_rows(), 0); } +#[tokio::test] +async fn test_fts_phrase_query_with_removed_stop_words() { + let tmpdir = TempStrDir::default(); + let uri = tmpdir.to_owned(); + drop(tmpdir); + + let doc_col: Arc = Arc::new(GenericStringArray::::from(vec![ + "want the apple", + "want an apple", + "want green apple", + "apple want the", + ])); + let ids = UInt64Array::from_iter_values(0..doc_col.len() as u64); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("doc", doc_col.data_type().to_owned(), true), + arrow_schema::Field::new("id", DataType::UInt64, false), + ]) + .into(), + vec![Arc::new(doc_col) as ArrayRef, Arc::new(ids) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let mut dataset = Dataset::write(batches, &uri, None).await.unwrap(); + + dataset + .create_index( + &["doc"], + IndexType::Inverted, + None, + &InvertedIndexParams::default() + .with_position(true) + .remove_stop_words(true), + true, + ) + .await + .unwrap(); + + for query in ["want the apple", "want an apple"] { + let result = dataset + .scan() + .project(&["id"]) + .unwrap() + .full_text_search(FullTextSearchQuery::new_query( + PhraseQuery::new(query.to_owned()).into(), + )) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let ids = result["id"].as_primitive::().values(); + assert_eq!(result.num_rows(), 2, "query={query}, ids={ids:?}"); + assert!(ids.contains(&0), "query={query}, ids={ids:?}"); + assert!(ids.contains(&1), "query={query}, ids={ids:?}"); + } +} + async fn prepare_json_dataset() -> (Dataset, String) { let text_col = Arc::new(StringArray::from(vec![ r#"{ diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 6e129841c76..436a09acfb4 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -25,7 +25,7 @@ use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS use super::PreFilterSource; use super::utils::{IndexMetrics, InstrumentedRecordBatchStreamAdapter, build_prefilter}; -use crate::{Dataset, index::DatasetIndexInternalExt}; +use crate::{Dataset, dataset::ProjectionRequest, index::DatasetIndexInternalExt}; use lance_index::metrics::MetricsCollector; use lance_index::scalar::inverted::builder::document_input; use lance_index::scalar::inverted::lance_tokenizer::{DocType, JsonTokenizer, LanceTokenizer}; @@ -33,6 +33,7 @@ use lance_index::scalar::inverted::query::{ BoostQuery, FtsSearchParams, MatchQuery, PhraseQuery, Tokens, collect_query_tokens, has_query_token, }; +use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; use lance_index::scalar::inverted::tokenizer::lance_tokenizer::TextTokenizer; use lance_index::scalar::inverted::{ FTS_SCHEMA, InvertedIndex, SCORE_COL, flat_bm25_search_stream, @@ -47,6 +48,14 @@ pub struct FtsIndexMetrics { baseline_metrics: BaselineMetrics, } +const STOP_WORD_PLACEHOLDER: &str = "__lance_stop_word__"; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct TokenWithPosition { + text: String, + position: u32, +} + impl FtsIndexMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { @@ -786,6 +795,177 @@ impl PhraseQueryExec { metrics: ExecutionPlanMetricsSet::new(), } } + + fn collect_tokens_with_positions( + text: &str, + tokenizer: &mut Box, + is_query: bool, + ) -> Vec { + let mut stream = if is_query { + tokenizer.token_stream_for_search(text) + } else { + tokenizer.token_stream_for_doc(text) + }; + + let mut tokens = Vec::new(); + while let Some(token) = stream.next() { + tokens.push(TokenWithPosition { + text: token.text.clone(), + position: token.position as u32, + }); + } + tokens + } + + fn collect_phrase_tokens_with_stop_words( + text: &str, + params: &InvertedIndexParams, + is_query: bool, + ) -> DataFusionResult> { + let mut all_tokens_tokenizer = params + .clone() + .remove_stop_words(false) + .build() + .map_err(DataFusionError::from)?; + let mut filtered_tokenizer = params.clone().build().map_err(DataFusionError::from)?; + + let all_tokens = + Self::collect_tokens_with_positions(text, &mut all_tokens_tokenizer, is_query); + let filtered_tokens = + Self::collect_tokens_with_positions(text, &mut filtered_tokenizer, is_query); + + let mut filtered_idx = 0usize; + let mut phrase_tokens = Vec::with_capacity(all_tokens.len()); + for token in all_tokens { + if filtered_tokens.get(filtered_idx) == Some(&token) { + phrase_tokens.push(token.text); + filtered_idx += 1; + } else { + phrase_tokens.push(STOP_WORD_PLACEHOLDER.to_string()); + } + } + Ok(phrase_tokens) + } + + fn matches_phrase_tokens(doc_tokens: &[String], query_tokens: &[String], slop: u32) -> bool { + if query_tokens.is_empty() { + return false; + } + + for (doc_idx, token) in doc_tokens.iter().enumerate() { + if token == &query_tokens[0] + && Self::matches_phrase_tokens_from(doc_tokens, query_tokens, doc_idx, 1, slop) + { + return true; + } + } + false + } + + fn matches_phrase_tokens_from( + doc_tokens: &[String], + query_tokens: &[String], + previous_doc_idx: usize, + next_query_idx: usize, + slop: u32, + ) -> bool { + if next_query_idx == query_tokens.len() { + return true; + } + + let min_doc_idx = previous_doc_idx.saturating_add(1); + if min_doc_idx >= doc_tokens.len() { + return false; + } + + let max_doc_idx = previous_doc_idx + .saturating_add(1 + slop as usize) + .min(doc_tokens.len() - 1); + if min_doc_idx > max_doc_idx { + return false; + } + + for doc_idx in min_doc_idx..=max_doc_idx { + if doc_tokens[doc_idx] == query_tokens[next_query_idx] + && Self::matches_phrase_tokens_from( + doc_tokens, + query_tokens, + doc_idx, + next_query_idx + 1, + slop, + ) + { + return true; + } + } + false + } + + async fn filter_stop_word_phrase_matches( + dataset: Arc, + column: &str, + row_ids: Vec, + scores: Vec, + params: &InvertedIndexParams, + query: &PhraseQuery, + ) -> DataFusionResult<(Vec, Vec)> { + let query_tokens = Self::collect_phrase_tokens_with_stop_words(&query.terms, params, true)?; + if !query_tokens + .iter() + .any(|token| token == STOP_WORD_PLACEHOLDER) + { + return Ok((row_ids, scores)); + } + + let projection = ProjectionRequest::from_columns([column], dataset.schema()); + let batch = dataset + .take_rows(&row_ids, projection) + .await + .map_err(DataFusionError::from)?; + let text_column = batch.column_by_name(column).ok_or_else(|| { + DataFusionError::Execution(format!("Column {} not found in batch", column)) + })?; + + let mut filtered_row_ids = Vec::with_capacity(row_ids.len()); + let mut filtered_scores = Vec::with_capacity(scores.len()); + match text_column.data_type() { + DataType::Utf8 => { + let text_column = text_column.as_string::(); + for ((row_id, score), value) in + row_ids.into_iter().zip(scores).zip(text_column.iter()) + { + let Some(value) = value else { + continue; + }; + let doc_tokens = + Self::collect_phrase_tokens_with_stop_words(value, params, false)?; + if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { + filtered_row_ids.push(row_id); + filtered_scores.push(score); + } + } + } + DataType::LargeUtf8 => { + let text_column = text_column.as_string::(); + for ((row_id, score), value) in + row_ids.into_iter().zip(scores).zip(text_column.iter()) + { + let Some(value) = value else { + continue; + }; + let doc_tokens = + Self::collect_phrase_tokens_with_stop_words(value, params, false)?; + if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { + filtered_row_ids.push(row_id); + filtered_scores.push(score); + } + } + } + _ => return Ok((row_ids, scores)), + } + + Ok((filtered_row_ids, filtered_scores)) + } } impl ExecutionPlan for PhraseQueryExec { @@ -872,10 +1052,13 @@ impl ExecutionPlan for PhraseQueryExec { let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); let stream = stream::once(async move { let _timer = metrics.baseline_metrics.elapsed_compute().timer(); - let column = query.column.ok_or(DataFusionError::Execution(format!( - "column not set for PhraseQuery {}", - query.terms - )))?; + let column = query + .column + .clone() + .ok_or(DataFusionError::Execution(format!( + "column not set for PhraseQuery {}", + query.terms + )))?; let index_meta = ds .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await? @@ -892,7 +1075,7 @@ impl ExecutionPlan for PhraseQueryExec { context.clone(), partition, &prefilter_source, - ds, + ds.clone(), &[index_meta], )?; @@ -909,6 +1092,7 @@ impl ExecutionPlan for PhraseQueryExec { let mut tokenizer = index.tokenizer(); let tokens = collect_query_tokens(&query.terms, &mut tokenizer); + let index_params = index.params().clone(); pre_filter.wait_for_ready().await?; let (doc_ids, scores) = index @@ -921,6 +1105,15 @@ impl ExecutionPlan for PhraseQueryExec { ) .boxed() .await?; + let (doc_ids, scores) = Self::filter_stop_word_phrase_matches( + ds.clone(), + &column, + doc_ids, + scores, + &index_params, + &query, + ) + .await?; metrics.baseline_metrics.record_output(doc_ids.len()); let batch = RecordBatch::try_new( FTS_SCHEMA.clone(), From 05a142048d30eb196f0f3fd8962ed40a3f77c430 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 24 Mar 2026 23:31:44 +0800 Subject: [PATCH 2/5] delete temp files Signed-off-by: BubbleCal --- .codex/task-memory/phrase-stop-words/findings.md | 9 --------- .codex/task-memory/phrase-stop-words/plan.md | 10 ---------- 2 files changed, 19 deletions(-) delete mode 100644 .codex/task-memory/phrase-stop-words/findings.md delete mode 100644 .codex/task-memory/phrase-stop-words/plan.md diff --git a/.codex/task-memory/phrase-stop-words/findings.md b/.codex/task-memory/phrase-stop-words/findings.md deleted file mode 100644 index 36f0328e9d8..00000000000 --- a/.codex/task-memory/phrase-stop-words/findings.md +++ /dev/null @@ -1,9 +0,0 @@ -# Findings - -- `PhraseQueryExec` tokenizes the query with the inverted index tokenizer before searching. -- The main inverted index stores per-token positions from the tokenizer stream in `rust/lance-index/src/scalar/inverted/builder.rs`. -- The in-memory WAL FTS path already assigns positions over emitted tokens only, which naturally collapses removed stop words. -- The new end-to-end Rust dataset test fails on `query=want the apple` with zero matches against docs `want the apple` / `want an apple`. -- Root cause: `collect_query_tokens` dropped tokenizer positions and `load_posting_lists` reassigned phrase query positions as dense `0..n`, so removed stop words in the query lost their gaps. -- Preserving query positions fixes the missing-match bug, but it also introduces false positives like `want green apple` because the index cannot tell whether the gap token was a stop word. -- The final fix keeps tokenizer query positions for indexed phrase matching, then performs an exact post-validation step against the original document text when the phrase query actually contains removed stop words. diff --git a/.codex/task-memory/phrase-stop-words/plan.md b/.codex/task-memory/phrase-stop-words/plan.md deleted file mode 100644 index 6ddee642fda..00000000000 --- a/.codex/task-memory/phrase-stop-words/plan.md +++ /dev/null @@ -1,10 +0,0 @@ -# Phrase Query Stop Words - -- Goal: verify whether phrase queries still match when the inverted index is built with `remove_stop_words=true`, especially for queries like `want the apple` vs `want an apple`. -- Steps: -- Add an end-to-end Rust dataset test that reproduces the requested cases. -- Run the targeted test to confirm current behavior. -- Only change implementation if the test fails. - -- Status: -- Completed. From 73a4fd1910d4a89f6b552500e2dda124fc616b3d Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Tue, 24 Mar 2026 23:36:45 +0800 Subject: [PATCH 3/5] fix: optimize stop-word phrase validation --- rust/lance/src/dataset/tests/dataset_index.rs | 1 + rust/lance/src/io/exec/fts.rs | 59 +++++++++++++------ 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 20d9a3d0fe5..329bbd3841c 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1832,6 +1832,7 @@ async fn test_fts_phrase_query_with_removed_stop_words() { assert_eq!(result.num_rows(), 2, "query={query}, ids={ids:?}"); assert!(ids.contains(&0), "query={query}, ids={ids:?}"); assert!(ids.contains(&1), "query={query}, ids={ids:?}"); + assert!(!ids.contains(&2), "query={query}, ids={ids:?}"); } } diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 436a09acfb4..87e0f2985fa 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -819,20 +819,13 @@ impl PhraseQueryExec { fn collect_phrase_tokens_with_stop_words( text: &str, - params: &InvertedIndexParams, + all_tokens_tokenizer: &mut Box, + filtered_tokenizer: &mut Box, is_query: bool, - ) -> DataFusionResult> { - let mut all_tokens_tokenizer = params - .clone() - .remove_stop_words(false) - .build() - .map_err(DataFusionError::from)?; - let mut filtered_tokenizer = params.clone().build().map_err(DataFusionError::from)?; - - let all_tokens = - Self::collect_tokens_with_positions(text, &mut all_tokens_tokenizer, is_query); + ) -> Vec { + let all_tokens = Self::collect_tokens_with_positions(text, all_tokens_tokenizer, is_query); let filtered_tokens = - Self::collect_tokens_with_positions(text, &mut filtered_tokenizer, is_query); + Self::collect_tokens_with_positions(text, filtered_tokenizer, is_query); let mut filtered_idx = 0usize; let mut phrase_tokens = Vec::with_capacity(all_tokens.len()); @@ -844,7 +837,19 @@ impl PhraseQueryExec { phrase_tokens.push(STOP_WORD_PLACEHOLDER.to_string()); } } - Ok(phrase_tokens) + phrase_tokens + } + + fn build_phrase_tokenizers( + params: &InvertedIndexParams, + ) -> DataFusionResult<(Box, Box)> { + let all_tokens_tokenizer = params + .clone() + .remove_stop_words(false) + .build() + .map_err(DataFusionError::from)?; + let filtered_tokenizer = params.clone().build().map_err(DataFusionError::from)?; + Ok((all_tokens_tokenizer, filtered_tokenizer)) } fn matches_phrase_tokens(doc_tokens: &[String], query_tokens: &[String], slop: u32) -> bool { @@ -869,6 +874,9 @@ impl PhraseQueryExec { next_query_idx: usize, slop: u32, ) -> bool { + // Phrase slop is typically small in practice. This recursive search keeps the + // implementation simple for the post-validation path, which only runs for + // stop-word phrase queries after indexed candidate generation. if next_query_idx == query_tokens.len() { return true; } @@ -909,7 +917,14 @@ impl PhraseQueryExec { params: &InvertedIndexParams, query: &PhraseQuery, ) -> DataFusionResult<(Vec, Vec)> { - let query_tokens = Self::collect_phrase_tokens_with_stop_words(&query.terms, params, true)?; + let (mut all_tokens_tokenizer, mut filtered_tokenizer) = + Self::build_phrase_tokenizers(params)?; + let query_tokens = Self::collect_phrase_tokens_with_stop_words( + &query.terms, + &mut all_tokens_tokenizer, + &mut filtered_tokenizer, + true, + ); if !query_tokens .iter() .any(|token| token == STOP_WORD_PLACEHOLDER) @@ -937,8 +952,12 @@ impl PhraseQueryExec { let Some(value) = value else { continue; }; - let doc_tokens = - Self::collect_phrase_tokens_with_stop_words(value, params, false)?; + let doc_tokens = Self::collect_phrase_tokens_with_stop_words( + value, + &mut all_tokens_tokenizer, + &mut filtered_tokenizer, + false, + ); if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { filtered_row_ids.push(row_id); filtered_scores.push(score); @@ -953,8 +972,12 @@ impl PhraseQueryExec { let Some(value) = value else { continue; }; - let doc_tokens = - Self::collect_phrase_tokens_with_stop_words(value, params, false)?; + let doc_tokens = Self::collect_phrase_tokens_with_stop_words( + value, + &mut all_tokens_tokenizer, + &mut filtered_tokenizer, + false, + ); if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { filtered_row_ids.push(row_id); filtered_scores.push(score); From f639b403a310070fca64535433874e864a5fe911 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 25 Mar 2026 10:27:50 +0800 Subject: [PATCH 4/5] fix missing doc Signed-off-by: BubbleCal --- rust/lance/src/dataset/tests/dataset_index.rs | 4 +- rust/lance/src/io/exec/fts.rs | 215 +----------------- 2 files changed, 3 insertions(+), 216 deletions(-) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 329bbd3841c..b51e8cc7c33 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1829,10 +1829,10 @@ async fn test_fts_phrase_query_with_removed_stop_words() { .unwrap(); let ids = result["id"].as_primitive::().values(); - assert_eq!(result.num_rows(), 2, "query={query}, ids={ids:?}"); + assert_eq!(result.num_rows(), 3, "query={query}, ids={ids:?}"); assert!(ids.contains(&0), "query={query}, ids={ids:?}"); assert!(ids.contains(&1), "query={query}, ids={ids:?}"); - assert!(!ids.contains(&2), "query={query}, ids={ids:?}"); + assert!(ids.contains(&2), "query={query}, ids={ids:?}"); } } diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 87e0f2985fa..7cefcb2046c 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -25,7 +25,7 @@ use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS use super::PreFilterSource; use super::utils::{IndexMetrics, InstrumentedRecordBatchStreamAdapter, build_prefilter}; -use crate::{Dataset, dataset::ProjectionRequest, index::DatasetIndexInternalExt}; +use crate::{Dataset, index::DatasetIndexInternalExt}; use lance_index::metrics::MetricsCollector; use lance_index::scalar::inverted::builder::document_input; use lance_index::scalar::inverted::lance_tokenizer::{DocType, JsonTokenizer, LanceTokenizer}; @@ -33,7 +33,6 @@ use lance_index::scalar::inverted::query::{ BoostQuery, FtsSearchParams, MatchQuery, PhraseQuery, Tokens, collect_query_tokens, has_query_token, }; -use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; use lance_index::scalar::inverted::tokenizer::lance_tokenizer::TextTokenizer; use lance_index::scalar::inverted::{ FTS_SCHEMA, InvertedIndex, SCORE_COL, flat_bm25_search_stream, @@ -48,14 +47,6 @@ pub struct FtsIndexMetrics { baseline_metrics: BaselineMetrics, } -const STOP_WORD_PLACEHOLDER: &str = "__lance_stop_word__"; - -#[derive(Clone, Debug, PartialEq, Eq)] -struct TokenWithPosition { - text: String, - position: u32, -} - impl FtsIndexMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { @@ -795,200 +786,6 @@ impl PhraseQueryExec { metrics: ExecutionPlanMetricsSet::new(), } } - - fn collect_tokens_with_positions( - text: &str, - tokenizer: &mut Box, - is_query: bool, - ) -> Vec { - let mut stream = if is_query { - tokenizer.token_stream_for_search(text) - } else { - tokenizer.token_stream_for_doc(text) - }; - - let mut tokens = Vec::new(); - while let Some(token) = stream.next() { - tokens.push(TokenWithPosition { - text: token.text.clone(), - position: token.position as u32, - }); - } - tokens - } - - fn collect_phrase_tokens_with_stop_words( - text: &str, - all_tokens_tokenizer: &mut Box, - filtered_tokenizer: &mut Box, - is_query: bool, - ) -> Vec { - let all_tokens = Self::collect_tokens_with_positions(text, all_tokens_tokenizer, is_query); - let filtered_tokens = - Self::collect_tokens_with_positions(text, filtered_tokenizer, is_query); - - let mut filtered_idx = 0usize; - let mut phrase_tokens = Vec::with_capacity(all_tokens.len()); - for token in all_tokens { - if filtered_tokens.get(filtered_idx) == Some(&token) { - phrase_tokens.push(token.text); - filtered_idx += 1; - } else { - phrase_tokens.push(STOP_WORD_PLACEHOLDER.to_string()); - } - } - phrase_tokens - } - - fn build_phrase_tokenizers( - params: &InvertedIndexParams, - ) -> DataFusionResult<(Box, Box)> { - let all_tokens_tokenizer = params - .clone() - .remove_stop_words(false) - .build() - .map_err(DataFusionError::from)?; - let filtered_tokenizer = params.clone().build().map_err(DataFusionError::from)?; - Ok((all_tokens_tokenizer, filtered_tokenizer)) - } - - fn matches_phrase_tokens(doc_tokens: &[String], query_tokens: &[String], slop: u32) -> bool { - if query_tokens.is_empty() { - return false; - } - - for (doc_idx, token) in doc_tokens.iter().enumerate() { - if token == &query_tokens[0] - && Self::matches_phrase_tokens_from(doc_tokens, query_tokens, doc_idx, 1, slop) - { - return true; - } - } - false - } - - fn matches_phrase_tokens_from( - doc_tokens: &[String], - query_tokens: &[String], - previous_doc_idx: usize, - next_query_idx: usize, - slop: u32, - ) -> bool { - // Phrase slop is typically small in practice. This recursive search keeps the - // implementation simple for the post-validation path, which only runs for - // stop-word phrase queries after indexed candidate generation. - if next_query_idx == query_tokens.len() { - return true; - } - - let min_doc_idx = previous_doc_idx.saturating_add(1); - if min_doc_idx >= doc_tokens.len() { - return false; - } - - let max_doc_idx = previous_doc_idx - .saturating_add(1 + slop as usize) - .min(doc_tokens.len() - 1); - if min_doc_idx > max_doc_idx { - return false; - } - - for doc_idx in min_doc_idx..=max_doc_idx { - if doc_tokens[doc_idx] == query_tokens[next_query_idx] - && Self::matches_phrase_tokens_from( - doc_tokens, - query_tokens, - doc_idx, - next_query_idx + 1, - slop, - ) - { - return true; - } - } - false - } - - async fn filter_stop_word_phrase_matches( - dataset: Arc, - column: &str, - row_ids: Vec, - scores: Vec, - params: &InvertedIndexParams, - query: &PhraseQuery, - ) -> DataFusionResult<(Vec, Vec)> { - let (mut all_tokens_tokenizer, mut filtered_tokenizer) = - Self::build_phrase_tokenizers(params)?; - let query_tokens = Self::collect_phrase_tokens_with_stop_words( - &query.terms, - &mut all_tokens_tokenizer, - &mut filtered_tokenizer, - true, - ); - if !query_tokens - .iter() - .any(|token| token == STOP_WORD_PLACEHOLDER) - { - return Ok((row_ids, scores)); - } - - let projection = ProjectionRequest::from_columns([column], dataset.schema()); - let batch = dataset - .take_rows(&row_ids, projection) - .await - .map_err(DataFusionError::from)?; - let text_column = batch.column_by_name(column).ok_or_else(|| { - DataFusionError::Execution(format!("Column {} not found in batch", column)) - })?; - - let mut filtered_row_ids = Vec::with_capacity(row_ids.len()); - let mut filtered_scores = Vec::with_capacity(scores.len()); - match text_column.data_type() { - DataType::Utf8 => { - let text_column = text_column.as_string::(); - for ((row_id, score), value) in - row_ids.into_iter().zip(scores).zip(text_column.iter()) - { - let Some(value) = value else { - continue; - }; - let doc_tokens = Self::collect_phrase_tokens_with_stop_words( - value, - &mut all_tokens_tokenizer, - &mut filtered_tokenizer, - false, - ); - if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { - filtered_row_ids.push(row_id); - filtered_scores.push(score); - } - } - } - DataType::LargeUtf8 => { - let text_column = text_column.as_string::(); - for ((row_id, score), value) in - row_ids.into_iter().zip(scores).zip(text_column.iter()) - { - let Some(value) = value else { - continue; - }; - let doc_tokens = Self::collect_phrase_tokens_with_stop_words( - value, - &mut all_tokens_tokenizer, - &mut filtered_tokenizer, - false, - ); - if Self::matches_phrase_tokens(&doc_tokens, &query_tokens, query.slop) { - filtered_row_ids.push(row_id); - filtered_scores.push(score); - } - } - } - _ => return Ok((row_ids, scores)), - } - - Ok((filtered_row_ids, filtered_scores)) - } } impl ExecutionPlan for PhraseQueryExec { @@ -1115,7 +912,6 @@ impl ExecutionPlan for PhraseQueryExec { let mut tokenizer = index.tokenizer(); let tokens = collect_query_tokens(&query.terms, &mut tokenizer); - let index_params = index.params().clone(); pre_filter.wait_for_ready().await?; let (doc_ids, scores) = index @@ -1128,15 +924,6 @@ impl ExecutionPlan for PhraseQueryExec { ) .boxed() .await?; - let (doc_ids, scores) = Self::filter_stop_word_phrase_matches( - ds.clone(), - &column, - doc_ids, - scores, - &index_params, - &query, - ) - .await?; metrics.baseline_metrics.record_output(doc_ids.len()); let batch = RecordBatch::try_new( FTS_SCHEMA.clone(), From 1d6fb7a54e5464a3334dd584b45ac9a3de10c429 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 25 Mar 2026 19:51:24 +0800 Subject: [PATCH 5/5] more test Signed-off-by: BubbleCal --- rust/lance/src/dataset/tests/dataset_index.rs | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index b51e8cc7c33..87c17cdba32 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1836,6 +1836,65 @@ async fn test_fts_phrase_query_with_removed_stop_words() { } } +#[tokio::test] +async fn test_fts_phrase_query_preserves_stop_word_gaps() { + let tmpdir = TempStrDir::default(); + let uri = tmpdir.to_owned(); + drop(tmpdir); + + let doc_col: Arc = Arc::new(GenericStringArray::::from(vec![ + "the united states of america", + "the united states and america", + "united states america", + "the united states of north america", + ])); + let ids = UInt64Array::from_iter_values(0..doc_col.len() as u64); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("doc", doc_col.data_type().to_owned(), true), + arrow_schema::Field::new("id", DataType::UInt64, false), + ]) + .into(), + vec![Arc::new(doc_col) as ArrayRef, Arc::new(ids) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let mut dataset = Dataset::write(batches, &uri, None).await.unwrap(); + + dataset + .create_index( + &["doc"], + IndexType::Inverted, + None, + &InvertedIndexParams::default() + .with_position(true) + .remove_stop_words(true), + true, + ) + .await + .unwrap(); + + let result = dataset + .scan() + .project(&["id"]) + .unwrap() + .full_text_search(FullTextSearchQuery::new_query( + PhraseQuery::new("the united states of america".to_owned()).into(), + )) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let ids = result["id"].as_primitive::().values(); + assert_eq!(result.num_rows(), 2, "ids={ids:?}"); + assert!(ids.contains(&0), "ids={ids:?}"); + assert!(ids.contains(&1), "ids={ids:?}"); + assert!(!ids.contains(&2), "ids={ids:?}"); + assert!(!ids.contains(&3), "ids={ids:?}"); +} + async fn prepare_json_dataset() -> (Dataset, String) { let text_col = Arc::new(StringArray::from(vec![ r#"{