This is a follow-up to issue #5164.
As the scale of unstructured data continues to grow, the need for a robust distributed index-building capability is becoming increasingly critical. Because different index types, such as the Range-based BTree Index, have different distributed build strategies, the current build implementation is no longer sufficient for all index types.
Current Implementation
Current implementation for distributed indexing is an extension of the single-node logic. It adapts the CreateIndexBuilder by adding a fragments parameter. This approach allowed us to reuse the existing single-node build process with minimal changes, achieving distributed capabilities quickly.
The build process is divided into the following three stages:
build_distributed_index: An instance of CreateIndexBuilder is created, and the presence of fragments determines if it's a distributed build.
merge_metadata: After the distributed build completes, the Dataset.merge_index_metadata() method is called to consolidate the metadata. (This is invoked at the Python/Java layer by directly calling into Index, and there isn't a unified entry point for this at the Rust level).
commit_index: The final index information is committed to the dataset's manifest.
Limitations
This design is built on two key assumptions:
- All computation for index creation occurs within Lance itself.
- The build process can be partitioned at the fragment level.
As we expand support for more advanced indexing strategies, we're encountering new requirements that go beyond this initial scope. A prime example is the Range-Based Distributed B-Tree Index, where
- The expensive computation (global sorting) is performed by external big data engines (like Spark, Flink, etc.).
- The data is partitioned by value range, not by fragment.
The existing CreateIndexBuilder was not originally designed to handle these scenarios, suggesting a need for a more adaptable approach.
Proposal
As mentioned in (#5202 (comment)), similar to compaction API, I'd like to propose creating a dedicated, unified API for distributed index building at the Rust level, perhaps a DistributedIndexBuilder, which would encapsulate the entire lifecycle:
build_distributed_index: Generalize beyond fragment-level splitting to support arbitrary data partitions and accept pre-processed data from external compute engines.
merge_metadata: Move this logic from a standalone Dataset interface into the DistributedIndexBuilder, abstracting the Index details away from the Python/Java bindings.
commit_index: Encapsulate the final commit step within the builder as well.
Would this unified builder be flexible enough to accommodate the distributed build requirements for various index types, including vector indexes?
On a related note, I've noticed that vector indexes currently use IvfIndexBuilder internally. Could this new, unified API also integrate the logic from IvfIndexBuilder, creating a consistent interface for both single-node and distributed builds across all index families?
Here is a rough sketch of the proposed structure:
// The common configuration base for all index builders.
pub struct IndexBuilderBase<'a> {
dataset: &'a mut Dataset,
columns: Vec<String>,
index_type: IndexType,
params: &'a dyn IndexParams,
name: Option<String>,
replace: bool,
train: bool,
index_uuid: Option<String>,
}
// Basic build impls (e.g., .name(), .replace()) would be here.
// A builder for standard, monolithic index creation.
pub struct CreateIndexBuilder<'a> {
base: IndexBuilderBase<'a>,
}
// Impl for `execute (build_index + commit_index)` would be here.
...
impl<'a> Deref for CreateIndexBuilder<'a> {
type Target = IndexBuilderBase<'a>;
fn deref(&self) -> &Self::Target { &self.base }
}
impl<'a> DerefMut for CreateIndexBuilder<'a> {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.base }
}
// A builder for distributed, multi-stage index creation.
// Or a more specific DistributedBTreeIndexBuilder
pub struct DistributedIndexBuilder<'a> {
base: IndexBuilderBase<'a>,
// For fragment-based builds
fragments: Option<Vec<u32>>,
// For different-kind-based builds such as range-based
// (partition_id, preprocessed_data)
external_data: Option<(u32, Box<dyn RecordBatchReader + Send + 'static>)>,
}
// Impls for `build_distributed_index`, `merge_metadata`, and `commit_index` would be here.
...
I'm still debating whether a generic DistributedIndexBuilder is the right path, or if a more specific extension, like creating a new BTreeIndexBuilder, would be a better initial step.
@westonpace @wjones127 @BubbleCal @chenghao-guo I would greatly appreciate your thoughts and any alternative suggestions on this!
This is a follow-up to issue #5164.
As the scale of unstructured data continues to grow, the need for a robust distributed index-building capability is becoming increasingly critical. Because different index types, such as the Range-based BTree Index, have different distributed build strategies, the current build implementation is no longer sufficient for all index types.
Current Implementation
Current implementation for distributed indexing is an extension of the single-node logic. It adapts the
CreateIndexBuilderby adding afragmentsparameter. This approach allowed us to reuse the existing single-node build process with minimal changes, achieving distributed capabilities quickly.The build process is divided into the following three stages:
build_distributed_index: An instance ofCreateIndexBuilderis created, and the presence of fragments determines if it's a distributed build.merge_metadata: After the distributed build completes, theDataset.merge_index_metadata()method is called to consolidate the metadata. (This is invoked at the Python/Java layer by directly calling into Index, and there isn't a unified entry point for this at the Rust level).commit_index: The final index information is committed to the dataset's manifest.Limitations
This design is built on two key assumptions:
As we expand support for more advanced indexing strategies, we're encountering new requirements that go beyond this initial scope. A prime example is the Range-Based Distributed B-Tree Index, where
The existing
CreateIndexBuilderwas not originally designed to handle these scenarios, suggesting a need for a more adaptable approach.Proposal
As mentioned in (#5202 (comment)), similar to compaction API, I'd like to propose creating a dedicated, unified API for distributed index building at the Rust level, perhaps a
DistributedIndexBuilder, which would encapsulate the entire lifecycle:build_distributed_index: Generalize beyond fragment-level splitting to support arbitrary data partitions and accept pre-processed data from external compute engines.merge_metadata: Move this logic from a standalone Dataset interface into the DistributedIndexBuilder, abstracting the Index details away from the Python/Java bindings.commit_index: Encapsulate the final commit step within the builder as well.Would this unified builder be flexible enough to accommodate the distributed build requirements for various index types, including vector indexes?
On a related note, I've noticed that vector indexes currently use
IvfIndexBuilderinternally. Could this new, unified API also integrate the logic fromIvfIndexBuilder, creating a consistent interface for both single-node and distributed builds across all index families?Here is a rough sketch of the proposed structure:
I'm still debating whether a generic
DistributedIndexBuilderis the right path, or if a more specific extension, like creating a newBTreeIndexBuilder, would be a better initial step.@westonpace @wjones127 @BubbleCal @chenghao-guo I would greatly appreciate your thoughts and any alternative suggestions on this!