Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt,
};
use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index;
use lance_index::{DatasetIndexExt, IndexDescription};
use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexType};
use uuid::Uuid;

#[pyclass(name = "IndexConfig", module = "lance.indices", get_all)]
Expand Down Expand Up @@ -416,9 +416,21 @@ async fn do_load_shuffled_vectors(
.infer_error()?;

let mut ds = dataset.ds.as_ref().clone();
ds.commit_existing_index(index_name, column, index_id)
.await
.infer_error()?;
ds.commit_existing_index_segments(
index_name,
column,
vec![IndexSegment::new(
index_id,
ds.fragments().iter().map(|f| f.id as u32),
Arc::new(
prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default())
.unwrap(),
),
IndexType::IvfPq.version(),
)],
)
.await
.infer_error()?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tracing.workspace = true
tempfile.workspace = true
crossbeam-queue.workspace = true
bytes.workspace = true
chrono.workspace = true
uuid.workspace = true
twox-hash = "2.0"
async-channel = "2.3.1"
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ pub mod progress;
pub mod registry;
pub mod scalar;
pub mod traits;
pub mod types;
pub mod vector;

pub use crate::traits::*;
pub use crate::types::IndexSegment;

pub const INDEX_FILE_NAME: &str = "index.idx";
/// The name of the auxiliary index file.
Expand Down
8 changes: 4 additions & 4 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use lance_core::{Error, Result};

use crate::{IndexParams, IndexType, optimize::OptimizeOptions};
use crate::{IndexParams, IndexType, optimize::OptimizeOptions, types::IndexSegment};
use lance_table::format::IndexMetadata;
use uuid::Uuid;

/// A set of criteria used to filter potential indices to use for a query
#[derive(Debug, Default)]
Expand Down Expand Up @@ -275,11 +274,12 @@ pub trait DatasetIndexExt {
/// If the index does not exist, return Error.
async fn index_statistics(&self, index_name: &str) -> Result<String>;

async fn commit_existing_index(
/// Commit one or more existing physical index segments as a logical index.
async fn commit_existing_index_segments(
&mut self,
index_name: &str,
column: &str,
index_id: Uuid,
segments: Vec<IndexSegment>,
) -> Result<()>;

async fn read_index_partition(
Expand Down
75 changes: 75 additions & 0 deletions rust/lance-index/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use roaring::RoaringBitmap;
use uuid::Uuid;

/// A single physical segment of a logical index.
///
/// Each segment is stored independently and will become one manifest entry when committed.
/// The logical index identity (name / target column / dataset version) is provided separately
/// by the commit API.
#[derive(Debug, Clone, PartialEq)]
pub struct IndexSegment {
Comment thread
Xuanwo marked this conversation as resolved.
/// Unique ID of the physical segment.
uuid: Uuid,
/// The fragments covered by this segment.
fragment_bitmap: RoaringBitmap,
/// Metadata specific to the index type.
index_details: Arc<prost_types::Any>,
/// The on-disk index version for this segment.
index_version: i32,
}

impl IndexSegment {
/// Create a fully described segment with the given UUID, fragment coverage, and index
/// metadata.
pub fn new<I>(
uuid: Uuid,
fragment_bitmap: I,
index_details: Arc<prost_types::Any>,
index_version: i32,
) -> Self
where
I: IntoIterator<Item = u32>,
{
Self {
uuid,
fragment_bitmap: fragment_bitmap.into_iter().collect(),
index_details,
index_version,
}
}

/// Return the UUID of this segment.
pub fn uuid(&self) -> Uuid {
self.uuid
}

/// Return the fragment coverage of this segment.
pub fn fragment_bitmap(&self) -> &RoaringBitmap {
&self.fragment_bitmap
}

/// Return the serialized index details for this segment.
pub fn index_details(&self) -> &Arc<prost_types::Any> {
&self.index_details
}

/// Return the on-disk index version for this segment.
pub fn index_version(&self) -> i32 {
self.index_version
}

/// Consume the segment and return its component parts.
pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc<prost_types::Any>, i32) {
(
self.uuid,
self.fragment_bitmap,
self.index_details,
self.index_version,
)
}
}
5 changes: 2 additions & 3 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2698,8 +2698,7 @@ impl Scanner {
let row_addrs = RowAddrTreeMap::from_iter(u64s);
let row_addr_mask = RowAddrMask::from_allowed(row_addrs);
let index_result = IndexExprResult::Exact(row_addr_mask);
let fragments_covered =
RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32));
let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone();
let batch = index_result.serialize_to_arrow(&fragments_covered)?;
let stream = futures::stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -4172,7 +4171,7 @@ impl Scanner {
if let Some(fragments) = &self.fragments {
RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32))
} else {
RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32))
self.dataset.fragment_bitmap.as_ref().clone()
}
}

Expand Down
Loading
Loading