diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index f9794079811..2c5c7a847a5 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -236,6 +236,11 @@ impl InvertedIndex { &self.params } + /// Returns the number of partitions in this inverted index. + pub fn partition_count(&self) -> usize { + self.partitions.len() + } + // search the documents that contain the query // return the row ids of the documents sorted by bm25 score // ref: https://en.wikipedia.org/wiki/Okapi_BM25 diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 6fc3ce9e3d4..72ef63846df 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -16,11 +16,12 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; -use datafusion_physical_plan::metrics::BaselineMetrics; +use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; +use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS_SEARCHED_METRIC}; use super::utils::{build_prefilter, IndexMetrics, InstrumentedRecordBatchStreamAdapter}; use super::PreFilterSource; @@ -41,6 +42,7 @@ use tracing::instrument; pub struct FtsIndexMetrics { index_metrics: IndexMetrics, + partitions_searched: Count, baseline_metrics: BaselineMetrics, } @@ -48,9 +50,14 @@ impl FtsIndexMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { index_metrics: IndexMetrics::new(metrics, partition), + partitions_searched: metrics.new_count(PARTITIONS_SEARCHED_METRIC, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), } } + + pub fn record_parts_searched(&self, num_parts: usize) { + self.partitions_searched.add(num_parts); + } } impl MetricsCollector for FtsIndexMetrics { @@ -251,6 +258,7 @@ impl ExecutionPlan for MatchQueryExec { column, )) })?; + metrics.record_parts_searched(inverted_idx.partition_count()); let is_fuzzy = matches!(query.fuzziness, Some(n) if n != 0); let params = params @@ -450,6 +458,9 @@ impl ExecutionPlan for FlatMatchQueryExec { } None => None, }; + if let Some(index) = inverted_idx.as_ref() { + metrics.record_parts_searched(index.partition_count()); + } Ok::<_, DataFusionError>(flat_bm25_search_stream( unindexed_input, @@ -669,6 +680,7 @@ impl ExecutionPlan for PhraseQueryExec { column, )) })?; + metrics.record_parts_searched(index.partition_count()); let mut tokenizer = index.tokenizer(); let tokens = collect_query_tokens(&query.terms, &mut tokenizer, None); @@ -1039,6 +1051,9 @@ impl ExecutionPlan for BooleanQueryExec { context: Arc, ) -> DataFusionResult { let params = self.params.clone(); + let should_plan = self.should.clone(); + let must_plan = self.must.clone(); + let must_not_plan = self.must_not.clone(); let must = self .must .as_ref() @@ -1088,6 +1103,22 @@ impl ExecutionPlan for BooleanQueryExec { } } + let mut partitions_searched = 0; + for plan in [Some(&should_plan), must_plan.as_ref(), Some(&must_not_plan)] { + let Some(plan) = plan else { + continue; + }; + let Some(metrics) = plan.metrics() else { + continue; + }; + for (metric_name, count) in metrics.iter_counts() { + if metric_name.as_ref() == PARTITIONS_SEARCHED_METRIC { + partitions_searched += count.value(); + } + } + } + metrics.record_parts_searched(partitions_searched); + // sort the results and take the top k let _timer = elapsed_time.timer(); let (row_ids, scores): (Vec<_>, Vec<_>) = res @@ -1126,20 +1157,48 @@ impl ExecutionPlan for BooleanQueryExec { #[cfg(test)] pub mod tests { - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; use lance_datafusion::datagen::DatafusionDatagenExt; + use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; + use lance_datafusion::utils::PARTITIONS_SEARCHED_METRIC; use lance_datagen::{BatchCount, ByteCount, RowCount}; + use lance_index::metrics::NoOpMetricsCollector; use lance_index::scalar::inverted::query::{ - BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, + BooleanQuery, BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, Occur, Operator, + PhraseQuery, + }; + use lance_index::scalar::inverted::{InvertedIndex, FTS_SCHEMA}; + use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; + use lance_index::{DatasetIndexExt, IndexCriteria, IndexType}; + + use crate::{ + index::DatasetIndexInternalExt, + io::exec::PreFilterSource, + utils::test::{DatagenExt, FragmentCount, FragmentRowCount, NoContextTestFixture}, }; - use lance_index::scalar::inverted::FTS_SCHEMA; - - use crate::{io::exec::PreFilterSource, utils::test::NoContextTestFixture}; use super::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec}; + #[derive(Default)] + struct StatsHolder { + collected_stats: Arc>>, + } + + impl StatsHolder { + fn get_setter(&self) -> ExecutionStatsCallback { + let collected_stats = self.collected_stats.clone(); + Arc::new(move |stats| { + *collected_stats.lock().unwrap() = Some(stats.clone()); + }) + } + + fn consume(self) -> ExecutionSummaryCounts { + self.collected_stats.lock().unwrap().take().unwrap() + } + } + #[test] fn execute_without_context() { // These tests ensure we can create nodes and call execute without a tokio Runtime @@ -1224,4 +1283,135 @@ pub mod tests { let metrics = boost_query.metrics().unwrap(); assert!(metrics.elapsed_compute().unwrap() > 0); } + + #[tokio::test] + async fn test_parts_searched_metrics() { + let mut dataset = lance_datagen::gen_batch() + .col( + "text", + lance_datagen::array::cycle_utf8_literals(&["hello", "lance", "search"]), + ) + .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(5)) + .await + .unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + let index_meta = dataset + .load_scalar_index(IndexCriteria::default().for_column("text").supports_fts()) + .await + .unwrap() + .unwrap(); + let index = dataset + .open_generic_index("text", &index_meta.uuid.to_string(), &NoOpMetricsCollector) + .await + .unwrap(); + let inverted_index = index.as_any().downcast_ref::().unwrap(); + let expected_parts = inverted_index.partition_count(); + + let stats_holder = StatsHolder::default(); + let mut scanner = dataset.scan(); + scanner + .scan_stats_callback(stats_holder.get_setter()) + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new("hello".to_string())) + .unwrap(); + let _ = scanner.try_into_batch().await.unwrap(); + let stats = stats_holder.consume(); + let parts_searched = stats + .all_counts + .get(PARTITIONS_SEARCHED_METRIC) + .copied() + .unwrap_or_default(); + assert_eq!(parts_searched, expected_parts); + + let mut analyze_scanner = dataset.scan(); + analyze_scanner + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new("hello".to_string())) + .unwrap(); + let analysis = analyze_scanner.analyze_plan().await.unwrap(); + assert!(analysis.contains(PARTITIONS_SEARCHED_METRIC)); + } + + #[tokio::test] + async fn test_boolean_query_parts_searched_metrics() { + let mut dataset = lance_datagen::gen_batch() + .col( + "text", + lance_datagen::array::cycle_utf8_literals(&["hello", "lance", "search"]), + ) + .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(5)) + .await + .unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + let index_meta = dataset + .load_scalar_index(IndexCriteria::default().for_column("text").supports_fts()) + .await + .unwrap() + .unwrap(); + let index = dataset + .open_generic_index("text", &index_meta.uuid.to_string(), &NoOpMetricsCollector) + .await + .unwrap(); + let inverted_index = index.as_any().downcast_ref::().unwrap(); + let expected_parts = inverted_index.partition_count(); + + let query = BooleanQuery::new([ + ( + Occur::Should, + MatchQuery::new("hello".to_string()) + .with_operator(Operator::And) + .into(), + ), + ( + Occur::Must, + MatchQuery::new("lance".to_string()) + .with_operator(Operator::And) + .into(), + ), + ]); + let expected_total = expected_parts * 2; + + let mut scanner = dataset.scan(); + scanner + .project(&["text"]) + .unwrap() + .with_row_id() + .full_text_search(FullTextSearchQuery::new_query(query.into())) + .unwrap(); + let analysis = scanner.analyze_plan().await.unwrap(); + let boolean_line = analysis + .lines() + .find(|line| line.contains("BooleanQuery")) + .unwrap(); + assert!( + boolean_line.contains(&format!("{PARTITIONS_SEARCHED_METRIC}={expected_total}")), + "BooleanQuery metrics missing partitions_searched: {boolean_line}" + ); + } }