From 2a30f79936de42fb79cb300118a454b994f9890b Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 13 Mar 2026 22:30:06 +0800 Subject: [PATCH 1/3] feat(DirectoryNamespace): support index and transaction related operations --- Cargo.lock | 1 + java/lance-jni/Cargo.lock | 1 + python/Cargo.lock | 1 + rust/lance-namespace-impls/Cargo.toml | 1 + rust/lance-namespace-impls/src/dir.rs | 1052 ++++++++++++++++++++++++- 5 files changed, 1049 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 409c2ed915e..a6bce5fb34c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5287,6 +5287,7 @@ dependencies = [ "lance-core", "lance-index", "lance-io", + "lance-linalg", "lance-namespace", "lance-table", "log", diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 0a1eb4f5dc8..f5a78c0fb86 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3803,6 +3803,7 @@ dependencies = [ "lance-core", "lance-index", "lance-io", + "lance-linalg", "lance-namespace", "lance-table", "log", diff --git a/python/Cargo.lock b/python/Cargo.lock index d4649565a9e..85614d30aef 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4331,6 +4331,7 @@ dependencies = [ "lance-core", "lance-index", "lance-io", + "lance-linalg", "lance-namespace", "lance-table", "log", diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index 8c84e1bbe8b..956ca5cd0a4 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -43,6 +43,7 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature url = { workspace = true } lance = { workspace = true } lance-index = { workspace = true } +lance-linalg = { workspace = true } lance-io = { workspace = true } lance-table = { workspace = true } object_store = { workspace = true } diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index a9e5b4cc112..91706857862 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -14,9 +14,18 @@ use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; use lance::dataset::builder::DatasetBuilder; +use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{Dataset, WriteParams}; +use lance::index::{IndexParams, vector::VectorIndexParams}; use lance::session::Session; +use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams}; +use lance_index::vector::{ + bq::RQBuildParams, hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, pq::PQBuildParams, + sq::builder::SQBuildParams, +}; +use lance_index::{DatasetIndexExt, IndexType, is_system_index}; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; +use lance_linalg::distance::MetricType; use lance_table::io::commit::ManifestNamingScheme; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions}; @@ -27,14 +36,18 @@ use std::sync::Arc; use crate::context::DynamicContextProvider; use lance_namespace::models::{ BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest, - CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest, + CreateNamespaceResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, + CreateTableResponse, CreateTableScalarIndexResponse, CreateTableVersionRequest, CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, - DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest, - DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse, - DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, Identity, - ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsRequest, - ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, - TableExistsRequest, TableVersion, + DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableIndexStatsRequest, + DescribeTableIndexStatsResponse, DescribeTableRequest, DescribeTableResponse, + DescribeTableVersionRequest, DescribeTableVersionResponse, DescribeTransactionRequest, + DescribeTransactionResponse, DropNamespaceRequest, DropNamespaceResponse, + DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, DropTableResponse, Identity, + IndexContent, ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, + ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse, + ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, TableExistsRequest, + TableVersion, }; use lance_core::{Error, Result, box_error}; @@ -58,6 +71,35 @@ pub(crate) struct TableStatus { pub(crate) has_reserved_file: bool, } +enum DirectoryIndexParams { + Scalar { + index_type: IndexType, + params: ScalarIndexParams, + }, + Inverted(InvertedIndexParams), + Vector { + index_type: IndexType, + params: VectorIndexParams, + }, +} + +impl DirectoryIndexParams { + fn index_type(&self) -> IndexType { + match self { + Self::Scalar { index_type, .. } | Self::Vector { index_type, .. } => *index_type, + Self::Inverted(_) => IndexType::Inverted, + } + } + + fn params(&self) -> &dyn IndexParams { + match self { + Self::Scalar { params, .. } => params, + Self::Inverted(params) => params, + Self::Vector { params, .. } => params, + } + } +} + /// Builder for creating a DirectoryNamespace. /// /// This builder provides a fluent API for configuring and establishing @@ -707,6 +749,380 @@ impl DirectoryNamespace { }) } + async fn load_dataset( + &self, + table_uri: &str, + version: Option, + operation: &str, + ) -> Result { + let mut builder = DatasetBuilder::from_uri(table_uri); + if let Some(opts) = &self.storage_options { + builder = builder.with_storage_options(opts.clone()); + } + if let Some(sess) = &self.session { + builder = builder.with_session(sess.clone()); + } + + let dataset = builder.load().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to open table at '{}' for {}: {}", + table_uri, operation, e + ) + .into(), + ) + })?; + + if let Some(version) = version { + if version < 0 { + return Err(Error::invalid_input_source( + format!( + "Table version for {} must be non-negative, got {}", + operation, version + ) + .into(), + )); + } + return dataset.checkout_version(version as u64).await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to checkout version {} for table at '{}' during {}: {}", + version, table_uri, operation, e + ) + .into(), + ) + }); + } + + Ok(dataset) + } + + fn parse_index_type(index_type: &str) -> Result { + match index_type.trim().to_ascii_uppercase().as_str() { + "SCALAR" | "BTREE" => Ok(IndexType::BTree), + "BITMAP" => Ok(IndexType::Bitmap), + "LABEL_LIST" | "LABELLIST" => Ok(IndexType::LabelList), + "INVERTED" | "FTS" => Ok(IndexType::Inverted), + "NGRAM" => Ok(IndexType::NGram), + "ZONEMAP" | "ZONE_MAP" => Ok(IndexType::ZoneMap), + "BLOOMFILTER" | "BLOOM_FILTER" => Ok(IndexType::BloomFilter), + "RTREE" | "R_TREE" => Ok(IndexType::RTree), + "VECTOR" | "IVF_PQ" => Ok(IndexType::IvfPq), + "IVF_FLAT" => Ok(IndexType::IvfFlat), + "IVF_SQ" => Ok(IndexType::IvfSq), + "IVF_RQ" => Ok(IndexType::IvfRq), + "IVF_HNSW_FLAT" => Ok(IndexType::IvfHnswFlat), + "IVF_HNSW_SQ" => Ok(IndexType::IvfHnswSq), + "IVF_HNSW_PQ" => Ok(IndexType::IvfHnswPq), + other => Err(Error::invalid_input_source( + format!("Unsupported index_type '{}'", other).into(), + )), + } + } + + fn parse_metric_type(distance_type: Option<&str>) -> Result { + let distance_type = distance_type.unwrap_or("l2"); + MetricType::try_from(distance_type).map_err(|e| { + Error::invalid_input_source( + format!( + "Unsupported distance_type '{}' for vector index: {}", + distance_type, e + ) + .into(), + ) + }) + } + + fn build_index_params(request: &CreateTableIndexRequest) -> Result { + let index_type = Self::parse_index_type(&request.index_type)?; + Ok(match index_type { + IndexType::BTree => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::BTree), + }, + IndexType::Bitmap => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap), + }, + IndexType::LabelList => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList), + }, + IndexType::NGram => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::NGram), + }, + IndexType::ZoneMap => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap), + }, + IndexType::BloomFilter => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter), + }, + IndexType::RTree => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::RTree), + }, + IndexType::Inverted => { + let mut params = InvertedIndexParams::default(); + if let Some(with_position) = request.with_position { + params = params.with_position(with_position); + } + if let Some(base_tokenizer) = &request.base_tokenizer { + params = params.base_tokenizer(base_tokenizer.clone()); + } + if let Some(language) = &request.language { + params = params.language(language)?; + } + if let Some(max_token_length) = request.max_token_length { + if max_token_length < 0 { + return Err(Error::invalid_input_source( + format!( + "FTS max_token_length must be non-negative, got {}", + max_token_length + ) + .into(), + )); + } + params = params.max_token_length(Some(max_token_length as usize)); + } + if let Some(lower_case) = request.lower_case { + params = params.lower_case(lower_case); + } + if let Some(stem) = request.stem { + params = params.stem(stem); + } + if let Some(remove_stop_words) = request.remove_stop_words { + params = params.remove_stop_words(remove_stop_words); + } + if let Some(ascii_folding) = request.ascii_folding { + params = params.ascii_folding(ascii_folding); + } + DirectoryIndexParams::Inverted(params) + } + IndexType::IvfFlat => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::ivf_flat( + 1, + Self::parse_metric_type(request.distance_type.as_deref())?, + ), + }, + IndexType::IvfPq => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::ivf_pq( + 1, + 8, + 1, + Self::parse_metric_type(request.distance_type.as_deref())?, + 50, + ), + }, + IndexType::IvfSq => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::with_ivf_sq_params( + Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::new(1), + SQBuildParams::default(), + ), + }, + IndexType::IvfRq => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::with_ivf_rq_params( + Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::new(1), + RQBuildParams::default(), + ), + }, + IndexType::IvfHnswFlat => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::ivf_hnsw( + Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::new(1), + HnswBuildParams::default(), + ), + }, + IndexType::IvfHnswSq => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::with_ivf_hnsw_sq_params( + Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::new(1), + HnswBuildParams::default(), + SQBuildParams::default(), + ), + }, + IndexType::IvfHnswPq => DirectoryIndexParams::Vector { + index_type, + params: VectorIndexParams::with_ivf_hnsw_pq_params( + Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::new(1), + HnswBuildParams::default(), + PQBuildParams::new(1, 8), + ), + }, + other => { + return Err(Error::invalid_input_source( + format!("Unsupported index type for namespace API: {}", other).into(), + )); + } + }) + } + + fn paginate_indices( + indices: &mut Vec, + page_token: Option, + limit: Option, + ) { + indices.sort_by(|a, b| a.index_name.cmp(&b.index_name)); + + if let Some(start_after) = page_token { + if let Some(index) = indices + .iter() + .position(|index| index.index_name.as_str() > start_after.as_str()) + { + indices.drain(0..index); + } else { + indices.clear(); + } + } + + if let Some(limit) = limit + && limit >= 0 + { + indices.truncate(limit as usize); + } + } + + fn stats_field_as_i64(stats: &serde_json::Value, key: &str) -> Option { + stats + .get(key) + .and_then(|value| value.as_i64().or_else(|| value.as_u64().map(|v| v as i64))) + } + + fn stats_field_as_i32(stats: &serde_json::Value, key: &str) -> Option { + Self::stats_field_as_i64(stats, key).and_then(|value| i32::try_from(value).ok()) + } + + fn stats_field_as_string(stats: &serde_json::Value, key: &str) -> Option { + stats + .get(key) + .and_then(|value| value.as_str()) + .map(str::to_string) + } + + fn stats_distance_type(stats: &serde_json::Value) -> Option { + Self::stats_field_as_string(stats, "distance_type") + .or_else(|| Self::stats_field_as_string(stats, "metric_type")) + .or_else(|| { + stats + .get("indices") + .and_then(|value| value.as_array()) + .and_then(|indices| indices.first()) + .and_then(|first| { + first + .get("distance_type") + .and_then(|value| value.as_str()) + .or_else(|| first.get("metric_type").and_then(|value| value.as_str())) + }) + .map(str::to_string) + }) + } + + fn transaction_operation_name(transaction: &Transaction) -> String { + match &transaction.operation { + Operation::CreateIndex { + new_indices, + removed_indices, + } if new_indices.is_empty() && !removed_indices.is_empty() => "DropIndex".to_string(), + _ => transaction.operation.to_string(), + } + } + + fn transaction_response( + version: u64, + transaction: &Transaction, + ) -> DescribeTransactionResponse { + let mut properties = transaction + .transaction_properties + .as_ref() + .map(|properties| (**properties).clone()) + .unwrap_or_default(); + properties.insert("uuid".to_string(), transaction.uuid.clone()); + properties.insert("version".to_string(), version.to_string()); + properties.insert( + "read_version".to_string(), + transaction.read_version.to_string(), + ); + properties.insert( + "operation".to_string(), + Self::transaction_operation_name(transaction), + ); + if let Some(tag) = &transaction.tag { + properties.insert("tag".to_string(), tag.clone()); + } + + DescribeTransactionResponse { + status: "SUCCEEDED".to_string(), + properties: Some(properties), + } + } + + async fn find_transaction( + &self, + dataset: &Dataset, + transaction_id: &str, + ) -> Result<(u64, Transaction)> { + if let Ok(version) = transaction_id.parse::() { + let transaction = dataset + .read_transaction_by_version(version) + .await + .map_err(|e| { + Error::namespace_source( + format!("Failed to read transaction for version {}: {}", version, e).into(), + ) + })? + .ok_or_else(|| { + Error::namespace_source( + format!("Transaction not found for version {}", version).into(), + ) + })?; + return Ok((version, transaction)); + } + + let versions = dataset.versions().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to list table versions while resolving transaction '{}': {}", + transaction_id, e + ) + .into(), + ) + })?; + + for version in versions.into_iter().rev() { + if let Some(transaction) = dataset + .read_transaction_by_version(version.version) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to read transaction for version {} while resolving '{}': {}", + version.version, transaction_id, e + ) + .into(), + ) + })? + && transaction.uuid == transaction_id + { + return Ok((version.version, transaction)); + } + } + + Err(Error::namespace_source( + format!("Transaction not found: {}", transaction_id).into(), + )) + } + fn table_full_uri(&self, table_name: &str) -> String { format!("{}/{}.lance", &self.root, table_name) } @@ -1809,6 +2225,297 @@ impl LanceNamespace for DirectoryNamespace { }) } + async fn create_table_index( + &self, + request: CreateTableIndexRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let mut dataset = self + .load_dataset(&table_uri, None, "create_table_index") + .await?; + let index_request = Self::build_index_params(&request)?; + + dataset + .create_index( + &[request.column.as_str()], + index_request.index_type(), + request.name.clone(), + index_request.params(), + false, + ) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to create {} index '{}' on column '{}' for table '{}': {}", + request.index_type, + request.name.as_deref().unwrap_or(""), + request.column, + table_uri, + e + ) + .into(), + ) + })?; + + let transaction_id = dataset + .read_transaction() + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to read committed transaction after creating index on '{}': {}", + table_uri, e + ) + .into(), + ) + })? + .map(|transaction| transaction.uuid); + + Ok(CreateTableIndexResponse { transaction_id }) + } + + async fn list_table_indices( + &self, + request: ListTableIndicesRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = self + .load_dataset(&table_uri, request.version, "list_table_indices") + .await?; + let mut indices = dataset + .describe_indices(None) + .await + .map_err(|e| { + Error::namespace_source( + format!("Failed to describe table indices for '{}': {}", table_uri, e).into(), + ) + })? + .into_iter() + .filter(|description| { + description + .metadata() + .first() + .map(|metadata| !is_system_index(metadata)) + .unwrap_or(false) + }) + .map(|description| { + let columns = description + .field_ids() + .iter() + .map(|field_id| { + dataset + .schema() + .field_path(i32::try_from(*field_id).map_err(|e| { + Error::namespace_source( + format!( + "Field id {} does not fit in i32 for table '{}': {}", + field_id, table_uri, e + ) + .into(), + ) + })?) + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to resolve field path for field_id {} in table '{}': {}", + field_id, table_uri, e + ) + .into(), + ) + }) + }) + .collect::>>()?; + + Ok(IndexContent { + index_name: description.name().to_string(), + index_uuid: description.metadata()[0].uuid.to_string(), + columns, + status: "SUCCEEDED".to_string(), + }) + }) + .collect::>>()?; + + Self::paginate_indices(&mut indices, request.page_token, request.limit); + Ok(ListTableIndicesResponse { + indexes: indices, + page_token: None, + }) + } + + async fn describe_table_index_stats( + &self, + request: DescribeTableIndexStatsRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = self + .load_dataset(&table_uri, request.version, "describe_table_index_stats") + .await?; + let index_name = request.index_name.as_deref().ok_or_else(|| { + Error::invalid_input_source( + "Index name is required for describe_table_index_stats".into(), + ) + })?; + let metadatas = dataset + .load_indices_by_name(index_name) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to load index '{}' metadata for table '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })?; + if metadatas.first().is_some_and(is_system_index) { + return Err(Error::not_supported_source( + format!("System index '{}' is not exposed by this API", index_name).into(), + )); + } + + let stats = + ::index_statistics(&dataset, index_name) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to describe index statistics for '{}' on table '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })?; + let stats: serde_json::Value = serde_json::from_str(&stats).map_err(|e| { + Error::namespace_source( + format!( + "Failed to parse index statistics for '{}' on table '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })?; + + Ok(DescribeTableIndexStatsResponse { + distance_type: Self::stats_distance_type(&stats), + index_type: Self::stats_field_as_string(&stats, "index_type"), + num_indexed_rows: Self::stats_field_as_i64(&stats, "num_indexed_rows"), + num_unindexed_rows: Self::stats_field_as_i64(&stats, "num_unindexed_rows"), + num_indices: Self::stats_field_as_i32(&stats, "num_indices"), + }) + } + + async fn describe_transaction( + &self, + request: DescribeTransactionRequest, + ) -> Result { + let mut request_id = request.id.ok_or_else(|| { + Error::invalid_input_source( + "Transaction id must include table id and transaction identifier".into(), + ) + })?; + if request_id.len() < 2 { + return Err(Error::invalid_input_source( + format!( + "Transaction request id must include table id and transaction identifier, got {:?}", + request_id + ) + .into(), + )); + } + + let transaction_id = request_id.pop().expect("request_id len checked above"); + let table_id = Some(request_id); + let table_uri = self.resolve_table_location(&table_id).await?; + let dataset = self + .load_dataset(&table_uri, None, "describe_transaction") + .await?; + let (version, transaction) = self.find_transaction(&dataset, &transaction_id).await?; + + Ok(Self::transaction_response(version, &transaction)) + } + + async fn create_table_scalar_index( + &self, + request: CreateTableIndexRequest, + ) -> Result { + let index_type = Self::parse_index_type(&request.index_type)?; + if !index_type.is_scalar() { + return Err(Error::invalid_input_source( + format!( + "create_table_scalar_index only supports scalar index types, got {}", + request.index_type + ) + .into(), + )); + } + + let response = self.create_table_index(request).await?; + Ok(CreateTableScalarIndexResponse { + transaction_id: response.transaction_id, + }) + } + + async fn drop_table_index( + &self, + request: DropTableIndexRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let index_name = request.index_name.as_deref().ok_or_else(|| { + Error::invalid_input_source("Index name is required for drop_table_index".into()) + })?; + let mut dataset = self + .load_dataset(&table_uri, None, "drop_table_index") + .await?; + let metadatas = dataset + .load_indices_by_name(index_name) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to load index '{}' before dropping it from table '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })?; + if metadatas.first().is_some_and(is_system_index) { + return Err(Error::not_supported_source( + format!( + "System index '{}' cannot be dropped via this API", + index_name + ) + .into(), + )); + } + + dataset.drop_index(index_name).await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to drop index '{}' from table '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })?; + + let transaction_id = dataset + .read_transaction() + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to read committed transaction after dropping index '{}' from '{}': {}", + index_name, table_uri, e + ) + .into(), + ) + })? + .map(|transaction| transaction.uuid); + + Ok(DropTableIndexResponse { transaction_id }) + } + fn namespace_id(&self) -> String { format!("DirectoryNamespace {{ root: {:?} }}", self.root) } @@ -1820,6 +2527,7 @@ mod tests { use arrow_ipc::reader::StreamReader; use lance::dataset::Dataset; use lance_core::utils::tempfile::{TempStdDir, TempStrDir}; + use lance_index::DatasetIndexExt; use lance_namespace::models::{ CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest, }; @@ -1854,6 +2562,23 @@ mod tests { buffer } + fn create_ipc_data_from_batches( + schema: Arc, + batches: Vec, + ) -> Vec { + use arrow::ipc::writer::StreamWriter; + + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap(); + for batch in &batches { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + } + buffer + } + /// Helper to create a simple test schema fn create_test_schema() -> JsonArrowSchema { let int_type = JsonArrowDataType::new("int32".to_string()); @@ -1879,6 +2604,107 @@ mod tests { } } + fn create_scalar_table_ipc_data() -> Vec { + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["alice", "bob", "cory"])), + ], + ) + .unwrap(); + create_ipc_data_from_batches(schema, vec![batch]) + } + + fn create_vector_table_ipc_data() -> Vec { + use arrow::array::{FixedSizeListArray, Float32Array, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "vector", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 2), + true, + ), + ])); + let vector_field = Arc::new(Field::new("item", DataType::Float32, true)); + let vectors = FixedSizeListArray::try_new( + vector_field.clone(), + 2, + Arc::new(Float32Array::from(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6])), + None, + ) + .unwrap(); + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(vectors)], + ) + .unwrap(); + create_ipc_data_from_batches(schema, vec![batch]) + } + + async fn create_scalar_table(namespace: &DirectoryNamespace, table_name: &str) { + let mut create_table_request = CreateTableRequest::new(); + create_table_request.id = Some(vec![table_name.to_string()]); + namespace + .create_table( + create_table_request, + Bytes::from(create_scalar_table_ipc_data()), + ) + .await + .unwrap(); + } + + async fn create_vector_table(namespace: &DirectoryNamespace, table_name: &str) { + let mut create_table_request = CreateTableRequest::new(); + create_table_request.id = Some(vec![table_name.to_string()]); + namespace + .create_table( + create_table_request, + Bytes::from(create_vector_table_ipc_data()), + ) + .await + .unwrap(); + } + + async fn open_dataset(namespace: &DirectoryNamespace, table_name: &str) -> Dataset { + let mut describe_request = DescribeTableRequest::new(); + describe_request.id = Some(vec![table_name.to_string()]); + let table_uri = namespace + .describe_table(describe_request) + .await + .unwrap() + .location + .expect("table location should exist"); + Dataset::open(&table_uri).await.unwrap() + } + + async fn create_scalar_index( + namespace: &DirectoryNamespace, + table_name: &str, + index_name: &str, + ) -> Option { + use lance_namespace::models::CreateTableIndexRequest; + + let mut create_index_request = + CreateTableIndexRequest::new("id".to_string(), "BTREE".to_string()); + create_index_request.id = Some(vec![table_name.to_string()]); + create_index_request.name = Some(index_name.to_string()); + namespace + .create_table_scalar_index(create_index_request) + .await + .unwrap() + .transaction_id + } + #[tokio::test] async fn test_create_table() { let (namespace, _temp_dir) = create_test_namespace().await; @@ -2021,6 +2847,218 @@ mod tests { ); } + #[tokio::test] + async fn test_create_scalar_index() { + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + + let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; + let dataset = open_dataset(&namespace, "users").await; + let expected_transaction_id = dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(transaction_id, expected_transaction_id); + let indices = dataset.load_indices().await.unwrap(); + assert!(indices.iter().any(|index| index.name == "users_id_idx")); + } + + #[tokio::test] + async fn test_create_vector_index() { + use lance_namespace::models::CreateTableIndexRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_vector_table(&namespace, "vectors").await; + + let mut create_index_request = + CreateTableIndexRequest::new("vector".to_string(), "IVF_FLAT".to_string()); + create_index_request.id = Some(vec!["vectors".to_string()]); + create_index_request.name = Some("vector_idx".to_string()); + create_index_request.distance_type = Some("l2".to_string()); + let transaction_id = namespace + .create_table_index(create_index_request) + .await + .unwrap() + .transaction_id; + + let dataset = open_dataset(&namespace, "vectors").await; + let expected_transaction_id = dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(transaction_id, expected_transaction_id); + let indices = dataset.load_indices().await.unwrap(); + assert!(indices.iter().any(|index| index.name == "vector_idx")); + } + + #[tokio::test] + async fn test_list_table_indices() { + use lance_namespace::models::ListTableIndicesRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; + + let response = namespace + .list_table_indices(ListTableIndicesRequest { + id: Some(vec!["users".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(response.indexes.len(), 1); + assert_eq!(response.indexes[0].index_name, "users_id_idx"); + assert_eq!(response.indexes[0].columns, vec!["id"]); + assert_eq!(response.indexes[0].status, "SUCCEEDED"); + + let dataset = open_dataset(&namespace, "users").await; + let expected_transaction_id = dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(transaction_id, expected_transaction_id); + let indices = dataset.load_indices().await.unwrap(); + assert_eq!( + indices + .iter() + .filter(|index| index.name == "users_id_idx") + .count(), + 1 + ); + } + + #[tokio::test] + async fn test_describe_table_index_stats() { + use lance_namespace::models::DescribeTableIndexStatsRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; + + let response = namespace + .describe_table_index_stats(DescribeTableIndexStatsRequest { + id: Some(vec!["users".to_string()]), + index_name: Some("users_id_idx".to_string()), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(response.index_type, Some("BTree".to_string())); + assert_eq!(response.num_indices, Some(1)); + assert_eq!(response.num_indexed_rows, Some(3)); + assert_eq!(response.num_unindexed_rows, Some(0)); + + let dataset = open_dataset(&namespace, "users").await; + let expected_transaction_id = dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(transaction_id, expected_transaction_id); + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("users_id_idx").await.unwrap()).unwrap(); + assert_eq!(stats["index_type"], "BTree"); + assert_eq!(stats["num_indices"], 1); + assert_eq!(stats["num_indexed_rows"], 3); + assert_eq!(stats["num_unindexed_rows"], 0); + } + + #[tokio::test] + async fn test_describe_transaction() { + use lance_namespace::models::DescribeTransactionRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; + let dataset = open_dataset(&namespace, "users").await; + let latest_transaction = dataset.read_transaction().await.unwrap(); + assert_eq!( + transaction_id, + latest_transaction + .as_ref() + .map(|transaction| transaction.uuid.clone()) + ); + + if let Some(transaction_id) = transaction_id { + let response = namespace + .describe_transaction(DescribeTransactionRequest { + id: Some(vec!["users".to_string(), transaction_id.clone()]), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(response.status, "SUCCEEDED"); + assert_eq!( + response + .properties + .as_ref() + .and_then(|props| props.get("operation")), + Some(&"CreateIndex".to_string()) + ); + assert_eq!( + response + .properties + .as_ref() + .and_then(|props| props.get("uuid")), + Some(&transaction_id) + ); + } else { + assert!(latest_transaction.is_none()); + } + } + + #[tokio::test] + async fn test_drop_table_index() { + use lance_namespace::models::{DropTableIndexRequest, ListTableIndicesRequest}; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + let create_transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; + + let drop_transaction_id = namespace + .drop_table_index(DropTableIndexRequest { + id: Some(vec!["users".to_string()]), + index_name: Some("users_id_idx".to_string()), + ..Default::default() + }) + .await + .unwrap() + .transaction_id; + + let dataset = open_dataset(&namespace, "users").await; + let previous_dataset = dataset + .checkout_version(dataset.version().version - 1) + .await + .unwrap(); + let previous_transaction_id = previous_dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(create_transaction_id, previous_transaction_id); + let expected_drop_transaction_id = dataset + .read_transaction() + .await + .unwrap() + .map(|transaction| transaction.uuid); + assert_eq!(drop_transaction_id, expected_drop_transaction_id); + let indices = dataset.load_indices().await.unwrap(); + assert!(!indices.iter().any(|index| index.name == "users_id_idx")); + + let list_response = namespace + .list_table_indices(ListTableIndicesRequest { + id: Some(vec!["users".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert!(list_response.indexes.is_empty()); + } + #[tokio::test] async fn test_describe_table() { let (namespace, _temp_dir) = create_test_namespace().await; From c68bb5da876d1dfe971f20a54707e3efffc055e1 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 13 Mar 2026 23:49:03 +0800 Subject: [PATCH 2/3] code review --- rust/lance-namespace-impls/src/dir.rs | 140 ++++++++++++-------------- 1 file changed, 63 insertions(+), 77 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 91706857862..bab25ea243f 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -755,6 +755,18 @@ impl DirectoryNamespace { version: Option, operation: &str, ) -> Result { + if let Some(version) = version + && version < 0 + { + return Err(Error::invalid_input_source( + format!( + "Table version for {} must be non-negative, got {}", + operation, version + ) + .into(), + )); + } + let mut builder = DatasetBuilder::from_uri(table_uri); if let Some(opts) = &self.storage_options { builder = builder.with_storage_options(opts.clone()); @@ -774,15 +786,6 @@ impl DirectoryNamespace { })?; if let Some(version) = version { - if version < 0 { - return Err(Error::invalid_input_source( - format!( - "Table version for {} must be non-negative, got {}", - operation, version - ) - .into(), - )); - } return dataset.checkout_version(version as u64).await.map_err(|e| { Error::namespace_source( format!( @@ -903,26 +906,24 @@ impl DirectoryNamespace { } IndexType::IvfFlat => DirectoryIndexParams::Vector { index_type, - params: VectorIndexParams::ivf_flat( - 1, + params: VectorIndexParams::with_ivf_flat_params( Self::parse_metric_type(request.distance_type.as_deref())?, + IvfBuildParams::default(), ), }, IndexType::IvfPq => DirectoryIndexParams::Vector { index_type, - params: VectorIndexParams::ivf_pq( - 1, - 8, - 1, + params: VectorIndexParams::with_ivf_pq_params( Self::parse_metric_type(request.distance_type.as_deref())?, - 50, + IvfBuildParams::default(), + PQBuildParams::default(), ), }, IndexType::IvfSq => DirectoryIndexParams::Vector { index_type, params: VectorIndexParams::with_ivf_sq_params( Self::parse_metric_type(request.distance_type.as_deref())?, - IvfBuildParams::new(1), + IvfBuildParams::default(), SQBuildParams::default(), ), }, @@ -930,7 +931,7 @@ impl DirectoryNamespace { index_type, params: VectorIndexParams::with_ivf_rq_params( Self::parse_metric_type(request.distance_type.as_deref())?, - IvfBuildParams::new(1), + IvfBuildParams::default(), RQBuildParams::default(), ), }, @@ -938,7 +939,7 @@ impl DirectoryNamespace { index_type, params: VectorIndexParams::ivf_hnsw( Self::parse_metric_type(request.distance_type.as_deref())?, - IvfBuildParams::new(1), + IvfBuildParams::default(), HnswBuildParams::default(), ), }, @@ -946,7 +947,7 @@ impl DirectoryNamespace { index_type, params: VectorIndexParams::with_ivf_hnsw_sq_params( Self::parse_metric_type(request.distance_type.as_deref())?, - IvfBuildParams::new(1), + IvfBuildParams::default(), HnswBuildParams::default(), SQBuildParams::default(), ), @@ -955,9 +956,9 @@ impl DirectoryNamespace { index_type, params: VectorIndexParams::with_ivf_hnsw_pq_params( Self::parse_metric_type(request.distance_type.as_deref())?, - IvfBuildParams::new(1), + IvfBuildParams::default(), HnswBuildParams::default(), - PQBuildParams::new(1, 8), + PQBuildParams::default(), ), }, other => { @@ -993,41 +994,6 @@ impl DirectoryNamespace { } } - fn stats_field_as_i64(stats: &serde_json::Value, key: &str) -> Option { - stats - .get(key) - .and_then(|value| value.as_i64().or_else(|| value.as_u64().map(|v| v as i64))) - } - - fn stats_field_as_i32(stats: &serde_json::Value, key: &str) -> Option { - Self::stats_field_as_i64(stats, key).and_then(|value| i32::try_from(value).ok()) - } - - fn stats_field_as_string(stats: &serde_json::Value, key: &str) -> Option { - stats - .get(key) - .and_then(|value| value.as_str()) - .map(str::to_string) - } - - fn stats_distance_type(stats: &serde_json::Value) -> Option { - Self::stats_field_as_string(stats, "distance_type") - .or_else(|| Self::stats_field_as_string(stats, "metric_type")) - .or_else(|| { - stats - .get("indices") - .and_then(|value| value.as_array()) - .and_then(|indices| indices.first()) - .and_then(|first| { - first - .get("distance_type") - .and_then(|value| value.as_str()) - .or_else(|| first.get("metric_type").and_then(|value| value.as_str())) - }) - .map(str::to_string) - }) - } - fn transaction_operation_name(transaction: &Transaction) -> String { match &transaction.operation { Operation::CreateIndex { @@ -1067,12 +1033,38 @@ impl DirectoryNamespace { } } - async fn find_transaction( - &self, - dataset: &Dataset, - transaction_id: &str, - ) -> Result<(u64, Transaction)> { - if let Ok(version) = transaction_id.parse::() { + fn describe_table_index_stats_response( + stats: &serde_json::Value, + ) -> DescribeTableIndexStatsResponse { + let get_i64 = |key: &str| { + stats.get(key).and_then(|value| { + value + .as_i64() + .or_else(|| value.as_u64().and_then(|v| i64::try_from(v).ok())) + }) + }; + + DescribeTableIndexStatsResponse { + distance_type: stats + .get("distance_type") + .and_then(|value| value.as_str()) + .map(str::to_string), + index_type: stats + .get("index_type") + .and_then(|value| value.as_str()) + .map(str::to_string), + num_indexed_rows: get_i64("num_indexed_rows"), + num_unindexed_rows: get_i64("num_unindexed_rows"), + num_indices: get_i64("num_indices").and_then(|value| i32::try_from(value).ok()), + } + } + + /// When transaction_id is not parseable as a version number (i.e. it's a UUID), + /// find_transaction iterates through every version in reverse, reading each + /// transaction file from storage. For tables with many versions this will + /// be extremely slow — each iteration is a separate I/O call. + async fn find_transaction(&self, dataset: &Dataset, id: &str) -> Result<(u64, Transaction)> { + if let Ok(version) = id.parse::() { let transaction = dataset .read_transaction_by_version(version) .await @@ -1093,7 +1085,7 @@ impl DirectoryNamespace { Error::namespace_source( format!( "Failed to list table versions while resolving transaction '{}': {}", - transaction_id, e + id, e ) .into(), ) @@ -1107,19 +1099,19 @@ impl DirectoryNamespace { Error::namespace_source( format!( "Failed to read transaction for version {} while resolving '{}': {}", - version.version, transaction_id, e + version.version, id, e ) .into(), ) })? - && transaction.uuid == transaction_id + && transaction.uuid == id { return Ok((version.version, transaction)); } } Err(Error::namespace_source( - format!("Transaction not found: {}", transaction_id).into(), + format!("Transaction not found: {}", id).into(), )) } @@ -2396,13 +2388,7 @@ impl LanceNamespace for DirectoryNamespace { ) })?; - Ok(DescribeTableIndexStatsResponse { - distance_type: Self::stats_distance_type(&stats), - index_type: Self::stats_field_as_string(&stats, "index_type"), - num_indexed_rows: Self::stats_field_as_i64(&stats, "num_indexed_rows"), - num_unindexed_rows: Self::stats_field_as_i64(&stats, "num_unindexed_rows"), - num_indices: Self::stats_field_as_i32(&stats, "num_indices"), - }) + Ok(Self::describe_table_index_stats_response(&stats)) } async fn describe_transaction( @@ -2424,13 +2410,13 @@ impl LanceNamespace for DirectoryNamespace { )); } - let transaction_id = request_id.pop().expect("request_id len checked above"); + let id = request_id.pop().expect("request_id len checked above"); let table_id = Some(request_id); let table_uri = self.resolve_table_location(&table_id).await?; let dataset = self .load_dataset(&table_uri, None, "describe_transaction") .await?; - let (version, transaction) = self.find_transaction(&dataset, &transaction_id).await?; + let (version, transaction) = self.find_transaction(&dataset, &id).await?; Ok(Self::transaction_response(version, &transaction)) } @@ -2637,7 +2623,7 @@ mod tests { ])); let vector_field = Arc::new(Field::new("item", DataType::Float32, true)); let vectors = FixedSizeListArray::try_new( - vector_field.clone(), + vector_field, 2, Arc::new(Float32Array::from(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6])), None, From f1e418873d6732ea7b7e7920c66e57a2fcd4cc07 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Tue, 17 Mar 2026 15:51:54 +0800 Subject: [PATCH 3/3] code review --- rust/lance-namespace-impls/src/dir.rs | 64 +++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index bab25ea243f..72fc91f68c0 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -973,7 +973,7 @@ impl DirectoryNamespace { indices: &mut Vec, page_token: Option, limit: Option, - ) { + ) -> Option { indices.sort_by(|a, b| a.index_name.cmp(&b.index_name)); if let Some(start_after) = page_token { @@ -987,10 +987,20 @@ impl DirectoryNamespace { } } + let mut next_page_token = None; if let Some(limit) = limit && limit >= 0 { - indices.truncate(limit as usize); + let limit = limit as usize; + if limit > 0 && indices.len() > limit { + next_page_token = Some(indices[limit - 1].index_name.clone()); + } + indices.truncate(limit); + } + if indices.is_empty() { + None + } else { + next_page_token } } @@ -2328,10 +2338,10 @@ impl LanceNamespace for DirectoryNamespace { }) .collect::>>()?; - Self::paginate_indices(&mut indices, request.page_token, request.limit); + let page_token = Self::paginate_indices(&mut indices, request.page_token, request.limit); Ok(ListTableIndicesResponse { indexes: indices, - page_token: None, + page_token, }) } @@ -2885,6 +2895,8 @@ mod tests { let (namespace, _temp_dir) = create_test_namespace().await; create_scalar_table(&namespace, "users").await; + create_scalar_index(&namespace, "users", "a_idx").await; + create_scalar_index(&namespace, "users", "b_idx").await; let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await; let response = namespace @@ -2895,10 +2907,18 @@ mod tests { .await .unwrap(); - assert_eq!(response.indexes.len(), 1); - assert_eq!(response.indexes[0].index_name, "users_id_idx"); - assert_eq!(response.indexes[0].columns, vec!["id"]); - assert_eq!(response.indexes[0].status, "SUCCEEDED"); + assert_eq!(response.indexes.len(), 3); + assert_eq!(response.indexes[0].index_name, "a_idx"); + assert_eq!(response.indexes[1].index_name, "b_idx"); + assert_eq!(response.indexes[2].index_name, "users_id_idx"); + assert!(response.page_token.is_none()); + let users_id_idx = response + .indexes + .iter() + .find(|index| index.index_name == "users_id_idx") + .unwrap(); + assert_eq!(users_id_idx.columns, vec!["id"]); + assert_eq!(users_id_idx.status, "SUCCEEDED"); let dataset = open_dataset(&namespace, "users").await; let expected_transaction_id = dataset @@ -2915,6 +2935,34 @@ mod tests { .count(), 1 ); + + let first_page = namespace + .list_table_indices(ListTableIndicesRequest { + id: Some(vec!["users".to_string()]), + limit: Some(2), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(first_page.indexes.len(), 2); + assert_eq!(first_page.indexes[0].index_name, "a_idx"); + assert_eq!(first_page.indexes[1].index_name, "b_idx"); + assert_eq!(first_page.page_token.as_deref(), Some("b_idx")); + + let second_page = namespace + .list_table_indices(ListTableIndicesRequest { + id: Some(vec!["users".to_string()]), + page_token: first_page.page_token.clone(), + limit: Some(2), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(second_page.indexes.len(), 1); + assert_eq!(second_page.indexes[0].index_name, "users_id_idx"); + assert!(second_page.page_token.is_none()); } #[tokio::test]