From 343dc721bea7768e47a2a2eb979d55253612a92a Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Mon, 5 Jan 2026 21:21:58 +0800 Subject: [PATCH 1/4] Add parts_searched metrics for FTS --- rust/lance-datafusion/src/utils.rs | 1 + rust/lance-index/src/scalar/inverted/index.rs | 5 + rust/lance/src/io/exec/fts.rs | 116 +++++++++++++++++- 3 files changed, 117 insertions(+), 5 deletions(-) diff --git a/rust/lance-datafusion/src/utils.rs b/rust/lance-datafusion/src/utils.rs index 39d693bace8..421942b80d8 100644 --- a/rust/lance-datafusion/src/utils.rs +++ b/rust/lance-datafusion/src/utils.rs @@ -234,6 +234,7 @@ pub const REQUESTS_METRIC: &str = "requests"; pub const BYTES_READ_METRIC: &str = "bytes_read"; pub const INDICES_LOADED_METRIC: &str = "indices_loaded"; pub const PARTS_LOADED_METRIC: &str = "parts_loaded"; +pub const PARTS_SEARCHED_METRIC: &str = "parts_searched"; pub const PARTITIONS_RANKED_METRIC: &str = "partitions_ranked"; pub const INDEX_COMPARISONS_METRIC: &str = "index_comparisons"; pub const FRAGMENTS_SCANNED_METRIC: &str = "fragments_scanned"; diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index f9794079811..2c5c7a847a5 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -236,6 +236,11 @@ impl InvertedIndex { &self.params } + /// Returns the number of partitions in this inverted index. + pub fn partition_count(&self) -> usize { + self.partitions.len() + } + // search the documents that contain the query // return the row ids of the documents sorted by bm25 score // ref: https://en.wikipedia.org/wiki/Okapi_BM25 diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 6fc3ce9e3d4..cd150f6a090 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -16,10 +16,11 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; -use datafusion_physical_plan::metrics::BaselineMetrics; +use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; +use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTS_SEARCHED_METRIC}; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; use super::utils::{build_prefilter, IndexMetrics, InstrumentedRecordBatchStreamAdapter}; @@ -41,6 +42,7 @@ use tracing::instrument; pub struct FtsIndexMetrics { index_metrics: IndexMetrics, + parts_searched: Count, baseline_metrics: BaselineMetrics, } @@ -48,9 +50,14 @@ impl FtsIndexMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { index_metrics: IndexMetrics::new(metrics, partition), + parts_searched: metrics.new_count(PARTS_SEARCHED_METRIC, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), } } + + pub fn record_parts_searched(&self, num_parts: usize) { + self.parts_searched.add(num_parts); + } } impl MetricsCollector for FtsIndexMetrics { @@ -251,6 +258,7 @@ impl ExecutionPlan for MatchQueryExec { column, )) })?; + metrics.record_parts_searched(inverted_idx.partition_count()); let is_fuzzy = matches!(query.fuzziness, Some(n) if n != 0); let params = params @@ -450,6 +458,9 @@ impl ExecutionPlan for FlatMatchQueryExec { } None => None, }; + if let Some(index) = inverted_idx.as_ref() { + metrics.record_parts_searched(index.partition_count()); + } Ok::<_, DataFusionError>(flat_bm25_search_stream( unindexed_input, @@ -669,6 +680,7 @@ impl ExecutionPlan for PhraseQueryExec { column, )) })?; + metrics.record_parts_searched(index.partition_count()); let mut tokenizer = index.tokenizer(); let tokens = collect_query_tokens(&query.terms, &mut tokenizer, None); @@ -1126,20 +1138,47 @@ impl ExecutionPlan for BooleanQueryExec { #[cfg(test)] pub mod tests { - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; + use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datafusion::datagen::DatafusionDatagenExt; + use lance_datafusion::utils::PARTS_SEARCHED_METRIC; use lance_datagen::{BatchCount, ByteCount, RowCount}; use lance_index::scalar::inverted::query::{ BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, }; - use lance_index::scalar::inverted::FTS_SCHEMA; - - use crate::{io::exec::PreFilterSource, utils::test::NoContextTestFixture}; + use lance_index::scalar::inverted::{InvertedIndex, FTS_SCHEMA}; + use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; + use lance_index::{DatasetIndexExt, IndexCriteria, IndexType}; + use lance_index::metrics::NoOpMetricsCollector; + + use crate::{ + index::DatasetIndexInternalExt, + io::exec::PreFilterSource, + utils::test::{DatagenExt, FragmentCount, FragmentRowCount, NoContextTestFixture}, + }; use super::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec}; + #[derive(Default)] + struct StatsHolder { + collected_stats: Arc>>, + } + + impl StatsHolder { + fn get_setter(&self) -> ExecutionStatsCallback { + let collected_stats = self.collected_stats.clone(); + Arc::new(move |stats| { + *collected_stats.lock().unwrap() = Some(stats.clone()); + }) + } + + fn consume(self) -> ExecutionSummaryCounts { + self.collected_stats.lock().unwrap().take().unwrap() + } + } + #[test] fn execute_without_context() { // These tests ensure we can create nodes and call execute without a tokio Runtime @@ -1224,4 +1263,71 @@ pub mod tests { let metrics = boost_query.metrics().unwrap(); assert!(metrics.elapsed_compute().unwrap() > 0); } + + #[tokio::test] + async fn test_parts_searched_metrics() { + let mut dataset = lance_datagen::gen_batch() + .col( + "text", + lance_datagen::array::cycle_utf8_literals(&["hello", "lance", "search"]), + ) + .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(5)) + .await + .unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + let index_meta = dataset + .load_scalar_index(IndexCriteria::default().for_column("text").supports_fts()) + .await + .unwrap() + .unwrap(); + let index = dataset + .open_generic_index( + "text", + &index_meta.uuid.to_string(), + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let inverted_index = index.as_any().downcast_ref::().unwrap(); + let expected_parts = inverted_index.partition_count(); + + let stats_holder = StatsHolder::default(); + let mut scanner = dataset.scan(); + scanner + .scan_stats_callback(stats_holder.get_setter()) + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new("hello".to_string())) + .unwrap(); + let _ = scanner.try_into_batch().await.unwrap(); + let stats = stats_holder.consume(); + let parts_searched = stats + .all_counts + .get(PARTS_SEARCHED_METRIC) + .copied() + .unwrap_or_default(); + assert_eq!(parts_searched, expected_parts); + + let mut analyze_scanner = dataset.scan(); + analyze_scanner + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new("hello".to_string())) + .unwrap(); + let analysis = analyze_scanner.analyze_plan().await.unwrap(); + assert!(analysis.contains(PARTS_SEARCHED_METRIC)); + } } From 66a154323aa88fb19c20cfe434023c38085dba22 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Mon, 5 Jan 2026 21:27:12 +0800 Subject: [PATCH 2/4] Reuse partitions_searched for FTS --- rust/lance-datafusion/src/utils.rs | 1 - rust/lance/src/io/exec/fts.rs | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rust/lance-datafusion/src/utils.rs b/rust/lance-datafusion/src/utils.rs index 421942b80d8..39d693bace8 100644 --- a/rust/lance-datafusion/src/utils.rs +++ b/rust/lance-datafusion/src/utils.rs @@ -234,7 +234,6 @@ pub const REQUESTS_METRIC: &str = "requests"; pub const BYTES_READ_METRIC: &str = "bytes_read"; pub const INDICES_LOADED_METRIC: &str = "indices_loaded"; pub const PARTS_LOADED_METRIC: &str = "parts_loaded"; -pub const PARTS_SEARCHED_METRIC: &str = "parts_searched"; pub const PARTITIONS_RANKED_METRIC: &str = "partitions_ranked"; pub const INDEX_COMPARISONS_METRIC: &str = "index_comparisons"; pub const FRAGMENTS_SCANNED_METRIC: &str = "fragments_scanned"; diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index cd150f6a090..e7f646fbe5a 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -20,7 +20,7 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; -use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTS_SEARCHED_METRIC}; +use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTITIONS_SEARCHED_METRIC}; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; use super::utils::{build_prefilter, IndexMetrics, InstrumentedRecordBatchStreamAdapter}; @@ -42,7 +42,7 @@ use tracing::instrument; pub struct FtsIndexMetrics { index_metrics: IndexMetrics, - parts_searched: Count, + partitions_searched: Count, baseline_metrics: BaselineMetrics, } @@ -50,13 +50,13 @@ impl FtsIndexMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { index_metrics: IndexMetrics::new(metrics, partition), - parts_searched: metrics.new_count(PARTS_SEARCHED_METRIC, partition), + partitions_searched: metrics.new_count(PARTITIONS_SEARCHED_METRIC, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), } } pub fn record_parts_searched(&self, num_parts: usize) { - self.parts_searched.add(num_parts); + self.partitions_searched.add(num_parts); } } @@ -1143,7 +1143,7 @@ pub mod tests { use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datafusion::datagen::DatafusionDatagenExt; - use lance_datafusion::utils::PARTS_SEARCHED_METRIC; + use lance_datafusion::utils::PARTITIONS_SEARCHED_METRIC; use lance_datagen::{BatchCount, ByteCount, RowCount}; use lance_index::scalar::inverted::query::{ BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, @@ -1315,7 +1315,7 @@ pub mod tests { let stats = stats_holder.consume(); let parts_searched = stats .all_counts - .get(PARTS_SEARCHED_METRIC) + .get(PARTITIONS_SEARCHED_METRIC) .copied() .unwrap_or_default(); assert_eq!(parts_searched, expected_parts); @@ -1328,6 +1328,6 @@ pub mod tests { .full_text_search(FullTextSearchQuery::new("hello".to_string())) .unwrap(); let analysis = analyze_scanner.analyze_plan().await.unwrap(); - assert!(analysis.contains(PARTS_SEARCHED_METRIC)); + assert!(analysis.contains(PARTITIONS_SEARCHED_METRIC)); } } From 79cf8141abdff6b6a9642bc1fb84b179ceabf912 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Mon, 5 Jan 2026 21:35:13 +0800 Subject: [PATCH 3/4] fmt Signed-off-by: BubbleCal --- rust/lance/src/io/exec/fts.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index e7f646fbe5a..3a2932639db 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -20,8 +20,8 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; -use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTITIONS_SEARCHED_METRIC}; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; +use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTITIONS_SEARCHED_METRIC}; use super::utils::{build_prefilter, IndexMetrics, InstrumentedRecordBatchStreamAdapter}; use super::PreFilterSource; @@ -1141,17 +1141,17 @@ pub mod tests { use std::sync::{Arc, Mutex}; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; - use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datafusion::datagen::DatafusionDatagenExt; + use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datafusion::utils::PARTITIONS_SEARCHED_METRIC; use lance_datagen::{BatchCount, ByteCount, RowCount}; + use lance_index::metrics::NoOpMetricsCollector; use lance_index::scalar::inverted::query::{ BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, }; use lance_index::scalar::inverted::{InvertedIndex, FTS_SCHEMA}; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; use lance_index::{DatasetIndexExt, IndexCriteria, IndexType}; - use lance_index::metrics::NoOpMetricsCollector; use crate::{ index::DatasetIndexInternalExt, @@ -1292,11 +1292,7 @@ pub mod tests { .unwrap() .unwrap(); let index = dataset - .open_generic_index( - "text", - &index_meta.uuid.to_string(), - &NoOpMetricsCollector, - ) + .open_generic_index("text", &index_meta.uuid.to_string(), &NoOpMetricsCollector) .await .unwrap(); let inverted_index = index.as_any().downcast_ref::().unwrap(); From 6067560ebcbc04a793b20067b850419011c08cd1 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Mon, 5 Jan 2026 22:20:08 +0800 Subject: [PATCH 4/4] Record partitions searched for BooleanQuery --- rust/lance/src/io/exec/fts.rs | 92 ++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 3a2932639db..72ef63846df 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -21,7 +21,7 @@ use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; -use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, PARTITIONS_SEARCHED_METRIC}; +use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS_SEARCHED_METRIC}; use super::utils::{build_prefilter, IndexMetrics, InstrumentedRecordBatchStreamAdapter}; use super::PreFilterSource; @@ -1051,6 +1051,9 @@ impl ExecutionPlan for BooleanQueryExec { context: Arc, ) -> DataFusionResult { let params = self.params.clone(); + let should_plan = self.should.clone(); + let must_plan = self.must.clone(); + let must_not_plan = self.must_not.clone(); let must = self .must .as_ref() @@ -1100,6 +1103,22 @@ impl ExecutionPlan for BooleanQueryExec { } } + let mut partitions_searched = 0; + for plan in [Some(&should_plan), must_plan.as_ref(), Some(&must_not_plan)] { + let Some(plan) = plan else { + continue; + }; + let Some(metrics) = plan.metrics() else { + continue; + }; + for (metric_name, count) in metrics.iter_counts() { + if metric_name.as_ref() == PARTITIONS_SEARCHED_METRIC { + partitions_searched += count.value(); + } + } + } + metrics.record_parts_searched(partitions_searched); + // sort the results and take the top k let _timer = elapsed_time.timer(); let (row_ids, scores): (Vec<_>, Vec<_>) = res @@ -1147,7 +1166,8 @@ pub mod tests { use lance_datagen::{BatchCount, ByteCount, RowCount}; use lance_index::metrics::NoOpMetricsCollector; use lance_index::scalar::inverted::query::{ - BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, + BooleanQuery, BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, Occur, Operator, + PhraseQuery, }; use lance_index::scalar::inverted::{InvertedIndex, FTS_SCHEMA}; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; @@ -1326,4 +1346,72 @@ pub mod tests { let analysis = analyze_scanner.analyze_plan().await.unwrap(); assert!(analysis.contains(PARTITIONS_SEARCHED_METRIC)); } + + #[tokio::test] + async fn test_boolean_query_parts_searched_metrics() { + let mut dataset = lance_datagen::gen_batch() + .col( + "text", + lance_datagen::array::cycle_utf8_literals(&["hello", "lance", "search"]), + ) + .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(5)) + .await + .unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + let index_meta = dataset + .load_scalar_index(IndexCriteria::default().for_column("text").supports_fts()) + .await + .unwrap() + .unwrap(); + let index = dataset + .open_generic_index("text", &index_meta.uuid.to_string(), &NoOpMetricsCollector) + .await + .unwrap(); + let inverted_index = index.as_any().downcast_ref::().unwrap(); + let expected_parts = inverted_index.partition_count(); + + let query = BooleanQuery::new([ + ( + Occur::Should, + MatchQuery::new("hello".to_string()) + .with_operator(Operator::And) + .into(), + ), + ( + Occur::Must, + MatchQuery::new("lance".to_string()) + .with_operator(Operator::And) + .into(), + ), + ]); + let expected_total = expected_parts * 2; + + let mut scanner = dataset.scan(); + scanner + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new_query(query.into())) + .unwrap(); + let analysis = scanner.analyze_plan().await.unwrap(); + let boolean_line = analysis + .lines() + .find(|line| line.contains("BooleanQuery")) + .unwrap(); + assert!( + boolean_line.contains(&format!("{PARTITIONS_SEARCHED_METRIC}={expected_total}")), + "BooleanQuery metrics missing partitions_searched: {boolean_line}" + ); + } }