From a85ea9321f2615dc0cd60895af04c87b60ad44c1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 22 Mar 2026 06:49:38 +0800 Subject: [PATCH] docs: add distributed indexing workflow example --- docs/src/guide/distributed_indexing.md | 175 ++++++++++++++++++++++++- 1 file changed, 173 insertions(+), 2 deletions(-) diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index e6aa241feee..67641c897f7 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -102,8 +102,8 @@ Then the caller turns that staging root into one or more built segments: At that point the caller has two execution choices: -- call `build(plan)` for each plan and run those builds in parallel -- call `build_all()` to let Lance build every planned segment on the current node +- call `build(plan)` for each plan and run those builds on one or more + distributed segment-build workers After the segments are built, publish them with `commit_existing_index_segments(...)`. @@ -153,3 +153,174 @@ Lance is responsible for: This split keeps distributed scheduling outside the storage engine while still letting Lance own the on-disk index format. + +## End-to-End Example (Python) + +The following recipe uses the public Python APIs to build an `IVF_PQ` index in +three stages: + +1. train shared artifacts once +2. build partial shards on workers +3. plan, build, and commit final segments + +This example assumes the dataset already exists in object storage and that your +distributed scheduler is responsible for: + +- assigning fragment groups to workers +- distributing the shared training artifacts +- collecting the returned partial index metadata on the coordinator + +The examples below use an S3 dataset path. See [Object Store +Configuration](object_store.md) for authentication and other storage settings. + +### Shared Configuration + +```python +import uuid + +import lance +from lance.indices import IndicesBuilder +from lance.indices.ivf import IvfModel +from lance.indices.pq import PqModel + +uri = "s3://my-bucket/example/distributed-index.lance" +storage_options = { + "region": "us-east-1", +} +index_name = "vector_idx" +staging_index_uuid = str(uuid.uuid4()) +ivf_model_uri = "s3://my-bucket/example/index-training/ivf.lance" +pq_model_uri = "s3://my-bucket/example/index-training/pq.lance" +``` + +### 1. Train Once and Plan Fragment Groups + +One node trains the shared IVF/PQ artifacts and decides how to partition the +dataset fragments across workers. The trained IVF and PQ models can be saved to +object storage and loaded by every worker later, so the scheduler only needs to +distribute the model URIs. + +```python +ds = lance.dataset(uri, storage_options=storage_options) + +fragments = ds.get_fragments() +fragment_groups = [ + [fragment.fragment_id for fragment in fragments[:2]], + [fragment.fragment_id for fragment in fragments[2:]], +] + +builder = IndicesBuilder(ds, "vector") +ivf_model = builder.train_ivf( + num_partitions=32, + distance_type="l2", +) +ivf_model.save(ivf_model_uri) + +pq_model = builder.train_pq(ivf_model, num_subvectors=16) +pq_model.save(pq_model_uri) +``` + +### 2. Build Partial Shards on Each Worker + +Each worker receives one fragment group plus the shared training artifacts. The +worker writes one partial shard under the shared staging UUID and returns the +resulting partial index metadata. + +```python +ds = lance.dataset(uri, storage_options=storage_options) + +fragment_ids = [0, 1] # assigned by your scheduler for this worker +ivf_model = IvfModel.load(ivf_model_uri) +pq_model = PqModel.load(pq_model_uri) + +partial_index = ds.create_index_uncommitted( + "vector", + "IVF_PQ", + name=index_name, + metric="l2", + train=True, + fragment_ids=fragment_ids, + index_uuid=staging_index_uuid, + num_partitions=32, + num_sub_vectors=16, + ivf_centroids=ivf_model.centroids, + pq_codebook=pq_model.codebook, +) + +# Send partial_index back to the coordinator. +``` + +### 3. Plan Final Segments on the Coordinator + +The coordinator gathers the partial index metadata and decides how partial +shards should be grouped into final physical segments. + +```python +ds = lance.dataset(uri, storage_options=storage_options) + +# Collected from all workers. +partial_indices = [ + partial_index_0, + partial_index_1, +] + +segment_builder = ( + ds.create_index_segment_builder(staging_index_uuid) + .with_partial_indices(partial_indices) + .with_target_segment_bytes(2 * 1024 * 1024 * 1024) # optional +) + +plans = segment_builder.plan() +``` + +### 4. Build Final Segments on Segment-Build Workers + +Segment build can also be distributed. After planning, the coordinator can +assign one `IndexSegmentPlan` to each segment-build worker. Each worker reads +the staged shard outputs directly from object storage and builds one committed +physical segment. + +```python +ds = lance.dataset(uri, storage_options=storage_options) + +# Assigned by the coordinator. +plan_for_this_worker = plans[0] + +segment_builder = ds.create_index_segment_builder(staging_index_uuid) +segment = segment_builder.build(plan_for_this_worker) + +# Send segment back to the coordinator. +``` + +### 5. Commit Final Segments on the Coordinator + +After all segment-build workers finish, the coordinator commits the resulting +physical segments as one logical index. + +```python +import numpy as np + +ds = lance.dataset(uri, storage_options=storage_options) + +# Collected from all segment-build workers. +segments = [ + segment_0, + segment_1, +] + +ds = ds.commit_existing_index_segments( + index_name, + "vector", + segments, +) + +query = np.random.randn(128).astype(np.float32) +results = ds.to_table( + nearest={"column": "vector", "q": query, "k": 5, "nprobes": 4} +) +print(results.select(["id", "_distance"]).to_pydict()) +``` + +The same overall flow also applies to `IVF_FLAT` and `IVF_SQ`. In those cases, +train and save only the IVF model, load it on each worker, and pass +`ivf_centroids=ivf_model.centroids` into `create_index_uncommitted(...)`.