From 24b545031438dfd82f22b40eaeb4578cc13843f5 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 27 Mar 2026 11:56:50 +0800 Subject: [PATCH 1/4] init --- rust/lance-namespace-impls/src/dir.rs | 670 +++++++++++++++++++++++++- 1 file changed, 658 insertions(+), 12 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 7718fc95a01..3d7729f4fb3 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; use lance::dataset::builder::DatasetBuilder; +use lance::dataset::statistics::DatasetStatisticsExt; use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{Dataset, WriteMode, WriteParams}; use lance::index::{IndexParams, vector::VectorIndexParams}; @@ -35,19 +36,22 @@ use std::sync::Arc; use crate::context::DynamicContextProvider; use lance_namespace::models::{ - BatchDeleteTableVersionsRequest, BatchDeleteTableVersionsResponse, CreateNamespaceRequest, - CreateNamespaceResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, - CreateTableResponse, CreateTableScalarIndexResponse, CreateTableVersionRequest, - CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, - DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableIndexStatsRequest, - DescribeTableIndexStatsResponse, DescribeTableRequest, DescribeTableResponse, - DescribeTableVersionRequest, DescribeTableVersionResponse, DescribeTransactionRequest, - DescribeTransactionResponse, DropNamespaceRequest, DropNamespaceResponse, - DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, DropTableResponse, Identity, - IndexContent, ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, + AnalyzeTableQueryPlanRequest, BatchDeleteTableVersionsRequest, + BatchDeleteTableVersionsResponse, CreateNamespaceRequest, CreateNamespaceResponse, + CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, CreateTableResponse, + CreateTableScalarIndexResponse, CreateTableVersionRequest, CreateTableVersionResponse, + DeclareTableRequest, DeclareTableResponse, DescribeNamespaceRequest, + DescribeNamespaceResponse, DescribeTableIndexStatsRequest, DescribeTableIndexStatsResponse, + DescribeTableRequest, DescribeTableResponse, DescribeTableVersionRequest, + DescribeTableVersionResponse, DescribeTransactionRequest, DescribeTransactionResponse, + DropNamespaceRequest, DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, + DropTableRequest, DropTableResponse, ExplainTableQueryPlanRequest, FragmentStats, + FragmentSummary, GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, + ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse, - ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, TableExistsRequest, - TableVersion, + ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RestoreTableRequest, + RestoreTableResponse, TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest, + UpdateTableSchemaMetadataResponse, }; use lance_core::{Error, Result, box_error}; @@ -2690,6 +2694,257 @@ impl LanceNamespace for DirectoryNamespace { Ok(DropTableIndexResponse { transaction_id }) } + async fn list_all_tables(&self, request: ListTablesRequest) -> Result { + // In dir-only mode there are no child namespaces, so all tables live in the + // root directory. This is equivalent to listing the root namespace. + let mut tables = self.list_directory_tables().await?; + Self::apply_pagination(&mut tables, request.page_token, request.limit); + Ok(ListTablesResponse::new(tables)) + } + + async fn restore_table(&self, request: RestoreTableRequest) -> Result { + let version = request.version; + if version < 0 { + return Err(Error::invalid_input_source( + format!( + "Table version for restore_table must be non-negative, got {}", + version + ) + .into(), + )); + } + + let table_uri = self.resolve_table_location(&request.id).await?; + let mut dataset = self + .load_dataset(&table_uri, None, "restore_table") + .await?; + + dataset = dataset + .checkout_version(version as u64) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to checkout version {} for restore at '{}': {}", + version, table_uri, e + ) + .into(), + ) + })?; + + dataset.restore().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to restore table at '{}' to version {}: {}", + table_uri, version, e + ) + .into(), + ) + })?; + + let transaction_id = dataset + .read_transaction() + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to read transaction after restoring '{}': {}", + table_uri, e + ) + .into(), + ) + })? + .map(|t| t.uuid); + + Ok(RestoreTableResponse { transaction_id }) + } + + async fn update_table_schema_metadata( + &self, + request: UpdateTableSchemaMetadataRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let mut dataset = self + .load_dataset(&table_uri, None, "update_table_schema_metadata") + .await?; + + let new_metadata = request.metadata.unwrap_or_default(); + let updated_metadata = dataset + .update_schema_metadata(new_metadata.iter().map(|(k, v)| (k.as_str(), v.as_str()))) + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to update schema metadata for table at '{}': {}", + table_uri, e + ) + .into(), + ) + })?; + + let transaction_id = dataset + .read_transaction() + .await + .map_err(|e| { + Error::namespace_source( + format!( + "Failed to read transaction after updating metadata for '{}': {}", + table_uri, e + ) + .into(), + ) + })? + .map(|t| t.uuid); + + Ok(UpdateTableSchemaMetadataResponse { + metadata: Some(updated_metadata), + transaction_id, + }) + } + + async fn get_table_stats(&self, request: GetTableStatsRequest) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = Arc::new( + self.load_dataset(&table_uri, None, "get_table_stats") + .await?, + ); + + // Compute total bytes on disk using field-level statistics + let data_stats = dataset.calculate_data_stats().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to calculate data statistics for table at '{}': {}", + table_uri, e + ) + .into(), + ) + })?; + let total_bytes: i64 = data_stats + .fields + .iter() + .map(|f| f.bytes_on_disk as i64) + .sum(); + + // Collect per-fragment row counts + let fragment_row_futures: Vec<_> = dataset + .get_fragments() + .into_iter() + .map(|f| async move { f.physical_rows().await }) + .collect(); + let fragment_row_results = futures::future::join_all(fragment_row_futures).await; + let mut fragment_row_counts: Vec = fragment_row_results + .into_iter() + .filter_map(|r| r.ok()) + .map(|r| r as i64) + .collect(); + + let num_fragments = fragment_row_counts.len() as i64; + let num_rows: i64 = fragment_row_counts.iter().sum(); + + // Fragments with fewer than 1024 rows are considered "small" + const SMALL_FRAGMENT_THRESHOLD: i64 = 1024; + let num_small_fragments = fragment_row_counts + .iter() + .filter(|&&r| r < SMALL_FRAGMENT_THRESHOLD) + .count() as i64; + + // Compute length summary statistics + fragment_row_counts.sort_unstable(); + let lengths = if fragment_row_counts.is_empty() { + FragmentSummary::new(0, 0, 0, 0, 0, 0, 0) + } else { + let len = fragment_row_counts.len(); + let min = fragment_row_counts[0]; + let max = fragment_row_counts[len - 1]; + let mean = num_rows / num_fragments; + let pct = |p: f64| fragment_row_counts[((len - 1) as f64 * p) as usize]; + FragmentSummary::new(min, max, mean, pct(0.25), pct(0.50), pct(0.75), pct(0.99)) + }; + + // Count non-system indices + let indices = dataset.load_indices().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to load indices for table at '{}': {}", + table_uri, e + ) + .into(), + ) + })?; + let num_indices = indices.iter().filter(|m| !is_system_index(m)).count() as i64; + + let fragment_stats = FragmentStats::new(num_fragments, num_small_fragments, lengths); + Ok(GetTableStatsResponse::new( + total_bytes, + num_rows, + num_indices, + fragment_stats, + )) + } + + async fn explain_table_query_plan( + &self, + request: ExplainTableQueryPlanRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = self + .load_dataset(&table_uri, None, "explain_table_query_plan") + .await?; + let verbose = request.verbose.unwrap_or(false); + + let mut scanner = dataset.scan(); + if let Some(ref filter) = request.query.filter { + scanner.filter(filter).map_err(|e| { + Error::invalid_input_source( + format!("Invalid filter expression for explain_table_query_plan: {}", e).into(), + ) + })?; + } + + scanner.explain_plan(verbose).await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to explain query plan for table at '{}': {}", + table_uri, e + ) + .into(), + ) + }) + } + + async fn analyze_table_query_plan( + &self, + request: AnalyzeTableQueryPlanRequest, + ) -> Result { + let table_uri = self.resolve_table_location(&request.id).await?; + let dataset = self + .load_dataset(&table_uri, None, "analyze_table_query_plan") + .await?; + + let mut scanner = dataset.scan(); + if let Some(ref filter) = request.filter { + scanner.filter(filter).map_err(|e| { + Error::invalid_input_source( + format!( + "Invalid filter expression for analyze_table_query_plan: {}", + e + ) + .into(), + ) + })?; + } + + scanner.analyze_plan().await.map_err(|e| { + Error::namespace_source( + format!( + "Failed to analyze query plan for table at '{}': {}", + table_uri, e + ) + .into(), + ) + }) + } + fn namespace_id(&self) -> String { format!("DirectoryNamespace {{ root: {:?} }}", self.root) } @@ -6277,4 +6532,395 @@ mod tests { assert_eq!(*ver, 2, "Recorded version should be 2"); } } + + // ========================================================================= + // Tests for Table lifecycle and metadata methods + // ========================================================================= + + #[tokio::test] + async fn test_list_all_tables_dir_only() { + use lance_namespace::models::ListTablesRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "alpha").await; + create_scalar_table(&namespace, "beta").await; + + let request = ListTablesRequest { + id: Some(vec![]), + page_token: None, + limit: None, + ..Default::default() + }; + let response = namespace.list_all_tables(request).await.unwrap(); + let mut tables = response.tables; + tables.sort(); + assert_eq!(tables, vec!["alpha", "beta"]); + } + + #[tokio::test] + async fn test_list_all_tables_empty() { + use lance_namespace::models::ListTablesRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let request = ListTablesRequest { + id: Some(vec![]), + page_token: None, + limit: None, + ..Default::default() + }; + let response = namespace.list_all_tables(request).await.unwrap(); + assert!(response.tables.is_empty()); + } + + #[tokio::test] + async fn test_list_all_tables_with_pagination() { + use lance_namespace::models::ListTablesRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "aaa").await; + create_scalar_table(&namespace, "bbb").await; + create_scalar_table(&namespace, "ccc").await; + + // First page (limit 2) + let request = ListTablesRequest { + id: Some(vec![]), + page_token: None, + limit: Some(2), + ..Default::default() + }; + let response = namespace.list_all_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 2); + assert_eq!(response.tables, vec!["aaa", "bbb"]); + + // Second page (after "bbb") + let request = ListTablesRequest { + id: Some(vec![]), + page_token: Some("bbb".to_string()), + limit: Some(2), + ..Default::default() + }; + let response = namespace.list_all_tables(request).await.unwrap(); + assert_eq!(response.tables, vec!["ccc"]); + } + + #[tokio::test] + async fn test_list_all_tables_is_superset_of_root_tables() { + // In dir-only mode list_all_tables covers the same tables as list_tables + // for the root namespace, since there are no child namespaces on disk. + use lance_namespace::models::ListTablesRequest; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .dir_listing_enabled(true) + .build() + .await + .unwrap(); + + create_scalar_table(&namespace, "table_x").await; + create_scalar_table(&namespace, "table_y").await; + + let request = ListTablesRequest { + id: Some(vec![]), + page_token: None, + limit: None, + ..Default::default() + }; + let all_response = namespace.list_all_tables(request.clone()).await.unwrap(); + let root_response = namespace.list_tables(request).await.unwrap(); + + let mut all_tables = all_response.tables; + let mut root_tables = root_response.tables; + all_tables.sort(); + root_tables.sort(); + assert_eq!(all_tables, root_tables); + assert_eq!(all_tables, vec!["table_x", "table_y"]); + } + + #[tokio::test] + async fn test_restore_table() { + use lance_namespace::models::RestoreTableRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + + // Create a second version by creating a scalar index (this adds a new version) + create_scalar_index(&namespace, "users", "users_id_idx").await; + + let dataset = open_dataset(&namespace, "users").await; + let current_version = dataset.version().version; + assert!(current_version >= 2, "Should have at least 2 versions"); + + // Restore to version 1 + let mut restore_req = RestoreTableRequest::new(1); + restore_req.id = Some(vec!["users".to_string()]); + let response = namespace.restore_table(restore_req).await.unwrap(); + + // transaction_id should be present (the restore operation) + assert!( + response.transaction_id.is_some(), + "restore_table should return a transaction_id" + ); + + // Verify the dataset now has a new version (restore creates a new version) + let dataset_after = open_dataset(&namespace, "users").await; + assert!( + dataset_after.version().version > current_version, + "Restore should create a new version" + ); + } + + #[tokio::test] + async fn test_restore_table_invalid_version() { + use lance_namespace::models::RestoreTableRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "users").await; + + // Negative version should fail + let mut restore_req = RestoreTableRequest::new(-1); + restore_req.id = Some(vec!["users".to_string()]); + let result = namespace.restore_table(restore_req).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("non-negative")); + } + + #[tokio::test] + async fn test_restore_table_not_found() { + use lance_namespace::models::RestoreTableRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut restore_req = RestoreTableRequest::new(1); + restore_req.id = Some(vec!["nonexistent".to_string()]); + let result = namespace.restore_table(restore_req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_update_table_schema_metadata() { + use lance_namespace::models::UpdateTableSchemaMetadataRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "products").await; + + let mut metadata = HashMap::new(); + metadata.insert("owner".to_string(), "team_a".to_string()); + metadata.insert("version".to_string(), "1.0".to_string()); + + let mut req = UpdateTableSchemaMetadataRequest::new(); + req.id = Some(vec!["products".to_string()]); + req.metadata = Some(metadata.clone()); + + let response = namespace.update_table_schema_metadata(req).await.unwrap(); + + assert!(response.metadata.is_some()); + let returned = response.metadata.unwrap(); + assert_eq!(returned.get("owner"), Some(&"team_a".to_string())); + assert_eq!(returned.get("version"), Some(&"1.0".to_string())); + assert!( + response.transaction_id.is_some(), + "update_table_schema_metadata should return a transaction_id" + ); + } + + #[tokio::test] + async fn test_update_table_schema_metadata_empty() { + use lance_namespace::models::UpdateTableSchemaMetadataRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "products").await; + + // Empty metadata update should succeed + let mut req = UpdateTableSchemaMetadataRequest::new(); + req.id = Some(vec!["products".to_string()]); + req.metadata = Some(HashMap::new()); + + let response = namespace + .update_table_schema_metadata(req) + .await + .unwrap(); + assert!(response.metadata.is_some()); + } + + #[tokio::test] + async fn test_update_table_schema_metadata_not_found() { + use lance_namespace::models::UpdateTableSchemaMetadataRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut req = UpdateTableSchemaMetadataRequest::new(); + req.id = Some(vec!["nonexistent".to_string()]); + req.metadata = Some(HashMap::new()); + + let result = namespace.update_table_schema_metadata(req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_get_table_stats() { + use lance_namespace::models::GetTableStatsRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "orders").await; + + let mut req = GetTableStatsRequest::new(); + req.id = Some(vec!["orders".to_string()]); + + let response = namespace.get_table_stats(req).await.unwrap(); + + // The scalar table has 3 rows + assert_eq!(response.num_rows, 3); + // Fragments: should have at least 1 + assert!(response.fragment_stats.num_fragments >= 1); + // num_indices: 0 (no indices created) + assert_eq!(response.num_indices, 0); + // Fragment summary lengths should reflect the 3-row fragment + let lengths = &response.fragment_stats.lengths; + assert!(lengths.min >= 0); + assert!(lengths.max >= lengths.min); + } + + #[tokio::test] + async fn test_get_table_stats_with_index() { + use lance_namespace::models::GetTableStatsRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "items").await; + create_scalar_index(&namespace, "items", "items_id_idx").await; + + let mut req = GetTableStatsRequest::new(); + req.id = Some(vec!["items".to_string()]); + + let response = namespace.get_table_stats(req).await.unwrap(); + assert_eq!(response.num_rows, 3); + assert_eq!(response.num_indices, 1); + } + + #[tokio::test] + async fn test_get_table_stats_not_found() { + use lance_namespace::models::GetTableStatsRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut req = GetTableStatsRequest::new(); + req.id = Some(vec!["nonexistent".to_string()]); + + let result = namespace.get_table_stats(req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_explain_table_query_plan() { + use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "catalog").await; + + let query = QueryTableRequest::new( + 1, + QueryTableRequestVector::new(), + ); + let mut req = ExplainTableQueryPlanRequest::new(query); + req.id = Some(vec!["catalog".to_string()]); + req.verbose = Some(false); + + let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); + assert!(!plan_str.is_empty(), "Plan string should not be empty"); + } + + #[tokio::test] + async fn test_explain_table_query_plan_with_filter() { + use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "catalog").await; + + let mut query = QueryTableRequest::new( + 1, + QueryTableRequestVector::new(), + ); + query.filter = Some("id > 1".to_string()); + + let mut req = ExplainTableQueryPlanRequest::new(query); + req.id = Some(vec!["catalog".to_string()]); + + let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); + assert!(!plan_str.is_empty(), "Filtered plan string should not be empty"); + } + + #[tokio::test] + async fn test_explain_table_query_plan_not_found() { + use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let query = QueryTableRequest::new( + 1, + QueryTableRequestVector::new(), + ); + let mut req = ExplainTableQueryPlanRequest::new(query); + req.id = Some(vec!["nonexistent".to_string()]); + + let result = namespace.explain_table_query_plan(req).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_analyze_table_query_plan() { + use lance_namespace::models::AnalyzeTableQueryPlanRequest; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "catalog").await; + + let mut req = AnalyzeTableQueryPlanRequest::new( + 1, + QueryTableRequestVector::new(), + ); + req.id = Some(vec!["catalog".to_string()]); + + let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); + assert!(!analysis_str.is_empty(), "Analysis string should not be empty"); + } + + #[tokio::test] + async fn test_analyze_table_query_plan_with_filter() { + use lance_namespace::models::AnalyzeTableQueryPlanRequest; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_scalar_table(&namespace, "catalog").await; + + let mut req = AnalyzeTableQueryPlanRequest::new( + 1, + QueryTableRequestVector::new(), + ); + req.id = Some(vec!["catalog".to_string()]); + req.filter = Some("id > 0".to_string()); + + let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); + assert!(!analysis_str.is_empty(), "Filtered analysis string should not be empty"); + } + + #[tokio::test] + async fn test_analyze_table_query_plan_not_found() { + use lance_namespace::models::AnalyzeTableQueryPlanRequest; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut req = AnalyzeTableQueryPlanRequest::new( + 1, + QueryTableRequestVector::new(), + ); + req.id = Some(vec!["nonexistent".to_string()]); + + let result = namespace.analyze_table_query_plan(req).await; + assert!(result.is_err()); + } } From dcf663cb98823923698daac21b02f5a0eb9a0c9e Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 27 Mar 2026 14:31:15 +0800 Subject: [PATCH 2/4] init --- rust/lance-namespace-impls/src/dir.rs | 321 ++++++++++++++++++++++++-- 1 file changed, 298 insertions(+), 23 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 3d7729f4fb3..815908373f8 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -8,12 +8,14 @@ pub mod manifest; +use arrow::array::Float32Array; use arrow::record_batch::RecordBatchIterator; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; use lance::dataset::builder::DatasetBuilder; +use lance::dataset::scanner::Scanner; use lance::dataset::statistics::DatasetStatisticsExt; use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{Dataset, WriteMode, WriteParams}; @@ -49,7 +51,8 @@ use lance_namespace::models::{ FragmentSummary, GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse, - ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, RestoreTableRequest, + ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, + QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, RestoreTableResponse, TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse, }; @@ -1433,6 +1436,142 @@ impl DirectoryNamespace { } Ok(deleted_count) } + + /// Apply all query parameters from a `QueryTableRequest`-like source onto a `Scanner`. + /// + /// This covers vector search, filters, column projection, limits, and ANN tuning knobs so + /// that `explain_table_query_plan` / `analyze_table_query_plan` produce an accurate plan. + #[allow(clippy::too_many_arguments)] + fn apply_query_params_to_scanner( + scanner: &mut Scanner, + filter: Option<&str>, + columns: Option<&QueryTableRequestColumns>, + vector_column: Option<&str>, + vector: &QueryTableRequestVector, + k: i32, + offset: Option, + prefilter: Option, + bypass_vector_index: Option, + nprobes: Option, + ef: Option, + refine_factor: Option, + distance_type: Option<&str>, + fast_search_flag: Option, + with_row_id: Option, + lower_bound: Option, + upper_bound: Option, + operation: &str, + ) -> Result<()> { + // prefilter must be set before nearest() so the fragment-scan guard sees it. + if let Some(pf) = prefilter { + scanner.prefilter(pf); + } + + if let Some(filter) = filter { + scanner.filter(filter).map_err(|e| { + Error::invalid_input_source( + format!("Invalid filter expression for {}: {}", operation, e).into(), + ) + })?; + } + + if let Some(cols) = columns { + if let Some(ref names) = cols.column_names { + scanner.project(names.as_slice()).map_err(|e| { + Error::invalid_input_source( + format!("Invalid column projection for {}: {}", operation, e).into(), + ) + })?; + } else if let Some(ref aliases) = cols.column_aliases { + // aliases maps output_alias -> source_column + let pairs: Vec<(&str, &str)> = aliases + .iter() + .map(|(alias, src)| (alias.as_str(), src.as_str())) + .collect(); + scanner.project_with_transform(&pairs).map_err(|e| { + Error::invalid_input_source( + format!("Invalid column aliases for {}: {}", operation, e).into(), + ) + })?; + } + } + + // Resolve query vector: prefer single_vector, fall back to first row of multi_vector. + let query_vec: Option> = vector + .single_vector + .as_ref() + .filter(|v| !v.is_empty()) + .cloned() + .or_else(|| { + vector + .multi_vector + .as_ref() + .and_then(|mv| mv.first()) + .filter(|v| !v.is_empty()) + .cloned() + }); + + if let Some(q_vec) = query_vec { + let col = vector_column.unwrap_or("vector"); + let q = Arc::new(Float32Array::from(q_vec)); + scanner + .nearest(col, q.as_ref(), k.max(1) as usize) + .map_err(|e| { + Error::invalid_input_source( + format!("Invalid vector query for {}: {}", operation, e).into(), + ) + })?; + + // ANN parameters — must be applied after nearest(). + if let Some(n) = nprobes { + scanner.nprobes(n.max(1) as usize); + } + if let Some(e) = ef { + scanner.ef(e.max(1) as usize); + } + if let Some(rf) = refine_factor { + scanner.refine(rf.max(0) as u32); + } + // bypass_vector_index and fast_search are mutually exclusive; apply in order. + if let Some(true) = bypass_vector_index { + scanner.use_index(false); + } + if let Some(true) = fast_search_flag { + scanner.fast_search(); + } + if lower_bound.is_some() || upper_bound.is_some() { + scanner.distance_range(lower_bound, upper_bound); + } + if let Some(dt) = distance_type { + let metric = Self::parse_metric_type(Some(dt))?; + scanner.distance_metric(metric); + } + // Apply offset on top of the k nearest results. + if let Some(off) = offset.filter(|&o| o > 0) { + scanner.limit(None, Some(off as i64)).map_err(|e| { + Error::invalid_input_source( + format!("Invalid offset for {}: {}", operation, e).into(), + ) + })?; + } + } else { + // Scalar (non-vector) query: treat k as a row LIMIT. + let limit = if k > 0 { Some(k as i64) } else { None }; + scanner + .limit(limit, offset.map(|o| o as i64)) + .map_err(|e| { + Error::invalid_input_source( + format!("Invalid limit/offset for {}: {}", operation, e).into(), + ) + })?; + } + + if let Some(true) = with_row_id { + scanner.with_row_id(); + } + + Ok(()) + } } #[async_trait] @@ -2841,8 +2980,9 @@ impl LanceNamespace for DirectoryNamespace { let num_fragments = fragment_row_counts.len() as i64; let num_rows: i64 = fragment_row_counts.iter().sum(); - // Fragments with fewer than 1024 rows are considered "small" - const SMALL_FRAGMENT_THRESHOLD: i64 = 1024; + // Fragments with fewer rows than the compaction target are considered "small", + // consistent with CompactionOptions::target_rows_per_fragment default. + const SMALL_FRAGMENT_THRESHOLD: i64 = 1024 * 1024; let num_small_fragments = fragment_row_counts .iter() .filter(|&&r| r < SMALL_FRAGMENT_THRESHOLD) @@ -2888,18 +3028,31 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { let table_uri = self.resolve_table_location(&request.id).await?; let dataset = self - .load_dataset(&table_uri, None, "explain_table_query_plan") + .load_dataset(&table_uri, request.query.version, "explain_table_query_plan") .await?; let verbose = request.verbose.unwrap_or(false); let mut scanner = dataset.scan(); - if let Some(ref filter) = request.query.filter { - scanner.filter(filter).map_err(|e| { - Error::invalid_input_source( - format!("Invalid filter expression for explain_table_query_plan: {}", e).into(), - ) - })?; - } + Self::apply_query_params_to_scanner( + &mut scanner, + request.query.filter.as_deref(), + request.query.columns.as_deref(), + request.query.vector_column.as_deref(), + &request.query.vector, + request.query.k, + request.query.offset, + request.query.prefilter, + request.query.bypass_vector_index, + request.query.nprobes, + request.query.ef, + request.query.refine_factor, + request.query.distance_type.as_deref(), + request.query.fast_search, + request.query.with_row_id, + request.query.lower_bound, + request.query.upper_bound, + "explain_table_query_plan", + )?; scanner.explain_plan(verbose).await.map_err(|e| { Error::namespace_source( @@ -2918,21 +3071,30 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { let table_uri = self.resolve_table_location(&request.id).await?; let dataset = self - .load_dataset(&table_uri, None, "analyze_table_query_plan") + .load_dataset(&table_uri, request.version, "analyze_table_query_plan") .await?; let mut scanner = dataset.scan(); - if let Some(ref filter) = request.filter { - scanner.filter(filter).map_err(|e| { - Error::invalid_input_source( - format!( - "Invalid filter expression for analyze_table_query_plan: {}", - e - ) - .into(), - ) - })?; - } + Self::apply_query_params_to_scanner( + &mut scanner, + request.filter.as_deref(), + request.columns.as_deref(), + request.vector_column.as_deref(), + &request.vector, + request.k, + request.offset, + request.prefilter, + request.bypass_vector_index, + request.nprobes, + request.ef, + request.refine_factor, + request.distance_type.as_deref(), + request.fast_search, + request.with_row_id, + request.lower_bound, + request.upper_bound, + "analyze_table_query_plan", + )?; scanner.analyze_plan().await.map_err(|e| { Error::namespace_source( @@ -6923,4 +7085,117 @@ mod tests { let result = namespace.analyze_table_query_plan(req).await; assert!(result.is_err()); } + + // ── multi-vector tests ──────────────────────────────────────────────────── + // The vector table has a FixedSizeList "vector" column. + // `single_vector` maps directly; `multi_vector` falls back to the first + // sub-vector so both should produce a valid (non-empty) plan string. + + #[tokio::test] + async fn test_explain_table_query_plan_with_single_vector() { + use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_vector_table(&namespace, "vecs").await; + + let mut vec_input = QueryTableRequestVector::new(); + // dim=2 matches the vector column created by create_vector_table_ipc_data + vec_input.single_vector = Some(vec![0.1, 0.2]); + + let mut query = QueryTableRequest::new(2, vec_input); + query.vector_column = Some("vector".to_string()); + + let mut req = ExplainTableQueryPlanRequest::new(query); + req.id = Some(vec!["vecs".to_string()]); + + let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); + assert!(!plan_str.is_empty(), "plan string should not be empty"); + // The plan should reference the vector column, confirming KNN is applied. + assert!( + plan_str.to_lowercase().contains("vector") || plan_str.to_lowercase().contains("knn"), + "plan should reflect vector search, got: {plan_str}" + ); + } + + #[tokio::test] + async fn test_explain_table_query_plan_with_multi_vector() { + // multi_vector: our implementation uses the first sub-vector for nearest(). + use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_vector_table(&namespace, "vecs").await; + + let mut vec_input = QueryTableRequestVector::new(); + vec_input.multi_vector = Some(vec![ + vec![0.1, 0.2], // used for nearest() + vec![0.3, 0.4], // additional vectors are ignored by explain/analyze + ]); + + let mut query = QueryTableRequest::new(2, vec_input); + query.vector_column = Some("vector".to_string()); + + let mut req = ExplainTableQueryPlanRequest::new(query); + req.id = Some(vec!["vecs".to_string()]); + + let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); + assert!(!plan_str.is_empty(), "plan string should not be empty"); + assert!( + plan_str.to_lowercase().contains("vector") || plan_str.to_lowercase().contains("knn"), + "plan should reflect vector search, got: {plan_str}" + ); + } + + #[tokio::test] + async fn test_analyze_table_query_plan_with_single_vector() { + use lance_namespace::models::AnalyzeTableQueryPlanRequest; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_vector_table(&namespace, "vecs").await; + + let mut vec_input = QueryTableRequestVector::new(); + vec_input.single_vector = Some(vec![0.1, 0.2]); + + let mut req = AnalyzeTableQueryPlanRequest::new(2, vec_input); + req.id = Some(vec!["vecs".to_string()]); + req.vector_column = Some("vector".to_string()); + + let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); + assert!(!analysis_str.is_empty(), "analysis string should not be empty"); + assert!( + analysis_str.to_lowercase().contains("vector") + || analysis_str.to_lowercase().contains("knn"), + "analysis should reflect vector search, got: {analysis_str}" + ); + } + + #[tokio::test] + async fn test_analyze_table_query_plan_with_multi_vector() { + // multi_vector: our implementation uses the first sub-vector for nearest(). + use lance_namespace::models::AnalyzeTableQueryPlanRequest; + use lance_namespace::models::QueryTableRequestVector; + + let (namespace, _temp_dir) = create_test_namespace().await; + create_vector_table(&namespace, "vecs").await; + + let mut vec_input = QueryTableRequestVector::new(); + vec_input.multi_vector = Some(vec![ + vec![0.1, 0.2], // used for nearest() + vec![0.3, 0.4], // additional vectors are ignored by explain/analyze + ]); + + let mut req = AnalyzeTableQueryPlanRequest::new(2, vec_input); + req.id = Some(vec!["vecs".to_string()]); + req.vector_column = Some("vector".to_string()); + + let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); + assert!(!analysis_str.is_empty(), "analysis string should not be empty"); + assert!( + analysis_str.to_lowercase().contains("vector") + || analysis_str.to_lowercase().contains("knn"), + "analysis should reflect vector search, got: {analysis_str}" + ); + } } From 22042087e9f1ad7de669ec471fda598388a6ab10 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 27 Mar 2026 15:41:03 +0800 Subject: [PATCH 3/4] feat(dictionary-namespace): support table related operation --- rust/lance-namespace-impls/src/dir.rs | 474 +++++--------------------- 1 file changed, 76 insertions(+), 398 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 815908373f8..2cae4e0b4eb 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -42,17 +42,16 @@ use lance_namespace::models::{ BatchDeleteTableVersionsResponse, CreateNamespaceRequest, CreateNamespaceResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, CreateTableResponse, CreateTableScalarIndexResponse, CreateTableVersionRequest, CreateTableVersionResponse, - DeclareTableRequest, DeclareTableResponse, DescribeNamespaceRequest, - DescribeNamespaceResponse, DescribeTableIndexStatsRequest, DescribeTableIndexStatsResponse, - DescribeTableRequest, DescribeTableResponse, DescribeTableVersionRequest, - DescribeTableVersionResponse, DescribeTransactionRequest, DescribeTransactionResponse, - DropNamespaceRequest, DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, - DropTableRequest, DropTableResponse, ExplainTableQueryPlanRequest, FragmentStats, - FragmentSummary, GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, - ListNamespacesRequest, ListNamespacesResponse, ListTableIndicesRequest, - ListTableIndicesResponse, ListTableVersionsRequest, ListTableVersionsResponse, - ListTablesRequest, ListTablesResponse, NamespaceExistsRequest, - QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, + DeclareTableRequest, DeclareTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse, + DescribeTableIndexStatsRequest, DescribeTableIndexStatsResponse, DescribeTableRequest, + DescribeTableResponse, DescribeTableVersionRequest, DescribeTableVersionResponse, + DescribeTransactionRequest, DescribeTransactionResponse, DropNamespaceRequest, + DropNamespaceResponse, DropTableIndexRequest, DropTableIndexResponse, DropTableRequest, + DropTableResponse, ExplainTableQueryPlanRequest, FragmentStats, FragmentSummary, + GetTableStatsRequest, GetTableStatsResponse, Identity, IndexContent, ListNamespacesRequest, + ListNamespacesResponse, ListTableIndicesRequest, ListTableIndicesResponse, + ListTableVersionsRequest, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, + NamespaceExistsRequest, QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, RestoreTableResponse, TableExistsRequest, TableVersion, UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse, }; @@ -2854,9 +2853,7 @@ impl LanceNamespace for DirectoryNamespace { } let table_uri = self.resolve_table_location(&request.id).await?; - let mut dataset = self - .load_dataset(&table_uri, None, "restore_table") - .await?; + let mut dataset = self.load_dataset(&table_uri, None, "restore_table").await?; dataset = dataset .checkout_version(version as u64) @@ -2941,7 +2938,10 @@ impl LanceNamespace for DirectoryNamespace { }) } - async fn get_table_stats(&self, request: GetTableStatsRequest) -> Result { + async fn get_table_stats( + &self, + request: GetTableStatsRequest, + ) -> Result { let table_uri = self.resolve_table_location(&request.id).await?; let dataset = Arc::new( self.load_dataset(&table_uri, None, "get_table_stats") @@ -3004,11 +3004,7 @@ impl LanceNamespace for DirectoryNamespace { // Count non-system indices let indices = dataset.load_indices().await.map_err(|e| { Error::namespace_source( - format!( - "Failed to load indices for table at '{}': {}", - table_uri, e - ) - .into(), + format!("Failed to load indices for table at '{}': {}", table_uri, e).into(), ) })?; let num_indices = indices.iter().filter(|m| !is_system_index(m)).count() as i64; @@ -3028,7 +3024,11 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { let table_uri = self.resolve_table_location(&request.id).await?; let dataset = self - .load_dataset(&table_uri, request.query.version, "explain_table_query_plan") + .load_dataset( + &table_uri, + request.query.version, + "explain_table_query_plan", + ) .await?; let verbose = request.verbose.unwrap_or(false); @@ -3121,11 +3121,24 @@ mod tests { use lance_index::DatasetIndexExt; use lance_namespace::models::{ CreateTableRequest, JsonArrowDataType, JsonArrowField, JsonArrowSchema, ListTablesRequest, + QueryTableRequestColumns, }; use lance_namespace::schema::convert_json_arrow_schema; use std::io::Cursor; use std::sync::Arc; + fn assert_plan_contains_all(plan: &str, expected_fragments: &[&str], context: &str) { + for expected_fragment in expected_fragments { + assert!( + plan.contains(expected_fragment), + "{}. Missing fragment: '{}'. Plan:\n{}", + context, + expected_fragment, + plan + ); + } + } + /// Helper to create a test DirectoryNamespace with a temporary directory async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) { let temp_dir = TempStdDir::default(); @@ -6695,12 +6708,8 @@ mod tests { } } - // ========================================================================= - // Tests for Table lifecycle and metadata methods - // ========================================================================= - #[tokio::test] - async fn test_list_all_tables_dir_only() { + async fn test_list_all_tables() { use lance_namespace::models::ListTablesRequest; let (namespace, _temp_dir) = create_test_namespace().await; @@ -6719,87 +6728,6 @@ mod tests { assert_eq!(tables, vec!["alpha", "beta"]); } - #[tokio::test] - async fn test_list_all_tables_empty() { - use lance_namespace::models::ListTablesRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let request = ListTablesRequest { - id: Some(vec![]), - page_token: None, - limit: None, - ..Default::default() - }; - let response = namespace.list_all_tables(request).await.unwrap(); - assert!(response.tables.is_empty()); - } - - #[tokio::test] - async fn test_list_all_tables_with_pagination() { - use lance_namespace::models::ListTablesRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "aaa").await; - create_scalar_table(&namespace, "bbb").await; - create_scalar_table(&namespace, "ccc").await; - - // First page (limit 2) - let request = ListTablesRequest { - id: Some(vec![]), - page_token: None, - limit: Some(2), - ..Default::default() - }; - let response = namespace.list_all_tables(request).await.unwrap(); - assert_eq!(response.tables.len(), 2); - assert_eq!(response.tables, vec!["aaa", "bbb"]); - - // Second page (after "bbb") - let request = ListTablesRequest { - id: Some(vec![]), - page_token: Some("bbb".to_string()), - limit: Some(2), - ..Default::default() - }; - let response = namespace.list_all_tables(request).await.unwrap(); - assert_eq!(response.tables, vec!["ccc"]); - } - - #[tokio::test] - async fn test_list_all_tables_is_superset_of_root_tables() { - // In dir-only mode list_all_tables covers the same tables as list_tables - // for the root namespace, since there are no child namespaces on disk. - use lance_namespace::models::ListTablesRequest; - - let temp_dir = TempStdDir::default(); - let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) - .manifest_enabled(false) - .dir_listing_enabled(true) - .build() - .await - .unwrap(); - - create_scalar_table(&namespace, "table_x").await; - create_scalar_table(&namespace, "table_y").await; - - let request = ListTablesRequest { - id: Some(vec![]), - page_token: None, - limit: None, - ..Default::default() - }; - let all_response = namespace.list_all_tables(request.clone()).await.unwrap(); - let root_response = namespace.list_tables(request).await.unwrap(); - - let mut all_tables = all_response.tables; - let mut root_tables = root_response.tables; - all_tables.sort(); - root_tables.sort(); - assert_eq!(all_tables, root_tables); - assert_eq!(all_tables, vec!["table_x", "table_y"]); - } - #[tokio::test] async fn test_restore_table() { use lance_namespace::models::RestoreTableRequest; @@ -6833,33 +6761,6 @@ mod tests { ); } - #[tokio::test] - async fn test_restore_table_invalid_version() { - use lance_namespace::models::RestoreTableRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "users").await; - - // Negative version should fail - let mut restore_req = RestoreTableRequest::new(-1); - restore_req.id = Some(vec!["users".to_string()]); - let result = namespace.restore_table(restore_req).await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("non-negative")); - } - - #[tokio::test] - async fn test_restore_table_not_found() { - use lance_namespace::models::RestoreTableRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let mut restore_req = RestoreTableRequest::new(1); - restore_req.id = Some(vec!["nonexistent".to_string()]); - let result = namespace.restore_table(restore_req).await; - assert!(result.is_err()); - } - #[tokio::test] async fn test_update_table_schema_metadata() { use lance_namespace::models::UpdateTableSchemaMetadataRequest; @@ -6887,67 +6788,10 @@ mod tests { ); } - #[tokio::test] - async fn test_update_table_schema_metadata_empty() { - use lance_namespace::models::UpdateTableSchemaMetadataRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "products").await; - - // Empty metadata update should succeed - let mut req = UpdateTableSchemaMetadataRequest::new(); - req.id = Some(vec!["products".to_string()]); - req.metadata = Some(HashMap::new()); - - let response = namespace - .update_table_schema_metadata(req) - .await - .unwrap(); - assert!(response.metadata.is_some()); - } - - #[tokio::test] - async fn test_update_table_schema_metadata_not_found() { - use lance_namespace::models::UpdateTableSchemaMetadataRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let mut req = UpdateTableSchemaMetadataRequest::new(); - req.id = Some(vec!["nonexistent".to_string()]); - req.metadata = Some(HashMap::new()); - - let result = namespace.update_table_schema_metadata(req).await; - assert!(result.is_err()); - } - #[tokio::test] async fn test_get_table_stats() { use lance_namespace::models::GetTableStatsRequest; - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "orders").await; - - let mut req = GetTableStatsRequest::new(); - req.id = Some(vec!["orders".to_string()]); - - let response = namespace.get_table_stats(req).await.unwrap(); - - // The scalar table has 3 rows - assert_eq!(response.num_rows, 3); - // Fragments: should have at least 1 - assert!(response.fragment_stats.num_fragments >= 1); - // num_indices: 0 (no indices created) - assert_eq!(response.num_indices, 0); - // Fragment summary lengths should reflect the 3-row fragment - let lengths = &response.fragment_stats.lengths; - assert!(lengths.min >= 0); - assert!(lengths.max >= lengths.min); - } - - #[tokio::test] - async fn test_get_table_stats_with_index() { - use lance_namespace::models::GetTableStatsRequest; - let (namespace, _temp_dir) = create_test_namespace().await; create_scalar_table(&namespace, "items").await; create_scalar_index(&namespace, "items", "items_id_idx").await; @@ -6960,76 +6804,39 @@ mod tests { assert_eq!(response.num_indices, 1); } - #[tokio::test] - async fn test_get_table_stats_not_found() { - use lance_namespace::models::GetTableStatsRequest; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let mut req = GetTableStatsRequest::new(); - req.id = Some(vec!["nonexistent".to_string()]); - - let result = namespace.get_table_stats(req).await; - assert!(result.is_err()); - } - #[tokio::test] async fn test_explain_table_query_plan() { - use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "catalog").await; - - let query = QueryTableRequest::new( - 1, - QueryTableRequestVector::new(), - ); - let mut req = ExplainTableQueryPlanRequest::new(query); - req.id = Some(vec!["catalog".to_string()]); - req.verbose = Some(false); - - let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); - assert!(!plan_str.is_empty(), "Plan string should not be empty"); - } - - #[tokio::test] - async fn test_explain_table_query_plan_with_filter() { use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; - use lance_namespace::models::QueryTableRequestVector; let (namespace, _temp_dir) = create_test_namespace().await; create_scalar_table(&namespace, "catalog").await; - let mut query = QueryTableRequest::new( - 1, - QueryTableRequestVector::new(), - ); + let mut query = QueryTableRequest::new(1, QueryTableRequestVector::new()); query.filter = Some("id > 1".to_string()); + query.columns = Some(Box::new(QueryTableRequestColumns { + column_names: Some(vec!["id".to_string(), "name".to_string()]), + column_aliases: None, + })); + query.with_row_id = Some(true); let mut req = ExplainTableQueryPlanRequest::new(query); req.id = Some(vec!["catalog".to_string()]); let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); - assert!(!plan_str.is_empty(), "Filtered plan string should not be empty"); - } - - #[tokio::test] - async fn test_explain_table_query_plan_not_found() { - use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let query = QueryTableRequest::new( - 1, - QueryTableRequestVector::new(), + assert_plan_contains_all( + &plan_str, + &[ + "ProjectionExec: expr=[id@0 as id, name@2 as name", + "Take: columns=\"id, _rowid, (name)\"", + "LanceRead: uri=", + "projection=[id]", + "row_id=true, row_addr=false", + "full_filter=id > Int32(1)", + "refine_filter=id > Int32(1)", + ], + "Filtered explain plan should preserve late materialization and filter pushdown", ); - let mut req = ExplainTableQueryPlanRequest::new(query); - req.id = Some(vec!["nonexistent".to_string()]); - - let result = namespace.explain_table_query_plan(req).await; - assert!(result.is_err()); } #[tokio::test] @@ -7040,162 +6847,33 @@ mod tests { let (namespace, _temp_dir) = create_test_namespace().await; create_scalar_table(&namespace, "catalog").await; - let mut req = AnalyzeTableQueryPlanRequest::new( - 1, - QueryTableRequestVector::new(), - ); - req.id = Some(vec!["catalog".to_string()]); - - let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); - assert!(!analysis_str.is_empty(), "Analysis string should not be empty"); - } - - #[tokio::test] - async fn test_analyze_table_query_plan_with_filter() { - use lance_namespace::models::AnalyzeTableQueryPlanRequest; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_scalar_table(&namespace, "catalog").await; - - let mut req = AnalyzeTableQueryPlanRequest::new( - 1, - QueryTableRequestVector::new(), - ); + let mut req = AnalyzeTableQueryPlanRequest::new(1, QueryTableRequestVector::new()); req.id = Some(vec!["catalog".to_string()]); req.filter = Some("id > 0".to_string()); + req.columns = Some(Box::new(QueryTableRequestColumns { + column_names: Some(vec!["id".to_string(), "name".to_string()]), + column_aliases: None, + })); + req.with_row_id = Some(true); let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); - assert!(!analysis_str.is_empty(), "Filtered analysis string should not be empty"); - } - - #[tokio::test] - async fn test_analyze_table_query_plan_not_found() { - use lance_namespace::models::AnalyzeTableQueryPlanRequest; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - - let mut req = AnalyzeTableQueryPlanRequest::new( - 1, - QueryTableRequestVector::new(), - ); - req.id = Some(vec!["nonexistent".to_string()]); - - let result = namespace.analyze_table_query_plan(req).await; - assert!(result.is_err()); - } - - // ── multi-vector tests ──────────────────────────────────────────────────── - // The vector table has a FixedSizeList "vector" column. - // `single_vector` maps directly; `multi_vector` falls back to the first - // sub-vector so both should produce a valid (non-empty) plan string. - - #[tokio::test] - async fn test_explain_table_query_plan_with_single_vector() { - use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_vector_table(&namespace, "vecs").await; - - let mut vec_input = QueryTableRequestVector::new(); - // dim=2 matches the vector column created by create_vector_table_ipc_data - vec_input.single_vector = Some(vec![0.1, 0.2]); - - let mut query = QueryTableRequest::new(2, vec_input); - query.vector_column = Some("vector".to_string()); - - let mut req = ExplainTableQueryPlanRequest::new(query); - req.id = Some(vec!["vecs".to_string()]); - - let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); - assert!(!plan_str.is_empty(), "plan string should not be empty"); - // The plan should reference the vector column, confirming KNN is applied. - assert!( - plan_str.to_lowercase().contains("vector") || plan_str.to_lowercase().contains("knn"), - "plan should reflect vector search, got: {plan_str}" - ); - } - - #[tokio::test] - async fn test_explain_table_query_plan_with_multi_vector() { - // multi_vector: our implementation uses the first sub-vector for nearest(). - use lance_namespace::models::{ExplainTableQueryPlanRequest, QueryTableRequest}; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_vector_table(&namespace, "vecs").await; - - let mut vec_input = QueryTableRequestVector::new(); - vec_input.multi_vector = Some(vec![ - vec![0.1, 0.2], // used for nearest() - vec![0.3, 0.4], // additional vectors are ignored by explain/analyze - ]); - - let mut query = QueryTableRequest::new(2, vec_input); - query.vector_column = Some("vector".to_string()); - - let mut req = ExplainTableQueryPlanRequest::new(query); - req.id = Some(vec!["vecs".to_string()]); - - let plan_str = namespace.explain_table_query_plan(req).await.unwrap(); - assert!(!plan_str.is_empty(), "plan string should not be empty"); - assert!( - plan_str.to_lowercase().contains("vector") || plan_str.to_lowercase().contains("knn"), - "plan should reflect vector search, got: {plan_str}" - ); - } - - #[tokio::test] - async fn test_analyze_table_query_plan_with_single_vector() { - use lance_namespace::models::AnalyzeTableQueryPlanRequest; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_vector_table(&namespace, "vecs").await; - - let mut vec_input = QueryTableRequestVector::new(); - vec_input.single_vector = Some(vec![0.1, 0.2]); - - let mut req = AnalyzeTableQueryPlanRequest::new(2, vec_input); - req.id = Some(vec!["vecs".to_string()]); - req.vector_column = Some("vector".to_string()); - - let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); - assert!(!analysis_str.is_empty(), "analysis string should not be empty"); - assert!( - analysis_str.to_lowercase().contains("vector") - || analysis_str.to_lowercase().contains("knn"), - "analysis should reflect vector search, got: {analysis_str}" - ); - } - - #[tokio::test] - async fn test_analyze_table_query_plan_with_multi_vector() { - // multi_vector: our implementation uses the first sub-vector for nearest(). - use lance_namespace::models::AnalyzeTableQueryPlanRequest; - use lance_namespace::models::QueryTableRequestVector; - - let (namespace, _temp_dir) = create_test_namespace().await; - create_vector_table(&namespace, "vecs").await; - - let mut vec_input = QueryTableRequestVector::new(); - vec_input.multi_vector = Some(vec![ - vec![0.1, 0.2], // used for nearest() - vec![0.3, 0.4], // additional vectors are ignored by explain/analyze - ]); - - let mut req = AnalyzeTableQueryPlanRequest::new(2, vec_input); - req.id = Some(vec!["vecs".to_string()]); - req.vector_column = Some("vector".to_string()); - - let analysis_str = namespace.analyze_table_query_plan(req).await.unwrap(); - assert!(!analysis_str.is_empty(), "analysis string should not be empty"); - assert!( - analysis_str.to_lowercase().contains("vector") - || analysis_str.to_lowercase().contains("knn"), - "analysis should reflect vector search, got: {analysis_str}" + assert_plan_contains_all( + &analysis_str, + &[ + "AnalyzeExec verbose=true", + "ProjectionExec: elapsed=", + "expr=[id@0 as id, name@2 as name", + "Take: elapsed=", + "columns=\"id, _rowid, (name)\"", + "CoalesceBatchesExec: elapsed=", + "LanceRead: elapsed=", + "projection=[id]", + "row_id=true, row_addr=false", + "full_filter=id > Int32(0)", + "refine_filter=id > Int32(0)", + "metrics=[output_rows=", + ], + "Filtered analyze plan should preserve late materialization and filter pushdown", ); } } From f81a3efeafbdc161123335f6782842672a029cd4 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 27 Mar 2026 16:03:37 +0800 Subject: [PATCH 4/4] fix ut --- rust/lance-namespace-impls/src/dir.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 4e8f44f25b8..6bfbf872bac 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -56,7 +56,7 @@ use lance_namespace::models::{ UpdateTableSchemaMetadataResponse, }; -use lance_core::Result; +use lance_core::{Error, Result}; use lance_namespace::LanceNamespace; use lance_namespace::error::NamespaceError; use lance_namespace::schema::arrow_schema_to_json;