diff --git a/protos/table.proto b/protos/table.proto index 98ccb65d34f..5903fc19c0f 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -257,6 +257,15 @@ message IndexMetadata { // // Indices should avoid putting large amounts of information in this field, as it will // bloat the manifest. + // + // Indexes are plugins, and so the format of the details message is flexible and not fully + // defined by the table format. However, there are some conventions that should be followed: + // + // - When Lance APIs refer to indexes they will use the type URL of the index details as the + // identifier for the index type. If a user provides a simple string identifier like + // "btree" then it will be converted to "/lance.table.BTreeIndexDetails" + // - Type URLs comparisons are case-insensitive. Thereform an index must have a unique type + // URL ignoring case. google.protobuf.Any index_details = 6; // The minimum lance version that this index is compatible with. diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index d9a96a86c85..8700bd268dd 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -75,6 +75,7 @@ from .commit import CommitLock from .io import StorageOptionsProvider + from .lance.indices import IndexDescription from .progress import FragmentWriteProgress from .types import ReaderLike @@ -645,8 +646,27 @@ def checkout_latest(self): self._ds.checkout_latest() def list_indices(self) -> List[Index]: + """ + Returns index information for all indices in the dataset. + + This method is deprecated as it requires loading the statistics for each index + which can be a very expensive operation. Instead use describe_indices() to + list index information and index_statistics() to get the statistics for + individual indexes of interest. + """ + # TODO: https://github.com/lancedb/lance/issues/5237 deprecate this method + # warnings.warn( + # "The 'list_indices' method is deprecated. It may be removed in a future" + # "version. Use describe_indices() instead.", + # DeprecationWarning, + # ) + return self._ds.load_indices() + def describe_indices(self) -> List[IndexDescription]: + """Returns index information for all indices in the dataset.""" + return self._ds.describe_indices() + def index_statistics(self, index_name: str) -> Dict[str, Any]: warnings.warn( "LanceDataset.index_statistics() is deprecated, " diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 2fd57e307e3..74d9dcb339e 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -60,6 +60,7 @@ from .fragment import ( from .fragment import ( RowIdMeta as RowIdMeta, ) +from .indices import IndexDescription as IndexDescription from .optimize import ( Compaction as Compaction, ) @@ -215,6 +216,7 @@ class _Dataset: def index_statistics(self, index_name: str) -> str: ... def serialized_manifest(self) -> bytes: ... def load_indices(self) -> List[Index]: ... + def describe_indices(self) -> List[IndexDescription]: ... def scanner( self, columns: Optional[List[str]] = None, diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index a12783117f5..feed1f934fe 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime +from typing import Optional + import pyarrow as pa class IndexConfig: @@ -47,3 +50,24 @@ def transform_vectors( pq_codebook: pa.Array, dst_uri: str, ): ... + +class IndexSegmentDescription: + uuid: str + dataset_version_at_last_update: int + fragment_ids: set[int] + index_version: int + created_at: Optional[datetime] + + def __repr__(self) -> str: ... + +class IndexDescription: + name: str + type_url: str + index_type: str + num_rows_indexed: int + fields: list[int] + field_names: list[str] + segments: list[IndexSegmentDescription] + details: dict + + def __repr__(self) -> str: ... diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 32c787ad9f4..0c52c37c3ee 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -4006,3 +4006,124 @@ def test_json_inverted_match_query(tmp_path): full_text_query=MatchQuery("Author,str,tolkien", "json_col") ) assert results.num_rows == 1 + + +def test_describe_indices(tmp_path): + data = pa.table( + { + "id": range(100), + "text": [f"document {i} about lance database" for i in range(100)], + "bitmap": range(100), + "bloomfilter": range(100), + "btree": range(100), + "json": pa.array( + [json.dumps({"key": f"value_{i}"}) for i in range(100)], pa.json_() + ), + "ngram": [f"document {i}" for i in range(100)], + "zonemap": range(100), + } + ) + ds = lance.write_dataset(data, tmp_path) + ds.create_scalar_index("text", index_type="INVERTED") + indices = ds.describe_indices() + assert len(indices) == 1 + + assert indices[0].name == "text_idx" + assert indices[0].type_url == "/lance.table.InvertedIndexDetails" + assert indices[0].index_type == "Inverted" + assert indices[0].num_rows_indexed == 100 + assert indices[0].fields == [1] + assert indices[0].field_names == ["text"] + assert len(indices[0].segments) == 1 + assert indices[0].segments[0].uuid is not None + assert indices[0].segments[0].fragment_ids == {0} + assert indices[0].segments[0].dataset_version_at_last_update == 1 + assert indices[0].segments[0].index_version == 1 + assert indices[0].segments[0].created_at is not None + assert isinstance(indices[0].segments[0].created_at, datetime) + + details = indices[0].details + assert details is not None and len(details) > 0 + assert details["lance_tokenizer"] is None + assert details["base_tokenizer"] == "simple" + assert details["language"] == "English" + assert not details["with_position"] + assert details["max_token_length"] == 40 + assert details["lower_case"] + assert details["stem"] + assert details["remove_stop_words"] + assert details["custom_stop_words"] is None + assert details["ascii_folding"] + assert details["min_ngram_length"] == 3 + assert details["max_ngram_length"] == 3 + assert not details["prefix_only"] + + ds.create_scalar_index("bitmap", index_type="BITMAP") + ds.create_scalar_index("bloomfilter", index_type="BLOOMFILTER") + ds.create_scalar_index("btree", index_type="BTREE") + ds.create_scalar_index( + "json", + IndexConfig( + index_type="json", parameters={"target_index_type": "btree", "path": "x"} + ), + ) + ds.create_scalar_index("ngram", index_type="NGRAM") + ds.create_scalar_index("zonemap", index_type="ZONEMAP") + + indices = ds.describe_indices() + # Skip text index since it is already asserted above + indices = [index for index in indices if index.name != "text_idx"] + indices.sort(key=lambda x: x.name) + + names = [ + "bitmap_idx", + "bloomfilter_idx", + "btree_idx", + "json_idx", + "ngram_idx", + "zonemap_idx", + ] + types_urls = [ + "/lance.table.BitmapIndexDetails", + "/lance.index.pb.BloomFilterIndexDetails", + "/lance.table.BTreeIndexDetails", + "/lance.index.pb.JsonIndexDetails", + "/lance.table.NGramIndexDetails", + "/lance.table.ZoneMapIndexDetails", + ] + index_types = [ + "Bitmap", + "BloomFilter", + "BTree", + "Json", + "NGram", + "ZoneMap", + ] + details = [ + "{}", + "{}", + "{}", + '{"path":"x","target_details":{}}', + "{}", + "{}", + ] + + for i in range(len(indices)): + assert indices[i].name == names[i] + assert indices[i].type_url == types_urls[i] + assert indices[i].index_type == index_types[i] + assert indices[i].num_rows_indexed == 100 + assert indices[i].fields == [i + 2] + assert indices[i].field_names == [data.column_names[i + 2]] + assert len(indices[i].segments) == 1 + assert indices[i].segments[0].fragment_ids == {0} + assert indices[i].segments[0].dataset_version_at_last_update == i + 2 + assert indices[i].segments[0].index_version == 0 + assert indices[i].segments[0].created_at is not None + assert isinstance(indices[i].segments[0].created_at, datetime) + assert indices[i].details == json.loads(details[i]) + + ds.delete("id < 50") + indices = ds.describe_indices() + for index in indices: + assert index.num_rows_indexed == 50 diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index eabe1cfd4bc..9cb77b85464 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -12,7 +12,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest -from lance import LanceFragment +from lance import LanceDataset, LanceFragment from lance.dataset import VectorIndexReader from lance.indices import IndexFileVersion from lance.util import validate_vector_index # noqa: E402 @@ -1391,6 +1391,23 @@ def test_load_indices(dataset): assert len(indices) == 1 +def test_describe_vector_index(indexed_dataset: LanceDataset): + info = indexed_dataset.describe_indices()[0] + + assert info.name == "vector_idx" + assert info.type_url == "/lance.table.VectorIndexDetails" + # This is currently Unknown because vector indices are not yet handled by plugins + assert info.index_type == "Unknown" + assert info.num_rows_indexed == 1000 + assert info.fields == [0] + assert info.field_names == ["vector"] + assert len(info.segments) == 1 + assert info.segments[0].fragment_ids == {0} + assert info.segments[0].dataset_version_at_last_update == 1 + assert info.segments[0].index_version == 1 + assert info.segments[0].created_at is not None + + def test_optimize_indices(indexed_dataset): data = create_table() indexed_dataset = lance.write_dataset(data, indexed_dataset.uri, mode="append") diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 91108192c47..6f97ddd98af 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -86,7 +86,7 @@ use lance_table::io::commit::CommitHandler; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; -use crate::indices::PyIndexConfig; +use crate::indices::{PyIndexConfig, PyIndexDescription}; use crate::rt; use crate::scanner::ScanStatistics; use crate::schema::{logical_schema_from_lance, LanceSchema}; @@ -2599,6 +2599,18 @@ impl Dataset { let builder = self.ds.sql(&sql); Ok(SqlQueryBuilder { builder }) } + + #[pyo3(signature=())] + fn describe_indices(&self, py: Python<'_>) -> PyResult> { + let new_self = self.ds.as_ref().clone(); + let indices = rt() + .block_on(Some(py), new_self.describe_indices(None))? + .infer_error()?; + Ok(indices + .into_iter() + .map(|desc| PyIndexDescription::new(desc.as_ref(), self.ds.as_ref())) + .collect()) + } } #[pyclass(name = "SqlQuery", module = "_lib", subclass)] diff --git a/python/src/indices.rs b/python/src/indices.rs index 216e6b65196..068d3caec8a 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -1,9 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashSet; + use arrow::pyarrow::{PyArrowType, ToPyArrow}; use arrow_array::{Array, FixedSizeListArray}; use arrow_data::ArrayData; +use chrono::{DateTime, Utc}; +use lance::dataset::Dataset as LanceDataset; use lance::index::vector::ivf::builder::write_vector_storage; use lance::io::ObjectStore; use lance_index::vector::ivf::shuffler::{shuffle_vectors, IvfShuffler}; @@ -25,11 +29,12 @@ use pyo3::{ use lance::index::DatasetIndexInternalExt; use crate::fragment::FileFragment; +use crate::utils::PyJson; use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::DatasetIndexExt; +use lance_index::{DatasetIndexExt, IndexDescription}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] @@ -463,6 +468,102 @@ pub fn load_shuffled_vectors( )? } +#[pyclass(name = "IndexSegmentDescription", module = "lance.indices", get_all)] +#[derive(Clone)] +pub struct PyIndexSegmentDescription { + /// The UUID of the index segment + pub uuid: String, + /// The dataset version at which the index segment was last updated + pub dataset_version_at_last_update: u64, + /// The fragment ids that are covered by the index segment + pub fragment_ids: HashSet, + /// The version of the index + pub index_version: i32, + /// The timestamp when the index segment was created + pub created_at: Option>, +} + +impl PyIndexSegmentDescription { + pub fn __repr__(&self) -> String { + format!("IndexSegmentDescription(uuid={}, dataset_version_at_last_update={}, fragment_ids={:?}, index_version={}, created_at={:?})", self.uuid, self.dataset_version_at_last_update, self.fragment_ids, self.index_version, self.created_at) + } +} + +#[pyclass(name = "IndexDescription", module = "lance.indices", get_all)] +pub struct PyIndexDescription { + /// The name of the index + pub name: String, + /// The full type URL of the index + pub type_url: String, + /// The short type of the index (may not be unique) + pub index_type: String, + /// The ids of the fields that the index is built on + pub fields: Vec, + /// The names of the fields that the index is built on + pub field_names: Vec, + /// The number of rows indexed by the index + pub num_rows_indexed: u64, + /// The details of the index + pub details: PyJson, + /// The segments of the index + pub segments: Vec, +} + +impl PyIndexDescription { + pub fn new(index: &dyn IndexDescription, dataset: &LanceDataset) -> Self { + let field_names = index + .field_ids() + .iter() + .map(|field| { + dataset + .schema() + .field_by_id(*field as i32) + .map(|f| f.name.clone()) + .unwrap_or("".to_string()) + }) + .collect(); + + let segments = index + .metadata() + .iter() + .map(|segment| { + let fragment_ids = segment + .fragment_bitmap + .as_ref() + .map(|bitmap| bitmap.iter().collect::>()) + .unwrap_or_default(); + PyIndexSegmentDescription { + uuid: segment.uuid.to_string(), + dataset_version_at_last_update: segment.dataset_version, + fragment_ids, + index_version: segment.index_version, + created_at: segment.created_at, + } + }) + .collect(); + + let details = index.details().unwrap_or_else(|_| "{}".to_string()); + + Self { + name: index.name().to_string(), + fields: index.field_ids().to_vec(), + field_names, + index_type: index.index_type().to_string(), + segments, + type_url: index.type_url().to_string(), + num_rows_indexed: index.rows_indexed(), + details: PyJson(details), + } + } +} + +#[pymethods] +impl PyIndexDescription { + pub fn __repr__(&self) -> String { + format!("IndexDescription(name={}, type_url={}, num_rows_indexed={}, fields={:?}, field_names={:?}, num_segments={})", self.name, self.type_url, self.num_rows_indexed, self.fields, self.field_names, self.segments.len()) + } +} + pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { let indices = PyModule::new(py, "indices")?; indices.add_wrapped(wrap_pyfunction!(train_ivf_model))?; @@ -472,6 +573,8 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { indices.add_wrapped(wrap_pyfunction!(load_shuffled_vectors))?; indices.add_class::()?; indices.add_class::()?; + indices.add_class::()?; + indices.add_class::()?; indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?; m.add_submodule(&indices)?; Ok(()) diff --git a/python/src/utils.rs b/python/src/utils.rs index 948811b54a0..ff464b1102d 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -43,6 +43,22 @@ use pyo3::{ use crate::file::object_store_from_uri_or_path; use crate::rt; +/// A wrapper around a JSON string that converts to a Python object +/// using json.loads when marshalling to Python. +#[derive(Debug, Clone)] +pub struct PyJson(pub String); + +impl<'py> IntoPyObject<'py> for PyJson { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> PyResult { + let json_module = py.import("json")?; + json_module.call_method1("loads", (self.0,)) + } +} + #[pyclass(name = "_KMeans")] pub struct KMeans { /// Number of clusters diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 4c08b7e236c..d99b2c36de9 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -26,6 +26,7 @@ pub mod mem_wal; pub mod metrics; pub mod optimize; pub mod prefilter; +pub mod registry; pub mod scalar; pub mod traits; pub mod vector; diff --git a/rust/lance-index/src/registry.rs b/rust/lance-index/src/registry.rs new file mode 100644 index 00000000000..f087db9158f --- /dev/null +++ b/rust/lance-index/src/registry.rs @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +use std::{collections::HashMap, sync::Arc}; + +use lance_core::{Error, Result}; +use snafu::location; + +use crate::{ + pb, pbold, + scalar::{ + bitmap::BitmapIndexPlugin, bloomfilter::BloomFilterIndexPlugin, btree::BTreeIndexPlugin, + inverted::InvertedIndexPlugin, json::JsonIndexPlugin, label_list::LabelListIndexPlugin, + ngram::NGramIndexPlugin, registry::ScalarIndexPlugin, zonemap::ZoneMapIndexPlugin, + }, +}; + +/// A registry of index plugins +pub struct IndexPluginRegistry { + plugins: HashMap>, +} + +impl IndexPluginRegistry { + fn get_plugin_name_from_details_name(&self, details_name: &str) -> String { + let details_name = details_name.to_lowercase(); + if details_name.ends_with("indexdetails") { + details_name.replace("indexdetails", "") + } else { + details_name + } + } + + /// Adds a plugin to the registry, using the name of the details message to determine + /// the plugin name. + /// + /// The plugin name will be the lowercased name of the details message with any trailing + /// "indexdetails" removed. + /// + /// For example, if the details message is `BTreeIndexDetails`, the plugin name will be + /// `btree`. + pub fn add_plugin< + DetailsType: prost::Message + prost::Name, + PluginType: ScalarIndexPlugin + std::default::Default + 'static, + >( + &mut self, + ) { + let plugin_name = self.get_plugin_name_from_details_name(DetailsType::NAME); + self.plugins + .insert(plugin_name, Box::new(PluginType::default())); + } + + /// Create a registry with the default plugins + pub fn with_default_plugins() -> Arc { + let mut registry = Self { + plugins: HashMap::new(), + }; + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + registry.add_plugin::(); + + let registry = Arc::new(registry); + for plugin in registry.plugins.values() { + plugin.attach_registry(registry.clone()); + } + + registry + } + + /// Get an index plugin suitable for training an index with the given parameters + pub fn get_plugin_by_name(&self, name: &str) -> Result<&dyn ScalarIndexPlugin> { + self.plugins + .get(name) + .map(|plugin| plugin.as_ref()) + .ok_or_else(|| Error::InvalidInput { + source: format!("No scalar index plugin found for name {}", name).into(), + location: location!(), + }) + } + + pub fn get_plugin_by_details( + &self, + details: &prost_types::Any, + ) -> Result<&dyn ScalarIndexPlugin> { + let details_name = details.type_url.split('.').next_back().unwrap(); + let plugin_name = self.get_plugin_name_from_details_name(details_name); + self.get_plugin_by_name(&plugin_name) + } +} diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 9f3779668f1..345c1687142 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -695,6 +695,10 @@ impl BitmapIndexPlugin { #[async_trait] impl ScalarIndexPlugin for BitmapIndexPlugin { + fn name(&self) -> &str { + "Bitmap" + } + fn new_training_request( &self, _params: &str, diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 7fef76136e2..6047029368a 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -1146,6 +1146,10 @@ impl BloomFilterIndexPlugin { #[async_trait] impl ScalarIndexPlugin for BloomFilterIndexPlugin { + fn name(&self) -> &str { + "BloomFilter" + } + fn new_training_request( &self, params: &str, diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 8a4ea14c99e..080f0cb4a34 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1915,6 +1915,10 @@ pub struct BTreeIndexPlugin; #[async_trait] impl ScalarIndexPlugin for BTreeIndexPlugin { + fn name(&self) -> &str { + "BTree" + } + fn new_training_request( &self, params: &str, diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index fb5c3cd7a68..e8644600513 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -105,6 +105,10 @@ impl TrainingRequest for InvertedIndexTrainingRequest { #[async_trait] impl ScalarIndexPlugin for InvertedIndexPlugin { + fn name(&self) -> &str { + "Inverted" + } + fn new_training_request( &self, params: &str, @@ -196,4 +200,10 @@ impl ScalarIndexPlugin for InvertedIndexPlugin { as Arc, ) } + + fn details_as_json(&self, details: &prost_types::Any) -> Result { + let index_details = details.to_msg::()?; + let index_params = InvertedIndexParams::try_from(&index_details)?; + Ok(serde_json::json!(&index_params)) + } } diff --git a/rust/lance-index/src/scalar/inverted/tokenizer.rs b/rust/lance-index/src/scalar/inverted/tokenizer.rs index 324ae9d7b6a..531d18e1b9e 100644 --- a/rust/lance-index/src/scalar/inverted/tokenizer.rs +++ b/rust/lance-index/src/scalar/inverted/tokenizer.rs @@ -112,6 +112,33 @@ impl TryFrom<&InvertedIndexParams> for pbold::InvertedIndexDetails { } } +impl TryFrom<&pbold::InvertedIndexDetails> for InvertedIndexParams { + type Error = Error; + + fn try_from(details: &pbold::InvertedIndexDetails) -> Result { + let defaults = Self::default(); + Ok(Self { + lance_tokenizer: defaults.lance_tokenizer, + base_tokenizer: details + .base_tokenizer + .as_ref() + .cloned() + .unwrap_or(defaults.base_tokenizer), + language: serde_json::from_str(details.language.as_str())?, + with_position: details.with_position, + max_token_length: details.max_token_length.map(|l| l as usize), + lower_case: details.lower_case, + stem: details.stem, + remove_stop_words: details.remove_stop_words, + custom_stop_words: defaults.custom_stop_words, + ascii_folding: details.ascii_folding, + min_ngram_length: details.min_ngram_length, + max_ngram_length: details.max_ngram_length, + prefix_only: details.prefix_only, + }) + } +} + fn bool_true() -> bool { true } diff --git a/rust/lance-index/src/scalar/json.rs b/rust/lance-index/src/scalar/json.rs index c400aec036b..8edcde761b2 100644 --- a/rust/lance-index/src/scalar/json.rs +++ b/rust/lance-index/src/scalar/json.rs @@ -35,12 +35,10 @@ use lance_core::{cache::LanceCache, error::LanceOptionExt, Error, Result, ROW_ID use crate::{ frag_reuse::FragReuseIndex, metrics::MetricsCollector, + registry::IndexPluginRegistry, scalar::{ expression::{IndexedExpression, ScalarIndexExpr, ScalarIndexSearch, ScalarQueryParser}, - registry::{ - ScalarIndexPlugin, ScalarIndexPluginRegistry, TrainingCriteria, TrainingRequest, - VALUE_COLUMN_NAME, - }, + registry::{ScalarIndexPlugin, TrainingCriteria, TrainingRequest, VALUE_COLUMN_NAME}, AnyQuery, CreatedIndex, IndexStore, ScalarIndex, SearchResult, UpdateCriteria, }, Index, IndexType, @@ -373,7 +371,7 @@ impl TrainingRequest for JsonTrainingRequest { /// Plugin implementation for a [`JsonIndex`] #[derive(Default)] pub struct JsonIndexPlugin { - registry: Mutex>>, + registry: Mutex>>, } impl std::fmt::Debug for JsonIndexPlugin { @@ -383,7 +381,7 @@ impl std::fmt::Debug for JsonIndexPlugin { } impl JsonIndexPlugin { - fn registry(&self) -> Result> { + fn registry(&self) -> Result> { Ok(self.registry.lock().unwrap().as_ref().expect_ok()?.clone()) } @@ -709,6 +707,10 @@ impl JsonIndexPlugin { #[async_trait] impl ScalarIndexPlugin for JsonIndexPlugin { + fn name(&self) -> &str { + "Json" + } + fn new_training_request( &self, params: &str, @@ -740,7 +742,7 @@ impl ScalarIndexPlugin for JsonIndexPlugin { true } - fn attach_registry(&self, registry: Arc) { + fn attach_registry(&self, registry: Arc) { let mut reg_ref = self.registry.lock().unwrap(); *reg_ref = Some(registry); } @@ -832,6 +834,18 @@ impl ScalarIndexPlugin for JsonIndexPlugin { .await?; Ok(Arc::new(JsonIndex::new(target_index, json_details.path))) } + + fn details_as_json(&self, details: &prost_types::Any) -> Result { + let registry = self.registry().unwrap(); + let json_details = crate::pb::JsonIndexDetails::decode(details.value.as_slice())?; + let target_details = json_details.target_details.as_ref().expect_ok()?; + let target_plugin = registry.get_plugin_by_details(target_details).unwrap(); + let target_details_json = target_plugin.details_as_json(target_details)?; + Ok(serde_json::json!({ + "path": json_details.path, + "target_details": target_details_json, + })) + } } #[cfg(test)] diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index b22a12f8e4a..cb96961849f 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -354,6 +354,10 @@ pub struct LabelListIndexPlugin; #[async_trait] impl ScalarIndexPlugin for LabelListIndexPlugin { + fn name(&self) -> &str { + "LabelList" + } + fn new_training_request( &self, _params: &str, diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 00a2f7da5d9..15dfec35a62 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -1250,6 +1250,10 @@ impl NGramIndexPlugin { #[async_trait] impl ScalarIndexPlugin for NGramIndexPlugin { + fn name(&self) -> &str { + "NGram" + } + fn new_training_request( &self, _params: &str, diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index a36e221f6a0..c53d46f0546 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -1,24 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use arrow_schema::Field; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; -use lance_core::{cache::LanceCache, Error, Result}; -use snafu::location; +use lance_core::{cache::LanceCache, Result}; -use crate::pb; -use crate::pbold; +use crate::registry::IndexPluginRegistry; use crate::{ frag_reuse::FragReuseIndex, - scalar::{ - bitmap::BitmapIndexPlugin, bloomfilter::BloomFilterIndexPlugin, btree::BTreeIndexPlugin, - expression::ScalarQueryParser, inverted::InvertedIndexPlugin, json::JsonIndexPlugin, - label_list::LabelListIndexPlugin, ngram::NGramIndexPlugin, zonemap::ZoneMapIndexPlugin, - CreatedIndex, IndexStore, ScalarIndex, - }, + scalar::{expression::ScalarQueryParser, CreatedIndex, IndexStore, ScalarIndex}, }; pub const VALUE_COLUMN_NAME: &str = "value"; @@ -123,6 +116,16 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { fragment_ids: Option>, ) -> Result; + /// A short name for the index + /// + /// This is a friendly name for display purposes and also can be used as an alias for + /// the index type URL. If multiple plugins have the same name, then the first one + /// found will be used. + /// + /// By convention this is MixedCase with no spaces. When used as an alias, it will be + /// compared case-insensitively. + fn name(&self) -> &str; + /// Returns true if the index returns an exact answer (e.g. not AtMost) fn provides_exact_answer(&self) -> bool; @@ -154,82 +157,15 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { ) -> Result>; /// Optional hook that plugins can use if they need to be aware of the registry - fn attach_registry(&self, _registry: Arc) {} -} - -/// A registry of scalar index plugins -pub struct ScalarIndexPluginRegistry { - plugins: HashMap>, -} + fn attach_registry(&self, _registry: Arc) {} -impl ScalarIndexPluginRegistry { - fn get_plugin_name_from_details_name(&self, details_name: &str) -> String { - let details_name = details_name.to_lowercase(); - if details_name.ends_with("indexdetails") { - details_name.replace("indexdetails", "") - } else { - details_name - } - } - - /// Adds a plugin to the registry, using the name of the details message to determine - /// the plugin name. - /// - /// The plugin name will be the lowercased name of the details message with any trailing - /// "indexdetails" removed. + /// Returns a JSON string representation of the provided index details /// - /// For example, if the details message is `BTreeIndexDetails`, the plugin name will be - /// `btree`. - pub fn add_plugin< - DetailsType: prost::Message + prost::Name, - PluginType: ScalarIndexPlugin + std::default::Default + 'static, - >( - &mut self, - ) { - let plugin_name = self.get_plugin_name_from_details_name(DetailsType::NAME); - self.plugins - .insert(plugin_name, Box::new(PluginType::default())); - } - - /// Create a registry with the default plugins - pub fn with_default_plugins() -> Arc { - let mut registry = Self { - plugins: HashMap::new(), - }; - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - registry.add_plugin::(); - - let registry = Arc::new(registry); - for plugin in registry.plugins.values() { - plugin.attach_registry(registry.clone()); - } - - registry - } - - /// Get an index plugin suitable for training an index with the given parameters - pub fn get_plugin_by_name(&self, name: &str) -> Result<&dyn ScalarIndexPlugin> { - self.plugins - .get(name) - .map(|plugin| plugin.as_ref()) - .ok_or_else(|| Error::InvalidInput { - source: format!("No scalar index plugin found for name {}", name).into(), - location: location!(), - }) - } - - pub fn get_plugin_by_details( - &self, - details: &prost_types::Any, - ) -> Result<&dyn ScalarIndexPlugin> { - let details_name = details.type_url.split('.').next_back().unwrap(); - let plugin_name = self.get_plugin_name_from_details_name(details_name); - self.get_plugin_by_name(&plugin_name) + /// These details will be user-visible and should be considered part of the public + /// API. As a result, efforts should be made to ensure the information is backwards + /// compatible and avoid breaking changes. + fn details_as_json(&self, _details: &prost_types::Any) -> Result { + // Return an empty JSON object as the default implementation + Ok(serde_json::json!({})) } } diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 7b6e6078310..a2d785a1894 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -948,6 +948,10 @@ impl TrainingRequest for ZoneMapIndexTrainingRequest { #[async_trait] impl ScalarIndexPlugin for ZoneMapIndexPlugin { + fn name(&self) -> &str { + "ZoneMap" + } + fn new_training_request( &self, params: &str, diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index f46c63b118a..bed5d6160ae 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -14,7 +14,7 @@ use uuid::Uuid; /// A set of criteria used to filter potential indices to use for a query #[derive(Debug, Default)] -pub struct ScalarIndexCriteria<'a> { +pub struct IndexCriteria<'a> { /// Only consider indices for this column (this also means the index /// maps to a single column) pub for_column: Option<&'a str>, @@ -26,7 +26,7 @@ pub struct ScalarIndexCriteria<'a> { pub must_support_exact_equality: bool, } -impl<'a> ScalarIndexCriteria<'a> { +impl<'a> IndexCriteria<'a> { /// Only consider indices for this column (this also means the index /// maps to a single column) pub fn for_column(mut self, column: &'a str) -> Self { @@ -56,6 +56,68 @@ impl<'a> ScalarIndexCriteria<'a> { } } +#[deprecated(since = "0.39.0", note = "Use IndexCriteria instead")] +pub type ScalarIndexCriteria<'a> = IndexCriteria<'a>; + +/// Additional information about an index +/// +/// Note that a single index might consist of multiple segments. Each segment has its own +/// UUID and collection of files and covers some subset of the data fragments. +/// +/// All segments in an index should have the same index type and index details. +pub trait IndexDescription: Send + Sync { + /// Returns the index name + /// + /// This is the user-defined name of the index. It is shared by all segments of the index + /// and is what is used to refer to the index in the API. It is guaranteed to be unique + /// within the dataset. + fn name(&self) -> &str; + + /// Returns the index metadata + /// + /// This is the raw metadata information stored in the manifest. There is one + /// IndexMetadata for each segment of the index. + fn metadata(&self) -> &[IndexMetadata]; + + /// Returns the index type URL + /// + /// This is extracted from the type url of the index details + fn type_url(&self) -> &str; + + /// Returns the index type + /// + /// This is a short string identifier that is friendlier than the type URL but not + /// guaranteed to be unique. + /// + /// This is calculated by the plugin and will be "Unknown" if no plugin could be found + /// for the type URL. + fn index_type(&self) -> &str; + + /// Returns the number of rows indexed by the index, across all segments. + /// + /// This is an approximate count and may include rows that have been + /// deleted. + fn rows_indexed(&self) -> u64; + + /// Returns the ids of the fields that the index is built on. + fn field_ids(&self) -> &[u32]; + + /// Returns a JSON string representation of the index details + /// + /// The format of these details will vary depending on the index type and + /// since indexes can be provided by plugins we cannot fully define it here. + /// + /// However, plugins should do their best to maintain backwards compatibility + /// and consider this method part of the public API. + /// + /// See individual index plugins for more description of the expected format. + /// + /// The conversion from Any to JSON is controlled by the index + /// plugin. As a result, this method may fail if there is no plugin + /// available for the index. + fn details(&self) -> Result; +} + // Extends Lance Dataset with secondary index. #[async_trait] pub trait DatasetIndexExt { @@ -182,10 +244,21 @@ pub trait DatasetIndexExt { } } + /// Describes indexes in a dataset + /// + /// This method should only access the index metadata and should not load the index into memory. + /// + /// More detailed information may be available from [`index_statistics`] but that will require + /// loading the index into memory. + async fn describe_indices<'a, 'b>( + &'a self, + criteria: Option>, + ) -> Result>>; + /// Loads a specific index with the given index name. async fn load_scalar_index<'a, 'b>( &'a self, - criteria: ScalarIndexCriteria<'b>, + criteria: IndexCriteria<'b>, ) -> Result>; /// Optimize indices. diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 40ad486cb07..fdb2e2b4b90 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -26,7 +26,10 @@ pub struct IndexMetadata { /// Human readable index name pub name: String, - /// The latest version of the dataset this index covers + /// The version of the dataset this index was last updated on + /// + /// This is set when the index is created (based on the version used to train the index) + /// This is updated when the index is updated or remapped pub dataset_version: u64, /// The fragment ids this index covers. diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1a1ae67116f..8e96ce7e3ec 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1115,6 +1115,43 @@ impl FileFragment { } } + /// Get the number of physical rows in the fragment synchronously + /// + /// Fails if the fragment does not have the physical row count in the metadata. This method should + /// only be called in new workflows which are not run on old versions of Lance. + pub fn fast_physical_rows(&self) -> Result { + if self.dataset.manifest.writer_version.is_some() && self.metadata.physical_rows.is_some() { + Ok(self.metadata.physical_rows.unwrap()) + } else { + Err(Error::Internal { message: format!("The method fast_physical_rows was called on a fragment that does not have the physical row count in the metadata. Fragment id: {}", self.id()), location: location!() }) + } + } + + /// Get the number of deleted rows in the fragment synchronously + /// + /// Fails if the fragment does not have deletion count in the metadata. This method should only + /// be called in new workflows which are not run on old versions of Lance. + pub fn fast_num_deletions(&self) -> Result { + match &self.metadata().deletion_file { + Some(DeletionFile { + num_deleted_rows: Some(num_deleted), + .. + }) => Ok(*num_deleted), + None => Ok(0), + _ => Err(Error::Internal { message: format!("The method fast_num_deletions was called on a fragment that does not have the deletion count in the metadata. Fragment id: {}", self.id()), location: location!() }), + } + } + + /// Get the number of logical rows (physical rows - deleted rows) in the fragment synchronously + /// + /// Fails if the fragment does not have the physical row count or deletion count in the metadata. This method should only + /// be called in new workflows which are not run on old versions of Lance. + pub fn fast_logical_rows(&self) -> Result { + let num_physical_rows = self.fast_physical_rows()?; + let num_deleted_rows = self.fast_num_deletions()?; + Ok(num_physical_rows - num_deleted_rows) + } + /// Get the number of physical rows in the fragment. This includes deleted rows. /// /// If there are no deleted rows, this is equal to the number of rows in the diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f6a60959317..6c89619813f 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -64,7 +64,7 @@ use lance_index::scalar::inverted::query::{ use lance_index::scalar::inverted::SCORE_COL; use lance_index::scalar::FullTextSearchQuery; use lance_index::vector::{Query, DIST_COL}; -use lance_index::ScalarIndexCriteria; +use lance_index::IndexCriteria; use lance_index::{metrics::NoOpMetricsCollector, scalar::inverted::FTS_SCHEMA}; use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt}; use lance_io::stream::RecordBatchStream; @@ -2284,11 +2284,7 @@ impl Scanner { ) -> Result { let index = self .dataset - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(column).supports_fts()) .await?; match index { Some(index) => match &index.fragment_bitmap { @@ -2440,7 +2436,7 @@ impl Scanner { let has_fts_index = self .dataset .load_scalar_index( - ScalarIndexCriteria::default() + IndexCriteria::default() .for_column(&column_path) .supports_fts(), ) @@ -2684,11 +2680,7 @@ impl Scanner { let index_meta = self .dataset - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(&column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await? .ok_or(Error::invalid_input( format!("No Inverted index found for column {}", column), @@ -2734,11 +2726,7 @@ impl Scanner { let index = self .dataset - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(&column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await?; let (match_plan, flat_match_plan) = match &index { @@ -3045,7 +3033,7 @@ impl Scanner { ScalarIndexExpr::Query(search) => { let idx = self .dataset - .load_scalar_index(ScalarIndexCriteria::default().with_name(&search.index_name)) + .load_scalar_index(IndexCriteria::default().with_name(&search.index_name)) .await? .expect("Index not found even though it must have been found earlier"); Ok(idx diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index a53ef714423..48b40034ec0 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -92,7 +92,7 @@ use lance_datafusion::{ use lance_file::version::LanceFileVersion; use lance_index::mem_wal::{MemWal, MemWalId}; use lance_index::metrics::NoOpMetricsCollector; -use lance_index::{DatasetIndexExt, ScalarIndexCriteria}; +use lance_index::{DatasetIndexExt, IndexCriteria}; use lance_table::format::{Fragment, IndexMetadata, RowIdMeta}; use log::info; use roaring::RoaringTreemap; @@ -564,7 +564,7 @@ impl MergeInsertJob { let col = &self.params.on[0]; self.dataset .load_scalar_index( - ScalarIndexCriteria::default() + IndexCriteria::default() .for_column(col) // Unclear if this would work if the index does not support exact equality .supports_exact_equality(), @@ -3669,7 +3669,7 @@ mod tests { let check_indices = async |dataset: &Dataset, id_frags: &[u32], value_frags: &[u32]| { let id_index = dataset - .load_scalar_index(ScalarIndexCriteria::default().with_name("id_idx")) + .load_scalar_index(IndexCriteria::default().with_name("id_idx")) .await .unwrap(); @@ -3686,7 +3686,7 @@ mod tests { } let value_index = dataset - .load_scalar_index(ScalarIndexCriteria::default().with_name("value_idx")) + .load_scalar_index(IndexCriteria::default().with_name("value_idx")) .await .unwrap(); @@ -3703,7 +3703,7 @@ mod tests { } let other_value_index = dataset - .load_scalar_index(ScalarIndexCriteria::default().with_name("other_value_idx")) + .load_scalar_index(IndexCriteria::default().with_name("other_value_idx")) .await .unwrap() .unwrap(); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index c24c0744c98..1d2b1f19aa7 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -42,10 +42,12 @@ pub use lance_index::IndexParams; use lance_index::{ is_system_index, metrics::{MetricsCollector, NoOpMetricsCollector}, - ScalarIndexCriteria, + IndexCriteria, }; use lance_index::{pb, vector::VectorIndex, Index, IndexType, INDEX_FILE_NAME}; -use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION}; +use lance_index::{ + DatasetIndexExt, IndexDescription, INDEX_METADATA_SCHEMA_KEY, VECTOR_INDEX_VERSION, +}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_io::traits::Reader; use lance_io::utils::{ @@ -367,6 +369,134 @@ fn vector_index_details() -> prost_types::Any { prost_types::Any::from_msg(&details).unwrap() } +struct IndexDescriptionImpl { + name: String, + field_ids: Vec, + segments: Vec, + index_type: String, + details: IndexDetails, + rows_indexed: u64, +} + +impl IndexDescriptionImpl { + fn try_new(segments: Vec, dataset: &Dataset) -> Result { + if segments.is_empty() { + return Err(Error::Index { + message: "Index metadata is empty".to_string(), + location: location!(), + }); + } + + // We assume the type URL and details are the same for all segments + let example_metadata = &segments[0]; + + let name = example_metadata.name.clone(); + if !segments.iter().all(|shard| shard.name == name) { + return Err(Error::Index { + message: "Index name should be identical across all segments".to_string(), + location: location!(), + }); + } + + let field_ids = &example_metadata.fields; + if !segments.iter().all(|shard| shard.fields == *field_ids) { + return Err(Error::Index { + message: "Index fields should be identical across all segments".to_string(), + location: location!(), + }); + } + let field_ids = field_ids.iter().map(|id| *id as u32).collect(); + + // This should not fail as we have already filtered out indexes without index details. + let index_details = example_metadata.index_details.as_ref().ok_or(Error::Index { + message: + "Index details are required for index description. This index must be retrained to support this method." + .to_string(), + location: location!(), + })?; + let type_url = &index_details.type_url; + if !segments.iter().all(|shard| { + shard + .index_details + .as_ref() + .map(|d| d.type_url == *type_url) + .unwrap_or(false) + }) { + return Err(Error::Index { + message: "Index type URL should be present and identical across all segments" + .to_string(), + location: location!(), + }); + } + + let details = IndexDetails(index_details.clone()); + let mut rows_indexed = 0; + + let index_type = details + .get_plugin() + .map(|p| p.name().to_string()) + .unwrap_or_else(|_| "Unknown".to_string()); + + for shard in &segments { + let fragment_bitmap = shard + .fragment_bitmap + .as_ref() + .ok_or_else(|| Error::Index { + message: "Fragment bitmap is required for index description. This index must be retrained to support this method.".to_string(), + location: location!(), + })?; + + for fragment in dataset.get_fragments() { + if fragment_bitmap.contains(fragment.id() as u32) { + rows_indexed += fragment.fast_logical_rows()? as u64; + } + } + } + + Ok(Self { + name, + field_ids, + index_type, + segments, + details, + rows_indexed, + }) + } +} + +impl IndexDescription for IndexDescriptionImpl { + fn name(&self) -> &str { + &self.name + } + + fn field_ids(&self) -> &[u32] { + &self.field_ids + } + + fn index_type(&self) -> &str { + &self.index_type + } + + fn metadata(&self) -> &[IndexMetadata] { + &self.segments + } + + fn type_url(&self) -> &str { + self.details.0.type_url.as_str() + } + + fn rows_indexed(&self) -> u64 { + self.rows_indexed + } + + fn details(&self) -> Result { + let plugin = self.details.get_plugin()?; + plugin + .details_as_json(&self.details.0) + .map(|v| v.to_string()) + } +} + #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; @@ -475,6 +605,47 @@ impl DatasetIndexExt for Dataset { Ok(()) } + async fn describe_indices<'a, 'b>( + &'a self, + criteria: Option>, + ) -> Result>> { + let indices = self.load_indices().await?; + let mut indices = if let Some(criteria) = criteria { + indices.iter().filter(|idx| { + if idx.index_details.is_none() { + log::warn!("The method describe_indices does not support indexes without index details. Please retrain the index {}", idx.name); + return false; + } + let fields = idx + .fields + .iter() + .filter_map(|id| self.schema().field_by_id(*id)) + .collect::>(); + match index_matches_criteria(idx, &criteria, &fields, false, self.schema()) { + Ok(matched) => matched, + Err(err) => { + log::warn!("Could not describe index {}: {}", idx.name, err); + false + } + } + }).collect::>() + } else { + indices.iter().collect::>() + }; + indices.sort_by_key(|idx| &idx.name); + + indices + .into_iter() + .chunk_by(|idx| &idx.name) + .into_iter() + .map(|(_, segments)| { + let segments = segments.cloned().collect::>(); + let desc = IndexDescriptionImpl::try_new(segments, self)?; + Ok(Arc::new(desc) as Arc) + }) + .collect::>>() + } + async fn load_indices(&self) -> Result>> { let metadata_key = IndexMetadataKey { version: self.version().version, @@ -567,7 +738,7 @@ impl DatasetIndexExt for Dataset { async fn load_scalar_index<'a, 'b>( &'a self, - criteria: ScalarIndexCriteria<'b>, + criteria: IndexCriteria<'b>, ) -> Result> { let indices = self.load_indices().await?; @@ -586,8 +757,9 @@ impl DatasetIndexExt for Dataset { } }) .collect::>(); - // This sorting & chunking is only needed to provide some backwards compatibility behavior for - // old versions of Lance that don't write index details. + // This sorting & chunking is only needed to calculate if there are multiple indexes on the same + // field. This fact is only needed for backwards compatibility behavior for indexes that don't have + // index details. At some point we should deprecate indexes without index details. // // TODO: At some point we should just fail if the index details are missing and ask the user to // retrain the index. @@ -599,7 +771,13 @@ impl DatasetIndexExt for Dataset { for idx in indices { let field = self.schema().field_by_id(field_id); if let Some(field) = field { - if index_matches_criteria(idx, &criteria, field, has_multiple, self.schema())? { + if index_matches_criteria( + idx, + &criteria, + &[field], + has_multiple, + self.schema(), + )? { let non_empty = idx.fragment_bitmap.as_ref().is_some_and(|bitmap| { bitmap.intersection_len(self.fragment_bitmap.as_ref()) > 0 }); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 50736fe53d5..22919b51239 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -24,10 +24,10 @@ use lance_index::metrics::{MetricsCollector, NoOpMetricsCollector}; use lance_index::pbold::{ BTreeIndexDetails, BitmapIndexDetails, InvertedIndexDetails, LabelListIndexDetails, }; +use lance_index::registry::IndexPluginRegistry; use lance_index::scalar::inverted::METADATA_FILE; use lance_index::scalar::registry::{ - ScalarIndexPlugin, ScalarIndexPluginRegistry, TrainingCriteria, TrainingOrdering, - VALUE_COLUMN_NAME, + ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, VALUE_COLUMN_NAME, }; use lance_index::scalar::IndexStore; use lance_index::scalar::{ @@ -35,7 +35,7 @@ use lance_index::scalar::{ ScalarIndex, ScalarIndexParams, }; use lance_index::scalar::{CreatedIndex, InvertedIndexParams}; -use lance_index::{DatasetIndexExt, IndexType, ScalarIndexCriteria, VECTOR_INDEX_VERSION}; +use lance_index::{DatasetIndexExt, IndexCriteria, IndexType, VECTOR_INDEX_VERSION}; use lance_table::format::{Fragment, IndexMetadata}; use log::info; use snafu::location; @@ -217,8 +217,8 @@ pub(crate) async fn load_training_data( } // TODO: Allow users to register their own plugins -static SCALAR_INDEX_PLUGIN_REGISTRY: LazyLock> = - LazyLock::new(ScalarIndexPluginRegistry::with_default_plugins); +static SCALAR_INDEX_PLUGIN_REGISTRY: LazyLock> = + LazyLock::new(IndexPluginRegistry::with_default_plugins); pub struct IndexDetails(pub Arc); @@ -390,8 +390,8 @@ pub(crate) async fn infer_scalar_index_details( pub fn index_matches_criteria( index: &IndexMetadata, - criteria: &ScalarIndexCriteria, - field: &Field, + criteria: &IndexCriteria, + fields: &[&Field], has_multiple_indices: bool, schema: &lance_core::datatypes::Schema, ) -> Result { @@ -405,6 +405,12 @@ pub fn index_matches_criteria( if index.fields.len() != 1 { return Ok(false); } + if fields.len() != 1 { + // This should be unreachable since we just verified index.fields.len() == 1 but + // return false just in case + return Ok(false); + } + let field = fields[0]; // Build the full field path for nested fields let field_path = if let Some(ancestors) = schema.field_ancestry_by_id(field.id) { let field_refs: Vec<&str> = ancestors.iter().map(|f| f.name.as_str()).collect(); @@ -576,7 +582,7 @@ mod tests { fn test_index_matches_criteria_vector_index() { let index1 = make_index_metadata("vector_index", 1, Some(IndexType::Vector)); - let criteria = ScalarIndexCriteria { + let criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: false, for_column: None, @@ -588,10 +594,10 @@ mod tests { fields: vec![field.clone()], metadata: Default::default(), }; - let result = index_matches_criteria(&index1, &criteria, &field, true, &schema).unwrap(); + let result = index_matches_criteria(&index1, &criteria, &[&field], true, &schema).unwrap(); assert!(!result); - let result = index_matches_criteria(&index1, &criteria, &field, false, &schema).unwrap(); + let result = index_matches_criteria(&index1, &criteria, &[&field], false, &schema).unwrap(); assert!(!result); } @@ -601,7 +607,7 @@ mod tests { let inverted_index = make_index_metadata("inverted_index", 1, Some(IndexType::Inverted)); let ngram_index = make_index_metadata("ngram_index", 1, Some(IndexType::NGram)); - let criteria = ScalarIndexCriteria { + let criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: false, for_column: None, @@ -614,91 +620,91 @@ mod tests { metadata: Default::default(), }; let result = - index_matches_criteria(&btree_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], true, &schema).unwrap(); assert!(result); let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(result); // test for_column - let mut criteria = ScalarIndexCriteria { + let mut criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: false, for_column: Some("mycol"), has_name: None, }; let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(result); criteria.for_column = Some("mycol2"); let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(!result); // test has_name - let mut criteria = ScalarIndexCriteria { + let mut criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: false, for_column: None, has_name: Some("btree_index"), }; let result = - index_matches_criteria(&btree_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], true, &schema).unwrap(); assert!(result); let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(result); criteria.has_name = Some("btree_index2"); let result = - index_matches_criteria(&btree_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], true, &schema).unwrap(); assert!(!result); let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(!result); // test supports_exact_equality - let mut criteria = ScalarIndexCriteria { + let mut criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: true, for_column: None, has_name: None, }; let result = - index_matches_criteria(&btree_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], false, &schema).unwrap(); assert!(result); criteria.must_support_fts = true; let result = - index_matches_criteria(&inverted_index, &criteria, &field, false, &schema).unwrap(); + index_matches_criteria(&inverted_index, &criteria, &[&field], false, &schema).unwrap(); assert!(!result); criteria.must_support_fts = false; let result = - index_matches_criteria(&ngram_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&ngram_index, &criteria, &[&field], true, &schema).unwrap(); assert!(!result); // test multiple indices - let mut criteria = ScalarIndexCriteria { + let mut criteria = IndexCriteria { must_support_fts: false, must_support_exact_equality: false, for_column: None, has_name: None, }; let result = - index_matches_criteria(&btree_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&btree_index, &criteria, &[&field], true, &schema).unwrap(); assert!(result); criteria.must_support_fts = true; let result = - index_matches_criteria(&inverted_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&inverted_index, &criteria, &[&field], true, &schema).unwrap(); assert!(result); criteria.must_support_fts = false; let result = - index_matches_criteria(&ngram_index, &criteria, &field, true, &schema).unwrap(); + index_matches_criteria(&ngram_index, &criteria, &[&field], true, &schema).unwrap(); assert!(result); } diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 996d2ffb082..b5ff3a2894f 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -35,7 +35,7 @@ use lance_index::scalar::inverted::{ flat_bm25_search_stream, InvertedIndex, FTS_SCHEMA, SCORE_COL, }; use lance_index::{prefilter::PreFilter, scalar::inverted::query::BooleanQuery}; -use lance_index::{DatasetIndexExt, ScalarIndexCriteria}; +use lance_index::{DatasetIndexExt, IndexCriteria}; use tracing::instrument; pub struct FtsIndexMetrics { @@ -222,11 +222,7 @@ impl ExecutionPlan for MatchQueryExec { let stream = stream::once(async move { let _timer = metrics.baseline_metrics.elapsed_compute().timer(); let index_meta = ds - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(&column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await? .ok_or(DataFusionError::Execution(format!( "No Inverted index found for column {}", @@ -439,11 +435,7 @@ impl ExecutionPlan for FlatMatchQueryExec { let stream = stream::once(async move { let index_meta = ds - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(&column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await?; let inverted_idx = match index_meta { Some(index_meta) => { @@ -645,11 +637,7 @@ impl ExecutionPlan for PhraseQueryExec { query.terms )))?; let index_meta = ds - .load_scalar_index( - ScalarIndexCriteria::default() - .for_column(&column) - .supports_fts(), - ) + .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await? .ok_or(DataFusionError::Execution(format!( "No Inverted index found for column {}", diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 26c69d1f5c5..82ba17efe02 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -47,7 +47,7 @@ use lance_index::{ }, SargableQuery, ScalarIndex, }, - DatasetIndexExt, ScalarIndexCriteria, + DatasetIndexExt, IndexCriteria, }; use lance_table::format::Fragment; use roaring::RoaringBitmap; @@ -63,7 +63,7 @@ impl ScalarIndexLoader for Dataset { metrics: &dyn MetricsCollector, ) -> Result> { let idx = self - .load_scalar_index(ScalarIndexCriteria::default().with_name(index_name)) + .load_scalar_index(IndexCriteria::default().with_name(index_name)) .await? .ok_or_else(|| Error::Internal { message: format!("Scanner created plan for index query on index {} for column {} but no usable index exists with that name", index_name, column), @@ -137,9 +137,7 @@ impl ScalarIndexExec { } ScalarIndexExpr::Query(search_key) => { let idx = dataset - .load_scalar_index( - ScalarIndexCriteria::default().with_name(&search_key.index_name), - ) + .load_scalar_index(IndexCriteria::default().with_name(&search_key.index_name)) .await? .expect("Index not found even though it must have been found earlier"); Ok(idx @@ -355,7 +353,7 @@ impl MapIndexExec { impl Stream> + Send + 'static, > { let index = dataset - .load_scalar_index(ScalarIndexCriteria::default().with_name(&index_name)) + .load_scalar_index(IndexCriteria::default().with_name(&index_name)) .await? .unwrap(); let deletion_mask_fut =