Skip to content

Commit a07ef61

Browse files
committed
feat: add Python vector segment builder API
1 parent 691cecb commit a07ef61

File tree

8 files changed

+275
-6
lines changed

8 files changed

+275
-6
lines changed

python/python/lance/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .lance import (
3030
DatasetBasePath,
3131
FFILanceTableProvider,
32+
IndexSegmentBuilder,
3233
ScanStatistics,
3334
bytes_read_counter,
3435
iops_counter,
@@ -64,6 +65,7 @@
6465
"FragmentMetadata",
6566
"Index",
6667
"IndexFile",
68+
"IndexSegmentBuilder",
6769
"LanceDataset",
6870
"LanceFragment",
6971
"LanceOperation",

python/python/lance/dataset.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
from .dependencies import numpy as np
4848
from .dependencies import pandas as pd
4949
from .fragment import DataFile, FragmentMetadata, LanceFragment
50-
from .indices import IndexConfig, SupportedDistributedIndices
50+
from .indices import IndexConfig, IndexSegment, SupportedDistributedIndices
5151
from .lance import (
5252
CleanupStats,
5353
Compaction,
@@ -3391,6 +3391,26 @@ def merge_index_metadata(
33913391
self._ds.merge_index_metadata(index_uuid, t, batch_readhead)
33923392
return None
33933393

3394+
def create_index_segment_builder(self, staging_index_uuid: str):
3395+
"""
3396+
Create a builder for turning partial index outputs into committed segments.
3397+
3398+
The caller should pass the shared index UUID used during
3399+
:meth:`create_index` with ``fragment_ids=...`` and ``index_uuid=...``.
3400+
Then provide the returned partial index metadata through
3401+
:meth:`IndexSegmentBuilder.with_partial_indices`.
3402+
"""
3403+
return self._ds.create_index_segment_builder(staging_index_uuid)
3404+
3405+
def commit_existing_index_segments(
3406+
self, index_name: str, column: str, segments: List[IndexSegment]
3407+
) -> LanceDataset:
3408+
"""
3409+
Commit built index segments as one logical index.
3410+
"""
3411+
self._ds.commit_existing_index_segments(index_name, column, segments)
3412+
return self
3413+
33943414
def session(self) -> Session:
33953415
"""
33963416
Return the dataset session, which holds the dataset's state.

python/python/lance/indices/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,18 @@
55

66
from lance.indices.builder import IndexConfig, IndicesBuilder
77
from lance.indices.ivf import IvfModel
8+
from lance.lance.indices import IndexSegment, IndexSegmentPlan
89
from lance.indices.pq import PqModel
910

10-
__all__ = ["IndicesBuilder", "IndexConfig", "PqModel", "IvfModel", "IndexFileVersion"]
11+
__all__ = [
12+
"IndicesBuilder",
13+
"IndexConfig",
14+
"PqModel",
15+
"IvfModel",
16+
"IndexFileVersion",
17+
"IndexSegment",
18+
"IndexSegmentPlan",
19+
]
1120

1221

1322
class IndexFileVersion(str, Enum):

python/python/lance/lance/__init__.pyi

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ from .fragment import (
6161
RowIdMeta as RowIdMeta,
6262
)
6363
from .indices import IndexDescription as IndexDescription
64+
from .indices import IndexSegment as IndexSegment
65+
from .indices import IndexSegmentPlan as IndexSegmentPlan
6466
from .lance import PySearchFilter
6567
from .optimize import (
6668
Compaction as Compaction,
@@ -185,6 +187,15 @@ class LanceColumnStatistics:
185187
class _Session:
186188
def size_bytes(self) -> int: ...
187189

190+
class IndexSegmentBuilder:
191+
@property
192+
def staging_index_uuid(self) -> str: ...
193+
def with_partial_indices(self, partial_indices: List[Index]) -> Self: ...
194+
def with_target_segment_bytes(self, bytes: int) -> Self: ...
195+
def plan(self) -> List[IndexSegmentPlan]: ...
196+
def build(self, plan: IndexSegmentPlan) -> IndexSegment: ...
197+
def build_all(self) -> List[IndexSegment]: ...
198+
188199
class LanceBlobFile:
189200
def close(self): ...
190201
def is_closed(self) -> bool: ...
@@ -360,6 +371,12 @@ class _Dataset:
360371
def merge_index_metadata(
361372
self, index_uuid: str, index_type: str, batch_readhead: Optional[int] = None
362373
): ...
374+
def create_index_segment_builder(
375+
self, staging_index_uuid: str
376+
) -> IndexSegmentBuilder: ...
377+
def commit_existing_index_segments(
378+
self, index_name: str, column: str, segments: List[IndexSegment]
379+
) -> None: ...
363380
def count_fragments(self) -> int: ...
364381
def num_small_files(self, max_rows_per_group: int) -> int: ...
365382
def get_fragments(self) -> List[_Fragment]: ...

python/python/lance/lance/indices/__init__.pyi

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,27 @@ from datetime import datetime
1616
from typing import Optional
1717

1818
import pyarrow as pa
19+
from ...dataset import Index
1920

2021
class IndexConfig:
2122
index_type: str
2223
config: str
2324

25+
class IndexSegment:
26+
uuid: str
27+
fragment_ids: set[int]
28+
index_version: int
29+
30+
def __repr__(self) -> str: ...
31+
32+
class IndexSegmentPlan:
33+
staging_index_uuid: str
34+
segment: IndexSegment
35+
partial_indices: list[Index]
36+
estimated_bytes: int
37+
38+
def __repr__(self) -> str: ...
39+
2440
def train_ivf_model(
2541
dataset,
2642
column: str,

python/src/dataset.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
8787
use crate::error::PythonErrorExt;
8888
use crate::file::object_store_from_uri_or_path;
8989
use crate::fragment::FileFragment;
90-
use crate::indices::{PyIndexConfig, PyIndexDescription};
90+
use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment, PyIndexSegmentPlan};
9191
use crate::namespace::extract_namespace_arc;
9292
use crate::rt;
9393
use crate::scanner::ScanStatistics;
@@ -323,6 +323,88 @@ impl MergeInsertBuilder {
323323
}
324324
}
325325

326+
#[pyclass(name = "IndexSegmentBuilder", module = "lance", subclass)]
327+
#[derive(Clone)]
328+
pub struct PyIndexSegmentBuilder {
329+
dataset: Arc<LanceDataset>,
330+
staging_index_uuid: String,
331+
partial_indices: Vec<IndexMetadata>,
332+
target_segment_bytes: Option<u64>,
333+
}
334+
335+
#[pymethods]
336+
impl PyIndexSegmentBuilder {
337+
#[getter]
338+
fn staging_index_uuid(&self) -> String {
339+
self.staging_index_uuid.clone()
340+
}
341+
342+
fn with_partial_indices<'a>(
343+
mut slf: PyRefMut<'a, Self>,
344+
partial_indices: &Bound<'_, PyAny>,
345+
) -> PyResult<PyRefMut<'a, Self>> {
346+
let mut indices = Vec::new();
347+
for item in partial_indices.try_iter()? {
348+
indices.push(item?.extract::<PyLance<IndexMetadata>>()?.0);
349+
}
350+
slf.partial_indices = indices;
351+
Ok(slf)
352+
}
353+
354+
fn with_target_segment_bytes<'a>(
355+
mut slf: PyRefMut<'a, Self>,
356+
bytes: u64,
357+
) -> PyResult<PyRefMut<'a, Self>> {
358+
slf.target_segment_bytes = Some(bytes);
359+
Ok(slf)
360+
}
361+
362+
fn plan(&self, py: Python<'_>) -> PyResult<Vec<Py<PyIndexSegmentPlan>>> {
363+
let mut builder = self
364+
.dataset
365+
.create_index_segment_builder(self.staging_index_uuid.clone())
366+
.with_partial_indices(self.partial_indices.clone());
367+
if let Some(target_segment_bytes) = self.target_segment_bytes {
368+
builder = builder.with_target_segment_bytes(target_segment_bytes);
369+
}
370+
let plans = rt().block_on(Some(py), builder.plan())?.infer_error()?;
371+
plans.into_iter()
372+
.map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan)))
373+
.collect()
374+
}
375+
376+
fn build(
377+
&self,
378+
py: Python<'_>,
379+
plan: &Bound<'_, PyAny>,
380+
) -> PyResult<Py<PyIndexSegment>> {
381+
let plan = plan.extract::<PyRef<'_, PyIndexSegmentPlan>>()?;
382+
let builder = self
383+
.dataset
384+
.create_index_segment_builder(self.staging_index_uuid.clone())
385+
.with_partial_indices(self.partial_indices.clone());
386+
let segment = rt()
387+
.block_on(Some(py), builder.build(&plan.inner))?
388+
.infer_error()?;
389+
Py::new(py, PyIndexSegment::from_inner(segment))
390+
}
391+
392+
fn build_all(&self, py: Python<'_>) -> PyResult<Vec<Py<PyIndexSegment>>> {
393+
let mut builder = self
394+
.dataset
395+
.create_index_segment_builder(self.staging_index_uuid.clone())
396+
.with_partial_indices(self.partial_indices.clone());
397+
if let Some(target_segment_bytes) = self.target_segment_bytes {
398+
builder = builder.with_target_segment_bytes(target_segment_bytes);
399+
}
400+
let segments = rt().block_on(Some(py), builder.build_all())?.infer_error()?;
401+
segments
402+
.into_iter()
403+
.map(|segment| Py::new(py, PyIndexSegment::from_inner(segment)))
404+
.collect()
405+
}
406+
}
407+
326408
impl MergeInsertBuilder {
327409
fn build_stats<'a>(stats: &MergeStats, py: Python<'a>) -> PyResult<Bound<'a, PyDict>> {
328410
let dict = PyDict::new(py);
@@ -2019,6 +2101,35 @@ impl Dataset {
20192101
Ok(PyLance(index_metadata))
20202102
}
20212103

2104+
fn create_index_segment_builder(&self, staging_index_uuid: String) -> PyResult<PyIndexSegmentBuilder> {
2105+
Ok(PyIndexSegmentBuilder {
2106+
dataset: self.ds.clone(),
2107+
staging_index_uuid,
2108+
partial_indices: Vec::new(),
2109+
target_segment_bytes: None,
2110+
})
2111+
}
2112+
2113+
fn commit_existing_index_segments(
2114+
&mut self,
2115+
index_name: &str,
2116+
column: &str,
2117+
segments: Vec<PyRef<'_, PyIndexSegment>>,
2118+
) -> PyResult<()> {
2119+
let mut new_self = self.ds.as_ref().clone();
2120+
let segments = segments
2121+
.into_iter()
2122+
.map(|segment| segment.inner.clone())
2123+
.collect();
2124+
rt().block_on(
2125+
None,
2126+
new_self.commit_existing_index_segments(index_name, column, segments),
2127+
)?
2128+
.infer_error()?;
2129+
self.ds = Arc::new(new_self);
2130+
Ok(())
2131+
}
2132+
20222133
fn drop_index(&mut self, name: &str) -> PyResult<()> {
20232134
let mut new_self = self.ds.as_ref().clone();
20242135
rt().block_on(None, new_self.drop_index(name))?

python/src/indices.rs

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ use pyo3::{
3232
use lance::index::DatasetIndexInternalExt;
3333

3434
use crate::fragment::FileFragment;
35-
use crate::utils::PyJson;
35+
use crate::utils::{PyJson, PyLance};
3636
use crate::{
3737
dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt,
3838
};
3939
use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index;
40-
use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexType};
40+
use lance_index::{
41+
DatasetIndexExt, IndexDescription, IndexSegment, IndexSegmentPlan, IndexType,
42+
};
4143
use uuid::Uuid;
4244

4345
#[pyclass(name = "IndexConfig", module = "lance.indices", get_all)]
@@ -58,6 +60,93 @@ impl PyIndexConfig {
5860
}
5961
}
6062

63+
#[pyclass(name = "IndexSegment", module = "lance.indices")]
64+
#[derive(Debug, Clone)]
65+
pub struct PyIndexSegment {
66+
pub(crate) inner: IndexSegment,
67+
}
68+
69+
impl PyIndexSegment {
70+
pub(crate) fn from_inner(inner: IndexSegment) -> Self {
71+
Self { inner }
72+
}
73+
}
74+
75+
#[pymethods]
76+
impl PyIndexSegment {
77+
#[getter]
78+
fn uuid(&self) -> String {
79+
self.inner.uuid().to_string()
80+
}
81+
82+
#[getter]
83+
fn fragment_ids(&self) -> HashSet<u32> {
84+
self.inner.fragment_bitmap().iter().collect()
85+
}
86+
87+
#[getter]
88+
fn index_version(&self) -> i32 {
89+
self.inner.index_version()
90+
}
91+
92+
fn __repr__(&self) -> String {
93+
format!(
94+
"IndexSegment(uuid={}, fragment_ids={:?}, index_version={})",
95+
self.uuid(),
96+
self.fragment_ids(),
97+
self.index_version()
98+
)
99+
}
100+
}
101+
102+
#[pyclass(name = "IndexSegmentPlan", module = "lance.indices")]
103+
#[derive(Debug, Clone)]
104+
pub struct PyIndexSegmentPlan {
105+
pub(crate) inner: IndexSegmentPlan,
106+
}
107+
108+
impl PyIndexSegmentPlan {
109+
pub(crate) fn from_inner(inner: IndexSegmentPlan) -> Self {
110+
Self { inner }
111+
}
112+
}
113+
114+
#[pymethods]
115+
impl PyIndexSegmentPlan {
116+
#[getter]
117+
fn staging_index_uuid(&self) -> String {
118+
self.inner.staging_index_uuid().to_string()
119+
}
120+
121+
#[getter]
122+
fn segment(&self) -> PyIndexSegment {
123+
PyIndexSegment::from_inner(self.inner.segment().clone())
124+
}
125+
126+
#[getter]
127+
fn partial_indices(&self) -> Vec<PyLance<lance_table::format::IndexMetadata>> {
128+
self.inner
129+
.partial_indices()
130+
.iter()
131+
.cloned()
132+
.map(PyLance)
133+
.collect()
134+
}
135+
136+
#[getter]
137+
fn estimated_bytes(&self) -> u64 {
138+
self.inner.estimated_bytes()
139+
}
140+
fn __repr__(&self) -> String {
141+
format!(
142+
"IndexSegmentPlan(staging_index_uuid={}, partial_indices={}, estimated_bytes={})",
143+
self.staging_index_uuid(),
144+
self.inner.partial_indices().len(),
145+
self.estimated_bytes()
146+
)
147+
}
148+
}
149+
61150
#[pyclass(name = "IvfModel", module = "lance.indices")]
62151
#[derive(Debug, Clone)]
63152
pub struct PyIvfModel {
@@ -619,6 +708,8 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
619708
indices.add_wrapped(wrap_pyfunction!(load_shuffled_vectors))?;
620709
indices.add_class::<PyIvfModel>()?;
621710
indices.add_class::<PyIndexConfig>()?;
711+
indices.add_class::<PyIndexSegment>()?;
712+
indices.add_class::<PyIndexSegmentPlan>()?;
622713
indices.add_class::<PyIndexDescription>()?;
623714
indices.add_class::<PyIndexSegmentDescription>()?;
624715
indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?;

0 commit comments

Comments
 (0)