diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index ab2333266da..001d609b209 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -37,7 +37,7 @@ use lance::dataset::{ ColumnAlteration, CommitBuilder, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, Version, WriteParams, }; -use lance::index::DatasetIndexExt; +use lance::index::{DatasetIndexExt, IndexSegment}; use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; use lance::io::{ObjectStore, ObjectStoreParams}; use lance::session::Session as LanceSession; @@ -239,10 +239,6 @@ impl BlockingDataset { Ok(version) } - pub fn version_id(&self) -> u64 { - self.inner.version_id() - } - pub fn list_versions(&self) -> Result> { let versions = RT.block_on(self.inner.versions())?; Ok(versions) @@ -1089,6 +1085,61 @@ fn inner_merge_index_metadata( Ok(()) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + index_type: jint, + target_segment_bytes_jobj: JObject, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_build_index_segments( + &mut env, + java_dataset, + java_segments, + index_type, + target_segment_bytes_jobj + ) + ) +} + +fn inner_build_index_segments<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + index_type: jint, + target_segment_bytes_jobj: JObject, +) -> Result> { + let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let index_type = IndexType::try_from(index_type)?; + let target_segment_bytes = env + .get_long_opt(&target_segment_bytes_jobj)? + .map(|v| v as u64); + let template = segment_template(&segments)?; + + let built_segments = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + let mut builder = dataset_guard + .inner + .create_index_segment_builder() + .with_index_type(index_type) + .with_segments(segments); + if let Some(target_segment_bytes) = target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + RT.block_on(builder.build_all())? + }; + + let built_metadata = built_segments + .into_iter() + .map(|segment| index_segment_to_metadata(&template, segment)) + .collect::>(); + export_vec(env, &built_metadata) +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeMergeExistingIndexSegments<'local>( mut env: JNIEnv<'local>, @@ -1144,7 +1195,12 @@ fn inner_commit_existing_index_segments<'local>( ) -> Result> { let index_name = index_name.extract(env)?; let column = column.extract(env)?; - let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let segment_metadata = + import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let segments = segment_metadata + .iter() + .map(index_metadata_to_segment) + .collect::>>()?; let committed = { let mut dataset_guard = @@ -1160,6 +1216,82 @@ fn inner_commit_existing_index_segments<'local>( export_vec(env, &committed) } +struct SegmentTemplate { + name: String, + fields: Vec, + dataset_version: u64, +} + +fn segment_template(segments: &[IndexMetadata]) -> Result { + let first = segments + .first() + .ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?; + for segment in &segments[1..] { + if segment.name != first.name { + return Err(Error::input_error(format!( + "All segments must share the same index name, got '{}' and '{}'", + first.name, segment.name + ))); + } + if segment.fields != first.fields { + return Err(Error::input_error(format!( + "All segments must target the same field ids, got {:?} and {:?}", + first.fields, segment.fields + ))); + } + if segment.dataset_version != first.dataset_version { + return Err(Error::input_error(format!( + "All segments must share the same dataset version, got {} and {}", + first.dataset_version, segment.dataset_version + ))); + } + } + + Ok(SegmentTemplate { + name: first.name.clone(), + fields: first.fields.clone(), + dataset_version: first.dataset_version, + }) +} + +fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { + let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing fragment coverage metadata", + metadata.uuid + )) + })?; + let index_details = metadata.index_details.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing index details metadata", + metadata.uuid + )) + })?; + + Ok(IndexSegment::new( + metadata.uuid, + fragment_bitmap, + index_details, + metadata.index_version, + )) +} + +fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + IndexMetadata { + uuid, + fields: template.fields.clone(), + name: template.name.clone(), + dataset_version: template.dataset_version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(Utc::now()), + base_id: None, + files: None, + } +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices( mut env: JNIEnv, @@ -1541,20 +1673,6 @@ fn inner_get_version<'local>( version.into_java(env) } -#[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_Dataset_nativeGetVersionId( - mut env: JNIEnv, - java_dataset: JObject, -) -> jlong { - ok_or_throw_with_return!(env, inner_get_version_id(&mut env, java_dataset), -1) as jlong -} - -fn inner_get_version_id(env: &mut JNIEnv, java_dataset: JObject) -> Result { - let dataset_guard = - unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - Ok(dataset_guard.version_id()) -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeGetLatestVersionId( mut env: JNIEnv, @@ -1879,20 +1997,6 @@ fn inner_get_config<'local>( Ok(java_hashmap) } -#[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_Dataset_nativeHasStableRowIds( - mut env: JNIEnv, - java_dataset: JObject, -) -> jboolean { - ok_or_throw_with_return!(env, inner_has_stable_row_ids(&mut env, java_dataset), 0u8) -} - -fn inner_has_stable_row_ids(env: &mut JNIEnv, java_dataset: JObject) -> Result { - let dataset_guard = - unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - Ok(dataset_guard.inner.manifest().uses_stable_row_ids() as u8) -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeGetLanceFileFormatVersion<'local>( mut env: JNIEnv<'local>, @@ -2445,32 +2549,12 @@ fn inner_list_branches<'local>( } else { JObject::null() }; - let jbranch_identifier = env.new_object( - "java/util/ArrayList", - "(I)V", - &[JValue::Int(contents.identifier.version_mapping.len() as i32)], - )?; - for (version, uuid) in contents.identifier.version_mapping.iter() { - let juuid = env.new_string(uuid)?; - let jmapping = env.new_object( - "org/lance/Branch$BranchVersionMapping", - "(JLjava/lang/String;)V", - &[JValue::Long(*version as i64), JValue::Object(&juuid)], - )?; - env.call_method( - &jbranch_identifier, - "add", - "(Ljava/lang/Object;)Z", - &[JValue::Object(&jmapping)], - )?; - } let jbranch = env.new_object( "org/lance/Branch", - "(Ljava/lang/String;Ljava/lang/String;Ljava/util/List;JJI)V", + "(Ljava/lang/String;Ljava/lang/String;JJI)V", &[ JValue::Object(&jname), JValue::Object(&jparent), - JValue::Object(&jbranch_identifier), JValue::Long(contents.parent_version as i64), JValue::Long(contents.create_at as i64), JValue::Int(contents.manifest_size as i32), diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 42444a9aa83..5bd2c562bf2 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -803,14 +803,9 @@ public String uri() { * @return the version id of the dataset */ public long version() { - try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { - Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); - return nativeGetVersionId(); - } + return getVersion().getId(); } - private native long nativeGetVersionId(); - /** * Gets the currently checked out version of the dataset. * @@ -1037,6 +1032,44 @@ public void mergeIndexMetadata( private native void innerMergeIndexMetadata( String indexUUID, int indexType, Optional batchReadHead); + /** + * Build physical vector index segments from previously-created fragment-level index outputs. + * + * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when + * fragmentIds are provided + * @param indexType concrete index type for the staged segments + * @param targetSegmentBytes optional size target for merged physical segments + * @return built physical segment metadata + */ + public List buildIndexSegments( + List segments, IndexType indexType, Optional targetSegmentBytes) { + Preconditions.checkNotNull(segments, "segments cannot be null"); + Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); + Preconditions.checkNotNull(indexType, "indexType cannot be null"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeBuildIndexSegments(segments, indexType.getValue(), targetSegmentBytes); + } + } + + /** + * Build physical vector index segments from previously-created fragment-level index outputs. + * + * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when + * fragmentIds are provided + * @param targetSegmentBytes optional size target for merged physical segments + * @return built physical segment metadata + */ + @Deprecated + public List buildIndexSegments(List segments, Optional targetSegmentBytes) { + throw new IllegalArgumentException( + "buildIndexSegments now requires an explicit index type; call " + + "buildIndexSegments(segments, indexType, targetSegmentBytes)"); + } + + private native List nativeBuildIndexSegments( + List segments, int indexType, Optional targetSegmentBytes); + /** Merge one caller-defined group of existing uncommitted vector index segments. */ public Index mergeExistingIndexSegments(List segments) { Preconditions.checkNotNull(segments, "segments cannot be null"); @@ -1262,11 +1295,7 @@ public List listIndexes() { /** * Get all indexes with full metadata. * - *

Each returned {@link Index} is a physical index segment from the manifest. Use {@link - * #describeIndices()} for the logical-index view. - * - * @return list of Index objects with complete segment metadata, including index type and fragment - * coverage + * @return list of Index objects with complete metadata including index type and fragment coverage */ public List getIndexes() { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { @@ -1362,23 +1391,6 @@ public Map getConfig() { private native Map nativeGetConfig(); - /** - * Check whether the dataset uses stable row IDs. - * - *

Stable row IDs remain constant when rows are moved during compaction. This reads the - * manifest feature flag directly rather than the user-facing config map. - * - * @return true if the dataset was created with stable row IDs enabled - */ - public boolean hasStableRowIds() { - try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { - Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); - return nativeHasStableRowIds(); - } - } - - private native boolean nativeHasStableRowIds(); - /** * Get the Lance file format version of this dataset. * diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index 50499197b34..e60a0f517b4 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -29,6 +29,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -94,11 +95,14 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_FLAT index should not present before commit"); + List builtSegments = + dataset.buildIndexSegments( + List.of(firstSegment, secondSegment), IndexType.IVF_FLAT, Optional.empty()); + assertEquals(2, builtSegments.size()); + List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, - TestVectorDataset.vectorColumnName, - List.of(firstSegment, secondSegment)); + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -184,11 +188,14 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_PQ index should not present before commit"); + List builtSegments = + dataset.buildIndexSegments( + List.of(firstSegment, secondSegment), IndexType.IVF_PQ, Optional.empty()); + assertEquals(2, builtSegments.size()); + List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, - TestVectorDataset.vectorColumnName, - List.of(firstSegment, secondSegment)); + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -258,11 +265,14 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_SQ index should not present before commit"); + List builtSegments = + dataset.buildIndexSegments( + List.of(firstSegment, secondSegment), IndexType.IVF_SQ, Optional.empty()); + assertEquals(2, builtSegments.size()); + List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, - TestVectorDataset.vectorColumnName, - List.of(firstSegment, secondSegment)); + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 79641ab4438..092ff34a84e 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -48,7 +48,7 @@ from .dependencies import numpy as np from .dependencies import pandas as pd from .fragment import DataFile, FragmentMetadata, LanceFragment -from .indices import IndexConfig, SupportedDistributedIndices +from .indices import IndexConfig, IndexSegment, SupportedDistributedIndices from .lance import ( CleanupStats, Compaction, @@ -804,13 +804,12 @@ def checkout_latest(self): def list_indices(self) -> List[Index]: """ - Returns physical index segment information for all indices in the dataset. + Returns index information for all indices in the dataset. This method is deprecated as it requires loading the statistics for each index - which can be a very expensive operation. It also exposes physical index - segments directly. Instead use describe_indices() for logical index - descriptions and index_statistics() to get the statistics for individual - indexes of interest. + which can be a very expensive operation. Instead use describe_indices() to + list index information and index_statistics() to get the statistics for + individual indexes of interest. """ warnings.warn( "The 'list_indices' method is deprecated. It may be removed in a future " @@ -821,7 +820,7 @@ def list_indices(self) -> List[Index]: return self._ds.load_indices() def describe_indices(self) -> List[IndexDescription]: - """Returns logical index information aggregated across all segments.""" + """Returns index information for all indices in the dataset.""" return self._ds.describe_indices() def index_statistics(self, index_name: str) -> Dict[str, Any]: @@ -3568,7 +3567,7 @@ def create_index( This enables distributed/fragment-level indexing. When provided, the method creates one segment but does not commit the index to the dataset. The returned metadata can be passed to - optionally merged with ``merge_existing_index_segments(...)`` + ``create_index_segment_builder().with_index_type(...).with_segments(...)`` and then committed with ``commit_existing_index_segments(...)``. index_uuid : str, optional A UUID to use for the segment written by this call. @@ -3752,9 +3751,10 @@ def create_index_uncommitted( 1. run :meth:`create_index_uncommitted` on each worker with that worker's assigned ``fragment_ids`` 2. collect the returned :class:`Index` objects - 3. optionally merge one or more caller-defined groups with - :meth:`merge_existing_index_segments` - 4. commit the final segment list with + 3. call :meth:`IndexSegmentBuilder.with_index_type` with the concrete + distributed index type + 4. pass them to :meth:`IndexSegmentBuilder.with_segments` + 5. build one or more physical segments and commit them with :meth:`commit_existing_index_segments` Parameters are the same as :meth:`create_index`, with one additional @@ -3835,9 +3835,9 @@ def merge_index_metadata( Merge distributed scalar index metadata. Vector distributed indexing no longer uses this API. For vector indices, - build segments with :meth:`create_index_uncommitted`, optionally merge - caller-defined groups with :meth:`merge_existing_index_segments`, and - publish them with :meth:`commit_existing_index_segments`. + build segments with :meth:`create_index_uncommitted`, plan or + merge them with :meth:`create_index_segment_builder`, and publish them + with :meth:`commit_existing_index_segments`. This method does NOT commit changes. @@ -3874,6 +3874,17 @@ def merge_index_metadata( self._ds.merge_index_metadata(index_uuid, t, batch_readhead, progress_callback) return None + def create_index_segment_builder(self): + """ + Create a builder for turning existing segments into physical segments. + + Provide the segment metadata returned by + :meth:`create_index_uncommitted` through + :meth:`IndexSegmentBuilder.with_segments`, and declare the segment type + with :meth:`IndexSegmentBuilder.with_index_type`. + """ + return self._ds.create_index_segment_builder() + def merge_existing_index_segments(self, segments: List[Index]) -> Index: """ Merge one caller-defined group of existing uncommitted segments. @@ -3881,7 +3892,7 @@ def merge_existing_index_segments(self, segments: List[Index]) -> Index: return self._ds.merge_existing_index_segments(segments) def commit_existing_index_segments( - self, index_name: str, column: str, segments: List[Index] + self, index_name: str, column: str, segments: List[IndexSegment] ) -> LanceDataset: """ Commit built index segments as one logical index. @@ -4692,7 +4703,6 @@ class Tag(TypedDict): class Branch(TypedDict): parent_branch: Optional[str] - branch_identifier: List[Tuple[int, str]] parent_version: int create_at: int manifest_size: int @@ -4746,7 +4756,6 @@ class Index: created_at: Optional[datetime] = None base_id: Optional[int] = None files: Optional[List["IndexFile"]] = None - index_details: Optional[tuple[str, bytes]] = None class AutoCleanupConfig(TypedDict): diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 82eaae3c97c..8977be0e2d6 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -62,7 +62,8 @@ from .fragment import ( RowIdMeta as RowIdMeta, ) from .indices import IndexDescription as IndexDescription -from .indices import IndexSegmentDescription as IndexSegmentDescription +from .indices import IndexSegment as IndexSegment +from .indices import IndexSegmentPlan as IndexSegmentPlan from .lance import PySearchFilter from .optimize import ( Compaction as Compaction, @@ -190,6 +191,14 @@ class LanceColumnStatistics: class _Session: def size_bytes(self) -> int: ... +class IndexSegmentBuilder: + def with_index_type(self, index_type: str) -> Self: ... + def with_segments(self, segments: List[Index]) -> Self: ... + def with_target_segment_bytes(self, bytes: int) -> Self: ... + def plan(self) -> List[IndexSegmentPlan]: ... + def build(self, plan: IndexSegmentPlan) -> IndexSegment: ... + def build_all(self) -> List[IndexSegment]: ... + class LanceBlobFile: def close(self): ... def is_closed(self) -> bool: ... @@ -390,9 +399,10 @@ class _Dataset: batch_readhead: Optional[int] = None, progress_callback: Optional[Callable[[IndexProgress], None]] = None, ): ... + def create_index_segment_builder(self) -> IndexSegmentBuilder: ... def merge_existing_index_segments(self, segments: List[Index]) -> Index: ... def commit_existing_index_segments( - self, index_name: str, column: str, segments: List[Index] + self, index_name: str, column: str, segments: List[IndexSegment] ) -> None: ... def count_fragments(self) -> int: ... def num_small_files(self, max_rows_per_group: int) -> int: ... diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index 152ea1d10b2..fc5d03b80bd 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -21,6 +21,21 @@ class IndexConfig: index_type: str config: str +class IndexSegment: + uuid: str + fragment_ids: set[int] + index_version: int + + def __repr__(self) -> str: ... + +class IndexSegmentPlan: + segment: IndexSegment + segments: list[object] + estimated_bytes: int + requested_index_type: Optional[str] + + def __repr__(self) -> str: ... + def train_ivf_model( dataset, column: str, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 5371aa1f2a7..cb0ea584220 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1700,22 +1700,6 @@ def test_optimize_indices(indexed_dataset): assert stats["num_indices"] == 2 -def test_logical_and_physical_index_views(indexed_dataset): - data = create_table() - indexed_dataset = lance.write_dataset(data, indexed_dataset.uri, mode="append") - indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0) - - logical_indices = indexed_dataset.describe_indices() - assert len(logical_indices) == 1 - assert logical_indices[0].name == "vector_idx" - assert len(logical_indices[0].segments) == 2 - assert all(segment.fragment_ids for segment in logical_indices[0].segments) - - stats = indexed_dataset.stats.index_stats("vector_idx") - assert stats["num_segments"] == stats["num_indices"] == 2 - assert stats["segments"] == stats["indices"] - - @pytest.mark.skip(reason="retrain is deprecated") def test_retrain_indices(indexed_dataset): data = create_table() @@ -2205,6 +2189,12 @@ def build_distributed_vector_index( ) ) + segments = ( + dataset.create_index_segment_builder() + .with_index_type(index_type) + .with_segments(segments) + .build_all() + ) return dataset.commit_existing_index_segments(f"{column}_idx", column, segments) @@ -2576,6 +2566,12 @@ def test_metadata_merge_pq_success(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) + segments = ( + ds.create_index_segment_builder() + .with_index_type("IVF_PQ") + .with_segments(segments) + .build_all() + ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2614,6 +2610,12 @@ def test_distributed_workflow_merge_and_search(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) + segments = ( + ds.create_index_segment_builder() + .with_index_type("IVF_PQ") + .with_segments(segments) + .build_all() + ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2649,6 +2651,12 @@ def test_vector_merge_two_shards_success_flat(tmp_path): ivf_centroids=preprocessed["ivf_centroids"], pq_codebook=preprocessed["pq_codebook"], ) + segments = ( + ds.create_index_segment_builder() + .with_index_type("IVF_FLAT") + .with_segments(segments) + .build_all() + ) ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) result = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2701,6 +2709,12 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): ds.create_index_uncommitted(**kwargs1), ds.create_index_uncommitted(**kwargs2), ] + segments = ( + ds.create_index_segment_builder() + .with_index_type(index_type) + .with_segments(segments) + .build_all() + ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) @@ -2761,16 +2775,21 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): kwargs2["pq_codebook"] = pre["pq_codebook"] segment2 = ds.create_index_uncommitted(**kwargs2) - merged_segment = ds.merge_existing_index_segments([segment1, segment2]) - ds = _commit_segments_helper(ds, [merged_segment], column="vector") + segments = ( + ds.create_index_segment_builder() + .with_index_type(index_type) + .with_segments([segment1, segment2]) + .build_all() + ) + ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) assert 0 < len(results) <= 5 -def test_merge_existing_index_segments_builds_vector_segment(tmp_path): - ds = _make_sample_dataset_base(tmp_path, "merge_existing_segments_ds", 2000, 128) +def test_index_segment_builder_builds_vector_segments(tmp_path): + ds = _make_sample_dataset_base(tmp_path, "segment_builder_ds", 2000, 128) frags = ds.get_fragments() assert len(frags) >= 2 builder = IndicesBuilder(ds, "vector") @@ -2796,14 +2815,19 @@ def test_merge_existing_index_segments_builds_vector_segment(tmp_path): ) for fragment in frags[:2] ] - assert all(segment.index_details is not None for segment in segments) - merged_segment = ds.merge_existing_index_segments(segments) - assert merged_segment.fragment_ids is not None - assert sorted(merged_segment.fragment_ids) == sorted( - [fragment.fragment_id for fragment in frags[:2]] + segment_builder = ( + ds.create_index_segment_builder() + .with_index_type("IVF_FLAT") + .with_segments(segments) ) - ds = ds.commit_existing_index_segments("vector_idx", "vector", [merged_segment]) + plans = segment_builder.plan() + assert len(plans) == 2 + assert all(len(plan.segments) == 1 for plan in plans) + + segments = segment_builder.build_all() + assert len(segments) == 2 + ds = ds.commit_existing_index_segments("vector_idx", "vector", segments) q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2864,6 +2888,12 @@ def build_distributed_ivf_pq(ds_copy, shard_order): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) + segments = ( + ds_copy.create_index_segment_builder() + .with_index_type("IVF_PQ") + .with_segments(segments) + .build_all() + ) return _commit_segments_helper(ds_copy, segments, column="vector") except ValueError as e: raise e diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 1885b78bd64..f609eb4f2d6 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -93,7 +93,7 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; -use crate::indices::{PyIndexConfig, PyIndexDescription}; +use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment, PyIndexSegmentPlan}; use crate::namespace::extract_namespace_arc; use crate::rt; use crate::scanner::ScanStatistics; @@ -354,6 +354,106 @@ impl MergeInsertBuilder { } } +#[pyclass(name = "IndexSegmentBuilder", module = "lance", subclass)] +#[derive(Clone)] +pub struct PyIndexSegmentBuilder { + dataset: Arc, + index_type: Option, + segments: Vec, + target_segment_bytes: Option, +} + +impl PyIndexSegmentBuilder { + fn builder(&self) -> ::IndexSegmentBuilder<'_> { + let mut builder = self + .dataset + .create_index_segment_builder() + .with_segments(self.segments.clone()); + if let Some(index_type) = self.index_type { + builder = builder.with_index_type(index_type); + } + if let Some(target_segment_bytes) = self.target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + builder + } +} + +#[pymethods] +impl PyIndexSegmentBuilder { + fn with_index_type<'a>( + mut slf: PyRefMut<'a, Self>, + index_type: &str, + ) -> PyResult> { + let normalized = index_type.to_uppercase(); + slf.index_type = Some(match normalized.as_str() { + "INVERTED" | "FTS" => IndexType::Inverted, + "VECTOR" => IndexType::Vector, + "IVF_FLAT" => IndexType::IvfFlat, + "IVF_PQ" => IndexType::IvfPq, + "IVF_SQ" => IndexType::IvfSq, + "IVF_RQ" => IndexType::IvfRq, + "IVF_HNSW_FLAT" => IndexType::IvfHnswFlat, + "IVF_HNSW_PQ" => IndexType::IvfHnswPq, + "IVF_HNSW_SQ" => IndexType::IvfHnswSq, + _ => { + return Err(PyValueError::new_err(format!( + "Unsupported index type for segment builder: {index_type}" + ))); + } + }); + Ok(slf) + } + + fn with_segments<'a>( + mut slf: PyRefMut<'a, Self>, + segments: &Bound<'_, PyAny>, + ) -> PyResult> { + let mut indices = Vec::new(); + for item in segments.try_iter()? { + indices.push(item?.extract::>()?.0); + } + slf.segments = indices; + Ok(slf) + } + + fn with_target_segment_bytes<'a>( + mut slf: PyRefMut<'a, Self>, + bytes: u64, + ) -> PyResult> { + slf.target_segment_bytes = Some(bytes); + Ok(slf) + } + + fn plan(&self, py: Python<'_>) -> PyResult>> { + let plans = rt() + .block_on(Some(py), self.builder().plan())? + .infer_error()?; + plans + .into_iter() + .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) + .collect() + } + + fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { + let plan = plan.extract::>()?; + let segment = rt() + .block_on(Some(py), self.builder().build(&plan.inner))? + .infer_error()?; + Py::new(py, PyIndexSegment::from_inner(segment)) + } + + fn build_all(&self, py: Python<'_>) -> PyResult>> { + let segments = rt() + .block_on(Some(py), self.builder().build_all())? + .infer_error()?; + segments + .into_iter() + .map(|segment| Py::new(py, PyIndexSegment::from_inner(segment))) + .collect() + } +} + impl MergeInsertBuilder { fn build_stats<'a>(stats: &MergeStats, py: Python<'a>) -> PyResult> { let dict = PyDict::new(py); @@ -1594,7 +1694,7 @@ impl Dataset { /// Fetches the currently checked out version of the dataset. fn version(&self) -> PyResult { - Ok(self.ds.version_id()) + Ok(self.ds.version().version) } fn latest_version(self_: PyRef<'_, Self>) -> PyResult { @@ -1881,7 +1981,6 @@ impl Dataset { for (name, meta) in branches.iter() { let dict = PyDict::new(py); dict.set_item("parent_branch", meta.parent_branch.clone())?; - dict.set_item("branch_identifier", meta.identifier.version_mapping.clone())?; dict.set_item("parent_version", meta.parent_version)?; dict.set_item("create_at", meta.create_at)?; dict.set_item("manifest_size", meta.manifest_size)?; @@ -1916,7 +2015,6 @@ impl Dataset { for (name, meta) in ordered.into_iter() { let dict = PyDict::new(py); dict.set_item("parent_branch", meta.parent_branch.clone())?; - dict.set_item("branch_identifier", meta.identifier.version_mapping.clone())?; dict.set_item("parent_version", meta.parent_version)?; dict.set_item("create_at", meta.create_at)?; dict.set_item("manifest_size", meta.manifest_size)?; @@ -2162,6 +2260,15 @@ impl Dataset { Ok(PyLance(index_metadata)) } + fn create_index_segment_builder(&self) -> PyResult { + Ok(PyIndexSegmentBuilder { + dataset: self.ds.clone(), + index_type: None, + segments: Vec::new(), + target_segment_bytes: None, + }) + } + fn merge_existing_index_segments( &self, segments: Vec>, @@ -2180,16 +2287,16 @@ impl Dataset { &mut self, index_name: &str, column: &str, - segments: Vec>, + segments: Vec>, ) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); + let segments = segments + .into_iter() + .map(|segment| segment.inner.clone()) + .collect(); rt().block_on( None, - new_self.commit_existing_index_segments( - index_name, - column, - segments.into_iter().map(|segment| segment.0).collect(), - ), + new_self.commit_existing_index_segments(index_name, column, segments), )? .infer_error()?; self.ds = Arc::new(new_self); @@ -3298,7 +3405,7 @@ impl Dataset { } else { Ok(Ref::Version( self.ds.manifest.branch.clone(), - Some(self.ds.version_id()), + Some(self.ds.version().version), )) } } diff --git a/python/src/indices.rs b/python/src/indices.rs index 62f3c0c64ec..a778abff758 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -11,6 +11,7 @@ use arrow_data::ArrayData; use chrono::{DateTime, Utc}; use lance::dataset::Dataset as LanceDataset; use lance::index::DatasetIndexExt; +use lance::index::{IndexSegment, IndexSegmentPlan}; use lance::index::vector::ivf::builder::write_vector_storage; use lance::index::vector::pq::build_pq_model_in_fragments; use lance::io::ObjectStore; @@ -35,7 +36,7 @@ use pyo3::{ use lance::index::DatasetIndexInternalExt; use crate::fragment::FileFragment; -use crate::utils::PyJson; +use crate::utils::{PyJson, PyLance}; use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; @@ -61,6 +62,96 @@ impl PyIndexConfig { } } +#[pyclass(name = "IndexSegment", module = "lance.indices")] +#[derive(Debug, Clone)] +pub struct PyIndexSegment { + pub(crate) inner: IndexSegment, +} + +impl PyIndexSegment { + pub(crate) fn from_inner(inner: IndexSegment) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyIndexSegment { + #[getter] + fn uuid(&self) -> String { + self.inner.uuid().to_string() + } + + #[getter] + fn fragment_ids(&self) -> HashSet { + self.inner.fragment_bitmap().iter().collect() + } + + #[getter] + fn index_version(&self) -> i32 { + self.inner.index_version() + } + + fn __repr__(&self) -> String { + format!( + "IndexSegment(uuid={}, fragment_ids={:?}, index_version={})", + self.uuid(), + self.fragment_ids(), + self.index_version() + ) + } +} + +#[pyclass(name = "IndexSegmentPlan", module = "lance.indices")] +#[derive(Debug, Clone)] +pub struct PyIndexSegmentPlan { + pub(crate) inner: IndexSegmentPlan, +} + +impl PyIndexSegmentPlan { + pub(crate) fn from_inner(inner: IndexSegmentPlan) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyIndexSegmentPlan { + #[getter] + fn segment(&self) -> PyIndexSegment { + PyIndexSegment::from_inner(self.inner.segment().clone()) + } + + #[getter] + fn segments(&self) -> Vec> { + self.inner + .segments() + .iter() + .cloned() + .map(PyLance) + .collect() + } + + #[getter] + fn estimated_bytes(&self) -> u64 { + self.inner.estimated_bytes() + } + + #[getter] + fn requested_index_type(&self) -> Option { + self.inner + .requested_index_type() + .map(|index_type| index_type.to_string()) + } + + fn __repr__(&self) -> String { + format!( + "IndexSegmentPlan(segments={}, estimated_bytes={}, requested_index_type={:?})", + self.inner.segments().len(), + self.estimated_bytes(), + self.requested_index_type() + ) + } +} + #[pyclass(name = "IvfModel", module = "lance.indices")] #[derive(Debug, Clone)] pub struct PyIvfModel { @@ -449,7 +540,21 @@ async fn do_load_shuffled_vectors( base_id: None, files: Some(files), }; - ds.commit_existing_index_segments(index_name, column, vec![metadata]) + let segment = IndexSegment::new( + metadata.uuid, + metadata + .fragment_bitmap + .as_ref() + .expect("vector metadata should include fragment coverage") + .iter(), + metadata + .index_details + .as_ref() + .expect("vector metadata should include index details") + .clone(), + metadata.index_version, + ); + ds.commit_existing_index_segments(index_name, column, vec![segment]) .await .infer_error()?; @@ -643,6 +748,8 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { indices.add_wrapped(wrap_pyfunction!(load_shuffled_vectors))?; indices.add_class::()?; indices.add_class::()?; + indices.add_class::()?; + indices.add_class::()?; indices.add_class::()?; indices.add_class::()?; indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?; diff --git a/rust/lance-index/benches/inverted.rs b/rust/lance-index/benches/inverted.rs index 0647cca0c14..207dab61803 100644 --- a/rust/lance-index/benches/inverted.rs +++ b/rust/lance-index/benches/inverted.rs @@ -188,6 +188,7 @@ fn bench_inverted(c: &mut Criterion) { Operator::Or, no_filter.clone(), Arc::new(NoOpMetricsCollector), + None, ) .await .unwrap(), @@ -226,6 +227,7 @@ fn bench_inverted(c: &mut Criterion) { Operator::And, no_filter.clone(), Arc::new(NoOpMetricsCollector), + None, ) .await .unwrap(), diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 4f60ebd05f5..cfec52b0692 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -8,7 +8,11 @@ use std::{ cmp::{Reverse, min}, collections::BinaryHeap, }; -use std::{collections::HashMap, ops::Range, time::Instant}; +use std::{ + collections::{HashMap, HashSet}, + ops::Range, + time::Instant, +}; use crate::metrics::NoOpMetricsCollector; use crate::prefilter::NoFilter; @@ -64,7 +68,6 @@ use super::{ }; use crate::frag_reuse::FragReuseIndex; use crate::pbold; -use crate::scalar::inverted::document_tokenizer::TextTokenizer; use crate::scalar::inverted::scorer::MemBM25Scorer; use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer; use crate::scalar::{ @@ -73,7 +76,6 @@ use crate::scalar::{ }; use crate::{FtsPrewarmOptions, Index}; use crate::{prefilter::PreFilter, scalar::inverted::iter::take_fst_keys}; -use lance_tokenizer::{SimpleTokenizer, TextAnalyzer}; use std::str::FromStr; // Version 0: Arrow TokenSetFormat (legacy) @@ -446,7 +448,6 @@ impl InvertedIndex { pub fn partition_count(&self) -> usize { self.partitions.len() } - /// Returns the set of fragments which are contained in the index, but no longer in the dataset. /// /// Most other indices remove data from deleted fragments when the index updates (copy-on-write). @@ -456,11 +457,50 @@ impl InvertedIndex { &self.deleted_fragments } - // search the documents that contain the query - // return the row ids of the documents sorted by bm25 score - // ref: https://en.wikipedia.org/wiki/Okapi_BM25 - // we first calculate in-partition BM25 scores, - // then re-calculate the scores for the top k documents across all partitions + pub fn bm25_base_scorer(&self, query_tokens: &Tokens) -> MemBM25Scorer { + let scorer = IndexBM25Scorer::new(self.partitions.iter().map(|part| part.as_ref())); + let token_docs = query_tokens + .into_iter() + .map(|token| (token.to_string(), scorer.num_docs_containing_token(token))) + .collect::>(); + MemBM25Scorer::new(scorer.total_tokens(), scorer.num_docs(), token_docs) + } + + pub fn bm25_stats_for_terms(&self, terms: &[String]) -> (u64, usize, Vec) { + let scorer = IndexBM25Scorer::new(self.partitions.iter().map(|part| part.as_ref())); + let token_docs = terms + .iter() + .map(|term| scorer.num_docs_containing_token(term)) + .collect(); + (scorer.total_tokens(), scorer.num_docs(), token_docs) + } + + /// Expand fuzzy query tokens against all partitions in this segment. + pub fn expand_fuzzy_tokens(&self, tokens: &Tokens, params: &FtsSearchParams) -> Result { + let mut expanded_tokens = Vec::new(); + let mut expanded_positions = Vec::new(); + let mut seen = HashSet::new(); + for partition in &self.partitions { + let expanded = partition.expand_fuzzy(tokens, params)?; + for idx in 0..expanded.len() { + let token = expanded.get_token(idx); + if seen.insert(token.to_string()) { + expanded_tokens.push(token.to_string()); + expanded_positions.push(expanded.position(idx)); + } + } + } + Ok(Tokens::with_positions( + expanded_tokens, + expanded_positions, + tokens.token_type().clone(), + )) + } + + /// Search documents that match the query and return row ids sorted by BM25 score. + /// + /// When `base_scorer` is provided, search uses those corpus-level BM25 statistics + /// instead of deriving them from this segment alone. #[instrument(level = "debug", skip_all)] pub async fn bm25_search( &self, @@ -469,7 +509,16 @@ impl InvertedIndex { operator: Operator, prefilter: Arc, metrics: Arc, + base_scorer: Option<&MemBM25Scorer>, ) -> Result<(Vec, Vec)> { + let local_scorer; + let scorer: &dyn Scorer = if let Some(base_scorer) = base_scorer { + base_scorer + } else { + local_scorer = IndexBM25Scorer::new(self.partitions.iter().map(|part| part.as_ref())); + &local_scorer + }; + let limit = params.limit.unwrap_or(usize::MAX); if limit == 0 { return Ok((Vec::new(), Vec::new())); @@ -524,7 +573,6 @@ impl InvertedIndex { }) .collect::>(); let mut parts = stream::iter(parts).buffer_unordered(get_num_compute_intensive_cpus()); - let scorer = IndexBM25Scorer::new(self.partitions.iter().map(|part| part.as_ref())); let mut idf_cache: HashMap = HashMap::new(); while let Some(res) = parts.try_next().await? { if res.candidates.is_empty() { @@ -808,6 +856,7 @@ impl InvertedIndex { Operator::And, Arc::new(NoFilter), Arc::new(NoOpMetricsCollector), + None, ) .boxed() .await?; @@ -4004,7 +4053,7 @@ async fn tokenize_and_count( /// In order to calculate BM25 scores we need to know token counts for the entire corpus. We extract these from the /// counted input of the flat search combined with any counts recorded for the indexed portion. fn initialize_scorer( - index: &Option, + base_scorer: Option<&MemBM25Scorer>, query_tokens: &Tokens, counted_input: &RecordBatch, ) -> MemBM25Scorer { @@ -4012,14 +4061,12 @@ fn initialize_scorer( let mut num_docs = 0; let mut all_token_counts = vec![0; query_tokens.len()]; - if let Some(index) = index { - let index_bm25_scorer = IndexBM25Scorer::new(index.partitions.iter().map(|p| p.as_ref())); + if let Some(base_scorer) = base_scorer { + total_tokens += base_scorer.total_tokens; + num_docs += base_scorer.num_docs; for (token_index, token) in query_tokens.into_iter().enumerate() { - let token_nq = index_bm25_scorer.num_docs_containing_token(token); - all_token_counts[token_index] = token_nq as u64; + all_token_counts[token_index] = base_scorer.num_docs_containing_token(token) as u64; } - total_tokens += index_bm25_scorer.total_tokens(); - num_docs += index_bm25_scorer.num_docs(); } num_docs += counted_input.num_rows(); @@ -4121,15 +4168,11 @@ pub async fn flat_bm25_search_stream( input: SendableRecordBatchStream, doc_col: String, query: String, - index: &Option, + tokenizer: Box, + base_scorer: Option, target_batch_size: usize, ) -> DataFusionResult { - let mut tokenizer = match index { - Some(index) => index.tokenizer(), - None => Box::new(TextTokenizer::new( - TextAnalyzer::builder(SimpleTokenizer::default()).build(), - )), - }; + let mut tokenizer = tokenizer; let query_tokens = Arc::new(collect_query_tokens(&query, &mut tokenizer)); let input_schema = input.schema(); @@ -4157,7 +4200,7 @@ pub async fn flat_bm25_search_stream( tokenize_and_count(chunked, tokenizer, query_tokens.clone(), doc_col_idx).await?; // Phase 3 - Calculate final scores (this is fairly cheap, probably don't need to parallelize) - let scorer = initialize_scorer(index, query_tokens.as_ref(), &counted_input); + let scorer = initialize_scorer(base_scorer.as_ref(), query_tokens.as_ref(), &counted_input); let scores = flat_bm25_score(query_tokens.as_ref(), &counted_input, &scorer)?; // Finally we emit batches according to the target batch size @@ -4818,7 +4861,7 @@ mod tests { let metrics = Arc::new(NoOpMetricsCollector); let (row_ids, scores) = index - .bm25_search(tokens, params, Operator::Or, prefilter, metrics) + .bm25_search(tokens, params, Operator::Or, prefilter, metrics, None) .await .unwrap(); @@ -5183,7 +5226,7 @@ mod tests { let metrics = Arc::new(NoOpMetricsCollector); let (row_ids, scores) = index - .bm25_search(tokens, params, Operator::Or, prefilter, metrics) + .bm25_search(tokens, params, Operator::Or, prefilter, metrics, None) .await .unwrap(); @@ -5278,7 +5321,7 @@ mod tests { let metrics = Arc::new(NoOpMetricsCollector); let (row_ids, _scores) = index - .bm25_search(tokens, params, Operator::And, prefilter, metrics) + .bm25_search(tokens, params, Operator::And, prefilter, metrics, None) .await .unwrap(); diff --git a/rust/lance-index/src/scalar/inverted/scorer.rs b/rust/lance-index/src/scalar/inverted/scorer.rs index 58c0471d262..e3fb81871ef 100644 --- a/rust/lance-index/src/scalar/inverted/scorer.rs +++ b/rust/lance-index/src/scalar/inverted/scorer.rs @@ -67,6 +67,23 @@ impl MemBM25Scorer { } } +impl Scorer for MemBM25Scorer { + fn query_weight(&self, token: &str) -> f32 { + let token_docs = self.num_docs_containing_token(token); + if token_docs == 0 { + return 0.0; + } + idf(token_docs, self.num_docs) + } + + fn doc_weight(&self, freq: u32, doc_tokens: u32) -> f32 { + let freq = freq as f32; + let doc_tokens = doc_tokens as f32; + let doc_norm = K1 * (1.0 - B + B * doc_tokens / self.avg_doc_length()); + (K1 + 1.0) * freq / (freq + doc_norm) + } +} + pub struct IndexBM25Scorer<'a> { partitions: Vec<&'a InvertedPartition>, num_docs: usize, diff --git a/rust/lance/benches/fts_search.rs b/rust/lance/benches/fts_search.rs index 7ea96bf29b4..d56dec66c84 100644 --- a/rust/lance/benches/fts_search.rs +++ b/rust/lance/benches/fts_search.rs @@ -9,15 +9,30 @@ /// /// This benchmark is primarily intended for developers to use for profiling and debugging. The python /// benchmark is more comprehensive and will cover regression testing. +use std::{collections::BTreeSet, env, sync::Arc}; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, StringArray}; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use futures::TryStreamExt; -use lance::Dataset; -use lance_index::scalar::FullTextSearchQuery; +use lance::{Dataset, dataset::WriteParams, index::DatasetIndexExt}; +use lance_index::{ + IndexType, + scalar::{FullTextSearchQuery, inverted::tokenizer::InvertedIndexParams}, +}; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; -use std::env; +use tempfile::TempDir; const WIKIPEDIA_DATASET_ENV_VAR: &str = "LANCE_WIKIPEDIA_DATASET_PATH"; +const INDEX_NAME: &str = "segmented_fts"; +const PARTITION_COMPARE_INDEX_NAME: &str = "partition_shape_fts"; +const INDEXED_FRAGMENT_COUNT: usize = 12; +const UNINDEXED_FRAGMENT_COUNT: usize = 1; +const ROWS_PER_FRAGMENT: usize = 64; +const PARTITION_COMPARE_INDEXED_FRAGMENT_COUNT: usize = 64; +const PARTITION_COMPARE_UNINDEXED_FRAGMENT_COUNT: usize = 1; +const PARTITION_COMPARE_ROWS_PER_FRAGMENT: usize = 512; +const PARTITION_COMPARE_TOKEN_REPEAT: usize = 1024; /// Get the Wikipedia dataset path from environment variable. /// Panics if the environment variable is not set. @@ -30,6 +45,261 @@ fn get_wikipedia_dataset_path() -> String { }) } +struct BenchDataset { + _tmpdir: TempDir, + dataset: Dataset, +} + +fn create_fragment_batch(fragment_id: usize) -> RecordBatch { + let start = (fragment_id * ROWS_PER_FRAGMENT) as i32; + let ids = Arc::new(Int32Array::from_iter_values( + start..start + ROWS_PER_FRAGMENT as i32, + )); + let texts = Arc::new(StringArray::from_iter_values((0..ROWS_PER_FRAGMENT).map( + |row| { + let term = match (fragment_id + row) % 4 { + 0 => "alpha", + 1 => "beta", + 2 => "gamma", + _ => "delta", + }; + format!("shared {term} fragment-{fragment_id} row-{row}") + }, + ))); + RecordBatch::try_from_iter(vec![("id", ids as ArrayRef), ("text", texts as ArrayRef)]).unwrap() +} + +fn create_partition_compare_fragment_batch(fragment_id: usize) -> RecordBatch { + let start = (fragment_id * PARTITION_COMPARE_ROWS_PER_FRAGMENT) as i32; + let ids = Arc::new(Int32Array::from_iter_values( + start..start + PARTITION_COMPARE_ROWS_PER_FRAGMENT as i32, + )); + let texts = Arc::new(StringArray::from_iter_values( + (0..PARTITION_COMPARE_ROWS_PER_FRAGMENT).map(|row| { + let term = match (fragment_id + row) % 4 { + 0 => "alpha", + 1 => "beta", + 2 => "gamma", + _ => "delta", + }; + let unique = format!("fragment-{fragment_id} row-{row}"); + let repeated = std::iter::repeat_n( + format!("shared {term} {unique}"), + PARTITION_COMPARE_TOKEN_REPEAT, + ) + .collect::>() + .join(" "); + format!("{repeated} tail-{term}") + }), + )); + RecordBatch::try_from_iter(vec![("id", ids as ArrayRef), ("text", texts as ArrayRef)]).unwrap() +} + +fn grouped_fragment_ids(total_fragments: usize, segment_count: usize) -> Vec> { + let fragments_per_segment = total_fragments / segment_count; + (0..segment_count) + .map(|segment_idx| { + let start = segment_idx * fragments_per_segment; + let end = start + fragments_per_segment; + (start..end).map(|fragment_id| fragment_id as u32).collect() + }) + .collect() +} + +async fn build_segmented_fts_dataset(segment_count: usize) -> BenchDataset { + let tmpdir = TempDir::new().unwrap(); + let uri = format!("file://{}", tmpdir.path().display()); + let batches = RecordBatchIterator::new( + (0..(INDEXED_FRAGMENT_COUNT + UNINDEXED_FRAGMENT_COUNT)) + .map(|fragment_id| Ok(create_fragment_batch(fragment_id))) + .collect::>(), + create_fragment_batch(0).schema(), + ); + let mut dataset = Dataset::write( + batches, + &uri, + Some(WriteParams { + max_rows_per_file: ROWS_PER_FRAGMENT, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!( + dataset.get_fragments().len(), + INDEXED_FRAGMENT_COUNT + UNINDEXED_FRAGMENT_COUNT + ); + + let params = InvertedIndexParams::default(); + let mut staged_segments = Vec::with_capacity(segment_count); + for fragment_ids in grouped_fragment_ids(INDEXED_FRAGMENT_COUNT, segment_count) { + let segment = dataset + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name(INDEX_NAME.to_string()) + .fragments(fragment_ids) + .execute_uncommitted() + .await + .unwrap(); + staged_segments.push(segment); + } + let segments = dataset + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(staged_segments) + .build_all() + .await + .unwrap(); + dataset + .commit_existing_index_segments(INDEX_NAME, "text", segments) + .await + .unwrap(); + + BenchDataset { + _tmpdir: tmpdir, + dataset, + } +} + +fn count_partitions(segment: &lance_table::format::IndexMetadata) -> usize { + segment + .files + .as_ref() + .map(|files| { + files + .iter() + .filter_map(|file| { + file.path + .strip_prefix("part_") + .and_then(|path| path.split_once('_')) + .map(|(partition_id, _)| partition_id.to_string()) + }) + .collect::>() + .len() + }) + .unwrap_or(0) +} + +async fn build_partition_compare_dataset_with_memory_limit( + partition_count: usize, + segmented: bool, + memory_limit_mb: u64, +) -> BenchDataset { + let tmpdir = TempDir::new().unwrap(); + let uri = format!("file://{}", tmpdir.path().display()); + let batches = RecordBatchIterator::new( + (0..(PARTITION_COMPARE_INDEXED_FRAGMENT_COUNT + + PARTITION_COMPARE_UNINDEXED_FRAGMENT_COUNT)) + .map(|fragment_id| Ok(create_partition_compare_fragment_batch(fragment_id))) + .collect::>(), + create_partition_compare_fragment_batch(0).schema(), + ); + let mut dataset = Dataset::write( + batches, + &uri, + Some(WriteParams { + max_rows_per_file: PARTITION_COMPARE_ROWS_PER_FRAGMENT, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragment_groups = if segmented { + grouped_fragment_ids(PARTITION_COMPARE_INDEXED_FRAGMENT_COUNT, partition_count) + } else { + vec![(0..PARTITION_COMPARE_INDEXED_FRAGMENT_COUNT as u32).collect()] + }; + let params = InvertedIndexParams::default() + .with_position(true) + .num_workers(1) + .memory_limit_mb(memory_limit_mb); + + let mut staged_segments = Vec::with_capacity(fragment_groups.len()); + for fragment_ids in fragment_groups { + let segment = dataset + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name(PARTITION_COMPARE_INDEX_NAME.to_string()) + .fragments(fragment_ids) + .execute_uncommitted() + .await + .unwrap(); + staged_segments.push(segment); + } + let segments = dataset + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(staged_segments) + .build_all() + .await + .unwrap(); + dataset + .commit_existing_index_segments(PARTITION_COMPARE_INDEX_NAME, "text", segments) + .await + .unwrap(); + + let committed_segments = dataset + .load_indices_by_name(PARTITION_COMPARE_INDEX_NAME) + .await + .unwrap(); + if segmented { + assert_eq!(committed_segments.len(), partition_count); + for segment in &committed_segments { + assert_eq!( + count_partitions(segment), + 1, + "expected each segmented FTS segment to have exactly one partition" + ); + } + } + + BenchDataset { + _tmpdir: tmpdir, + dataset, + } +} + +async fn build_partition_compare_dataset(partition_count: usize, segmented: bool) -> BenchDataset { + if segmented { + return build_partition_compare_dataset_with_memory_limit(partition_count, true, 512).await; + } + + let mut observed = Vec::new(); + for memory_limit_mb in [ + 512, 256, 192, 160, 128, 96, 80, 64, 56, 48, 40, 36, 32, 28, 24, 20, 18, 16, 14, 12, 10, 9, + 8, 7, 6, 5, 4, 3, 2, 1, + ] { + let bench_dataset = build_partition_compare_dataset_with_memory_limit( + partition_count, + false, + memory_limit_mb, + ) + .await; + let committed_segments = bench_dataset + .dataset + .load_indices_by_name(PARTITION_COMPARE_INDEX_NAME) + .await + .unwrap(); + let actual_partition_count = if committed_segments.len() == 1 { + count_partitions(&committed_segments[0]) + } else { + 0 + }; + observed.push(( + memory_limit_mb, + committed_segments.len(), + actual_partition_count, + )); + if committed_segments.len() == 1 && actual_partition_count == partition_count { + return bench_dataset; + } + } + + panic!( + "failed to build 1 segment x {partition_count} partitions for partition-shape benchmark: {observed:?}" + ); +} + /// Benchmark full text search on Wikipedia dataset with different K values fn bench_fts_search(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -86,19 +356,117 @@ fn bench_fts_search(c: &mut Criterion) { group.finish(); } +fn bench_segmented_fts_search(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let bench_datasets = [1_usize, 2, 4, 6] + .into_iter() + .map(|segment_count| { + ( + segment_count, + rt.block_on(build_segmented_fts_dataset(segment_count)), + ) + }) + .collect::>(); + + let mut group = c.benchmark_group("fts_search_segment_count"); + for (segment_count, bench_dataset) in &bench_datasets { + group.bench_with_input( + BenchmarkId::from_parameter(segment_count), + segment_count, + |b, _| { + b.iter(|| { + rt.block_on(async { + let mut scanner = bench_dataset.dataset.scan(); + let query = FullTextSearchQuery::new("shared alpha".to_string()) + .with_column("text".to_string()) + .unwrap(); + let mut stream = scanner + .full_text_search(query) + .unwrap() + .limit(Some(20), None) + .unwrap() + .project(&["_rowid"]) + .unwrap() + .try_into_stream() + .await + .unwrap(); + + let mut num_rows = 0; + while let Some(batch) = stream.try_next().await.unwrap() { + num_rows += batch.num_rows(); + } + assert!(num_rows <= 20); + }) + }); + }, + ); + } + group.finish(); +} + +fn bench_fts_segment_vs_partition_shape(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let bench_datasets = [16_usize] + .into_iter() + .flat_map(|count| { + [ + ( + format!("{count}_segments_x_1_partition"), + rt.block_on(build_partition_compare_dataset(count, true)), + ), + ( + format!("1_segment_x_{count}_partitions"), + rt.block_on(build_partition_compare_dataset(count, false)), + ), + ] + }) + .collect::>(); + + let mut group = c.benchmark_group("fts_search_segment_vs_partition_shape"); + for (shape, bench_dataset) in &bench_datasets { + group.bench_with_input(BenchmarkId::from_parameter(shape), shape, |b, _| { + b.iter(|| { + rt.block_on(async { + let mut scanner = bench_dataset.dataset.scan(); + let query = FullTextSearchQuery::new("shared alpha".to_string()) + .with_column("text".to_string()) + .unwrap(); + let mut stream = scanner + .full_text_search(query) + .unwrap() + .limit(Some(20), None) + .unwrap() + .project(&["_rowid"]) + .unwrap() + .try_into_stream() + .await + .unwrap(); + + let mut num_rows = 0; + while let Some(batch) = stream.try_next().await.unwrap() { + num_rows += batch.num_rows(); + } + assert!(num_rows <= 20); + }) + }); + }); + } + group.finish(); +} + #[cfg(target_os = "linux")] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10) .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = bench_fts_search + targets = bench_fts_search, bench_segmented_fts_search, bench_fts_segment_vs_partition_shape ); #[cfg(not(target_os = "linux"))] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10); - targets = bench_fts_search + targets = bench_fts_search, bench_segmented_fts_search, bench_fts_segment_vs_partition_shape ); criterion_main!(benches); diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 856f7361892..b3b235d8015 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -241,7 +241,7 @@ mod tests { .unwrap(); } - let segments = vec![ + let segments = [ IndexMetadata { uuid: first_segment_uuid, fragment_bitmap: Some(std::iter::once(target_fragments[0].id() as u32).collect()), @@ -254,6 +254,26 @@ mod tests { }, ]; + let segments = segments + .iter() + .map(|segment| { + crate::index::IndexSegment::new( + segment.uuid, + segment + .fragment_bitmap + .as_ref() + .expect("test segment metadata should have fragment coverage") + .iter(), + segment + .index_details + .as_ref() + .expect("test segment metadata should have index details") + .clone(), + segment.index_version, + ) + }) + .collect::>(); + dataset .commit_existing_index_segments("vector_idx", "vector", segments) .await diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 5f40c79b4f9..1617d2dcaff 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -81,6 +81,7 @@ use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; use crate::dataset::utils::SchemaAdapter; use crate::index::DatasetIndexInternalExt; +use crate::index::scalar::inverted::{load_segment_details, load_segments}; use crate::index::vector::utils::{ default_distance_type_for, get_vector_dim, get_vector_type, validate_distance_type_for, }; @@ -3310,20 +3311,14 @@ impl Scanner { "the column must be specified in the query".to_string(), ))?; - let index_meta = self - .dataset - .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) + let segments = load_segments(&self.dataset, &column) .await? .ok_or(Error::invalid_input(format!( "No Inverted index found for column {}", column )))?; + let details = load_segment_details(&self.dataset, &column, &segments).await?; - let details_any = - crate::index::scalar::fetch_index_details(&self.dataset, &column, &index_meta).await?; - let details = details_any - .as_ref() - .to_msg::()?; if !details.with_position { return Err(Error::invalid_input("position is not found but required for phrase queries, try recreating the index with position" .to_string())); @@ -4890,6 +4885,13 @@ pub mod test_dataset { .iter() .map(|segment| segment.uuid) .collect::>(); + let segments = self + .dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(segments) + .build_all() + .await?; self.dataset .commit_existing_index_segments("idx", "vec", segments) .await?; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..c6d1f3c6dad 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -81,7 +81,7 @@ use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::optimize::RemappedIndex; use crate::dataset::optimize::remapping::RemapResult; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; -pub use crate::index::api::DatasetIndexExt; +pub use crate::index::api::{DatasetIndexExt, IndexSegment, IndexSegmentPlan}; use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index}; use crate::index::mem_wal::open_mem_wal_index; pub use crate::index::prefilter::{FilterLoader, PreFilter}; @@ -126,6 +126,59 @@ fn validate_segment_metadata(index_name: &str, segments: &[IndexMetadata]) -> Re Ok(()) } +pub(crate) async fn build_index_metadata_from_segments( + dataset: &Dataset, + index_name: &str, + field_id: i32, + segments: Vec, +) -> Result> { + if segments.is_empty() { + return Err(Error::invalid_input( + "CreateIndex: at least one index segment is required".to_string(), + )); + } + + let mut seen_segment_ids = HashSet::with_capacity(segments.len()); + let mut covered_fragments = RoaringBitmap::new(); + for segment in &segments { + if !seen_segment_ids.insert(segment.uuid()) { + return Err(Error::invalid_input(format!( + "CreateIndex: duplicate segment uuid {} for index '{}'", + segment.uuid(), + index_name + ))); + } + if !covered_fragments.is_disjoint(segment.fragment_bitmap()) { + return Err(Error::invalid_input(format!( + "CreateIndex: overlapping fragment coverage in segment set for index '{}'", + index_name + ))); + } + covered_fragments |= segment.fragment_bitmap().clone(); + } + + let mut new_indices = Vec::with_capacity(segments.len()); + for segment in segments { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + let index_dir = dataset.indices_dir().child(uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; + new_indices.push(IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(files), + }); + } + + Ok(new_indices) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub struct ScalarIndexCacheKey<'a> { @@ -679,6 +732,7 @@ impl IndexDescription for IndexDescriptionImpl { #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; + type IndexSegmentBuilder<'a> = create::IndexSegmentBuilder<'a>; /// Create a builder for creating an index on columns. /// @@ -726,6 +780,10 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } + fn create_index_segment_builder<'a>(&'a self) -> create::IndexSegmentBuilder<'a> { + create::IndexSegmentBuilder::new(self) + } + #[instrument(skip_all)] async fn create_index( &mut self, @@ -958,7 +1016,7 @@ impl DatasetIndexExt for Dataset { &mut self, index_name: &str, column: &str, - segments: Vec, + segments: Vec, ) -> Result<()> { let Some(field) = self.schema().field(column) else { return Err(Error::index(format!( @@ -966,20 +1024,8 @@ impl DatasetIndexExt for Dataset { ))); }; - validate_segment_metadata(index_name, &segments)?; - - let mut new_indices = Vec::with_capacity(segments.len()); - for mut segment in segments { - if segment.fields != [field.id] { - return Err(Error::invalid_input(format!( - "CreateIndex: segment {} was built for fields {:?}, expected [{}]", - segment.uuid, segment.fields, field.id - ))); - } - segment.name = index_name.to_string(); - segment.dataset_version = self.manifest.version; - new_indices.push(segment); - } + let new_indices = + build_index_metadata_from_segments(self, index_name, field.id, segments).await?; let transaction = Transaction::new( self.manifest.version, @@ -2303,6 +2349,23 @@ mod tests { } } + fn segment_from_metadata(metadata: &IndexMetadata) -> IndexSegment { + IndexSegment::new( + metadata.uuid, + metadata + .fragment_bitmap + .as_ref() + .expect("test segment metadata should have fragment coverage") + .iter(), + metadata + .index_details + .as_ref() + .expect("test segment metadata should have index details") + .clone(), + metadata.index_version, + ) + } + async fn write_fragmented_vector_dataset(uri: &str, dimension: i32) -> Dataset { let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), @@ -2458,6 +2521,13 @@ mod tests { .iter() .map(|segment| segment.uuid) .collect::>(); + let segments = dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(segments) + .build_all() + .await + .unwrap(); dataset .commit_existing_index_segments(index_name, column, segments) .await @@ -5881,7 +5951,7 @@ mod tests { .commit_existing_index_segments( "vector_idx", "vector", - vec![seg0.clone(), seg1.clone()], + vec![segment_from_metadata(&seg0), segment_from_metadata(&seg1)], ) .await .unwrap(); @@ -5948,11 +6018,11 @@ mod tests { "vector_idx", "vector", vec![ - base.clone(), - IndexMetadata { + segment_from_metadata(&base), + segment_from_metadata(&IndexMetadata { fragment_bitmap: Some(std::iter::once(1_u32).collect()), ..base - }, + }), ], ) .await @@ -6022,7 +6092,11 @@ mod tests { .await; let err = dataset - .commit_existing_index_segments("vector_idx", "vector", vec![seg0, seg1]) + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![segment_from_metadata(&seg0), segment_from_metadata(&seg1)], + ) .await .unwrap_err(); assert!(err.to_string().contains("overlapping fragment coverage")); diff --git a/rust/lance/src/index/api.rs b/rust/lance/src/index/api.rs index 2b61cdfe12b..747a83b0f5c 100644 --- a/rust/lance/src/index/api.rs +++ b/rust/lance/src/index/api.rs @@ -7,13 +7,133 @@ use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use lance_index::{IndexParams, IndexType, PrewarmOptions, optimize::OptimizeOptions}; use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; +use uuid::Uuid; use crate::{Error, Result}; +/// A single physical segment of a logical index. +/// +/// Each segment is stored independently and will become one manifest entry when committed. +/// The logical index identity (name / target column / dataset version) is provided separately +/// by the commit API. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegment { + /// Unique ID of the physical segment. + uuid: Uuid, + /// The fragments covered by this segment. + fragment_bitmap: RoaringBitmap, + /// Metadata specific to the index type. + index_details: Arc, + /// The on-disk index version for this segment. + index_version: i32, +} + +impl IndexSegment { + /// Create a fully described segment with the given UUID, fragment coverage, and index + /// metadata. + pub fn new( + uuid: Uuid, + fragment_bitmap: I, + index_details: Arc, + index_version: i32, + ) -> Self + where + I: IntoIterator, + { + Self { + uuid, + fragment_bitmap: fragment_bitmap.into_iter().collect(), + index_details, + index_version, + } + } + + /// Return the UUID of this segment. + pub fn uuid(&self) -> Uuid { + self.uuid + } + + /// Return the fragment coverage of this segment. + pub fn fragment_bitmap(&self) -> &RoaringBitmap { + &self.fragment_bitmap + } + + /// Return the serialized index details for this segment. + pub fn index_details(&self) -> &Arc { + &self.index_details + } + + /// Return the on-disk index version for this segment. + pub fn index_version(&self) -> i32 { + self.index_version + } + + /// Consume the segment and return its component parts. + pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc, i32) { + ( + self.uuid, + self.fragment_bitmap, + self.index_details, + self.index_version, + ) + } +} + +/// A plan for building one physical segment from one or more existing +/// uncommitted index segments. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegmentPlan { + segment: IndexSegment, + segments: Vec, + estimated_bytes: u64, + requested_index_type: Option, +} + +impl IndexSegmentPlan { + /// Create a plan for one built segment. + pub fn new( + segment: IndexSegment, + segments: Vec, + estimated_bytes: u64, + requested_index_type: Option, + ) -> Self { + Self { + segment, + segments, + estimated_bytes, + requested_index_type, + } + } + + /// Return the segment metadata that should be committed after this plan is built. + pub fn segment(&self) -> &IndexSegment { + &self.segment + } + + /// Return the input segment metadata that should be combined into the segment. + pub fn segments(&self) -> &[IndexMetadata] { + &self.segments + } + + /// Return the estimated number of bytes covered by this plan. + pub fn estimated_bytes(&self) -> u64 { + self.estimated_bytes + } + + /// Return the requested logical index type, if one was supplied to the planner. + pub fn requested_index_type(&self) -> Option { + self.requested_index_type + } +} + /// Extends [`crate::Dataset`] with secondary index APIs. #[async_trait] pub trait DatasetIndexExt { type IndexBuilder<'a> + where + Self: 'a; + type IndexSegmentBuilder<'a> where Self: 'a; @@ -28,6 +148,19 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; + /// Create a builder for building physical index segments from uncommitted + /// index outputs. + /// + /// The caller supplies the uncommitted index metadata returned by + /// `execute_uncommitted()` and then declares the concrete index type with + /// `with_index_type(...)` so the builder can plan segment grouping without + /// rediscovering fragment coverage. + /// + /// This is the canonical entry point for segment-based index build. + /// After building the physical segments, publish them as a + /// logical index with [`Self::commit_existing_index_segments`]. + fn create_index_segment_builder<'a>(&'a self) -> Self::IndexSegmentBuilder<'a>; + /// Create indices on columns. /// /// Upon finish, a new dataset version is generated. @@ -130,11 +263,10 @@ pub trait DatasetIndexExt { /// Find an index with the given name and return its serialized statistics. async fn index_statistics(&self, index_name: &str) -> Result; - /// Merge one caller-defined group of existing uncommitted index segments into a - /// single segment. + /// Merge one or more existing uncommitted index segments into a single uncommitted segment. async fn merge_existing_index_segments( &self, - segments: Vec, + source_segments: Vec, ) -> Result; /// Commit one or more existing physical index segments as a logical index. @@ -142,7 +274,7 @@ pub trait DatasetIndexExt { &mut self, index_name: &str, column: &str, - segments: Vec, + segments: Vec, ) -> Result<()>; async fn read_index_partition( @@ -152,3 +284,29 @@ pub trait DatasetIndexExt { with_vector: bool, ) -> Result; } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::{IndexSegment, IndexSegmentPlan}; + use lance_index::IndexType; + use uuid::Uuid; + + #[test] + fn test_index_segment_plan_accessors() { + let uuid = Uuid::new_v4(); + let segment = IndexSegment::new(uuid, [1_u32, 3], Arc::new(prost_types::Any::default()), 7); + let plan = IndexSegmentPlan::new(segment.clone(), vec![], 128, Some(IndexType::BTree)); + + assert_eq!(segment.uuid(), uuid); + assert_eq!( + segment.fragment_bitmap().iter().collect::>(), + vec![1, 3] + ); + assert_eq!(segment.index_version(), 7); + assert_eq!(plan.segment().uuid(), uuid); + assert_eq!(plan.estimated_bytes(), 128); + assert_eq!(plan.requested_index_type(), Some(IndexType::BTree)); + } +} diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 8aed939787a..9c644323201 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -9,6 +9,8 @@ use crate::{ }, index::{ DatasetIndexExt, DatasetIndexInternalExt, + api::{IndexSegment, IndexSegmentPlan}, + build_index_metadata_from_segments, scalar::build_scalar_index, vector::{ LANCE_VECTOR_INDEX, VectorIndexParams, build_distributed_vector_index, @@ -17,7 +19,7 @@ use crate::{ vector_index_details, }, }; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, try_join_all}; use lance_core::datatypes::format_field_path; use lance_index::progress::{IndexBuildProgress, NoopIndexBuildProgress}; use lance_index::{IndexParams, IndexType, scalar::CreatedIndex}; @@ -26,7 +28,11 @@ use lance_index::{ scalar::{LANCE_SCALAR_INDEX, ScalarIndexParams, inverted::tokenizer::InvertedIndexParams}, }; use lance_table::format::{IndexMetadata, list_index_files_with_sizes}; -use std::{collections::HashMap, future::IntoFuture, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + future::IntoFuture, + sync::Arc, +}; use tracing::instrument; use uuid::Uuid; @@ -473,15 +479,66 @@ impl<'a> CreateIndexBuilder<'a> { } else { vec![] }; - let transaction = TransactionBuilder::new( - new_idx.dataset_version, - Operation::CreateIndex { - new_indices: vec![new_idx], - removed_indices, - }, - ) - .transaction_properties(self.transaction_properties.clone()) - .build(); + let transaction = if uses_segment_commit_path(self.index_type, &new_idx.name, self.params) { + let field_id = *new_idx.fields.first().ok_or_else(|| { + Error::internal(format!( + "Index '{}' is missing field ids after build", + new_idx.name + )) + })?; + let segment_index_type = match self.index_type { + IndexType::Vector + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfFlat + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq => self + .params + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index("Vector index type must take a VectorIndexParams".to_string()) + })? + .index_type(), + unsupported => { + return Err(Error::internal(format!( + "Segment commit path does not support index type {}", + unsupported + ))); + } + }; + let segments = self + .dataset + .create_index_segment_builder() + .with_index_type(segment_index_type) + .with_segments(vec![new_idx.clone()]) + .build_all() + .await?; + let new_indices = + build_index_metadata_from_segments(self.dataset, &new_idx.name, field_id, segments) + .await?; + TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices, + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build() + } else { + TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices: vec![new_idx], + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build() + }; self.dataset .apply_commit(transaction, &Default::default(), &Default::default()) @@ -503,6 +560,28 @@ impl<'a> CreateIndexBuilder<'a> { } } +fn uses_segment_commit_path( + index_type: IndexType, + index_name: &str, + params: &dyn IndexParams, +) -> bool { + if index_name != LANCE_VECTOR_INDEX { + return false; + } + + matches!( + index_type, + IndexType::Vector + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfFlat + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq + ) && params.as_any().is::() +} + impl<'a> IntoFuture for CreateIndexBuilder<'a> { type Output = Result; type IntoFuture = BoxFuture<'a, Result>; @@ -512,6 +591,159 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } +/// Build physical index segments from previously-written uncommitted index outputs. +/// +/// Use [`DatasetIndexExt::create_index_segment_builder`] and then either: +/// +/// - call [`Self::with_index_type`] with the concrete segment type first, then +/// - call [`Self::plan`] and orchestrate individual segment builds externally, or +/// - call [`Self::build_all`] to build all segments on the current node. +/// +/// This builder only builds physical segments. Publishing those segments as +/// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`]. +/// Together these two APIs form the canonical segment-based index build workflow. +#[derive(Clone)] +pub struct IndexSegmentBuilder<'a> { + dataset: &'a Dataset, + index_type: Option, + segments: Vec, + target_segment_bytes: Option, +} + +impl<'a> IndexSegmentBuilder<'a> { + pub(crate) fn new(dataset: &'a Dataset) -> Self { + Self { + dataset, + index_type: None, + segments: Vec::new(), + target_segment_bytes: None, + } + } + + /// Declare the concrete index type of the staged segments. + pub fn with_index_type(mut self, index_type: IndexType) -> Self { + self.index_type = Some(index_type); + self + } + + /// Provide the segment metadata returned by `execute_uncommitted()`. + /// + /// These segments must already exist in storage and must not have been + /// published into a logical index yet. + pub fn with_segments(mut self, segments: Vec) -> Self { + self.segments = segments; + self + } + + /// Set the target size, in bytes, for merged physical segments. + /// + /// When set, input segments will be grouped into larger physical segments + /// up to approximately this size. When unset, each input segment becomes + /// one physical segment. + pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { + self.target_segment_bytes = Some(bytes); + self + } + + /// Plan how input segments should be grouped into physical segments. + pub async fn plan(&self) -> Result> { + if self.segments.is_empty() { + return Err(Error::invalid_input( + "IndexSegmentBuilder requires at least one segment; \ + call with_segments(...) with execute_uncommitted() outputs" + .to_string(), + )); + } + let index_type = self.index_type.ok_or_else(|| { + Error::invalid_input( + "IndexSegmentBuilder requires an explicit index type; call with_index_type(...)" + .to_string(), + ) + })?; + let mut seen_segment_ids = HashSet::with_capacity(self.segments.len()); + for segment in &self.segments { + if !seen_segment_ids.insert(segment.uuid) { + return Err(Error::invalid_input(format!( + "IndexSegmentBuilder received duplicate segment uuid {}", + segment.uuid + ))); + } + } + + match index_type { + IndexType::Inverted => crate::index::scalar::inverted::plan_segments( + &self.segments, + self.target_segment_bytes, + ), + IndexType::Vector => { + crate::index::vector::ivf::plan_segments( + &self.segments, + Some(index_type), + self.target_segment_bytes, + ) + .await + } + IndexType::IvfFlat + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq => { + crate::index::vector::ivf::plan_segments( + &self.segments, + Some(index_type), + self.target_segment_bytes, + ) + .await + } + unsupported => Err(Error::invalid_input(format!( + "IndexSegmentBuilder does not support planning segments for index type {}", + unsupported + ))), + } + } + + /// Build one segment from a previously-generated plan. + pub async fn build(&self, plan: &IndexSegmentPlan) -> Result { + match plan.requested_index_type().ok_or_else(|| { + Error::invalid_input( + "IndexSegmentBuilder requires planned segments to declare an index type" + .to_string(), + ) + })? { + IndexType::Inverted => { + crate::index::scalar::inverted::build_segment(self.dataset, plan).await + } + IndexType::Vector + | IndexType::IvfFlat + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq => { + crate::index::vector::ivf::build_segment( + self.dataset.object_store(), + &self.dataset.indices_dir(), + plan, + ) + .await + } + unsupported => Err(Error::invalid_input(format!( + "IndexSegmentBuilder does not support building segments for index type {}", + unsupported + ))), + } + } + + /// Plan and build all segments from the provided inputs. + pub async fn build_all(&self) -> Result> { + let plans = self.plan().await?; + try_join_all(plans.iter().map(|plan| self.build(plan))).await + } +} + #[cfg(test)] mod tests { use super::*; @@ -520,9 +752,8 @@ mod tests { use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow::datatypes::{Float32Type, Int32Type}; use arrow_array::cast::AsArray; - use arrow_array::{ - FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray, - }; + use arrow_array::{FixedSizeListArray, RecordBatchIterator}; + use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use lance_arrow::FixedSizeListArrayExt; use lance_core::utils::tempfile::TempStrDir; @@ -1189,8 +1420,28 @@ mod tests { input_segments.push(segment); } + let segments = dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(input_segments.clone()) + .build_all() + .await + .unwrap(); + assert_eq!(segments.len(), fragments.len()); + let mut built_segment_ids = segments + .iter() + .map(|segment| segment.uuid()) + .collect::>(); + built_segment_ids.sort(); + let mut input_segment_ids = input_segments + .iter() + .map(|segment| segment.uuid) + .collect::>(); + input_segment_ids.sort(); + assert_eq!(built_segment_ids, input_segment_ids); + dataset - .commit_existing_index_segments("vector_idx", "vector", input_segments) + .commit_existing_index_segments("vector_idx", "vector", segments) .await .unwrap(); @@ -1220,7 +1471,7 @@ mod tests { } #[tokio::test] - async fn test_merge_existing_index_segments_vector_commits_single_logical_index() { + async fn test_index_segment_builder_vector_commits_multi_segment_logical_index() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1265,18 +1516,22 @@ mod tests { input_segments.push(segment); } - let segment = dataset - .merge_existing_index_segments(input_segments) + let segments = dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(input_segments) + .build_all() .await .unwrap(); + assert_eq!(segments.len(), 2); dataset - .commit_existing_index_segments("vector_idx", "vector", vec![segment]) + .commit_existing_index_segments("vector_idx", "vector", segments) .await .unwrap(); let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); - assert_eq!(indices.len(), 1); + assert_eq!(indices.len(), 2); let mut committed_fragment_sets = indices .iter() .map(|metadata| { @@ -1289,7 +1544,7 @@ mod tests { }) .collect::>(); committed_fragment_sets.sort(); - assert_eq!(committed_fragment_sets, vec![vec![0, 1]]); + assert_eq!(committed_fragment_sets, vec![vec![0], vec![1]]); let query_batch = dataset .scan() @@ -1314,7 +1569,7 @@ mod tests { } #[tokio::test] - async fn test_merge_existing_index_segments_accepts_python_round_tripped_metadata() { + async fn test_index_segment_builder_vector_segments_without_index_details() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1360,15 +1615,203 @@ mod tests { input_segments.push(segment); } - let merged_segment = dataset - .merge_existing_index_segments(input_segments) + let segments = dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(input_segments) + .build_all() + .await + .unwrap(); + assert_eq!(segments.len(), 2); + } + + #[tokio::test] + async fn test_index_segment_builder_fts_commits_multi_segment_logical_index() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let batch1 = create_text_batch(0, 10); + let batch2 = create_text_batch(10, 20); + let batch3 = create_text_batch(20, 30); + + let batches = RecordBatchIterator::new( + vec![Ok(batch1), Ok(batch2), Ok(batch3)], + create_text_batch(0, 1).schema(), + ); + let mut dataset = Dataset::write( + batches, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 5, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = InvertedIndexParams::default(); + let mut input_segments = Vec::new(); + for fragment in dataset.get_fragments() { + let segment = + CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) + .name("text_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + input_segments.push(segment); + } + + let segments = dataset + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(input_segments.clone()) + .build_all() + .await + .unwrap(); + assert_eq!(segments.len(), input_segments.len()); + + for segment in &segments { + let metadata_path = dataset + .indices_dir() + .child(segment.uuid().to_string()) + .child(lance_index::scalar::inverted::METADATA_FILE); + assert!(dataset.object_store().exists(&metadata_path).await.unwrap()); + } + + dataset + .commit_existing_index_segments("text_idx", "text", segments) .await .unwrap(); + + let indices = dataset.load_indices_by_name("text_idx").await.unwrap(); + assert_eq!(indices.len(), input_segments.len()); + } + + #[tokio::test] + async fn test_index_segment_builder_rejects_duplicate_segment_uuids() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let batches = RecordBatchIterator::new( + vec![Ok(create_text_batch(0, 10))], + create_text_batch(0, 1).schema(), + ); + let mut dataset = Dataset::write( + batches, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 10, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = InvertedIndexParams::default(); + let segment = + CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) + .name("text_idx".to_string()) + .fragments(vec![0]) + .execute_uncommitted() + .await + .unwrap(); + + let err = dataset + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(vec![segment.clone(), segment]) + .build_all() + .await + .unwrap_err(); assert!( - merged_segment - .fragment_bitmap - .as_ref() - .is_some_and(|bitmap| bitmap.iter().collect::>() == vec![0, 1]) + err.to_string().contains("duplicate segment uuid"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_index_segment_builder_requires_explicit_index_type() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let batches = RecordBatchIterator::new( + vec![Ok(create_text_batch(0, 10))], + create_text_batch(0, 1).schema(), + ); + let mut dataset = Dataset::write( + batches, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 10, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = InvertedIndexParams::default(); + let segment = + CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) + .name("text_idx".to_string()) + .fragments(vec![0]) + .execute_uncommitted() + .await + .unwrap(); + + let err = dataset + .create_index_segment_builder() + .with_segments(vec![segment]) + .plan() + .await + .unwrap_err(); + assert!( + err.to_string().contains("requires an explicit index type"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_index_segment_builder_requires_requested_index_type() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let batches = RecordBatchIterator::new( + vec![Ok(create_text_batch(0, 10))], + create_text_batch(0, 1).schema(), + ); + let dataset = Dataset::write( + batches, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 10, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let segment = IndexSegment::new( + Uuid::new_v4(), + [0_u32], + Arc::new(prost_types::Any::default()), + 0, + ); + let plan = IndexSegmentPlan::new(segment, Vec::new(), 0, None); + let err = dataset + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .build(&plan) + .await + .unwrap_err(); + assert!( + err.to_string().contains("declare an index type"), + "unexpected error: {err}" ); } @@ -1406,17 +1849,24 @@ mod tests { HnswBuildParams::default(), ); - let segment = - CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) - .name("vector_idx".to_string()) - .index_uuid(uuid.to_string()) - .execute_uncommitted() - .await - .unwrap(); - assert_eq!(segment.uuid, uuid); + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); dataset - .commit_existing_index_segments("vector_idx", "vector", vec![segment]) + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![IndexSegment::new( + uuid, + dataset.fragment_bitmap.as_ref().clone(), + Arc::new(vector_index_details()), + IndexType::IvfHnswFlat.version(), + )], + ) .await .unwrap(); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 44739454bec..f9e17af88bf 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -4,6 +4,8 @@ //! Utilities for integrating scalar indices with datasets //! +pub(crate) mod inverted; + use std::sync::{Arc, LazyLock}; use crate::index::DatasetIndexExt; diff --git a/rust/lance/src/index/scalar/inverted.rs b/rust/lance/src/index/scalar/inverted.rs new file mode 100644 index 00000000000..c9094f6f015 --- /dev/null +++ b/rust/lance/src/index/scalar/inverted.rs @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +#![allow(clippy::redundant_pub_crate)] + +use std::sync::Arc; + +use lance_index::pbold::InvertedIndexDetails; +use lance_index::{IndexType, scalar::lance_format::LanceIndexStore}; +use lance_table::format::IndexMetadata; +use prost::Message; + +use crate::{ + Dataset, Error, Result, + dataset::index::LanceIndexStoreExt, + index::{ + DatasetIndexExt, + api::{IndexSegment, IndexSegmentPlan}, + scalar::fetch_index_details, + }, +}; + +/// Plan physical segments for staged inverted-index outputs. +/// +/// Each staged inverted root remains its own physical segment for now. +pub(crate) fn plan_segments( + segments: &[IndexMetadata], + target_segment_bytes: Option, +) -> Result> { + if let Some(0) = target_segment_bytes { + return Err(Error::invalid_input( + "target_segment_bytes must be greater than zero".to_string(), + )); + } + if target_segment_bytes.is_some() && segments.len() > 1 { + // TODO: Support merging multiple staged inverted roots into one segment. + return Err(Error::invalid_input( + "Inverted segment builder does not yet support merging multiple source segments" + .to_string(), + )); + } + + segments + .iter() + .map(|segment| { + let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { + Error::index(format!( + "Segment '{}' is missing fragment coverage", + segment.uuid + )) + })?; + let index_details = segment.index_details.as_ref().ok_or_else(|| { + Error::index(format!( + "Segment '{}' is missing index details", + segment.uuid + )) + })?; + let built_segment = IndexSegment::new( + segment.uuid, + fragment_bitmap.iter(), + index_details.clone(), + segment.index_version, + ); + let estimated_bytes = segment + .files + .as_ref() + .map(|files| files.iter().map(|file| file.size_bytes).sum()) + .unwrap_or(0); + Ok(IndexSegmentPlan::new( + built_segment, + vec![segment.clone()], + estimated_bytes, + Some(IndexType::Inverted), + )) + }) + .collect() +} + +/// Finalize one staged inverted root into a commit-ready physical segment. +pub(crate) async fn build_segment( + dataset: &Dataset, + segment_plan: &IndexSegmentPlan, +) -> Result { + let built_segment = segment_plan.segment().clone(); + let source_segments = segment_plan.segments(); + if source_segments.len() != 1 { + // TODO: Support building one segment from multiple staged inverted roots. + return Err(Error::invalid_input( + "Inverted segment builder does not yet support merging multiple source segments" + .to_string(), + )); + } + let source_segment = &source_segments[0]; + if source_segment.uuid != built_segment.uuid() { + return Err(Error::invalid_input( + "Inverted segment builder requires the built segment UUID to match the staged source UUID" + .to_string(), + )); + } + + let index_dir = dataset.indices_dir().child(source_segment.uuid.to_string()); + let metadata_path = index_dir.child(lance_index::scalar::inverted::METADATA_FILE); + if dataset.object_store().exists(&metadata_path).await? { + return Ok(built_segment); + } + + let store = Arc::new(LanceIndexStore::from_dataset_for_new( + dataset, + &source_segment.uuid.to_string(), + )?); + lance_index::scalar::inverted::builder::merge_index_files( + dataset.object_store(), + &index_dir, + store, + lance_index::progress::noop_progress(), + ) + .await?; + Ok(built_segment) +} + +/// Load all committed inverted-index segments that belong to the same named index. +pub(crate) async fn load_segments( + dataset: &Dataset, + column: &str, +) -> Result>> { + let Some(index_meta) = dataset + .load_scalar_index( + lance_index::IndexCriteria::default() + .for_column(column) + .supports_fts(), + ) + .await? + else { + return Ok(None); + }; + + let indices = dataset.load_indices_by_name(&index_meta.name).await?; + if indices.is_empty() { + return Ok(None); + } + + let expected_fields = indices[0].fields.clone(); + for meta in &indices { + if meta.fields != expected_fields { + return Err(Error::invalid_input(format!( + "FTS index {} has inconsistent fields across segments", + index_meta.name + ))); + } + } + + Ok(Some(indices)) +} + +/// Load and validate the shared inverted-index details across committed segments. +pub(crate) async fn load_segment_details( + dataset: &Dataset, + column: &str, + segments: &[IndexMetadata], +) -> Result { + let mut expected_details: Option = None; + for meta in segments { + let details_any = fetch_index_details(dataset, column, meta).await?; + let details = + InvertedIndexDetails::decode(details_any.value.as_slice()).map_err(|err| { + Error::io(format!( + "failed to decode InvertedIndexDetails payload: {err}" + )) + })?; + match &expected_details { + Some(expected) if expected != &details => { + return Err(Error::invalid_input(format!( + "FTS index {} has inconsistent inverted index details across segments", + meta.name + ))); + } + Some(_) => {} + None => expected_details = Some(details), + } + } + expected_details.ok_or_else(|| { + Error::invalid_input(format!( + "FTS index for column {} requires at least one segment", + column + )) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn decode_legacy_inverted_details_type_url() { + let mut details_any = prost_types::Any::from_msg(&InvertedIndexDetails::default()).unwrap(); + details_any.type_url = "/lance.index.pb.InvertedIndexDetails".to_string(); + + let decoded = InvertedIndexDetails::decode(details_any.value.as_slice()).unwrap(); + assert_eq!(decoded, InvertedIndexDetails::default()); + } +} diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index a38cf3a57de..eb3910aa5b7 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -17,7 +17,13 @@ use crate::index::DatasetIndexInternalExt; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::{ dataset::Dataset, - index::{INDEX_FILE_NAME, pb, prefilter::PreFilter, vector::ivf::io::write_pq_partitions}, + index::{ + INDEX_FILE_NAME, + api::{IndexSegment, IndexSegmentPlan}, + pb, + prefilter::PreFilter, + vector::ivf::io::write_pq_partitions, + }, }; use crate::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion}; use arrow::datatypes::UInt8Type; @@ -101,7 +107,11 @@ use prost::Message; use roaring::RoaringBitmap; use serde::Serialize; use serde_json::json; -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::{ + any::Any, + collections::{HashMap, HashSet}, + sync::Arc, +}; use tokio::sync::mpsc; use tracing::instrument; use uuid::Uuid; @@ -2018,6 +2028,128 @@ async fn write_ivf_hnsw_file( Ok(()) } +pub(crate) async fn plan_segments( + segments: &[TableIndexMetadata], + requested_index_type: Option, + target_segment_bytes: Option, +) -> Result> { + if let Some(index_type) = requested_index_type + && !matches!( + index_type, + IndexType::IvfFlat + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq + | IndexType::Vector + ) + { + return Err(Error::invalid_input(format!( + "Unsupported distributed vector segment build type: {}", + index_type + ))); + } + + if let Some(0) = target_segment_bytes { + return Err(Error::invalid_input( + "target_segment_bytes must be greater than zero".to_string(), + )); + } + + if segments.is_empty() { + return Err(Error::index("No segment metadata was provided".to_string())); + } + + let mut sorted_segments = segments.to_vec(); + sorted_segments.sort_by_key(|index| index.uuid); + let mut expected_segment_ids = HashSet::with_capacity(sorted_segments.len()); + for segment in &sorted_segments { + if !expected_segment_ids.insert(segment.uuid) { + return Err(Error::index(format!( + "Distributed vector segment '{}' was provided more than once", + segment.uuid + ))); + } + } + + let mut covered_fragments = RoaringBitmap::new(); + for segment in &sorted_segments { + let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { + Error::index(format!( + "Segment '{}' is missing fragment coverage", + segment.uuid + )) + })?; + if covered_fragments.intersection_len(fragment_bitmap) > 0 { + return Err(Error::index( + "Distributed vector shards have overlapping fragment coverage".to_string(), + )); + } + covered_fragments |= fragment_bitmap.clone(); + } + + if target_segment_bytes.is_none() { + return sorted_segments + .into_iter() + .map(|segment| build_segment_plan(vec![segment], requested_index_type)) + .collect(); + } + + let target_segment_bytes = target_segment_bytes.unwrap(); + let mut plans = Vec::new(); + let mut current_group = Vec::new(); + let mut current_bytes = 0_u64; + + for segment in sorted_segments { + let source_bytes = estimate_source_index_bytes(&segment); + if !current_group.is_empty() + && current_bytes.saturating_add(source_bytes) > target_segment_bytes + { + plans.push(build_segment_plan( + std::mem::take(&mut current_group), + requested_index_type, + )?); + current_bytes = 0; + } + current_bytes = current_bytes.saturating_add(source_bytes); + current_group.push(segment); + } + + if !current_group.is_empty() { + plans.push(build_segment_plan(current_group, requested_index_type)?); + } + + Ok(plans) +} + +pub(crate) async fn build_segment( + object_store: &ObjectStore, + indices_dir: &Path, + segment_plan: &IndexSegmentPlan, +) -> Result { + let built_segment = segment_plan.segment().clone(); + let segments = segment_plan.segments(); + + if segments.len() == 1 && segments[0].uuid == built_segment.uuid() { + return Ok(built_segment); + } + + let final_dir = indices_dir.child(built_segment.uuid().to_string()); + merge_segments_to_dir( + object_store, + indices_dir, + &final_dir, + segment_plan.segments(), + segment_plan.requested_index_type(), + lance_index::progress::noop_progress(), + ) + .await?; + + Ok(built_segment) +} + /// Merge one caller-defined group of source segments into a single segment. pub(crate) async fn merge_segments( object_store: &ObjectStore, @@ -2063,7 +2195,15 @@ pub(crate) async fn merge_segments_with_progress( let index_version = infer_source_index_version(&segments)?; let segment_uuid = Uuid::new_v4(); let final_dir = indices_dir.child(segment_uuid.to_string()); - merge_segments_to_dir(object_store, indices_dir, &final_dir, &segments, progress).await?; + merge_segments_to_dir( + object_store, + indices_dir, + &final_dir, + &segments, + None, + progress, + ) + .await?; let files = list_index_files_with_sizes(object_store, &final_dir).await?; merged_segment = TableIndexMetadata { @@ -2089,6 +2229,7 @@ async fn merge_segments_to_dir( indices_dir: &Path, final_dir: &Path, segments: &[TableIndexMetadata], + _requested_index_type: Option, progress: Arc, ) -> Result<()> { reset_final_segment_dir(object_store, final_dir).await?; @@ -2134,6 +2275,52 @@ async fn merge_segments_to_dir( Ok(()) } +fn build_segment_plan( + group: Vec, + requested_index_type: Option, +) -> Result { + debug_assert!(!group.is_empty()); + let first = &group[0]; + let mut fragment_bitmap = RoaringBitmap::new(); + let mut estimated_bytes = 0_u64; + let mut segments = Vec::with_capacity(group.len()); + + for segment in &group { + let source_fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { + Error::index(format!( + "Segment '{}' is missing fragment coverage", + segment.uuid + )) + })?; + fragment_bitmap |= source_fragment_bitmap.clone(); + estimated_bytes = estimated_bytes.saturating_add(estimate_source_index_bytes(segment)); + segments.push(segment.clone()); + } + + let segment_uuid = if group.len() == 1 { + first.uuid + } else { + Uuid::new_v4() + }; + let index_version = match requested_index_type { + Some(index_type) => index_type.version(), + None => infer_source_index_version(&group)?, + }; + let segment = IndexSegment::new( + segment_uuid, + fragment_bitmap, + Arc::new(crate::index::vector_index_details()), + index_version, + ); + + Ok(IndexSegmentPlan::new( + segment, + segments, + estimated_bytes, + requested_index_type, + )) +} + fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { debug_assert!(!group.is_empty()); let first = group[0].index_version; @@ -2145,6 +2332,14 @@ fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { Ok(first) } +fn estimate_source_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { + index_metadata + .files + .as_ref() + .map(|files| files.iter().map(|file| file.size_bytes).sum()) + .unwrap_or(0) +} + /// Best-effort reset of one target directory before rewriting it. async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { match object_store.remove_dir_all(final_dir.clone()).await { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 7d38aa312c5..1ca573bfe58 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -2170,7 +2170,8 @@ mod tests { let segments = build_segments_for_fragment_groups(dataset, fragment_groups, ¶ms, index_name).await; - let committed_segments = build_distributed_segments(dataset, segments, index_name).await; + let committed_segments = + build_distributed_segments(dataset, segments, params.index_type(), index_name).await; assert!(!committed_segments.is_empty()); } @@ -2251,8 +2252,16 @@ mod tests { async fn build_distributed_segments( dataset: &mut Dataset, segments: Vec, + index_type: IndexType, index_name: &str, - ) -> Vec { + ) -> Vec { + let segments = dataset + .create_index_segment_builder() + .with_index_type(index_type) + .with_segments(segments) + .build_all() + .await + .unwrap(); dataset .commit_existing_index_segments(index_name, "vector", segments.clone()) .await @@ -2468,12 +2477,13 @@ mod tests { INDEX_NAME, ) .await; - let segments = build_distributed_segments(&mut ds_split, segments, INDEX_NAME).await; + let segments = + build_distributed_segments(&mut ds_split, segments, index_type, INDEX_NAME).await; assert_eq!(segments.len(), expected_segment_count); for segment in &segments { let segment_index = ds_split .indices_dir() - .child(segment.uuid.to_string()) + .child(segment.uuid().to_string()) .child(crate::index::INDEX_FILE_NAME); assert!( ds_split @@ -2654,18 +2664,12 @@ mod tests { .await .unwrap(); let grouped_segments = - build_distributed_segments(&mut ds_split, grouped_segments, INDEX_NAME).await; + build_distributed_segments(&mut ds_split, grouped_segments, index_type, INDEX_NAME) + .await; assert_eq!(grouped_segments.len(), expected_fragment_coverage.len()); let mut actual_fragment_coverage = grouped_segments .iter() - .map(|segment| { - segment - .fragment_bitmap - .as_ref() - .unwrap() - .iter() - .collect::>() - }) + .map(|segment| segment.fragment_bitmap().iter().collect::>()) .collect::>(); actual_fragment_coverage.sort(); assert_eq!( @@ -2794,6 +2798,14 @@ mod tests { segments.push(segment); } + let segments = dataset + .create_index_segment_builder() + .with_index_type(IndexType::IvfHnswFlat) + .with_segments(segments) + .build_all() + .await + .unwrap(); + dataset .commit_existing_index_segments("vector_idx", "vector", segments) .await @@ -2863,8 +2875,15 @@ mod tests { ) .await .unwrap(); + let merged_segment = dataset + .create_index_segment_builder() + .with_index_type(params.index_type()) + .with_segments(vec![merged_segment]) + .build_all() + .await + .unwrap(); dataset - .commit_existing_index_segments(INDEX_NAME, "vector", vec![merged_segment]) + .commit_existing_index_segments(INDEX_NAME, "vector", merged_segment) .await .unwrap(); diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 1cf8eddc3ec..411a0f2ba57 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use arrow::array::{AsArray, BooleanBuilder}; @@ -17,18 +17,23 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; +use futures::future::try_join_all; use futures::stream::{self}; -use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use itertools::Itertools; -use lance_core::{ROW_ID, utils::tracing::StreamTracingExt}; +use lance_core::{ + Error, ROW_ID, Result, + utils::{tokio::get_num_compute_intensive_cpus, tracing::StreamTracingExt}, +}; use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS_SEARCHED_METRIC}; +use lance_table::format::IndexMetadata; use super::PreFilterSource; use super::utils::{IndexMetrics, InstrumentedRecordBatchStreamAdapter, build_prefilter}; -use crate::index::DatasetIndexExt; +use crate::index::scalar::inverted::{load_segment_details, load_segments}; use crate::{Dataset, index::DatasetIndexInternalExt}; -use lance_index::IndexCriteria; use lance_index::metrics::MetricsCollector; +use lance_index::scalar::inverted::builder::ScoredDoc; use lance_index::scalar::inverted::builder::document_input; use lance_index::scalar::inverted::document_tokenizer::{DocType, JsonTokenizer, LanceTokenizer}; use lance_index::scalar::inverted::query::{ @@ -37,12 +42,178 @@ use lance_index::scalar::inverted::query::{ }; use lance_index::scalar::inverted::tokenizer::document_tokenizer::TextTokenizer; use lance_index::scalar::inverted::{ - FTS_SCHEMA, InvertedIndex, SCORE_COL, flat_bm25_search_stream, + FTS_SCHEMA, InvertedIndex, MemBM25Scorer, SCORE_COL, flat_bm25_search_stream, }; use lance_index::{prefilter::PreFilter, scalar::inverted::query::BooleanQuery}; use lance_tokenizer::{SimpleTokenizer, TextAnalyzer}; use tracing::instrument; +/// Open one FTS segment as an [`InvertedIndex`]. +async fn open_fts_segment( + dataset: &Dataset, + column: &str, + segment: &IndexMetadata, + metrics: &IndexMetrics, +) -> Result> { + let uuid = segment.uuid.to_string(); + let index = dataset.open_generic_index(column, &uuid, metrics).await?; + let inverted = index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::invalid_input(format!( + "Index for column {} and segment {} is not an inverted index", + column, uuid + )) + })?; + Ok(Arc::new(inverted.clone())) +} + +/// Open all committed FTS segments for a column. +/// +/// Exact multi-segment BM25 still needs every segment's local corpus statistics, so the +/// current correctness-first path opens each committed segment before scoring. +async fn open_fts_segments( + dataset: &Dataset, + column: &str, + segments: &[IndexMetadata], + metrics: &IndexMetrics, +) -> Result>> { + try_join_all( + segments + .iter() + .map(|segment| open_fts_segment(dataset, column, segment, metrics)), + ) + .await +} + +/// Collect the unique terms needed to build a shared BM25 scorer. +/// +/// The scorer only needs corpus-level document frequencies, so we keep a deduplicated +/// term list here instead of constructing a full `Tokens` object with positions. +fn scorer_terms( + indices: &[Arc], + query_tokens: &Tokens, + params: &FtsSearchParams, +) -> Result> { + let mut terms = Vec::new(); + let mut seen = HashSet::new(); + + if !matches!(params.fuzziness, Some(n) if n != 0) { + for token in query_tokens { + if seen.insert(token.to_string()) { + terms.push(token.to_string()); + } + } + return Ok(terms); + } + + for index in indices { + let expanded = index.expand_fuzzy_tokens(query_tokens, params)?; + for idx in 0..expanded.len() { + let token = expanded.get_token(idx); + if seen.insert(token.to_string()) { + terms.push(token.to_string()); + } + } + } + Ok(terms) +} + +/// Build a shared BM25 scorer for a set of committed FTS segments. +fn build_global_bm25_scorer( + indices: &[Arc], + query_tokens: &Tokens, + params: &FtsSearchParams, +) -> Result { + let terms = scorer_terms(indices, query_tokens, params)?; + let first_index = indices.first().ok_or_else(|| { + Error::invalid_input("FTS index requires at least one segment".to_string()) + })?; + let (mut total_tokens, mut num_docs, first_token_docs) = + first_index.bm25_stats_for_terms(&terms); + let mut token_docs = HashMap::with_capacity(terms.len()); + for (term, count) in terms.iter().cloned().zip(first_token_docs.into_iter()) { + token_docs.insert(term, count); + } + + for index in indices.iter().skip(1) { + let (segment_total_tokens, segment_num_docs, segment_token_docs) = + index.bm25_stats_for_terms(&terms); + total_tokens += segment_total_tokens; + num_docs += segment_num_docs; + for (term, count) in terms.iter().zip(segment_token_docs.into_iter()) { + *token_docs + .get_mut(term) + .expect("global scorer terms should already be initialized") += count; + } + } + + Ok(MemBM25Scorer::new(total_tokens, num_docs, token_docs)) +} + +async fn search_segments( + indices: &[Arc], + tokens: Arc, + params: Arc, + operator: lance_index::scalar::inverted::query::Operator, + pre_filter: Arc, + metrics: Arc, + base_scorer: Arc, +) -> Result<(Vec, Vec)> { + let limit = params.limit.unwrap_or(usize::MAX); + let mut candidates = std::collections::BinaryHeap::new(); + let searches = indices + .iter() + .map(|index| { + let index = Arc::clone(index); + let tokens = tokens.clone(); + let params = params.clone(); + let pre_filter = pre_filter.clone(); + let metrics = metrics.clone(); + let base_scorer = base_scorer.clone(); + async move { + index + .bm25_search( + tokens, + params, + operator, + pre_filter, + metrics, + Some(base_scorer.as_ref()), + ) + .await + } + }) + .collect::>(); + let searches = stream::iter(searches).buffer_unordered(get_num_compute_intensive_cpus()); + let mut searches = searches; + + while let Some((doc_ids, scores)) = searches.try_next().await? { + for (row_id, score) in doc_ids.into_iter().zip(scores.into_iter()) { + if candidates.len() < limit { + candidates.push(std::cmp::Reverse(ScoredDoc::new(row_id, score))); + } else if candidates.peek().unwrap().0.score.0 < score { + candidates.pop(); + candidates.push(std::cmp::Reverse(ScoredDoc::new(row_id, score))); + } + } + } + + Ok(candidates + .into_sorted_vec() + .into_iter() + .map(|std::cmp::Reverse(doc)| (doc.row_id, doc.score.0)) + .unzip()) +} + +/// Fall back to the default simple tokenizer when no on-disk FTS segment exists. +fn default_text_tokenizer() -> Box { + Box::new(TextTokenizer::new( + TextAnalyzer::builder(SimpleTokenizer::default()).build(), + )) +} + pub struct FtsIndexMetrics { index_metrics: IndexMetrics, partitions_searched: Count, @@ -232,52 +403,47 @@ impl ExecutionPlan for MatchQueryExec { )))?; let stream = stream::once(async move { let _timer = metrics.baseline_metrics.elapsed_compute().timer(); - let index_meta = ds - .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) + let segments = load_segments(&ds, &column) .await? .ok_or(DataFusionError::Execution(format!( "No Inverted index found for column {}", column, )))?; - let uuid = index_meta.uuid.to_string(); - let index = ds - .open_generic_index(&column, &uuid, &metrics.index_metrics) - .await?; - - let mut pre_filter = build_prefilter( - context.clone(), - partition, - &prefilter_source, - ds, - &[index_meta], - )?; - - let inverted_idx = index - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Index for column {} is not an inverted index", - column, - )) - })?; - if !inverted_idx.deleted_fragments().is_empty() { + let _details = load_segment_details(&ds, &column, &segments).await?; + let indices = + open_fts_segments(&ds, &column, &segments, &metrics.index_metrics).await?; + + let mut pre_filter = + build_prefilter(context.clone(), partition, &prefilter_source, ds, &segments)?; + let deleted_fragments = + indices + .iter() + .fold(roaring::RoaringBitmap::new(), |mut deleted, index| { + deleted |= index.deleted_fragments().clone(); + deleted + }); + if !deleted_fragments.is_empty() { Arc::get_mut(&mut pre_filter) .expect("prefilter just created") - .set_deleted_fragments(inverted_idx.deleted_fragments().clone()); + .set_deleted_fragments(deleted_fragments); } - metrics.record_parts_searched(inverted_idx.partition_count()); + metrics + .record_parts_searched(indices.iter().map(|index| index.partition_count()).sum()); let is_fuzzy = matches!(query.fuzziness, Some(n) if n != 0); let params = params .with_fuzziness(query.fuzziness) .with_max_expansions(query.max_expansions) .with_prefix_length(query.prefix_length); + let first_index = indices.first().ok_or(DataFusionError::Execution(format!( + "FTS index for column {} has no segments", + column + )))?; let mut tokenizer = match is_fuzzy { - false => inverted_idx.tokenizer(), + false => first_index.tokenizer(), true => { let tokenizer = TextAnalyzer::from(SimpleTokenizer::default()); - match inverted_idx.tokenizer().doc_type() { + match first_index.tokenizer().doc_type() { DocType::Text => { Box::new(TextTokenizer::new(tokenizer)) as Box } @@ -288,18 +454,21 @@ impl ExecutionPlan for MatchQueryExec { } }; let tokens = collect_query_tokens(&query.terms, &mut tokenizer); + let base_scorer = build_global_bm25_scorer(&indices, &tokens, ¶ms)?; pre_filter.wait_for_ready().await?; - let (doc_ids, mut scores) = inverted_idx - .bm25_search( - Arc::new(tokens), - params.into(), - query.operator, - pre_filter, - metrics.clone(), - ) - .boxed() - .await?; + let tokens = Arc::new(tokens); + let params = Arc::new(params); + let (doc_ids, mut scores) = search_segments( + &indices, + tokens, + params, + query.operator, + pre_filter, + metrics.clone(), + Arc::new(base_scorer), + ) + .await?; scores.iter_mut().for_each(|s| { *s *= query.boost; }); @@ -378,26 +547,18 @@ impl FlatMatchFilterExec { column: &str, metrics: &IndexMetrics, ) -> DataFusionResult> { - let index_meta = dataset - .load_scalar_index(IndexCriteria::default().for_column(column).supports_fts()) - .await?; - - if let Some(index_meta) = index_meta { - let uuid = index_meta.uuid.to_string(); - let index = dataset.open_generic_index(column, &uuid, metrics).await?; - if let Some(index) = index.as_any().downcast_ref::() { - return Ok(index.tokenizer()); - } else { - return Err(DataFusionError::Execution(format!( - "Index for column {} is not an inverted index", - column, - ))); - } - } // Else, no index, use text tokenzier - - Ok(Box::new(TextTokenizer::new( - TextAnalyzer::builder(SimpleTokenizer::default()).build(), - ))) + if let Some(segments) = load_segments(dataset, column).await? { + let index_meta = segments.first().ok_or_else(|| { + DataFusionError::Execution(format!( + "FTS index for column {} has no segments", + column + )) + })?; + return Ok(open_fts_segment(dataset, column, index_meta, metrics) + .await? + .tokenizer()); + } + Ok(default_text_tokenizer()) } pub fn new( @@ -671,28 +832,33 @@ impl ExecutionPlan for FlatMatchQueryExec { document_input(self.unindexed_input.execute(partition, context)?, &column)?; let stream = stream::once(async move { - let index_meta = ds - .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) - .await?; - let inverted_idx = match index_meta { - Some(index_meta) => { - let uuid = index_meta.uuid.to_string(); - let index = ds - .open_generic_index(&column, &uuid, &metrics.index_metrics) - .await?; - index.as_any().downcast_ref::().cloned() + let segments = load_segments(&ds, &column).await?; + let (tokenizer, base_scorer) = match segments { + Some(segments) => { + let _details = load_segment_details(&ds, &column, &segments).await?; + let indices = + open_fts_segments(&ds, &column, &segments, &metrics.index_metrics).await?; + metrics.record_parts_searched( + indices.iter().map(|index| index.partition_count()).sum(), + ); + let first_index = indices.first().ok_or(DataFusionError::Execution( + format!("FTS index for column {} has no segments", column), + ))?; + let mut tokenizer = first_index.tokenizer(); + let query_tokens = collect_query_tokens(&query.terms, &mut tokenizer); + let base_scorer = + build_global_bm25_scorer(&indices, &query_tokens, &FtsSearchParams::new())?; + (tokenizer, Some(base_scorer)) } - None => None, + None => (default_text_tokenizer(), None), }; - if let Some(index) = inverted_idx.as_ref() { - metrics.record_parts_searched(index.partition_count()); - } flat_bm25_search_stream( unindexed_input, column, query.terms, - &inverted_idx, + tokenizer, + base_scorer, target_batch_size, ) .await @@ -874,63 +1040,58 @@ impl ExecutionPlan for PhraseQueryExec { let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); let stream = stream::once(async move { let _timer = metrics.baseline_metrics.elapsed_compute().timer(); - let column = query - .column - .clone() - .ok_or(DataFusionError::Execution(format!( - "column not set for PhraseQuery {}", - query.terms - )))?; - let index_meta = ds - .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) + let column = query.column.ok_or(DataFusionError::Execution(format!( + "column not set for PhraseQuery {}", + query.terms + )))?; + let segments = load_segments(&ds, &column) .await? .ok_or(DataFusionError::Execution(format!( "No Inverted index found for column {}", column, )))?; - let uuid = index_meta.uuid.to_string(); - let index = ds - .open_generic_index(&column, &uuid, &metrics.index_metrics) - .await?; - - let mut pre_filter = build_prefilter( - context.clone(), - partition, - &prefilter_source, - ds.clone(), - &[index_meta], - )?; - - let index = index - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Index for column {} is not an inverted index", - column, - )) - })?; - if !index.deleted_fragments().is_empty() { + let _details = load_segment_details(&ds, &column, &segments).await?; + let indices = + open_fts_segments(&ds, &column, &segments, &metrics.index_metrics).await?; + + let mut pre_filter = + build_prefilter(context.clone(), partition, &prefilter_source, ds, &segments)?; + let deleted_fragments = + indices + .iter() + .fold(roaring::RoaringBitmap::new(), |mut deleted, index| { + deleted |= index.deleted_fragments().clone(); + deleted + }); + if !deleted_fragments.is_empty() { Arc::get_mut(&mut pre_filter) .expect("prefilter just created") - .set_deleted_fragments(index.deleted_fragments().clone()); + .set_deleted_fragments(deleted_fragments); } - metrics.record_parts_searched(index.partition_count()); + metrics + .record_parts_searched(indices.iter().map(|index| index.partition_count()).sum()); - let mut tokenizer = index.tokenizer(); + let first_index = indices.first().ok_or(DataFusionError::Execution(format!( + "FTS index for column {} has no segments", + column + )))?; + let mut tokenizer = first_index.tokenizer(); let tokens = collect_query_tokens(&query.terms, &mut tokenizer); + let base_scorer = build_global_bm25_scorer(&indices, &tokens, ¶ms)?; pre_filter.wait_for_ready().await?; - let (doc_ids, scores) = index - .bm25_search( - Arc::new(tokens), - params.into(), - lance_index::scalar::inverted::query::Operator::And, - pre_filter, - metrics.clone(), - ) - .boxed() - .await?; + let tokens = Arc::new(tokens); + let params = Arc::new(params); + let (doc_ids, scores) = search_segments( + &indices, + tokens, + params, + lance_index::scalar::inverted::query::Operator::And, + pre_filter, + metrics.clone(), + Arc::new(base_scorer), + ) + .await?; metrics.baseline_metrics.record_output(doc_ids.len()); let batch = RecordBatch::try_new( FTS_SCHEMA.clone(), @@ -1395,6 +1556,7 @@ mod tests { use std::sync::{Arc, Mutex}; use crate::index::DatasetIndexExt; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; use lance_datafusion::datagen::DatafusionDatagenExt; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; @@ -1402,20 +1564,25 @@ mod tests { use lance_datagen::{BatchCount, ByteCount, RowCount}; use lance_index::metrics::NoOpMetricsCollector; use lance_index::scalar::inverted::InvertedIndex; + use lance_index::scalar::inverted::Language; use lance_index::scalar::inverted::query::{ BooleanQuery, BoostQuery, FtsQuery, FtsSearchParams, MatchQuery, Occur, Operator, - PhraseQuery, + PhraseQuery, collect_query_tokens, has_query_token, }; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; use lance_index::{IndexCriteria, IndexType}; use crate::{ + dataset::transaction::{Operation, TransactionBuilder}, index::DatasetIndexInternalExt, io::exec::PreFilterSource, utils::test::{DatagenExt, FragmentCount, FragmentRowCount, NoContextTestFixture}, }; - use super::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec}; + use super::{ + BoostQueryExec, FlatMatchFilterExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec, + }; + use crate::io::exec::utils::IndexMetrics; #[derive(Default)] struct StatsHolder { @@ -1519,6 +1686,65 @@ mod tests { assert!(metrics.elapsed_compute().unwrap() > 0); } + #[tokio::test] + async fn test_flat_match_filter_load_tokenizer_uses_on_disk_params_when_details_missing() { + let mut dataset = lance_datagen::gen_batch() + .col( + "text", + lance_datagen::array::cycle_utf8_literals(&["hello", "HELLO"]), + ) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(2)) + .await + .unwrap(); + + let params = InvertedIndexParams::new("simple".to_string(), Language::English) + .with_position(false) + .lower_case(false) + .stem(false) + .remove_stop_words(false) + .ascii_folding(false) + .max_token_length(None); + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + let index_meta = dataset + .load_scalar_index(IndexCriteria::default().for_column("text").supports_fts()) + .await + .unwrap() + .unwrap(); + let mut legacy_index_meta = index_meta.clone(); + legacy_index_meta.index_details = None; + let transaction = TransactionBuilder::new( + dataset.manifest.version, + Operation::CreateIndex { + new_indices: vec![legacy_index_meta], + removed_indices: vec![index_meta], + }, + ) + .build(); + dataset + .apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + let metrics = IndexMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let mut tokenizer = FlatMatchFilterExec::load_tokenizer(&dataset, "text", &metrics) + .await + .unwrap(); + let query_tokens = collect_query_tokens("hello", &mut tokenizer); + + let mut tokenizer = FlatMatchFilterExec::load_tokenizer(&dataset, "text", &metrics) + .await + .unwrap(); + assert!(has_query_token("hello", &mut tokenizer, &query_tokens)); + assert!( + !has_query_token("HELLO", &mut tokenizer, &query_tokens), + "legacy FTS indices should continue using on-disk tokenizer params" + ); + } + #[tokio::test] async fn test_parts_searched_metrics() { let mut dataset = lance_datagen::gen_batch() diff --git a/rust/lance/tests/query/inverted.rs b/rust/lance/tests/query/inverted.rs index 4215292ca7a..da2a7181c34 100644 --- a/rust/lance/tests/query/inverted.rs +++ b/rust/lance/tests/query/inverted.rs @@ -3,7 +3,10 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array}; +use arrow_array::cast::AsArray; +use arrow_array::{ + ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt32Array, +}; use lance::Dataset; use lance::dataset::scanner::ColumnOrdering; use lance::dataset::{InsertBuilder, WriteParams}; @@ -12,6 +15,7 @@ use lance_index::IndexType; use lance_index::scalar::inverted::Language; use lance_index::scalar::inverted::query::{FtsQuery, PhraseQuery}; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; +use lance_table::format::IndexMetadata; use super::{strip_score_column, test_fts, test_scan, test_take}; use crate::utils::DatasetTestCases; @@ -146,6 +150,338 @@ async fn test_inverted_phrase_query_with_positions() { .await; } +#[tokio::test] +async fn test_segmented_inverted_match_query() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let batches = vec![ + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![0, 1])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("alpha lance"), Some("beta")])) as ArrayRef, + ), + ]) + .unwrap(), + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![2, 3])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("lance delta"), Some("gamma")])) as ArrayRef, + ), + ]) + .unwrap(), + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("omega"), Some("lance omega")])) as ArrayRef, + ), + ]) + .unwrap(), + ]; + let schema = batches[0].schema(); + let original = arrow_select::concat::concat_batches(&schema, &batches).unwrap(); + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut ds = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 2, + max_rows_per_group: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = base_inverted_params(false); + let fragment_ids = ds + .get_fragments() + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>(); + let mut metadatas = Vec::::with_capacity(fragment_ids.len()); + for fragment_id in fragment_ids { + let mut builder = ds + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name("segmented_fts".to_string()) + .fragments(vec![fragment_id]); + metadatas.push(builder.execute_uncommitted().await.unwrap()); + } + let segments = ds + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(metadatas.clone()) + .build_all() + .await + .unwrap(); + ds.commit_existing_index_segments("segmented_fts", "text", segments) + .await + .unwrap(); + assert!(metadatas.len() >= 2); + assert_eq!( + ds.load_indices_by_name("segmented_fts") + .await + .unwrap() + .len(), + metadatas.len() + ); + + let query = FullTextSearchQuery::new("lance".to_string()) + .with_column("text".to_string()) + .unwrap(); + assert_fts_expected(&original, &ds, query.clone(), None, &[0, 2, 5]).await; + test_fts(&original, &ds, "text", "lance", None, true, false).await; +} + +#[tokio::test] +async fn test_segmented_inverted_fuzzy_match_uses_global_idf() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let batches = vec![ + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![0])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("lance")])) as ArrayRef, + ), + ]) + .unwrap(), + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("lance lance lance")])) as ArrayRef, + ), + ]) + .unwrap(), + ]; + let schema = batches[0].schema(); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + let mut ds = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 1, + max_rows_per_group: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = base_inverted_params(false); + let fragment_ids = ds + .get_fragments() + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>(); + let mut metadatas = Vec::::with_capacity(fragment_ids.len()); + for fragment_id in fragment_ids { + let mut builder = ds + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name("segmented_fuzzy".to_string()) + .fragments(vec![fragment_id]); + metadatas.push(builder.execute_uncommitted().await.unwrap()); + } + let segments = ds + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(metadatas) + .build_all() + .await + .unwrap(); + ds.commit_existing_index_segments("segmented_fuzzy", "text", segments) + .await + .unwrap(); + + let batch = ds + .scan() + .full_text_search( + FullTextSearchQuery::new_fuzzy("lnce".to_string(), Some(1)) + .with_column("text".to_string()) + .unwrap() + .limit(Some(1)), + ) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let ids = batch["id"].as_primitive::(); + assert_eq!(ids.values(), &[1]); +} + +#[tokio::test] +async fn test_segmented_inverted_phrase_query() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let batches = vec![ + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![0, 1])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![ + Some("lance database"), + Some("database lance"), + ])) as ArrayRef, + ), + ]) + .unwrap(), + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![2, 3])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![ + Some("lance database query"), + Some("lance and database"), + ])) as ArrayRef, + ), + ]) + .unwrap(), + ]; + let schema = batches[0].schema(); + let original = arrow_select::concat::concat_batches(&schema, &batches).unwrap(); + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut ds = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 2, + max_rows_per_group: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = base_inverted_params(true); + let fragment_ids = ds + .get_fragments() + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>(); + let mut metadatas = Vec::::with_capacity(fragment_ids.len()); + for fragment_id in fragment_ids { + let mut builder = ds + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name("segmented_phrase_fts".to_string()) + .fragments(vec![fragment_id]); + metadatas.push(builder.execute_uncommitted().await.unwrap()); + } + let segments = ds + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(metadatas) + .build_all() + .await + .unwrap(); + ds.commit_existing_index_segments("segmented_phrase_fts", "text", segments) + .await + .unwrap(); + + let phrase = + PhraseQuery::new("lance database".to_string()).with_column(Some("text".to_string())); + let query = FullTextSearchQuery::new_query(FtsQuery::Phrase(phrase)); + assert_fts_expected(&original, &ds, query, None, &[0, 2]).await; + test_fts(&original, &ds, "text", "lance database", None, true, true).await; +} + +#[tokio::test] +async fn test_segmented_inverted_match_query_with_unindexed_fragments() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let initial_batches = vec![ + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![0, 1])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("lance zero"), Some("alpha")])) as ArrayRef, + ), + ]) + .unwrap(), + RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![2, 3])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("beta"), Some("lance three")])) as ArrayRef, + ), + ]) + .unwrap(), + ]; + let schema = initial_batches[0].schema(); + let reader = + RecordBatchIterator::new(initial_batches.clone().into_iter().map(Ok), schema.clone()); + let mut ds = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 2, + max_rows_per_group: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = base_inverted_params(false); + let fragment_ids = ds + .get_fragments() + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>(); + let mut metadatas = Vec::::with_capacity(fragment_ids.len()); + for fragment_id in fragment_ids { + let mut builder = ds + .create_index_builder(&["text"], IndexType::Inverted, ¶ms) + .name("segmented_mixed_fts".to_string()) + .fragments(vec![fragment_id]); + metadatas.push(builder.execute_uncommitted().await.unwrap()); + } + let segments = ds + .create_index_segment_builder() + .with_index_type(IndexType::Inverted) + .with_segments(metadatas) + .build_all() + .await + .unwrap(); + ds.commit_existing_index_segments("segmented_mixed_fts", "text", segments) + .await + .unwrap(); + + let appended = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef), + ( + "text", + Arc::new(StringArray::from(vec![Some("lance four"), Some("omega")])) as ArrayRef, + ), + ]) + .unwrap(); + let appended_reader = RecordBatchIterator::new(vec![Ok(appended.clone())], appended.schema()); + ds.append(appended_reader, None).await.unwrap(); + + let original = arrow_select::concat::concat_batches( + &schema, + &[ + initial_batches[0].clone(), + initial_batches[1].clone(), + appended, + ], + ) + .unwrap(); + let query = FullTextSearchQuery::new("lance".to_string()) + .with_column("text".to_string()) + .unwrap(); + assert_fts_expected(&original, &ds, query.clone(), None, &[0, 3, 4]).await; + test_fts(&original, &ds, "text", "lance", None, true, false).await; +} + #[tokio::test] // Validate filters are applied alongside inverted index search results. async fn test_inverted_with_filter() {