diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 49d19712e26..af0e4d9bb4b 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -11,7 +11,7 @@ use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; -use futures::stream::StreamExt; +use futures::{stream::StreamExt, FutureExt}; use lance::dataset::optimize::{compact_files, CompactionOptions}; use lance::dataset::{builder::DatasetBuilder, WriteParams}; use lance::session::Session; @@ -1081,7 +1081,7 @@ impl LanceNamespace for ManifestNamespace { } let object_id = Self::str_object_id(table_id); - let table_info = self.query_manifest_for_table(&object_id).await?; + let table_info = self.query_manifest_for_table(&object_id).boxed().await?; // Extract table name and namespace from table_id let table_name = table_id.last().cloned().unwrap_or_default(); @@ -1298,12 +1298,12 @@ impl LanceNamespace for ManifestNamespace { let object_id = Self::build_object_id(&namespace, &table_name); // Query manifest for table location - let table_info = self.query_manifest_for_table(&object_id).await?; + let table_info = self.query_manifest_for_table(&object_id).boxed().await?; match table_info { Some(info) => { // Delete from manifest first - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; // Delete physical data directory using the dir_name from manifest let table_path = self.base_path.child(info.location.as_str()); @@ -1312,6 +1312,7 @@ impl LanceNamespace for ManifestNamespace { // Remove the table directory self.object_store .remove_dir_all(table_path) + .boxed() .await .map_err(|e| Error::Namespace { source: format!("Failed to delete table directory: {}", e).into(), @@ -1489,7 +1490,7 @@ impl LanceNamespace for ManifestNamespace { let object_id = namespace_id.join(DELIMITER); // Check if namespace exists - if !self.manifest_contains_object(&object_id).await? { + if !self.manifest_contains_object(&object_id).boxed().await? { return Err(Error::Namespace { source: format!("Namespace '{}' not found", object_id).into(), location: location!(), @@ -1499,7 +1500,7 @@ impl LanceNamespace for ManifestNamespace { // Check for child namespaces let prefix = format!("{}{}", object_id, DELIMITER); let filter = format!("starts_with(object_id, '{}')", prefix); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self.manifest_scanner().boxed().await?; scanner.filter(&filter).map_err(|e| Error::IO { source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))), location: location!(), @@ -1509,7 +1510,7 @@ impl LanceNamespace for ManifestNamespace { location: location!(), })?; scanner.with_row_id(); - let count = scanner.count_rows().await.map_err(|e| Error::IO { + let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO { source: box_error(std::io::Error::other(format!( "Failed to count rows: {}", e @@ -1528,7 +1529,7 @@ impl LanceNamespace for ManifestNamespace { }); } - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; Ok(DropNamespaceResponse::default()) } @@ -1863,7 +1864,7 @@ impl LanceNamespace for ManifestNamespace { let table_uri = match table_info { Some(info) => { // Delete from manifest only (leave physical data intact) - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; Self::construct_full_uri(&self.root, &info.location)? } None => { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index a2b02dd5cf4..d74829d15f4 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -11,7 +11,7 @@ use arrow_schema::{DataType, Schema}; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::stream; +use futures::{stream, FutureExt}; use itertools::Itertools; use lance_core::cache::{CacheKey, UnsizedCacheKey}; use lance_core::datatypes::Field; @@ -921,170 +921,16 @@ impl DatasetIndexExt for Dataset { } if index_name == FRAG_REUSE_INDEX_NAME { - let index = self - .open_frag_reuse_index(&NoOpMetricsCollector) - .await? - .expect("FragmentReuse index does not exist"); - return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }); + return index_statistics_frag_reuse(self).boxed().await; } if index_name == MEM_WAL_INDEX_NAME { - let index = self - .open_mem_wal_index(&NoOpMetricsCollector) - .await? - .expect("MemWal index does not exist"); - return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }); - } - - let field_id = metadatas[0].fields[0]; - let field_path = self.schema().field_path(field_id)?; - - let mut indices_stats = Vec::with_capacity(metadatas.len()); - let mut index_uri: Option = None; - let mut index_typename: Option = None; - - for meta in metadatas.iter() { - let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(self, meta)?); - let index_details = scalar::fetch_index_details(self, &field_path, meta).await?; - if index_uri.is_none() { - index_uri = Some(index_details.type_url.clone()); - } - let index_details_wrapper = scalar::IndexDetails(index_details.clone()); - - if let Ok(plugin) = index_details_wrapper.get_plugin() { - if index_typename.is_none() { - index_typename = Some(plugin.name().to_string()); - } - - if let Some(stats) = plugin - .load_statistics(index_store.clone(), index_details.as_ref()) - .await? - { - indices_stats.push(stats); - continue; - } - } - - let index = self - .open_generic_index(&field_path, &meta.uuid.to_string(), &NoOpMetricsCollector) - .await?; - - if index_typename.is_none() { - // Fall back to a friendly name from the type URL if the plugin is unknown - let uri = index_uri - .as_deref() - .unwrap_or_else(|| index_details.type_url.as_str()); - index_typename = Some(type_name_from_uri(uri)); - } - - indices_stats.push(index.statistics()?); - } - - let index_uri = index_uri.unwrap_or_else(|| "unknown".to_string()); - let index_type_hint = indices_stats - .first() - .and_then(|stats| stats.get("index_type")) - .and_then(|v| v.as_str()); - let index_type = legacy_type_name(&index_uri, index_type_hint); - - let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; - - let res = indexed_fragments_per_delta - .iter() - .map(|frags| { - let mut sum = 0; - for frag in frags.iter() { - sum += frag.num_rows().ok_or_else(|| Error::Internal { - message: "Fragment should have row counts, please upgrade lance and \ - trigger a single write to fix this" - .to_string(), - location: location!(), - })?; - } - Ok(sum) - }) - .collect::>>(); - - async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result { - let mut ds = ds.clone(); - log::warn!( - "Detecting out-dated fragment metadata, migrating dataset. \ - To disable migration, set LANCE_AUTO_MIGRATION=false" - ); - ds.delete("false").await.map_err(|err| { - Error::Execution { - message: format!("Failed to migrate dataset while calculating index statistics. \ - To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err), - location: location!(), - } - })?; - ds.index_statistics(index_name).await + return index_statistics_mem_wal(self).boxed().await; } - let num_indexed_rows_per_delta = match res { - Ok(rows) => rows, - Err(Error::Internal { message, .. }) - if auto_migrate_corruption() && message.contains("trigger a single write") => - { - return migrate_and_recompute(self, index_name).await; - } - Err(e) => return Err(e), - }; - - let mut fragment_ids = HashSet::new(); - for frags in indexed_fragments_per_delta.iter() { - for frag in frags.iter() { - if !fragment_ids.insert(frag.id) { - if auto_migrate_corruption() { - return migrate_and_recompute(self, index_name).await; - } else { - return Err(Error::Internal { - message: - "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ - and trigger a single write to fix this" - .to_string(), - location: location!(), - }); - } - } - } - } - let num_indexed_fragments = fragment_ids.len(); - - let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments; - let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum(); - let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows; - - // Calculate updated_at as max(created_at) from all index metadata - let updated_at = metadatas - .iter() - .filter_map(|m| m.created_at) - .max() - .map(|dt| dt.timestamp_millis() as u64); - - let stats = json!({ - "index_type": index_type, - "name": index_name, - "num_indices": metadatas.len(), - "indices": indices_stats, - "num_indexed_fragments": num_indexed_fragments, - "num_indexed_rows": num_indexed_rows, - "num_unindexed_fragments": num_unindexed_fragments, - "num_unindexed_rows": num_unindexed_rows, - "num_indexed_rows_per_delta": num_indexed_rows_per_delta, - "updated_at_timestamp_ms": updated_at, - }); - - serde_json::to_string(&stats).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }) + index_statistics_scalar(self, index_name, metadatas) + .boxed() + .await } async fn read_index_partition( @@ -1132,6 +978,213 @@ impl DatasetIndexExt for Dataset { } } +fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec]) -> Result> { + let mut rows_per_delta = Vec::with_capacity(indexed_fragments_per_delta.len()); + for frags in indexed_fragments_per_delta { + let mut sum = 0usize; + for frag in frags { + sum += frag.num_rows().ok_or_else(|| Error::Internal { + message: "Fragment should have row counts, please upgrade lance and \ + trigger a single write to fix this" + .to_string(), + location: location!(), + })?; + } + rows_per_delta.push(sum); + } + Ok(rows_per_delta) +} + +fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec]) -> Option { + let mut fragment_ids = HashSet::new(); + for frags in indexed_fragments_per_delta { + for frag in frags { + if !fragment_ids.insert(frag.id) { + return None; + } + } + } + Some(fragment_ids.len()) +} + +fn serialize_index_statistics(stats: &serde_json::Value) -> Result { + serde_json::to_string(stats).map_err(|e| Error::Index { + message: format!("Failed to serialize index statistics: {}", e), + location: location!(), + }) +} + +async fn migrate_and_recompute_index_statistics(ds: &Dataset, index_name: &str) -> Result { + let mut ds = ds.clone(); + log::warn!( + "Detecting out-dated fragment metadata, migrating dataset. \ + To disable migration, set LANCE_AUTO_MIGRATION=false" + ); + ds.delete("false").await.map_err(|err| Error::Execution { + message: format!( + "Failed to migrate dataset while calculating index statistics. \ + To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", + err + ), + location: location!(), + })?; + ds.index_statistics(index_name).await +} + +async fn index_statistics_frag_reuse(ds: &Dataset) -> Result { + let index = ds + .open_frag_reuse_index(&NoOpMetricsCollector) + .await? + .expect("FragmentReuse index does not exist"); + serialize_index_statistics(&index.statistics()?) +} + +async fn index_statistics_mem_wal(ds: &Dataset) -> Result { + let index = ds + .open_mem_wal_index(&NoOpMetricsCollector) + .await? + .expect("MemWal index does not exist"); + serialize_index_statistics(&index.statistics()?) +} + +async fn index_statistics_scalar( + ds: &Dataset, + index_name: &str, + metadatas: Vec, +) -> Result { + let field_id = metadatas[0].fields[0]; + let field_path = ds.schema().field_path(field_id)?; + + let (indices_stats, index_uri, num_indices, updated_at) = + collect_regular_indices_statistics(ds, metadatas, &field_path).await?; + + let index_type_hint = indices_stats + .first() + .and_then(|stats| stats.get("index_type")) + .and_then(|v| v.as_str()); + let index_type = legacy_type_name(&index_uri, index_type_hint); + + let Some(( + num_indexed_rows_per_delta, + num_indexed_fragments, + num_unindexed_fragments, + num_indexed_rows, + num_unindexed_rows, + )) = gather_fragment_statistics(ds, index_name).await? + else { + return migrate_and_recompute_index_statistics(ds, index_name).await; + }; + + let stats = json!({ + "index_type": index_type, + "name": index_name, + "num_indices": num_indices, + "indices": indices_stats, + "num_indexed_fragments": num_indexed_fragments, + "num_indexed_rows": num_indexed_rows, + "num_unindexed_fragments": num_unindexed_fragments, + "num_unindexed_rows": num_unindexed_rows, + "num_indexed_rows_per_delta": num_indexed_rows_per_delta, + "updated_at_timestamp_ms": updated_at, + }); + + serialize_index_statistics(&stats) +} + +async fn collect_regular_indices_statistics( + ds: &Dataset, + metadatas: Vec, + field_path: &str, +) -> Result<(Vec, String, usize, Option)> { + let num_indices = metadatas.len(); + let updated_at = metadatas + .iter() + .filter_map(|m| m.created_at) + .max() + .map(|dt| dt.timestamp_millis() as u64); + + let mut indices_stats = Vec::with_capacity(num_indices); + let mut index_uri: Option = None; + + for meta in metadatas.iter() { + let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(ds, meta)?); + let index_details = scalar::fetch_index_details(ds, field_path, meta).await?; + if index_uri.is_none() { + index_uri = Some(index_details.type_url.clone()); + } + + let index_details_wrapper = scalar::IndexDetails(index_details.clone()); + if let Ok(plugin) = index_details_wrapper.get_plugin() { + if let Some(stats) = plugin + .load_statistics(index_store.clone(), index_details.as_ref()) + .await? + { + indices_stats.push(stats); + continue; + } + } + + let index = ds + .open_generic_index(field_path, &meta.uuid.to_string(), &NoOpMetricsCollector) + .await?; + + indices_stats.push(index.statistics()?); + } + + Ok(( + indices_stats, + index_uri.unwrap_or_else(|| "unknown".to_string()), + num_indices, + updated_at, + )) +} + +async fn gather_fragment_statistics( + ds: &Dataset, + index_name: &str, +) -> Result, usize, usize, usize, usize)>> { + let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?; + + let num_indexed_rows_per_delta = match sum_indexed_rows_per_delta(&indexed_fragments_per_delta) + { + Ok(rows) => rows, + Err(Error::Internal { message, .. }) + if auto_migrate_corruption() && message.contains("trigger a single write") => + { + return Ok(None); + } + Err(e) => return Err(e), + }; + + let Some(num_indexed_fragments) = unique_indexed_fragment_count(&indexed_fragments_per_delta) + else { + if auto_migrate_corruption() { + return Ok(None); + } + return Err(Error::Internal { + message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ + and trigger a single write to fix this" + .to_string(), + location: location!(), + }); + }; + + let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments; + let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); + + drop(indexed_fragments_per_delta); + let total_rows = ds.count_rows(None).await?; + let num_unindexed_rows = total_rows - num_indexed_rows; + + Ok(Some(( + num_indexed_rows_per_delta, + num_indexed_fragments, + num_unindexed_fragments, + num_indexed_rows, + num_unindexed_rows, + ))) +} + pub(crate) fn retain_supported_indices(indices: &mut Vec) { indices.retain(|idx| { let max_supported_version = idx