diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index dbb1348fc6b..8d9eb8fe60e 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -48,6 +48,7 @@ use crate::{metrics::MetricsCollector, Index, IndexType}; use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader}; pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance"; +pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats"; const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom @@ -620,6 +621,7 @@ impl BitmapIndexPlugin { index_store: &dyn IndexStore, value_type: &DataType, ) -> Result<()> { + let num_bitmaps = state.len(); let schema = Arc::new(Schema::new(vec![ Field::new("keys", value_type.clone(), true), Field::new("bitmaps", DataType::Binary, true), @@ -672,8 +674,17 @@ impl BitmapIndexPlugin { bitmap_index_file.write_record_batch(record_batch).await?; } - // Finish file once at the end - this creates the file even if we wrote no batches - bitmap_index_file.finish().await?; + // Finish file with metadata that allows lightweight statistics reads + let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps }).map_err(|e| { + Error::Internal { + message: format!("failed to serialize bitmap statistics: {e}"), + location: location!(), + } + })?; + let mut metadata = HashMap::new(); + metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json); + + bitmap_index_file.finish_with_metadata(metadata).await?; Ok(()) } @@ -782,6 +793,23 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { ) -> Result> { Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) } + + async fn load_statistics( + &self, + index_store: Arc, + _index_details: &prost_types::Any, + ) -> Result> { + let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?; + if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) { + let stats = serde_json::from_str(value).map_err(|e| Error::Internal { + message: format!("failed to parse bitmap statistics metadata: {e}"), + location: location!(), + })?; + Ok(Some(stats)) + } else { + Ok(None) + } + } } #[cfg(test)] @@ -790,11 +818,12 @@ pub mod tests { use crate::metrics::NoOpMetricsCollector; use crate::scalar::lance_format::LanceIndexStore; use arrow_array::{RecordBatch, StringArray, UInt64Array}; - use arrow_schema::{Field, Schema}; + use arrow_schema::{DataType, Field, Schema}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; use lance_core::utils::{address::RowAddress, tempfile::TempObjDir}; use lance_io::object_store::ObjectStore; + use std::collections::HashMap; #[tokio::test] async fn test_bitmap_lazy_loading_and_cache() { diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 8b0c8a951a4..f29d18a6095 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -1309,6 +1309,14 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin { as Arc, ) } + + async fn load_statistics( + &self, + _index_store: Arc, + _index_details: &prost_types::Any, + ) -> Result> { + Ok(None) + } } #[derive(Debug)] diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index c53d46f0546..76b088518e3 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -156,6 +156,15 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { cache: &LanceCache, ) -> Result>; + /// Optional hook allowing a plugin to provide statistics without loading the index. + async fn load_statistics( + &self, + _index_store: Arc, + _index_details: &prost_types::Any, + ) -> Result> { + Ok(None) + } + /// Optional hook that plugins can use if they need to be aware of the registry fn attach_registry(&self, _registry: Arc) {} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2a3ad508483..522bd9a2aa9 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -11,7 +11,7 @@ use arrow_schema::{DataType, Schema}; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::stream; use itertools::Itertools; use lance_core::cache::{CacheKey, UnsizedCacheKey}; use lance_core::utils::address::RowAddress; @@ -196,6 +196,45 @@ fn auto_migrate_corruption() -> bool { }) } +/// Derive a friendly (but not necessarily unique) type name from a type URL. +/// Extract a human-friendly type name from a type URL. +/// +/// Strips prefixes like `type.googleapis.com/` and package names, then removes +/// trailing `IndexDetails` / `Index` so callers get a concise display name. +fn type_name_from_uri(index_uri: &str) -> String { + let type_name = index_uri.rsplit('/').next().unwrap_or(index_uri); + let type_name = type_name.rsplit('.').next().unwrap_or(type_name); + type_name.trim_end_matches("IndexDetails").to_string() +} + +/// Legacy mapping from type URL to the old IndexType string for backwards compatibility. +/// Legacy mapping from type URL to the old IndexType string for backwards compatibility. +/// +/// If `index_type_hint` is provided (e.g. parsed from the index statistics of a concrete +/// index instance), it takes precedence so callers can surface the exact index type even +/// when the type URL alone is too generic (such as VectorIndexDetails). +fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String { + if let Some(hint) = index_type_hint { + return hint.to_string(); + } + + let base = type_name_from_uri(index_uri); + + match base.as_str() { + "BTree" => IndexType::BTree.to_string(), + "Bitmap" => IndexType::Bitmap.to_string(), + "LabelList" => IndexType::LabelList.to_string(), + "NGram" => IndexType::NGram.to_string(), + "ZoneMap" => IndexType::ZoneMap.to_string(), + "BloomFilter" => IndexType::BloomFilter.to_string(), + "Inverted" => IndexType::Inverted.to_string(), + "Json" => IndexType::Scalar.to_string(), + "Flat" | "Vector" => IndexType::Vector.to_string(), + other if other.contains("Vector") => IndexType::Vector.to_string(), + _ => "N/A".to_string(), + } +} + /// Builds index. #[async_trait] pub trait IndexBuilder { @@ -904,25 +943,53 @@ impl DatasetIndexExt for Dataset { let field_id = metadatas[0].fields[0]; let field_path = self.schema().field_path(field_id)?; - // Open all delta indices - let indices = stream::iter(metadatas.iter()) - .then(|m| { - let field_path = field_path.clone(); - async move { - self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector) - .await + let mut indices_stats = Vec::with_capacity(metadatas.len()); + let mut index_uri: Option = None; + let mut index_typename: Option = None; + + for meta in metadatas.iter() { + let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(self, meta)?); + let index_details = scalar::fetch_index_details(self, &field_path, meta).await?; + if index_uri.is_none() { + index_uri = Some(index_details.type_url.clone()); + } + let index_details_wrapper = scalar::IndexDetails(index_details.clone()); + + if let Ok(plugin) = index_details_wrapper.get_plugin() { + if index_typename.is_none() { + index_typename = Some(plugin.name().to_string()); } - }) - .try_collect::>() - .await?; - // Stastistics for each delta index. - let indices_stats = indices - .iter() - .map(|idx| idx.statistics()) - .collect::>>()?; + if let Some(stats) = plugin + .load_statistics(index_store.clone(), index_details.as_ref()) + .await? + { + indices_stats.push(stats); + continue; + } + } + + let index = self + .open_generic_index(&field_path, &meta.uuid.to_string(), &NoOpMetricsCollector) + .await?; + + if index_typename.is_none() { + // Fall back to a friendly name from the type URL if the plugin is unknown + let uri = index_uri + .as_deref() + .unwrap_or_else(|| index_details.type_url.as_str()); + index_typename = Some(type_name_from_uri(uri)); + } + + indices_stats.push(index.statistics()?); + } - let index_type = indices[0].index_type().to_string(); + let index_uri = index_uri.unwrap_or_else(|| "unknown".to_string()); + let index_type_hint = indices_stats + .first() + .and_then(|stats| stats.get("index_type")) + .and_then(|v| v.as_str()); + let index_type = legacy_type_name(&index_uri, index_type_hint); let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; @@ -1782,33 +1849,33 @@ fn is_vector_field(data_type: DataType) -> bool { #[cfg(test)] mod tests { + use super::*; use crate::dataset::builder::DatasetBuilder; use crate::dataset::optimize::{compact_files, CompactionOptions}; use crate::dataset::{WriteMode, WriteParams}; use crate::index::vector::VectorIndexParams; use crate::session::Session; use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount}; - use arrow_array::Int32Array; - - use lance_io::{assert_io_eq, assert_io_lt}; - - use super::*; - use arrow::array::AsArray; use arrow::datatypes::{Float32Type, Int32Type}; + use arrow_array::Int32Array; use arrow_array::{ FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray, }; - use arrow_schema::{Field, Schema}; + use arrow_schema::{DataType, Field, Schema}; + use futures::stream::TryStreamExt; use lance_arrow::*; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::gen_batch; use lance_datagen::{array, BatchCount, Dimension, RowCount}; - use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams}; + use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME; + use lance_index::scalar::{ + BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams, + }; use lance_index::vector::{ hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams, }; - + use lance_io::{assert_io_eq, assert_io_lt}; use lance_linalg::distance::{DistanceType, MetricType}; use lance_testing::datagen::generate_random_array; use rstest::rstest; @@ -1873,6 +1940,80 @@ mod tests { .is_err()); } + #[tokio::test] + async fn test_bitmap_index_statistics_minimal_io_via_dataset() { + const NUM_ROWS: usize = 500_000; + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new( + "status", + DataType::Int32, + false, + )])); + let values: Vec = (0..NUM_ROWS as i32).collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let mut dataset = Dataset::write(reader, &test_dir, None).await.unwrap(); + let io_tracker = dataset.object_store().io_tracker().clone(); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap); + dataset + .create_index( + &["status"], + IndexType::Bitmap, + Some("status_idx".to_string()), + ¶ms, + true, + ) + .await + .unwrap(); + + let indices = dataset.load_indices().await.unwrap(); + let index_meta = indices + .iter() + .find(|idx| idx.name == "status_idx") + .expect("status_idx should exist"); + let lookup_path = dataset + .indice_files_dir(index_meta) + .unwrap() + .child(index_meta.uuid.to_string()) + .child(BITMAP_LOOKUP_NAME); + let meta = dataset.object_store.inner.head(&lookup_path).await.unwrap(); + assert!( + meta.size >= 1_000_000, + "bitmap index should be large enough to fail without metadata path, size={} bytes", + meta.size + ); + + // Reset stats collected during index creation + io_tracker.incremental_stats(); + + dataset.index_statistics("status_idx").await.unwrap(); + + let stats = io_tracker.incremental_stats(); + assert_io_eq!( + stats, + read_bytes, + 4096, + "index_statistics should only read the index footer; got {} bytes", + stats.read_bytes + ); + assert_io_lt!( + stats, + read_iops, + 3, + "index_statistics should only require a head plus one range read; got {} ops", + stats.read_iops + ); + assert_io_eq!( + stats, + written_bytes, + 0, + "index_statistics should not perform writes" + ); + } + fn sample_vector_field() -> Field { let dimensions = 16; let column_name = "vec";