@@ -17,10 +17,10 @@ use crate::{
1717 vector_index_details,
1818 } ,
1919} ;
20- use futures:: future:: BoxFuture ;
20+ use futures:: future:: { BoxFuture , try_join_all } ;
2121use lance_core:: datatypes:: format_field_path;
2222use lance_index:: progress:: { IndexBuildProgress , NoopIndexBuildProgress } ;
23- use lance_index:: { IndexParams , IndexType , scalar:: CreatedIndex } ;
23+ use lance_index:: { IndexParams , IndexSegment , IndexSegmentPlan , IndexType , scalar:: CreatedIndex } ;
2424use lance_index:: {
2525 metrics:: NoOpMetricsCollector ,
2626 scalar:: { LANCE_SCALAR_INDEX , ScalarIndexParams , inverted:: tokenizer:: InvertedIndexParams } ,
@@ -196,6 +196,7 @@ impl<'a> CreateIndexBuilder<'a> {
196196 . map_err ( |e| Error :: index ( format ! ( "Invalid UUID string provided: {}" , e) ) ) ?,
197197 None => Uuid :: new_v4 ( ) ,
198198 } ;
199+ let mut output_index_uuid = index_id;
199200 let created_index = match ( self . index_type , self . params . index_name ( ) ) {
200201 (
201202 IndexType :: Bitmap
@@ -323,7 +324,7 @@ impl<'a> CreateIndexBuilder<'a> {
323324 if let Some ( fragments) = & self . fragments {
324325 // For distributed indexing, build only on specified fragments
325326 // This creates temporary index metadata without committing
326- Box :: pin ( build_distributed_vector_index (
327+ let shard_uuid = Box :: pin ( build_distributed_vector_index (
327328 self . dataset ,
328329 column,
329330 & index_name,
@@ -334,6 +335,7 @@ impl<'a> CreateIndexBuilder<'a> {
334335 self . progress . clone ( ) ,
335336 ) )
336337 . await ?;
338+ output_index_uuid = shard_uuid;
337339 } else {
338340 // Standard full dataset indexing
339341 Box :: pin ( build_vector_index (
@@ -359,7 +361,14 @@ impl<'a> CreateIndexBuilder<'a> {
359361 . await ?;
360362 }
361363 // Capture file sizes after vector index creation
362- let index_dir = self . dataset . indices_dir ( ) . child ( index_id. to_string ( ) ) ;
364+ let index_dir = if self . fragments . is_some ( ) {
365+ self . dataset
366+ . indices_dir ( )
367+ . child ( index_id. to_string ( ) )
368+ . child ( format ! ( "partial_{}" , output_index_uuid) )
369+ } else {
370+ self . dataset . indices_dir ( ) . child ( index_id. to_string ( ) )
371+ } ;
363372 let files =
364373 list_index_files_with_sizes ( & self . dataset . object_store , & index_dir) . await ?;
365374 CreatedIndex {
@@ -420,7 +429,7 @@ impl<'a> CreateIndexBuilder<'a> {
420429 } ;
421430
422431 Ok ( IndexMetadata {
423- uuid : index_id ,
432+ uuid : output_index_uuid ,
424433 name : index_name,
425434 fields : vec ! [ field. id] ,
426435 dataset_version : self . dataset . manifest . version ,
@@ -494,6 +503,91 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> {
494503 }
495504}
496505
506+ /// Build physical index segments from previously-written partial index outputs.
507+ ///
508+ /// Use [`DatasetIndexExt::create_index_segment_builder`] to open a staging root
509+ /// and then either:
510+ ///
511+ /// - call [`Self::plan`] and orchestrate individual segment builds externally, or
512+ /// - call [`Self::build_all`] to build all segments on the current node.
513+ ///
514+ /// This builder only builds physical segments. Publishing those segments as
515+ /// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`].
516+ /// Together these two APIs form the canonical distributed vector segment build workflow.
517+ #[ derive( Clone ) ]
518+ pub struct IndexSegmentBuilder < ' a > {
519+ dataset : & ' a Dataset ,
520+ staging_index_uuid : String ,
521+ partial_indices : Vec < IndexMetadata > ,
522+ target_segment_bytes : Option < u64 > ,
523+ }
524+
525+ impl < ' a > IndexSegmentBuilder < ' a > {
526+ pub ( crate ) fn new ( dataset : & ' a Dataset , staging_index_uuid : String ) -> Self {
527+ Self {
528+ dataset,
529+ staging_index_uuid,
530+ partial_indices : Vec :: new ( ) ,
531+ target_segment_bytes : None ,
532+ }
533+ }
534+
535+ /// Provide the partial index metadata returned by `execute_uncommitted()`
536+ /// for this staging root.
537+ pub fn with_partial_indices ( mut self , partial_indices : Vec < IndexMetadata > ) -> Self {
538+ self . partial_indices = partial_indices;
539+ self
540+ }
541+
542+ /// Set the target size, in bytes, for merged built segments.
543+ ///
544+ /// When set, shard outputs will be grouped into larger built segments up to
545+ /// approximately this size. When unset, each shard output becomes one built
546+ /// segment.
547+ pub fn with_target_segment_bytes ( mut self , bytes : u64 ) -> Self {
548+ self . target_segment_bytes = Some ( bytes) ;
549+ self
550+ }
551+
552+ /// Plan how partial indices should be grouped into built segments.
553+ pub async fn plan ( & self ) -> Result < Vec < IndexSegmentPlan > > {
554+ if self . partial_indices . is_empty ( ) {
555+ return Err ( Error :: invalid_input (
556+ "IndexSegmentBuilder requires at least one partial index; \
557+ call with_partial_indices(...) with execute_uncommitted() outputs"
558+ . to_string ( ) ,
559+ ) ) ;
560+ }
561+
562+ crate :: index:: vector:: ivf:: plan_staging_segments (
563+ & self
564+ . dataset
565+ . indices_dir ( )
566+ . child ( self . staging_index_uuid . as_str ( ) ) ,
567+ & self . partial_indices ,
568+ None ,
569+ self . target_segment_bytes ,
570+ )
571+ . await
572+ }
573+
574+ /// Build one segment from a previously-generated plan.
575+ pub async fn build ( & self , plan : & IndexSegmentPlan ) -> Result < IndexSegment > {
576+ crate :: index:: vector:: ivf:: build_staging_segment (
577+ self . dataset . object_store ( ) ,
578+ & self . dataset . indices_dir ( ) ,
579+ plan,
580+ )
581+ . await
582+ }
583+
584+ /// Plan and build all segments from this staging root.
585+ pub async fn build_all ( & self ) -> Result < Vec < IndexSegment > > {
586+ let plans = self . plan ( ) . await ?;
587+ try_join_all ( plans. iter ( ) . map ( |plan| self . build ( plan) ) ) . await
588+ }
589+ }
590+
497591#[ cfg( test) ]
498592mod tests {
499593 use super :: * ;
0 commit comments