Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/vector_trainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ fn inner_train_ivf_centroids<'local>(
dim,
metric_type,
&ivf_params,
None,
Arc::new(NoopIndexBuildProgress),
))?;

Expand Down
36 changes: 34 additions & 2 deletions python/python/lance/indices/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def train_ivf(
accelerator: Optional[Union[str, "torch.Device"]] = None,
sample_rate: int = 256,
max_iters: int = 50,
fragment_ids: Optional[list[int]] = None,
) -> IvfModel:
"""
Train IVF centroids for the given vector column.
Expand Down Expand Up @@ -106,8 +107,10 @@ def train_ivf(
some cases, k-means will not converge but will cycle between various
possible minima. In these cases we must terminate or run forever. The
max_iters parameter defines a cutoff at which we terminate training.
fragment_ids: list[int], optional
If provided, train using only the specified fragments from the dataset.
"""
num_rows = self.dataset.count_rows()
num_rows = self._count_rows(fragment_ids)
num_partitions = self._determine_num_partitions(num_partitions, num_rows)
self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows)
distance_type = self._normalize_distance_type(distance_type)
Expand All @@ -124,9 +127,14 @@ def train_ivf(
distance_type,
sample_rate,
max_iters,
fragment_ids,
)
return IvfModel(ivf_centroids, distance_type)
else:
if fragment_ids is not None:
raise NotImplementedError(
"fragment_ids is not supported with accelerator IVF training"
)
# Use accelerator to train ivf centroids
from lance.vector import train_ivf_centroids_on_accelerator

Expand Down Expand Up @@ -154,6 +162,7 @@ def train_pq(
*,
sample_rate: int = 256,
max_iters: int = 50,
fragment_ids: Optional[list[int]] = None,
) -> PqModel:
"""
Train a PQ model for a given column.
Expand Down Expand Up @@ -184,10 +193,12 @@ def train_pq(
This parameter is used in the same way as in the IVF model.
max_iters: int
This parameter is used in the same way as in the IVF model.
fragment_ids: list[int], optional
If provided, train using only the specified fragments from the dataset.
"""
from lance.lance import indices

num_rows = self.dataset.count_rows()
num_rows = self._count_rows(fragment_ids)
self.dataset.schema.field(self.column[0]).type.list_size
num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension)
self._verify_pq_sample_rate(num_rows, sample_rate)
Expand All @@ -201,6 +212,7 @@ def train_pq(
sample_rate,
max_iters,
ivf_model.centroids,
fragment_ids,
)
return PqModel(num_subvectors, pq_codebook)

Expand All @@ -213,11 +225,17 @@ def prepare_global_ivf_pq(
accelerator: Optional[Union[str, "torch.Device"]] = None,
sample_rate: int = 256,
max_iters: int = 50,
fragment_ids: Optional[list[int]] = None,
) -> dict:
"""
Perform global training for IVF+PQ using existing CPU training paths and
return preprocessed artifacts for distributed builds.

Parameters
----------
fragment_ids: list[int], optional
If provided, train using only the specified fragments from the dataset.

Returns
-------
dict
Expand All @@ -239,6 +257,7 @@ def prepare_global_ivf_pq(
accelerator=accelerator, # None by default (CPU path)
sample_rate=sample_rate,
max_iters=max_iters,
fragment_ids=fragment_ids,
)

# Global PQ training using IVF residuals
Expand All @@ -247,6 +266,7 @@ def prepare_global_ivf_pq(
num_subvectors,
sample_rate=sample_rate,
max_iters=max_iters,
fragment_ids=fragment_ids,
)

return {"ivf_centroids": ivf_model.centroids, "pq_codebook": pq_model.codebook}
Expand Down Expand Up @@ -459,6 +479,18 @@ def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int
return round(math.sqrt(num_rows))
return num_partitions

def _count_rows(self, fragment_ids: Optional[list[int]] = None) -> int:
if fragment_ids is None:
return self.dataset.count_rows()

num_rows = 0
for fragment_id in fragment_ids:
fragment = self.dataset.get_fragment(fragment_id)
if fragment is None:
raise ValueError(f"Fragment id does not exist: {fragment_id}")
num_rows += fragment.count_rows()
return num_rows

def _normalize_pq_params(self, num_subvectors: int, dimension: int):
if num_subvectors is None:
if dimension % 16 == 0:
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance/lance/indices/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def train_ivf_model(
distance_type: str,
sample_rate: int,
max_iters: int,
fragment_ids: Optional[list[int]] = None,
) -> pa.Array: ...
def train_pq_model(
dataset,
Expand All @@ -56,6 +57,7 @@ def train_pq_model(
sample_rate: int,
max_iters: int,
ivf_model: pa.Array,
fragment_ids: Optional[list[int]] = None,
) -> pa.Array: ...
def transform_vectors(
dataset,
Expand Down
52 changes: 52 additions & 0 deletions python/python/tests/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,58 @@ def test_gen_pq(tmpdir, rand_dataset, rand_ivf):
assert pq.codebook == reloaded.codebook


def test_ivf_centroids_fragment_ids(tmpdir):
rows_per_fragment = 32
vectors = np.concatenate(
[
np.zeros((rows_per_fragment, DIMENSION), dtype=np.float32),
np.full((rows_per_fragment, DIMENSION), 10.0, dtype=np.float32),
],
axis=0,
)
vectors.shape = -1
table = pa.Table.from_arrays(
[pa.FixedSizeListArray.from_arrays(vectors, DIMENSION)], names=["vectors"]
)
ds = lance.write_dataset(
table,
pathlib.Path(tmpdir) / "fragment_ivf",
max_rows_per_file=rows_per_fragment,
)
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]

first_ivf = IndicesBuilder(ds, "vectors").train_ivf(
num_partitions=1, sample_rate=2, fragment_ids=[fragment_ids[0]]
)
second_ivf = IndicesBuilder(ds, "vectors").train_ivf(
num_partitions=1, sample_rate=2, fragment_ids=[fragment_ids[1]]
)

first_centroid = first_ivf.centroids.values.to_numpy().reshape(-1, DIMENSION)[0]
second_centroid = second_ivf.centroids.values.to_numpy().reshape(-1, DIMENSION)[0]

assert np.allclose(first_centroid, 0.0, atol=1e-4)
assert np.allclose(second_centroid, 10.0, atol=1e-4)


def test_pq_fragment_ids(rand_dataset):
fragment_id = rand_dataset.get_fragments()[0].fragment_id
ivf = IndicesBuilder(rand_dataset, "vectors").train_ivf(
num_partitions=4,
sample_rate=16,
fragment_ids=[fragment_id],
)

pq = IndicesBuilder(rand_dataset, "vectors").train_pq(
ivf,
sample_rate=2,
fragment_ids=[fragment_id],
)

assert pq.dimension == DIMENSION
assert pq.num_subvectors == NUM_SUBVECTORS


def test_pq_invalid_sub_vectors(tmpdir, rand_dataset, rand_ivf):
with pytest.raises(
ValueError,
Expand Down
14 changes: 13 additions & 1 deletion python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow_data::ArrayData;
use chrono::{DateTime, Utc};
use lance::dataset::Dataset as LanceDataset;
use lance::index::vector::ivf::builder::write_vector_storage;
use lance::index::vector::pq::build_pq_model_in_fragments;
use lance::index::{DatasetIndexExt, IndexSegment, IndexSegmentPlan};
use lance::io::ObjectStore;
use lance_index::progress::NoopIndexBuildProgress;
Expand Down Expand Up @@ -198,6 +199,7 @@ fn get_ivf_model(py: Python<'_>, dataset: &Dataset, index_name: &str) -> PyResul
Py::new(py, PyIvfModel { inner: ivf_model })
}

#[allow(clippy::too_many_arguments)]
async fn do_train_ivf_model(
dataset: &Dataset,
column: &str,
Expand All @@ -206,6 +208,7 @@ async fn do_train_ivf_model(
distance_type: &str,
sample_rate: u32,
max_iters: u32,
fragment_ids: Option<Vec<u32>>,
) -> PyResult<ArrayData> {
// We verify distance_type earlier so can unwrap here
let distance_type = DistanceType::try_from(distance_type).unwrap();
Expand All @@ -221,6 +224,7 @@ async fn do_train_ivf_model(
dimension,
distance_type,
&params,
fragment_ids.as_deref(),
Arc::new(NoopIndexBuildProgress),
)
.await
Expand All @@ -231,6 +235,7 @@ async fn do_train_ivf_model(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature=(dataset, column, dimension, num_partitions, distance_type, sample_rate, max_iters, fragment_ids=None))]
fn train_ivf_model<'py>(
py: Python<'py>,
dataset: &Dataset,
Expand All @@ -240,6 +245,7 @@ fn train_ivf_model<'py>(
distance_type: &str,
sample_rate: u32,
max_iters: u32,
fragment_ids: Option<Vec<u32>>,
) -> PyResult<Bound<'py, PyAny>> {
let centroids = rt().block_on(
Some(py),
Expand All @@ -251,6 +257,7 @@ fn train_ivf_model<'py>(
distance_type,
sample_rate,
max_iters,
fragment_ids,
),
)??;
centroids.to_pyarrow(py)
Expand All @@ -266,6 +273,7 @@ async fn do_train_pq_model(
sample_rate: u32,
max_iters: u32,
ivf_model: IvfModel,
fragment_ids: Option<Vec<u32>>,
) -> PyResult<ArrayData> {
// We verify distance_type earlier so can unwrap here
let distance_type = DistanceType::try_from(distance_type).unwrap();
Expand All @@ -276,13 +284,14 @@ async fn do_train_pq_model(
sample_rate: sample_rate as usize,
..Default::default()
};
let pq_model = lance::index::vector::pq::build_pq_model(
let pq_model = build_pq_model_in_fragments(
dataset.ds.as_ref(),
column,
dimension,
distance_type,
&params,
Some(&ivf_model),
fragment_ids.as_deref(),
)
.await
.infer_error()?;
Expand All @@ -291,6 +300,7 @@ async fn do_train_pq_model(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature=(dataset, column, dimension, num_subvectors, distance_type, sample_rate, max_iters, ivf_centroids, fragment_ids=None))]
fn train_pq_model<'py>(
py: Python<'py>,
dataset: &Dataset,
Expand All @@ -301,6 +311,7 @@ fn train_pq_model<'py>(
sample_rate: u32,
max_iters: u32,
ivf_centroids: PyArrowType<ArrayData>,
fragment_ids: Option<Vec<u32>>,
) -> PyResult<Bound<'py, PyAny>> {
let ivf_centroids = ivf_centroids.0;
let ivf_centroids = FixedSizeListArray::from(ivf_centroids);
Expand All @@ -321,6 +332,7 @@ fn train_pq_model<'py>(
sample_rate,
max_iters,
ivf_model,
fragment_ids,
),
)??;
codebook.to_pyarrow(py)
Expand Down
5 changes: 3 additions & 2 deletions rust/lance/src/index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,9 @@ mod tests {
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
use arrow::datatypes::{Float32Type, Int32Type};
use arrow_array::cast::AsArray;
use arrow_array::{FixedSizeListArray, RecordBatchIterator};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_array::{
FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance_arrow::FixedSizeListArrayExt;
use lance_core::utils::tempfile::TempStrDir;
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@ pub(crate) async fn build_distributed_vector_index(
stages
)));
};

IvfIndexBuilder::<FlatIndex, ScalarQuantizer>::new(
filtered_dataset,
column.to_owned(),
Expand Down Expand Up @@ -664,7 +663,6 @@ pub(crate) async fn build_distributed_vector_index(
stages
)));
};

IvfIndexBuilder::<HNSW, ScalarQuantizer>::new(
filtered_dataset,
column.to_owned(),
Expand Down Expand Up @@ -2141,6 +2139,7 @@ mod tests {
dim,
MetricType::L2,
&ivf_params,
None,
noop_progress(),
)
.await
Expand Down Expand Up @@ -2193,6 +2192,7 @@ mod tests {
dim,
MetricType::L2,
&ivf_params,
None,
noop_progress(),
)
.await
Expand Down
4 changes: 3 additions & 1 deletion rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
dim,
self.distance_type,
ivf_params,
None,
self.progress.clone(),
)
.await
Expand Down Expand Up @@ -434,7 +435,8 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
sample_size_hint
);
let training_data =
utils::maybe_sample_training_data(dataset, &self.column, sample_size_hint).await?;
utils::maybe_sample_training_data(dataset, &self.column, sample_size_hint, None)
.await?;
info!(
"Finished loading training data in {:02} seconds",
start.elapsed().as_secs_f32()
Expand Down
Loading
Loading