diff --git a/java/lance-jni/src/merge_insert.rs b/java/lance-jni/src/merge_insert.rs index 19ac731a83c..4e74393dc54 100644 --- a/java/lance-jni/src/merge_insert.rs +++ b/java/lance-jni/src/merge_insert.rs @@ -9,7 +9,7 @@ use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use jni::objects::{JObject, JString, JValueGen}; use jni::sys::jlong; use jni::JNIEnv; -use lance::dataset::scanner::LanceFilter; +use lance::dataset::scanner::ExprFilter; use lance::dataset::{ MergeInsertBuilder, MergeStats, WhenMatched, WhenNotMatched, WhenNotMatchedBySource, }; @@ -158,7 +158,7 @@ fn extract_when_not_matched_by_source_str<'local>( fn extract_when_not_matched_by_source_delete_expr<'local>( env: &mut JNIEnv<'local>, jparam: &JObject, -) -> Result> { +) -> Result> { let when_not_matched_by_source_delete_expr = env .call_method( jparam, @@ -169,7 +169,7 @@ fn extract_when_not_matched_by_source_delete_expr<'local>( .l()?; if let Some(expr) = env.get_string_opt(&when_not_matched_by_source_delete_expr)? { - return Ok(Some(LanceFilter::Sql(expr))); + return Ok(Some(ExprFilter::Sql(expr))); } let when_not_matched_by_source_delete_substrait_expr = env @@ -182,7 +182,7 @@ fn extract_when_not_matched_by_source_delete_expr<'local>( .l()?; match env.get_bytes_opt(&when_not_matched_by_source_delete_substrait_expr)? { - Some(expr) => Ok(Some(LanceFilter::Substrait(expr.to_vec()))), + Some(expr) => Ok(Some(ExprFilter::Substrait(expr.to_vec()))), None => Ok(None), } } @@ -190,7 +190,7 @@ fn extract_when_not_matched_by_source_delete_expr<'local>( fn extract_when_not_matched_by_source( schema: &Schema, when_not_matched_by_source: &str, - when_not_matched_by_source_delete_expr: Option, + when_not_matched_by_source_delete_expr: Option, ) -> Result { match when_not_matched_by_source { "Keep" => Ok(WhenNotMatchedBySource::Keep), diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 08b9192a88e..d2e00cca77a 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -2386,6 +2386,7 @@ pub fn flat_bm25_search( query_tokens: &Tokens, tokenizer: &mut Box, scorer: &mut MemBM25Scorer, + schema: SchemaRef, ) -> std::result::Result { let doc_iter = iter_str_array(&batch[doc_col]); let mut scores = Vec::with_capacity(batch.num_rows()); @@ -2423,7 +2424,7 @@ pub fn flat_bm25_search( let score_col = Arc::new(Float32Array::from(scores)) as ArrayRef; let batch = batch .try_with_column(SCORE_FIELD.clone(), score_col)? - .project_by_schema(&FTS_SCHEMA)?; // the scan node would probably scan some extra columns for prefilter, drop them here + .project_by_schema(&schema)?; Ok(batch) } @@ -2432,6 +2433,7 @@ pub fn flat_bm25_search_stream( doc_col: String, query: String, index: &Option, + schema: SchemaRef, ) -> SendableRecordBatchStream { let mut tokenizer = match index { Some(index) => index.tokenizer(), @@ -2466,10 +2468,18 @@ pub fn flat_bm25_search_stream( None => MemBM25Scorer::new(0, 0, HashMap::new()), }; + let batch_schema = schema.clone(); let stream = input.map(move |batch| { let batch = batch?; - let batch = flat_bm25_search(batch, &doc_col, &tokens, &mut tokenizer, &mut bm25_scorer)?; + let batch = flat_bm25_search( + batch, + &doc_col, + &tokens, + &mut tokenizer, + &mut bm25_scorer, + batch_schema.clone(), + )?; // filter out rows with score 0 let score_col = batch[SCORE_COL].as_primitive::(); @@ -2483,7 +2493,7 @@ pub fn flat_bm25_search_stream( Ok(batch) }); - Box::pin(RecordBatchStreamAdapter::new(FTS_SCHEMA.clone(), stream)) as SendableRecordBatchStream + Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream } pub fn is_phrase_query(query: &str) -> bool { diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 15c77a8c2a8..4c22b75d0b1 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1754,7 +1754,7 @@ impl FileFragment { // else if predicate is `false`, filter the predicate // We do this on the expression level after expression optimization has // occurred so we also catch expressions that are equivalent to `true` - if let Some(predicate) = &scanner.get_filter()? { + if let Some(predicate) = &scanner.get_expr_filter()? { if matches!( predicate, Expr::Literal(ScalarValue::Boolean(Some(false)), _) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 0117584117b..31e7a31d68d 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -12,7 +12,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaR use arrow_select::concat::concat_batches; use async_recursion::async_recursion; use chrono::Utc; -use datafusion::common::{exec_datafusion_err, DFSchema, NullEquality, SchemaExt}; +use datafusion::common::{exec_datafusion_err, DFSchema, JoinType, NullEquality, SchemaExt}; use datafusion::functions_aggregate; use datafusion::functions_aggregate::count::count_udaf; use datafusion::logical_expr::{col, lit, Expr, ScalarUDF}; @@ -36,13 +36,15 @@ use datafusion_expr::ExprSchemable; use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, expressions::Column}; use datafusion_physical_expr::{create_physical_expr, LexOrdering, Partitioning, PhysicalExpr}; +use datafusion_physical_plan::joins::PartitionMode; +use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{empty::EmptyExec, joins::HashJoinExec}; use futures::future::BoxFuture; use futures::stream::{Stream, StreamExt}; use futures::{FutureExt, TryStreamExt}; use lance_arrow::floats::{coerce_float_vector, FloatType}; -use lance_arrow::DataTypeExt; +use lance_arrow::{DataTypeExt, SchemaExt as ArrowSchemaExt}; use lance_core::datatypes::{ escape_field_path_for_project, format_field_path, BlobHandling, Field, OnMissing, Projection, }; @@ -59,9 +61,9 @@ use lance_datafusion::projection::ProjectionPlan; use lance_file::reader::FileReaderOptions; use lance_index::scalar::expression::{IndexExprResult, PlannerIndexExt, INDEX_EXPR_RESULT_SCHEMA}; use lance_index::scalar::inverted::query::{ - fill_fts_query_column, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, + fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, PhraseQuery, }; -use lance_index::scalar::inverted::SCORE_COL; +use lance_index::scalar::inverted::{SCORE_COL, SCORE_FIELD}; use lance_index::scalar::FullTextSearchQuery; use lance_index::vector::{Query, DIST_COL}; use lance_index::IndexCriteria; @@ -86,17 +88,17 @@ use crate::io::exec::knn::MultivectorScoringExec; use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec}; use crate::io::exec::{get_physical_optimizer, AddRowOffsetExec, LanceFilterExec, LanceScanConfig}; use crate::io::exec::{ - knn::new_knn_exec, project, AddRowAddrExec, FilterPlan, KNNVectorDistanceExec, - LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec, + knn::new_knn_exec, project, AddRowAddrExec, FilterPlan as ExprFilterPlan, + KNNVectorDistanceExec, LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, + ScanConfig, TakeExec, }; use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec}; use crate::{Error, Result}; -use snafu::location; - pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; #[cfg(feature = "substrait")] use lance_datafusion::substrait::parse_substrait; +use snafu::location; pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192; // For backwards compatibility / historical reasons we re-calculate the default batch size @@ -229,9 +231,136 @@ struct PlannedFilteredScan { filter_pushed_down: bool, } -/// Filter for filtering rows +pub struct FilterPlan { + // Query filter plan + query_filter: Option, + refine_query_filter: bool, + // Expr filter plan + expr_filter_plan: ExprFilterPlan, +} + +impl FilterPlan { + pub fn new(query_filter: Option, expr_filter_plan: ExprFilterPlan) -> Self { + Self { + query_filter, + refine_query_filter: false, + expr_filter_plan, + } + } + + pub fn disable_refine(&mut self) { + self.expr_filter_plan = ExprFilterPlan::default(); + self.refine_query_filter = false; + } + + pub fn make_refine_only(&mut self) { + self.expr_filter_plan.make_refine_only(); + self.refine_query_filter = true; + } + + pub fn fts_filter(&self) -> Option { + match &self.query_filter { + Some(QueryFilter::Fts(query)) => Some(query.clone()), + _ => None, + } + } + + pub fn vector_filter(&self) -> Option { + match &self.query_filter { + Some(QueryFilter::Vector(query)) => Some(query.clone()), + _ => None, + } + } + + pub fn has_refine(&self) -> bool { + self.expr_filter_plan.has_refine() || self.refine_query_filter + } + + pub async fn refine_columns(&self, dataset: &Arc) -> Result> { + let mut columns = vec![]; + + if self.expr_filter_plan.has_refine() { + columns.extend(self.expr_filter_plan.refine_columns()); + } + + if self.refine_query_filter { + match &self.query_filter { + Some(QueryFilter::Fts(fts_query)) => { + let cols = if fts_query.columns().is_empty() { + let indexed_columns = fts_indexed_columns(dataset.clone()).await?; + let q = fill_fts_query_column(&fts_query.query, &indexed_columns, false)?; + q.columns() + } else { + fts_query.columns() + }; + + // Add refine column for match query since it supports `FlatMatchQueryExec`. + // Other fts query use join so we don't need to add refine column. + if let FtsQuery::Match(_) = &fts_query.query { + columns.extend(cols.iter().cloned().collect::>()); + } + } + Some(QueryFilter::Vector(vector_query)) => { + columns.push(vector_query.column.clone()); + } + None => {} + } + } + + Ok(columns) + } + + pub async fn refine_filter( + &self, + input: Arc, + scanner: &Scanner, + ) -> Result> { + let mut plan = input; + + if self.refine_query_filter { + match &self.query_filter { + Some(QueryFilter::Fts(fts_query)) => { + plan = scanner.flat_fts(plan, fts_query).await?; + } + Some(QueryFilter::Vector(vector_query)) => { + plan = scanner.flat_knn(plan, vector_query)?; + } + None => {} + } + } + + if let Some(refine_expr) = &self.expr_filter_plan.refine_expr { + // We create a new planner specific to the node's schema, since + // physical expressions reference column by index rather than by name. + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); + } + + Ok(plan) + } +} + +#[derive(Debug, Clone, Default)] +pub struct LanceFilter { + query_filter: Option, + expr_filter: Option, +} + +impl LanceFilter { + pub fn is_none(&self) -> bool { + self.query_filter.is_none() && self.expr_filter.is_none() + } +} + +/// Query filter for filtering rows +#[derive(Debug, Clone)] +pub enum QueryFilter { + Fts(FullTextSearchQuery), + Vector(Query), +} + +/// Expr filter for filtering rows #[derive(Debug, Clone)] -pub enum LanceFilter { +pub enum ExprFilter { /// The filter is an SQL string Sql(String), /// The filter is a Substrait expression @@ -240,7 +369,7 @@ pub enum LanceFilter { Datafusion(Expr), } -impl LanceFilter { +impl ExprFilter { /// Converts the filter to a Datafusion expression /// /// The schema for this conversion should be the full schema available to @@ -335,8 +464,8 @@ pub struct Scanner { /// Materialization style controls when columns are fetched materialization_style: MaterializationStyle, - /// Optional filter expression. - filter: Option, + /// Filter. + filter: LanceFilter, /// Optional full text search query full_text_query: Option, @@ -610,7 +739,7 @@ impl Scanner { blob_handling: BlobHandling::default(), prefilter: false, materialization_style: MaterializationStyle::Heuristic, - filter: None, + filter: LanceFilter::default(), full_text_query: None, batch_size: None, batch_readahead: get_num_compute_intensive_cpus(), @@ -792,7 +921,30 @@ impl Scanner { /// Once the filter is applied, Lance will create an optimized I/O plan for filtering. /// pub fn filter(&mut self, filter: &str) -> Result<&mut Self> { - self.filter = Some(LanceFilter::Sql(filter.to_string())); + self.filter.expr_filter = Some(ExprFilter::Sql(filter.to_string())); + Ok(self) + } + + /// Apply fts/vector query as filter. + /// + /// * Vector query filter can only be applied to full text search. + /// * Fts query filter can only be applied to vector search. + /// * Query filter couldn't be applied to normal query. + /// + /// ```rust,ignore + /// let dataset = Dataset::open(uri).await.unwrap(); + /// let query_vector = Float32Array::from(vec![300f32, 300f32, 300f32, 300f32]); + /// let stream = dataset.scan() + /// .nearest("vector", &query_vector, 5) + /// .project(&["col", "col2.subfield"]).unwrap() + /// .query_filter(QueryFilter::Fts(FullTextSearchQuery::new( + /// "hello".to_string(), + /// ))).unwrap() + /// .limit(10) + /// .into_stream(); + /// ``` + pub fn filter_query(&mut self, filter: QueryFilter) -> Result<&mut Self> { + self.filter.query_filter = Some(filter); Ok(self) } @@ -831,12 +983,12 @@ impl Scanner { /// The message must contain exactly one expression and that expression /// must be a scalar expression whose return type is boolean. pub fn filter_substrait(&mut self, filter: &[u8]) -> Result<&mut Self> { - self.filter = Some(LanceFilter::Substrait(filter.to_vec())); + self.filter.expr_filter = Some(ExprFilter::Substrait(filter.to_vec())); Ok(self) } pub fn filter_expr(&mut self, filter: Expr) -> &mut Self { - self.filter = Some(LanceFilter::Datafusion(filter)); + self.filter.expr_filter = Some(ExprFilter::Datafusion(filter)); self } @@ -1347,14 +1499,14 @@ impl Scanner { Ok(plan.schema()) } - /// Fetches the currently set filter + /// Fetches the currently set expr filter /// /// Note that this forces the filter to be evaluated and the result will depend on /// the current state of the scanner (e.g. if with_row_id has been called then _rowid /// will be available for filtering but not otherwise) and so you may want to call this /// after setting all other options. - pub fn get_filter(&self) -> Result> { - if let Some(filter) = &self.filter { + pub fn get_expr_filter(&self) -> Result> { + if let Some(filter) = &self.filter.expr_filter { let filter_schema = self.filterable_schema()?; Ok(Some(filter.to_datafusion( self.dataset.schema(), @@ -1635,7 +1787,7 @@ impl Scanner { // Note: only add columns that we actually need to read fn calc_eager_projection( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, desired_projection: &Projection, ) -> Result { // Note: We use all_columns and not refine_columns here. If a column is covered by an index but @@ -1676,11 +1828,12 @@ impl Scanner { let filter_schema = self.filterable_schema()?; let planner = Planner::new(Arc::new(filter_schema.as_ref().into())); - if let Some(filter) = self.filter.as_ref() { - let filter = filter.to_datafusion(self.dataset.schema(), filter_schema.as_ref())?; + // Check expr filter + let filter_plan = if let Some(filter) = self.filter.expr_filter.as_ref() { + let expr = filter.to_datafusion(self.dataset.schema(), filter_schema.as_ref())?; let index_info = self.dataset.scalar_index_info().await?; let filter_plan = - planner.create_filter_plan(filter.clone(), &index_info, use_scalar_index)?; + planner.create_filter_plan(expr.clone(), &index_info, use_scalar_index)?; // This tests if any of the fragments are missing the physical_rows property (old style) // If they are then we cannot use scalar indices @@ -1700,19 +1853,47 @@ impl Scanner { if has_missing_row_count { // We need row counts to use scalar indices. If we don't have them then // fallback to a non-indexed filter - Ok(planner.create_filter_plan(filter.clone(), &index_info, false)?) + let filter_plan = + planner.create_filter_plan(expr.clone(), &index_info, false)?; + FilterPlan::new(self.filter.query_filter.clone(), filter_plan) } else { - Ok(filter_plan) + FilterPlan::new(self.filter.query_filter.clone(), filter_plan) } } else { - Ok(filter_plan) + FilterPlan::new(self.filter.query_filter.clone(), filter_plan) } } else { - Ok(FilterPlan::default()) + FilterPlan::new(self.filter.query_filter.clone(), ExprFilterPlan::default()) + }; + + // Check query filter + if filter_plan.query_filter.is_some() + && self.nearest.is_none() + && self.full_text_query.is_none() + { + return Err(Error::InvalidInput { + source: "Query filter can only be used with full text search or vector search" + .into(), + location: location!(), + }); + } + if self.nearest.is_some() && filter_plan.vector_filter().is_some() { + return Err(Error::InvalidInput { + source: "Query filter can't be used with vector search".into(), + location: location!(), + }); } + if self.full_text_query.is_some() && filter_plan.fts_filter().is_some() { + return Err(Error::InvalidInput { + source: "Fts filter can't be used with fts search".into(), + location: location!(), + }); + } + + Ok(filter_plan) } - async fn get_scan_range(&self, filter_plan: &FilterPlan) -> Result>> { + async fn get_scan_range(&self, filter_plan: &ExprFilterPlan) -> Result>> { if filter_plan.has_any_filter() { // If there is a filter we can't pushdown limit / offset Ok(None) @@ -1822,23 +2003,26 @@ impl Scanner { } let take_op = filter_plan + .expr_filter_plan .full_expr .as_ref() .and_then(TakeOperation::try_from_expr); if let Some((take_op, remainder)) = take_op { // If there is any remainder use it as the filter (we don't even try and combine an indexed // search on the filter with a take as that seems excessive) - filter_plan = remainder - .map(FilterPlan::new_refine_only) - .unwrap_or(FilterPlan::default()); + filter_plan.expr_filter_plan = remainder + .map(ExprFilterPlan::new_refine_only) + .unwrap_or(ExprFilterPlan::default()); self.take_source(take_op).await? } else { - let planned_read = self.filtered_read_source(&mut filter_plan).await?; + let planned_read = self + .filtered_read_source(&mut filter_plan.expr_filter_plan) + .await?; if planned_read.limit_pushed_down { use_limit_node = false; } if planned_read.filter_pushed_down { - filter_plan = FilterPlan::default(); + filter_plan.disable_refine(); } planned_read.plan } @@ -1859,8 +2043,10 @@ impl Scanner { // an indexed scan. if filter_plan.has_refine() { // It's ok for some filter columns to be missing (e.g. _rowid) - pre_filter_projection = pre_filter_projection - .union_columns(filter_plan.refine_columns(), OnMissing::Ignore)?; + pre_filter_projection = pre_filter_projection.union_columns( + filter_plan.refine_columns(&self.dataset).await?, + OnMissing::Ignore, + )?; } // TODO: Does it always make sense to take the ordering columns here? If there is a filter then @@ -1877,11 +2063,7 @@ impl Scanner { plan = self.take(plan, pre_filter_projection)?; // Stage 2: filter - if let Some(refine_expr) = filter_plan.refine_expr { - // We create a new planner specific to the node's schema, since - // physical expressions reference column by index rather than by name. - plan = Arc::new(LanceFilterExec::try_new(refine_expr, plan)?); - } + plan = filter_plan.refine_filter(plan, self).await?; // Stage 3: sort if let Some(ordering) = &self.ordering { @@ -1948,7 +2130,7 @@ impl Scanner { } // Check if a filter plan references version columns - fn filter_references_version_columns(&self, filter_plan: &FilterPlan) -> bool { + fn filter_references_version_columns(&self, filter_plan: &ExprFilterPlan) -> bool { use lance_core::{ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION}; if let Some(refine_expr) = &filter_plan.refine_expr { @@ -1969,7 +2151,7 @@ impl Scanner { // First return value is the plan, second is whether the limit was pushed down async fn legacy_filtered_read( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, projection: Projection, make_deletions_null: bool, fragments: Option>>, @@ -2065,7 +2247,7 @@ impl Scanner { // Do not call this directly, use filtered_read instead async fn new_filtered_read( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, projection: Projection, make_deletions_null: bool, fragments: Option>>, @@ -2116,7 +2298,7 @@ impl Scanner { // Delegates to legacy or new filtered read based on dataset storage version async fn filtered_read( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, projection: Projection, make_deletions_null: bool, fragments: Option>>, @@ -2193,7 +2375,7 @@ impl Scanner { async fn filtered_read_source( &self, - filter_plan: &mut FilterPlan, + filter_plan: &mut ExprFilterPlan, ) -> Result { log::trace!("source is a filtered read"); let mut projection = if filter_plan.has_refine() { @@ -2247,15 +2429,23 @@ impl Scanner { // The source is an FTS search if self.prefilter { + let source: Arc = match &filter_plan.vector_filter() { + Some(vector_query) => { + let vector_plan = self + .vector_search(&filter_plan.expr_filter_plan, vector_query) + .await?; + self.flat_fts(vector_plan, query).await? + } + None => self.fts(&filter_plan.expr_filter_plan, query).await?, + }; // If we are prefiltering then the fts node will take care of the filter - let source = self.fts(filter_plan, query).await?; - *filter_plan = FilterPlan::default(); + filter_plan.disable_refine(); Ok(source) } else { // If we are postfiltering then we can't use scalar indices for the filter // and will need to run the postfilter in memory filter_plan.make_refine_only(); - self.fts(&FilterPlan::default(), query).await + self.fts(&ExprFilterPlan::default(), query).await } } @@ -2269,19 +2459,41 @@ impl Scanner { location: location!(), }); } + let Some(query) = self.nearest.as_ref() else { + return Err(Error::invalid_input( + "No nearest query".to_string(), + location!(), + )); + }; if self.prefilter { log::trace!("source is a vector search (prefilter)"); // If we are prefiltering then the ann / knn node will take care of the filter - let source = self.vector_search(filter_plan).await?; - *filter_plan = FilterPlan::default(); + let source: Arc = match &filter_plan.fts_filter() { + Some(fts_query) => { + let fts_plan = self.fts(&filter_plan.expr_filter_plan, fts_query).await?; + let projection = self + .dataset + .empty_projection() + .union_column(&query.column, OnMissing::Error)?; + let plan = self.take(fts_plan, projection)?; + + self.flat_knn(plan, query)? + } + None => { + self.vector_search(&filter_plan.expr_filter_plan, query) + .await? + } + }; + + filter_plan.disable_refine(); Ok(source) } else { log::trace!("source is a vector search (postfilter)"); // If we are postfiltering then we can't use scalar indices for the filter // and will need to run the postfilter in memory filter_plan.make_refine_only(); - self.vector_search(&FilterPlan::default()).await + self.vector_search(&ExprFilterPlan::default(), query).await } } @@ -2397,7 +2609,7 @@ impl Scanner { // Create an execution plan to do full text search async fn fts( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, query: &FullTextSearchQuery, ) -> Result> { let columns = query.columns(); @@ -2414,49 +2626,7 @@ impl Scanner { let query = if columns.is_empty() { // the field is not specified, // try to search over all indexed fields including nested ones - let mut indexed_columns = Vec::new(); - for field in self.dataset.schema().fields_pre_order() { - // Check if this field is a string type that could have an inverted index - let is_string_field = match field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => true, - DataType::List(inner_field) | DataType::LargeList(inner_field) => { - matches!( - inner_field.data_type(), - DataType::Utf8 | DataType::LargeUtf8 - ) - } - _ => false, - }; - - if is_string_field { - // Build the full field path for nested fields - let column_path = if let Some(ancestors) = - self.dataset.schema().field_ancestry_by_id(field.id) - { - let field_refs: Vec<&str> = - ancestors.iter().map(|f| f.name.as_str()).collect(); - format_field_path(&field_refs) - } else { - continue; // Skip if we can't find the field ancestry - }; - - // Check if this field has an inverted index - let has_fts_index = self - .dataset - .load_scalar_index( - IndexCriteria::default() - .for_column(&column_path) - .supports_fts(), - ) - .await? - .is_some(); - - if has_fts_index { - indexed_columns.push(column_path); - } - } - } - + let indexed_columns = fts_indexed_columns(self.dataset.clone()).await?; fill_fts_query_column(&query.query, &indexed_columns, false)? } else { query.query.clone() @@ -2481,7 +2651,7 @@ impl Scanner { &self, query: &FtsQuery, params: &FtsSearchParams, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, prefilter_source: &PreFilterSource, ) -> Result> { let plan: Arc = match query { @@ -2720,7 +2890,7 @@ impl Scanner { &self, query: &MatchQuery, params: &FtsSearchParams, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, prefilter_source: &PreFilterSource, ) -> Result> { let column = query @@ -2796,7 +2966,7 @@ impl Scanner { fragments: Vec, query: &MatchQuery, params: &FtsSearchParams, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, ) -> Result> { let column = query .column @@ -2835,21 +3005,18 @@ impl Scanner { query.clone(), params.clone(), scan_node, + FTS_SCHEMA.clone(), )); Ok(flat_match_plan) } // ANN/KNN search execution node with optional prefilter - async fn vector_search(&self, filter_plan: &FilterPlan) -> Result> { - let Some(nearest) = self.nearest.as_ref() else { - return Err(Error::invalid_input( - "No nearest query".to_string(), - location!(), - )); - }; - - // Clone so we can adjust the metric type based on the available index. - let mut q = nearest.clone(); + async fn vector_search( + &self, + filter_plan: &ExprFilterPlan, + q: &Query, + ) -> Result> { + let mut q = q.clone(); // Sanity check let (vector_type, element_type) = get_vector_type(self.dataset.schema(), &q.column)?; @@ -2948,7 +3115,7 @@ impl Scanner { q: &Query, index: &IndexMetadata, mut knn_node: Arc, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, ) -> Result> { // Check if we've created new versions since the index was built. let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; @@ -3085,7 +3252,7 @@ impl Scanner { async fn scalar_indexed_scan( &self, projection: Projection, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, fragments: Arc>, ) -> Result> { log::trace!("scalar indexed scan"); @@ -3298,7 +3465,7 @@ impl Scanner { fn pushdown_scan( &self, make_deletions_null: bool, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, ) -> Result> { log::trace!("pushdown_scan"); @@ -3332,6 +3499,88 @@ impl Scanner { )?)) } + async fn flat_fts( + &self, + input: Arc, + q: &FullTextSearchQuery, + ) -> Result> { + let fts_query = if q.columns().is_empty() { + let indexed_columns = fts_indexed_columns(self.dataset.clone()).await?; + fill_fts_query_column(&q.query, &indexed_columns, false)? + } else { + q.query.clone() + }; + + match &fts_query { + FtsQuery::Match(match_query) => { + let schema = Arc::new((input.schema()).try_with_column(SCORE_FIELD.clone())?); + + let column = match_query + .column + .as_ref() + .ok_or(Error::invalid_input( + "the column must be specified in the query".to_string(), + location!(), + ))? + .clone(); + let input = if schema.column_with_name(&column).is_none() { + let projection = self + .dataset + .empty_projection() + .union_column(&column, OnMissing::Error)?; + self.take(input, projection)? + } else { + input + }; + + Ok(Arc::new(FlatMatchQueryExec::new( + self.dataset.clone(), + match_query.clone(), + q.params(), + input, + schema, + ))) + } + _ => { + let default_filter = ExprFilterPlan::default(); + let fts_plan = self.fts(&default_filter, q).await?; + + let vector_row_id = Column::new_with_schema(ROW_ID, input.schema().as_ref())?; + let fts_row_id = Column::new_with_schema(ROW_ID, fts_plan.schema().as_ref())?; + let join = HashJoinExec::try_new( + input, + fts_plan, + vec![(Arc::new(vector_row_id), Arc::new(fts_row_id))], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; + + let schema = join.schema(); + let mut projection_exprs = Vec::new(); + let mut contain_rowid = false; + for field in schema.fields() { + if field.name() == ROW_ID { + if contain_rowid { + continue; + } + contain_rowid = true; + } + projection_exprs.push(( + Arc::new(Column::new_with_schema(field.name(), schema.as_ref())?) + as Arc, + field.name().clone(), + )); + } + + let projection_exec = ProjectionExec::try_new(projection_exprs, Arc::new(join))?; + Ok(Arc::new(projection_exec)) + } + } + } + /// Add a knn search node to the input plan fn flat_knn(&self, input: Arc, q: &Query) -> Result> { let flat_dist = Arc::new(KNNVectorDistanceExec::try_new( @@ -3441,7 +3690,7 @@ impl Scanner { &self, q: &Query, index: &[IndexMetadata], - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, ) -> Result> { let prefilter_source = self .prefilter_source(filter_plan, self.get_indexed_frags(index)) @@ -3472,7 +3721,7 @@ impl Scanner { &self, q: &Query, index: &[IndexMetadata], - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, ) -> Result> { // we split the query procedure into two steps: // 1. collect the candidates by vector searching on each query vector @@ -3557,7 +3806,7 @@ impl Scanner { /// for the search. A prefilter is calculated by doing a filtered read of the row id column. async fn prefilter_source( &self, - filter_plan: &FilterPlan, + filter_plan: &ExprFilterPlan, required_frags: RoaringBitmap, ) -> Result { if filter_plan.is_empty() { @@ -3670,6 +3919,51 @@ impl Scanner { } } +// Search over all indexed fields including nested ones, collecting columns that have an +// inverted index +async fn fts_indexed_columns(dataset: Arc) -> Result> { + let mut indexed_columns = Vec::new(); + for field in dataset.schema().fields_pre_order() { + // Check if this field is a string type that could have an inverted index + let is_string_field = match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => true, + DataType::List(inner_field) | DataType::LargeList(inner_field) => { + matches!( + inner_field.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + ) + } + _ => false, + }; + + if is_string_field { + // Build the full field path for nested fields + let column_path = + if let Some(ancestors) = dataset.schema().field_ancestry_by_id(field.id) { + let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect(); + format_field_path(&field_refs) + } else { + continue; // Skip if we can't find the field ancestry + }; + + // Check if this field has an inverted index + let has_fts_index = dataset + .load_scalar_index( + IndexCriteria::default() + .for_column(&column_path) + .supports_fts(), + ) + .await? + .is_some(); + + if has_fts_index { + indexed_columns.push(column_path); + } + } + } + Ok(indexed_columns) +} + /// [`DatasetRecordBatchStream`] wraps the dataset into a [`RecordBatchStream`] for /// consumption by the user. /// @@ -4173,7 +4467,7 @@ mod test { assert!(scan.filter.is_none()); scan.filter("i > 50")?; - assert_eq!(scan.get_filter().unwrap(), Some(col("i").gt(lit(50)))); + assert_eq!(scan.get_expr_filter().unwrap(), Some(col("i").gt(lit(50)))); for use_stats in [false, true] { let batches = scan diff --git a/rust/lance/src/dataset/tests/dataset_scanner.rs b/rust/lance/src/dataset/tests/dataset_scanner.rs new file mode 100644 index 00000000000..e80f3f8d46c --- /dev/null +++ b/rust/lance/src/dataset/tests/dataset_scanner.rs @@ -0,0 +1,428 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; +use std::vec; + +use crate::index::vector::VectorIndexParams; +use lance_arrow::FixedSizeListArrayExt; + +use arrow::compute::concat_batches; +use arrow_array::{Array, FixedSizeListArray}; +use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef}; +use futures::TryStreamExt; +use lance_arrow::SchemaExt; +use lance_index::scalar::inverted::{ + query::PhraseQuery, tokenizer::InvertedIndexParams, SCORE_FIELD, +}; +use lance_index::scalar::FullTextSearchQuery; +use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; +use lance_linalg::distance::MetricType; + +use crate::dataset::scanner::{DatasetRecordBatchStream, QueryFilter}; +use crate::Dataset; +use lance_index::scalar::inverted::query::FtsQuery; +use lance_index::vector::ivf::IvfBuildParams; +use lance_index::vector::pq::PQBuildParams; +use lance_index::vector::Query; +use pretty_assertions::assert_eq; + +#[tokio::test] +async fn test_vector_filter_fts_search() { + let dataset = prepare_query_filter_dataset().await; + let schema: ArrowSchema = dataset.schema().into(); + + let query_vector = Arc::new(Float32Array::from(vec![300f32, 300f32, 300f32, 300f32])); + let vector_query = Query { + column: "vector".to_string(), + key: query_vector, + k: 5, + lower_bound: None, + upper_bound: None, + minimum_nprobes: 20, + maximum_nprobes: None, + ef: None, + refine_factor: None, + metric_type: MetricType::L2, + use_index: true, + dist_q_c: 0.0, + }; + + // Case 1: search with prefilter=true, query_filter=vector([300,300,300,300]) + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new("text".to_string())) + .unwrap() + .prefilter(true) + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[300, 299], + ) + .await; + + // Case 2: search with prefilter=true, query_filter=vector([300,300,300,300]), filter="category='geography'" + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new("text".to_string())) + .unwrap() + .prefilter(true) + .filter("category='geography'") + .unwrap() + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[300], + ) + .await; + + // Case 3: search with prefilter=true, phrase query, query_filter=vector([300,300,300,300]) + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new_query(FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ))) + .unwrap() + .prefilter(true) + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[299, 300], + ) + .await; + + // Case 4: search with prefilter=true, phrase query, query_filter=vector([300,300,300,300]), filter="category='geography'" + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new_query(FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ))) + .unwrap() + .prefilter(true) + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .filter("category='geography'") + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[300], + ) + .await; + + // Case 5: search with prefilter=false, phrase query, query_filter=vector([300,300,300,300]) + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new_query(FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ))) + .unwrap() + .prefilter(false) + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[300, 299, 255, 254, 253], + ) + .await; + + // Case 6: search with prefilter=false, phrase query, query_filter=vector([300,300,300,300]), filter="category='geography'" + let mut scanner = dataset.scan(); + let stream = scanner + .full_text_search(FullTextSearchQuery::new_query(FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ))) + .unwrap() + .prefilter(false) + .filter("category='geography'") + .unwrap() + .filter_query(QueryFilter::Vector(vector_query.clone())) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema.try_with_column(SCORE_FIELD.clone()).unwrap().into(), + &[300, 255], + ) + .await; +} + +#[tokio::test] +async fn test_fts_filter_vector_search() { + let dataset = prepare_query_filter_dataset().await; + let schema: ArrowSchema = dataset.schema().into(); + + // Case 1: search with prefilter=true, query_filter=match("text") + let query_vector = Float32Array::from(vec![300f32, 300f32, 300f32, 300f32]); + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(true) + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new( + "text".to_string(), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[300, 299, 255, 254, 253], + ) + .await; + + // Case 2: search with prefilter=true, query_filter=match("text"), filter="category='geography'" + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(true) + .filter("category='geography'") + .unwrap() + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new( + "text".to_string(), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[300, 255, 252, 249, 246], + ) + .await; + + // Case 3: search with prefilter=false, query_filter=match("text") + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(false) + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new( + "text".to_string(), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[300, 299], + ) + .await; + + // Case 4: search with prefilter=false, query_filter=match("text"), filter="category='geography'" + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(false) + .filter("category='geography'") + .unwrap() + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new( + "text".to_string(), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[300], + ) + .await; + + // Case 5: search with prefilter=false, query_filter=phrase("text") + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(false) + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new_query( + FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[299, 300], + ) + .await; + + // Case 6: search with prefilter=false, query_filter=phrase("text") + let mut scanner = dataset.scan(); + let stream = scanner + .nearest("vector", &query_vector, 5) + .unwrap() + .prefilter(false) + .filter("category='geography'") + .unwrap() + .filter_query(QueryFilter::Fts(FullTextSearchQuery::new_query( + FtsQuery::Phrase( + PhraseQuery::new("text".to_string()).with_column(Some("text".to_string())), + ), + ))) + .unwrap() + .try_into_stream() + .await + .unwrap(); + check_results( + stream, + schema + .try_with_column(ArrowField::new(DIST_COL, DataType::Float32, true)) + .unwrap() + .into(), + &[300], + ) + .await; +} + +async fn prepare_query_filter_dataset() -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new( + "vector", + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + 4, + ), + true, + ), + ArrowField::new("text", DataType::Utf8, false), + ArrowField::new("category", DataType::Utf8, false), + ])); + + // Prepare dataset + let mut vectors = vec![]; + for i in 1..=300 { + vectors.extend(vec![i as f32; 4]); + } + + // id 256..298 has noop, others has text + let mut text = vec![]; + for i in 1..=255 { + text.push(format!("text {}", i)); + } + for i in 256..=298 { + text.push(format!("noop {}", i)); + } + text.extend(vec!["text 299".to_string(), "text 300".to_string()]); + + let mut category = vec![]; + for i in 1..=300 { + if i % 3 == 1 { + category.push("literature".to_string()); + } else if i % 3 == 2 { + category.push("science".to_string()); + } else { + category.push("geography".to_string()); + } + } + + let vectors = Float32Array::from(vectors); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(1..=300)), + Arc::new(FixedSizeListArray::try_new_from_values(vectors, 4).unwrap()), + Arc::new(StringArray::from(text)), + Arc::new(StringArray::from(category)), + ], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap(); + + // Create index + let params = VectorIndexParams::with_ivf_pq_params( + MetricType::L2, + IvfBuildParams::new(2), + PQBuildParams::new(4, 8), + ); + dataset + .create_index(&["vector"], IndexType::Vector, None, ¶ms, true) + .await + .unwrap(); + + dataset + .create_index( + &["text"], + IndexType::Inverted, + None, + &InvertedIndexParams::default().with_position(true), + true, + ) + .await + .unwrap(); + + dataset +} + +async fn check_results( + stream: DatasetRecordBatchStream, + expected_schema: SchemaRef, + expected_ids: &[i32], +) { + let results = stream.try_collect::>().await.unwrap(); + let batch = concat_batches(&results[0].schema(), &results).unwrap(); + assert_eq!(batch.schema(), expected_schema); + + let ids = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), expected_ids); +} diff --git a/rust/lance/src/dataset/tests/mod.rs b/rust/lance/src/dataset/tests/mod.rs index a13c159e0ba..a1197dcc198 100644 --- a/rust/lance/src/dataset/tests/mod.rs +++ b/rust/lance/src/dataset/tests/mod.rs @@ -8,6 +8,7 @@ mod dataset_index; mod dataset_io; mod dataset_merge_update; mod dataset_migrations; +mod dataset_scanner; mod dataset_schema_evolution; mod dataset_transactions; mod dataset_versioning; diff --git a/rust/lance/src/dataset/write/delete.rs b/rust/lance/src/dataset/write/delete.rs index eafa4ed2232..66e1da10d4b 100644 --- a/rust/lance/src/dataset/write/delete.rs +++ b/rust/lance/src/dataset/write/delete.rs @@ -167,7 +167,7 @@ impl RetryExecutor for DeleteJob { // Check if the filter optimized to true (delete everything) or false (delete nothing) let (updated_fragments, deleted_fragment_ids, affected_rows) = if let Some(filter_expr) = - scanner.get_filter()? + scanner.get_expr_filter()? { if matches!( filter_expr, diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index b5ff3a2894f..6fc3ce9e3d4 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::datatypes::{Float32Type, UInt64Type}; use arrow_array::{Float32Array, RecordBatch, UInt64Array}; +use arrow_schema::SchemaRef; use datafusion::common::Statistics; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::SendableRecordBatchStream; @@ -364,9 +365,10 @@ impl FlatMatchQueryExec { query: MatchQuery, params: FtsSearchParams, unindexed_input: Arc, + schema: SchemaRef, ) -> Self { let properties = PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), + EquivalenceProperties::new(schema), Partitioning::RoundRobinBatch(1), EmissionType::Incremental, Boundedness::Bounded, @@ -433,6 +435,7 @@ impl ExecutionPlan for FlatMatchQueryExec { let unindexed_input = document_input(self.unindexed_input.execute(partition, context)?, &column)?; + let schema = self.schema(); let stream = stream::once(async move { let index_meta = ds .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) @@ -453,6 +456,7 @@ impl ExecutionPlan for FlatMatchQueryExec { column, query.terms, &inverted_idx, + schema, )) }) .try_flatten_unordered(None) @@ -1130,6 +1134,7 @@ pub mod tests { use lance_index::scalar::inverted::query::{ BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery, }; + use lance_index::scalar::inverted::FTS_SCHEMA; use crate::{io::exec::PreFilterSource, utils::test::NoContextTestFixture}; @@ -1165,6 +1170,7 @@ pub mod tests { MatchQuery::new("blah".to_string()).with_column(Some("text".to_string())), FtsSearchParams::default(), flat_input, + FTS_SCHEMA.clone(), ); flat_match_query .execute(0, Arc::new(TaskContext::default()))