Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{metrics::MetricsCollector, Index, IndexType};
use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};

pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";

const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom

Expand Down Expand Up @@ -620,6 +621,7 @@ impl BitmapIndexPlugin {
index_store: &dyn IndexStore,
value_type: &DataType,
) -> Result<()> {
let num_bitmaps = state.len();
let schema = Arc::new(Schema::new(vec![
Field::new("keys", value_type.clone(), true),
Field::new("bitmaps", DataType::Binary, true),
Expand Down Expand Up @@ -672,8 +674,17 @@ impl BitmapIndexPlugin {
bitmap_index_file.write_record_batch(record_batch).await?;
}

// Finish file once at the end - this creates the file even if we wrote no batches
bitmap_index_file.finish().await?;
// Finish file with metadata that allows lightweight statistics reads
let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps }).map_err(|e| {
Error::Internal {
message: format!("failed to serialize bitmap statistics: {e}"),
location: location!(),
}
})?;
let mut metadata = HashMap::new();
metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);

bitmap_index_file.finish_with_metadata(metadata).await?;

Ok(())
}
Expand Down Expand Up @@ -782,6 +793,23 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
}

async fn load_statistics(
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
let stats = serde_json::from_str(value).map_err(|e| Error::Internal {
message: format!("failed to parse bitmap statistics metadata: {e}"),
location: location!(),
})?;
Ok(Some(stats))
} else {
Ok(None)
}
}
}

#[cfg(test)]
Expand All @@ -790,11 +818,12 @@ pub mod tests {
use crate::metrics::NoOpMetricsCollector;
use crate::scalar::lance_format::LanceIndexStore;
use arrow_array::{RecordBatch, StringArray, UInt64Array};
use arrow_schema::{Field, Schema};
use arrow_schema::{DataType, Field, Schema};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
use lance_io::object_store::ObjectStore;
use std::collections::HashMap;

#[tokio::test]
async fn test_bitmap_lazy_loading_and_cache() {
Expand Down
8 changes: 8 additions & 0 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,14 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin {
as Arc<dyn ScalarIndex>,
)
}

async fn load_statistics(
&self,
_index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}
}

#[derive(Debug)]
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-index/src/scalar/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug {
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>>;

/// Optional hook allowing a plugin to provide statistics without loading the index.
async fn load_statistics(
&self,
_index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}

/// Optional hook that plugins can use if they need to be aware of the registry
fn attach_registry(&self, _registry: Arc<IndexPluginRegistry>) {}

Expand Down
193 changes: 167 additions & 26 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow_schema::{DataType, Schema};
use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::{stream, StreamExt, TryStreamExt};
use futures::stream;
use itertools::Itertools;
use lance_core::cache::{CacheKey, UnsizedCacheKey};
use lance_core::utils::address::RowAddress;
Expand Down Expand Up @@ -196,6 +196,45 @@ fn auto_migrate_corruption() -> bool {
})
}

/// Derive a friendly (but not necessarily unique) type name from a type URL.
/// Extract a human-friendly type name from a type URL.
///
/// Strips prefixes like `type.googleapis.com/` and package names, then removes
/// trailing `IndexDetails` / `Index` so callers get a concise display name.
fn type_name_from_uri(index_uri: &str) -> String {
let type_name = index_uri.rsplit('/').next().unwrap_or(index_uri);
let type_name = type_name.rsplit('.').next().unwrap_or(type_name);
type_name.trim_end_matches("IndexDetails").to_string()
}
Comment thread
Xuanwo marked this conversation as resolved.

/// Legacy mapping from type URL to the old IndexType string for backwards compatibility.
/// Legacy mapping from type URL to the old IndexType string for backwards compatibility.
///
/// If `index_type_hint` is provided (e.g. parsed from the index statistics of a concrete
/// index instance), it takes precedence so callers can surface the exact index type even
/// when the type URL alone is too generic (such as VectorIndexDetails).
fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String {
if let Some(hint) = index_type_hint {
return hint.to_string();
}

let base = type_name_from_uri(index_uri);

match base.as_str() {
"BTree" => IndexType::BTree.to_string(),
"Bitmap" => IndexType::Bitmap.to_string(),
"LabelList" => IndexType::LabelList.to_string(),
"NGram" => IndexType::NGram.to_string(),
"ZoneMap" => IndexType::ZoneMap.to_string(),
"BloomFilter" => IndexType::BloomFilter.to_string(),
"Inverted" => IndexType::Inverted.to_string(),
"Json" => IndexType::Scalar.to_string(),
"Flat" | "Vector" => IndexType::Vector.to_string(),
other if other.contains("Vector") => IndexType::Vector.to_string(),
_ => "N/A".to_string(),
}
}

/// Builds index.
#[async_trait]
pub trait IndexBuilder {
Expand Down Expand Up @@ -904,25 +943,53 @@ impl DatasetIndexExt for Dataset {
let field_id = metadatas[0].fields[0];
let field_path = self.schema().field_path(field_id)?;

// Open all delta indices
let indices = stream::iter(metadatas.iter())
.then(|m| {
let field_path = field_path.clone();
async move {
self.open_generic_index(&field_path, &m.uuid.to_string(), &NoOpMetricsCollector)
.await
let mut indices_stats = Vec::with_capacity(metadatas.len());
let mut index_uri: Option<String> = None;
let mut index_typename: Option<String> = None;

for meta in metadatas.iter() {
let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(self, meta)?);
let index_details = scalar::fetch_index_details(self, &field_path, meta).await?;
if index_uri.is_none() {
index_uri = Some(index_details.type_url.clone());
}
let index_details_wrapper = scalar::IndexDetails(index_details.clone());

if let Ok(plugin) = index_details_wrapper.get_plugin() {
if index_typename.is_none() {
index_typename = Some(plugin.name().to_string());
}
})
.try_collect::<Vec<_>>()
.await?;

// Stastistics for each delta index.
let indices_stats = indices
.iter()
.map(|idx| idx.statistics())
.collect::<Result<Vec<_>>>()?;
if let Some(stats) = plugin
.load_statistics(index_store.clone(), index_details.as_ref())
.await?
{
indices_stats.push(stats);
continue;
}
}

let index = self
.open_generic_index(&field_path, &meta.uuid.to_string(), &NoOpMetricsCollector)
.await?;

if index_typename.is_none() {
// Fall back to a friendly name from the type URL if the plugin is unknown
let uri = index_uri
.as_deref()
.unwrap_or_else(|| index_details.type_url.as_str());
index_typename = Some(type_name_from_uri(uri));
}

indices_stats.push(index.statistics()?);
}

let index_type = indices[0].index_type().to_string();
let index_uri = index_uri.unwrap_or_else(|| "unknown".to_string());
let index_type_hint = indices_stats
.first()
.and_then(|stats| stats.get("index_type"))
.and_then(|v| v.as_str());
let index_type = legacy_type_name(&index_uri, index_type_hint);

let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;

Expand Down Expand Up @@ -1782,33 +1849,33 @@ fn is_vector_field(data_type: DataType) -> bool {

#[cfg(test)]
mod tests {
use super::*;
use crate::dataset::builder::DatasetBuilder;
use crate::dataset::optimize::{compact_files, CompactionOptions};
use crate::dataset::{WriteMode, WriteParams};
use crate::index::vector::VectorIndexParams;
use crate::session::Session;
use crate::utils::test::{copy_test_data_to_tmp, DatagenExt, FragmentCount, FragmentRowCount};
use arrow_array::Int32Array;

use lance_io::{assert_io_eq, assert_io_lt};

use super::*;

use arrow::array::AsArray;
use arrow::datatypes::{Float32Type, Int32Type};
use arrow_array::Int32Array;
use arrow_array::{
FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{Field, Schema};
use arrow_schema::{DataType, Field, Schema};
use futures::stream::TryStreamExt;
use lance_arrow::*;
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::gen_batch;
use lance_datagen::{array, BatchCount, Dimension, RowCount};
use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams};
use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME;
use lance_index::scalar::{
BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams,
};
use lance_index::vector::{
hnsw::builder::HnswBuildParams, ivf::IvfBuildParams, sq::builder::SQBuildParams,
};

use lance_io::{assert_io_eq, assert_io_lt};
use lance_linalg::distance::{DistanceType, MetricType};
use lance_testing::datagen::generate_random_array;
use rstest::rstest;
Expand Down Expand Up @@ -1873,6 +1940,80 @@ mod tests {
.is_err());
}

#[tokio::test]
async fn test_bitmap_index_statistics_minimal_io_via_dataset() {
const NUM_ROWS: usize = 500_000;
let test_dir = TempStrDir::default();
let schema = Arc::new(Schema::new(vec![Field::new(
"status",
DataType::Int32,
false,
)]));
let values: Vec<i32> = (0..NUM_ROWS as i32).collect();
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());

let mut dataset = Dataset::write(reader, &test_dir, None).await.unwrap();
let io_tracker = dataset.object_store().io_tracker().clone();

let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap);
dataset
.create_index(
&["status"],
IndexType::Bitmap,
Some("status_idx".to_string()),
&params,
true,
)
.await
.unwrap();

let indices = dataset.load_indices().await.unwrap();
let index_meta = indices
.iter()
.find(|idx| idx.name == "status_idx")
.expect("status_idx should exist");
let lookup_path = dataset
.indice_files_dir(index_meta)
.unwrap()
.child(index_meta.uuid.to_string())
.child(BITMAP_LOOKUP_NAME);
let meta = dataset.object_store.inner.head(&lookup_path).await.unwrap();
assert!(
meta.size >= 1_000_000,
"bitmap index should be large enough to fail without metadata path, size={} bytes",
meta.size
);

// Reset stats collected during index creation
io_tracker.incremental_stats();

dataset.index_statistics("status_idx").await.unwrap();

let stats = io_tracker.incremental_stats();
assert_io_eq!(
stats,
read_bytes,
4096,
"index_statistics should only read the index footer; got {} bytes",
stats.read_bytes
);
assert_io_lt!(
stats,
read_iops,
3,
"index_statistics should only require a head plus one range read; got {} ops",
stats.read_iops
);
assert_io_eq!(
stats,
written_bytes,
0,
"index_statistics should not perform writes"
);
}

fn sample_vector_field() -> Field {
let dimensions = 16;
let column_name = "vec";
Expand Down