From fcaafe1ec6f23009571d6a03f7fc1a0aa5c98b40 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Oct 2025 15:16:23 -0700 Subject: [PATCH 01/15] test: create framework for testing memory and io --- Cargo.lock | 11 ++ rust/lance-datafusion/src/datagen.rs | 23 +++- rust/lance/Cargo.toml | 1 + rust/lance/tests/README.md | 13 +++ rust/lance/tests/resource_test/index.rs | 90 ++++++++++++++ rust/lance/tests/resource_test/mod.rs | 3 + rust/lance/tests/resource_test/utils.rs | 149 ++++++++++++++++++++++++ rust/lance/tests/resource_test/write.rs | 48 ++++++++ rust/lance/tests/resource_tests.rs | 1 + 9 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 rust/lance/tests/README.md create mode 100644 rust/lance/tests/resource_test/index.rs create mode 100644 rust/lance/tests/resource_test/mod.rs create mode 100644 rust/lance/tests/resource_test/utils.rs create mode 100644 rust/lance/tests/resource_test/write.rs create mode 100644 rust/lance/tests/resource_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 9cb8d37275d..af5507e813d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4350,6 +4350,7 @@ dependencies = [ "tracing", "tracing-chrome", "tracing-subscriber", + "tracking-allocator", "url", "uuid", ] @@ -8667,6 +8668,16 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracking-allocator" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b61e0cb3385e17df7db29c565b40fd0350dfe8a076c7eea83d416e30cfd0581" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/rust/lance-datafusion/src/datagen.rs b/rust/lance-datafusion/src/datagen.rs index 70b07b9a20b..e3370088a2a 100644 --- a/rust/lance-datafusion/src/datagen.rs +++ b/rust/lance-datafusion/src/datagen.rs @@ -3,13 +3,15 @@ use std::sync::Arc; +use arrow_array::RecordBatchReader; use datafusion::{ execution::SendableRecordBatchStream, physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan}, }; use datafusion_common::DataFusionError; use futures::TryStreamExt; -use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount}; +use lance_core::Error; +use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RoundingBehavior, RowCount}; use crate::exec::OneShotExec; @@ -20,6 +22,13 @@ pub trait DatafusionDatagenExt { num_batches: BatchCount, ) -> SendableRecordBatchStream; + fn into_df_stream_bytes( + self, + batch_size: ByteCount, + num_batches: BatchCount, + rounding_behavior: RoundingBehavior, + ) -> Result; + fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc; } @@ -34,6 +43,18 @@ impl DatafusionDatagenExt for BatchGeneratorBuilder { Box::pin(RecordBatchStreamAdapter::new(schema, stream)) } + fn into_df_stream_bytes( + self, + batch_size: ByteCount, + num_batches: BatchCount, + rounding_behavior: RoundingBehavior, + ) -> Result { + let stream = self.into_reader_bytes(batch_size, num_batches, rounding_behavior)?; + let schema = stream.schema(); + let stream = futures::stream::iter(stream).map_err(DataFusionError::from); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc { let stream = self.into_df_stream(batch_size, num_batches); Arc::new(OneShotExec::new(stream)) diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 548f7c33312..a738758532d 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -103,6 +103,7 @@ tempfile.workspace = true test-log.workspace = true tracing-chrome = "0.7.1" rstest = { workspace = true } +tracking-allocator = { version = "0.4", features = ["tracing-compat"] } # For S3 / DynamoDB tests aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } diff --git a/rust/lance/tests/README.md b/rust/lance/tests/README.md new file mode 100644 index 00000000000..bc738ca5cb4 --- /dev/null +++ b/rust/lance/tests/README.md @@ -0,0 +1,13 @@ +Tests for memory and IO usage. + +## Debugging memory usage + +Once you've identified a test that is using too much memory, you can use +bytehound to find the source of the memory usage. + +```shell +LD_PRELOAD=/usr/local/lib/libbytehound.so \ + RUST_ALLOC_TIMINGS=true \ + cargo test --test resource_tests resource_test::index::test_label_list_lifecycle +bytehound server memory-profiling_*.dat +``` \ No newline at end of file diff --git a/rust/lance/tests/resource_test/index.rs b/rust/lance/tests/resource_test/index.rs new file mode 100644 index 00000000000..cc8bbf878fd --- /dev/null +++ b/rust/lance/tests/resource_test/index.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; + +use super::utils::AllocTracker; +use all_asserts::assert_le; +use arrow_schema::{DataType, Field}; +use lance::dataset::InsertBuilder; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{array, gen_batch, BatchCount, RowCount}; +use lance_index::DatasetIndexExt; +use lance_index::{scalar::ScalarIndexParams, IndexType}; + +// Key things to test +// - Getting index stats requires reading only the metadata (no data read) +// - + +// Ops with index +// - Build +// - Load +// - get stats +// - + +#[tokio::test] +async fn test_label_list_lifecycle() { + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + // Create a stream of 100MB of data, in batches + { + // 12 bytes per list entry, average 5 entries per list -> 60 bytes per row + // 1MB / 60 = ~16k rows per batch + let batch_size = 16_000; + let num_batches = BatchCount::from(100); + let data = gen_batch() + .col( + "value", + array::rand_type(&DataType::List(Arc::new(Field::new( + "item", + DataType::UInt8, + false, + )))), + ) + .into_df_stream(RowCount::from(batch_size), num_batches); + let _ = InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + } + + // Build index on column + // let io_tracking = todo!(); + let alloc_tracker = AllocTracker::new(); + { + let _guard = alloc_tracker.enter(); + let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + + dataset + .create_index_builder( + &["value"], + IndexType::Scalar, + &ScalarIndexParams::new("labellist".to_string()), + ) + .await + .unwrap(); + } + + let mem_stats = alloc_tracker.stats(); + assert_le!( + mem_stats.max_bytes_allocated, + 70 * 1024 * 1024, + "Memory usage too high" + ); + assert_le!( + mem_stats.total_bytes_allocated, + 400 * 1024 * 1024, + "Total memory allocation too high" + ); + assert_eq!(mem_stats.net_bytes_allocated(), 0, "memory leak"); + + // Drop everything, assert no leak + + // Call load index + + // assert minimal IO and memory usage done + + // Drop everything, assert no leak + + // Call get stats + // Assert IO and memory are small + + // Drop everything, assert no leak +} diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs new file mode 100644 index 00000000000..7a3657a8d54 --- /dev/null +++ b/rust/lance/tests/resource_test/mod.rs @@ -0,0 +1,3 @@ +mod index; +mod utils; +mod write; diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs new file mode 100644 index 00000000000..7b6b673cf74 --- /dev/null +++ b/rust/lance/tests/resource_test/utils.rs @@ -0,0 +1,149 @@ +use std::alloc::System; +use std::collections::HashMap; +use std::sync::{Arc, LazyLock, Mutex, Once}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::Registry; +use tracking_allocator::{ + AllocationGroupId, AllocationGroupToken, AllocationLayer, AllocationRegistry, + AllocationTracker, Allocator, +}; + +#[global_allocator] +static GLOBAL: Allocator = Allocator::system(); + +#[derive(Default, Clone, Debug)] +pub struct AllocStats { + pub max_bytes_allocated: isize, + pub total_bytes_allocated: isize, + pub total_bytes_deallocated: isize, + pub total_allocations: usize, + pub total_deallocations: usize, +} + +impl AllocStats { + pub fn net_bytes_allocated(&self) -> isize { + self.total_bytes_allocated - self.total_bytes_deallocated + } +} + +static GLOBAL_STATS: LazyLock>>> = + std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new()))); + +struct MemoryTracker; + +impl AllocationTracker for MemoryTracker { + fn allocated( + &self, + _addr: usize, + _object_size: usize, + wrapped_size: usize, + group_id: AllocationGroupId, + ) { + let mut guard = GLOBAL_STATS.lock().unwrap(); + // if + // dbg!(&group_id); + let stats = guard.entry(group_id).or_default(); + stats.total_bytes_allocated += wrapped_size as isize; + stats.total_allocations += 1; + stats.max_bytes_allocated = stats.max_bytes_allocated.max(stats.net_bytes_allocated()); + } + + fn deallocated( + &self, + _addr: usize, + _object_size: usize, + wrapped_size: usize, + source_group_id: AllocationGroupId, + _current_group_id: AllocationGroupId, + ) { + let mut guard = GLOBAL_STATS.lock().unwrap(); + let stats = guard.entry(source_group_id).or_default(); + stats.total_bytes_deallocated += wrapped_size as isize; + stats.total_deallocations += 1; + } +} + +static INIT: Once = Once::new(); + +// The alloc tracker holds a span and an associated allocation group id. +pub struct AllocTracker { + group_id: AllocationGroupId, + span: tracing::Span, +} + +impl AllocTracker { + pub fn new() -> Self { + INIT.call_once(init_memory_tracking); + + let span = tracing::span!(tracing::Level::INFO, "AllocTracker"); + + let token = AllocationGroupToken::register().expect("failed to register token"); + let group_id = token.id(); + token.attach_to_span(&span); + + Self { group_id, span } + } + + pub fn enter(&self) -> AllocGuard<'_> { + AllocGuard::new(self) + } + + pub fn stats(self) -> AllocStats { + let mut stats = GLOBAL_STATS.lock().unwrap(); + stats.remove(&self.group_id).unwrap_or_default() + } +} + +pub struct AllocGuard<'a> { + _guard: tracing::span::Entered<'a>, +} + +impl<'a> AllocGuard<'a> { + #[allow(clippy::print_stderr)] + pub fn new(tracker: &'a AllocTracker) -> Self { + if std::env::var("RUST_ALLOC_TIMINGS").is_ok() { + eprintln!("alloc:enter:{}", chrono::Utc::now().to_rfc3339()); + } + AllocGuard { + _guard: tracker.span.enter(), + } + } +} + +impl Drop for AllocGuard<'_> { + #[allow(clippy::print_stderr)] + fn drop(&mut self) { + if std::env::var("RUST_ALLOC_TIMINGS").is_ok() { + eprintln!("alloc:exit:{}", chrono::Utc::now().to_rfc3339()); + } + } +} + +pub fn init_memory_tracking() { + let registry = Registry::default().with(AllocationLayer::new()); + tracing::subscriber::set_global_default(registry) + .expect("failed to install tracing subscriber"); + + let tracker = MemoryTracker; + AllocationRegistry::set_global_tracker(tracker).expect("failed to set global tracker"); + AllocationRegistry::enable_tracking(); +} + +#[test] +fn check_memory_leak() { + // Make sure AllocTracker can detect leaks + let mut leaked = Vec::new(); + let tracker = AllocTracker::new(); + { + let _guard = tracker.enter(); + let v = vec![0u8; 1024 * 1024]; + leaked.resize(1024, 0u8); + drop(v); + } + let stats = tracker.stats(); + assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024 + 16); + assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024 + 16); + assert_eq!(stats.total_bytes_deallocated, (1024 * 1024) + 8); + assert_eq!(stats.total_allocations, 2); + assert_eq!(stats.net_bytes_allocated(), 1024 + 8); +} diff --git a/rust/lance/tests/resource_test/write.rs b/rust/lance/tests/resource_test/write.rs new file mode 100644 index 00000000000..ace24c4279e --- /dev/null +++ b/rust/lance/tests/resource_test/write.rs @@ -0,0 +1,48 @@ +use super::utils::AllocTracker; +use all_asserts::assert_le; +use arrow_schema::DataType; +use lance::dataset::InsertBuilder; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RoundingBehavior}; + +// TODO: also add IO + +#[tokio::test] +async fn test_insert_memory() { + // Create a stream of 100MB of data, in batches + let batch_size = 1024 * 1024; // 1MB + let num_batches = BatchCount::from(100); + let data = gen_batch() + .col("a", array::rand_type(&DataType::Int32)) + .into_df_stream_bytes( + ByteCount::from(batch_size), + num_batches, + RoundingBehavior::RoundDown, + ) + .unwrap(); + + let alloc_tracker = AllocTracker::new(); + { + let _guard = alloc_tracker.enter(); + + // write out to temporary directory + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + let _dataset = InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + } + + let stats = alloc_tracker.stats(); + // Allow for 15x the batch size to account for: + // - Allocator metadata overhead (wrapped_size vs object_size) + // - Internal buffering and temporary allocations + // - Arrow array overhead + // The key test is that we don't load all 100MB into memory at once + assert_le!( + stats.max_bytes_allocated, + (batch_size * 15) as isize, + "Max memory usage exceeded" + ); +} diff --git a/rust/lance/tests/resource_tests.rs b/rust/lance/tests/resource_tests.rs new file mode 100644 index 00000000000..1b4267ce9ec --- /dev/null +++ b/rust/lance/tests/resource_tests.rs @@ -0,0 +1 @@ +mod resource_test; From 07cccbcf271effec82b1f983864eadbddaf20412 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 9 Oct 2025 15:44:53 -0700 Subject: [PATCH 02/15] fix --- rust/lance/tests/README.md | 8 +++++--- rust/lance/tests/resource_test/index.rs | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/rust/lance/tests/README.md b/rust/lance/tests/README.md index bc738ca5cb4..74ee73ee97c 100644 --- a/rust/lance/tests/README.md +++ b/rust/lance/tests/README.md @@ -3,11 +3,13 @@ Tests for memory and IO usage. ## Debugging memory usage Once you've identified a test that is using too much memory, you can use -bytehound to find the source of the memory usage. +bytehound to find the source of the memory usage. (Note: we need to run +bytehound on the binary, not on cargo, so we have to extract the test binary path.) ```shell +TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p') LD_PRELOAD=/usr/local/lib/libbytehound.so \ RUST_ALLOC_TIMINGS=true \ - cargo test --test resource_tests resource_test::index::test_label_list_lifecycle + $TEST_BINARY resource_test::index::test_label_list_lifecycle bytehound server memory-profiling_*.dat -``` \ No newline at end of file +``` diff --git a/rust/lance/tests/resource_test/index.rs b/rust/lance/tests/resource_test/index.rs index cc8bbf878fd..48850c33026 100644 --- a/rust/lance/tests/resource_test/index.rs +++ b/rust/lance/tests/resource_test/index.rs @@ -73,7 +73,8 @@ async fn test_label_list_lifecycle() { 400 * 1024 * 1024, "Total memory allocation too high" ); - assert_eq!(mem_stats.net_bytes_allocated(), 0, "memory leak"); + // We do leak some memory for one-time initialization of DataFusion session. + assert_le!(mem_stats.net_bytes_allocated(), 300_000, "memory leak"); // Drop everything, assert no leak From 89c266f5ab442c778d37a82b01a09a6f912ce812 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 10 Oct 2025 16:03:07 -0700 Subject: [PATCH 03/15] figure out how to test index cache size bugs --- rust/lance-core/src/cache.rs | 11 +- rust/lance/src/dataset.rs | 2 +- rust/lance/tests/resource_test/index_cache.rs | 143 ++++++++++++++++++ rust/lance/tests/resource_test/mod.rs | 1 + 4 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 rust/lance/tests/resource_test/index_cache.rs diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index d771fe84f68..4910309c9cc 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -22,8 +22,8 @@ type ArcAny = Arc; #[derive(Clone)] pub struct SizedRecord { - record: ArcAny, - size_accessor: Arc usize + Send + Sync>, + pub record: ArcAny, + pub size_accessor: Arc usize + Send + Sync>, } impl std::fmt::Debug for SizedRecord { @@ -277,6 +277,13 @@ impl LanceCache { self.misses.store(0, Ordering::Relaxed); } + // For testing: get all entries in the cache + #[doc(hidden)] + pub async fn entries(&self) -> Vec<((String, TypeId), SizedRecord)> { + self.cache.run_pending_tasks().await; + self.cache.iter().map(|(k, v)| ((*k).clone(), v)).collect() + } + // CacheKey-based methods pub async fn insert_with_key(&self, cache_key: &K, metadata: Arc) where diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3adbd30bda6..c1478901e97 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -157,7 +157,7 @@ pub struct Dataset { pub(crate) fragment_bitmap: Arc, // These are references to session caches, but with the dataset URI as a prefix. - pub(crate) index_cache: Arc, + pub index_cache: Arc, pub(crate) metadata_cache: Arc, /// File reader options to use when reading data files. diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs new file mode 100644 index 00000000000..937170af3b9 --- /dev/null +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Tests to validate IndexCache memory accounting by creating real indices, +//! prewarming them to populate the cache, then evicting entries one-by-one +//! and verifying that memory reduction matches DeepSizeOf estimates. +//! +//! This approach tests DeepSizeOf in realistic conditions where items are +//! actually cached, catching issues like: +//! - Arc sharing and double-counting +//! - Trait object DeepSizeOf under-counting +//! - Cache overhead miscalculations + +use super::utils::AllocTracker; +use arrow_schema::{DataType, Field}; +use lance::dataset::InsertBuilder; +use lance_core::cache::LanceCache; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{array, gen_batch, BatchCount, RowCount}; +use lance_index::DatasetIndexExt; +use lance_index::{scalar::ScalarIndexParams, IndexType}; +use rand::seq::SliceRandom; +use std::sync::Arc; + +/// Test framework that validates DeepSizeOf by creating an index, prewarming cache, +/// then evicting entries one-by-one and verifying memory reduction +/// +/// # Arguments +/// * `cache` - The cache instance to test +/// * `prewarm_fn` - Function to call that populates the cache +/// * `test_name` - Name of the test for error messages +/// * `tolerance_per_entry` - Acceptable deviation in bytes per cache entry +async fn test_cache_accounting( + cache: LanceCache, + prewarm_fn: F, + test_name: &str, + tolerance_per_entry: usize, +) where + F: FnOnce() -> Fut, + Fut: std::future::Future, +{ + prewarm_fn().await; + if cache.size().await == 0 { + panic!("{}: Cache is empty after prewarm!", test_name); + } + + let mut entries = cache.entries().await; + entries.shuffle(&mut rand::rng()); + drop(cache); + + for ((key, _), entry) in entries { + assert_eq!( + Arc::strong_count(&entry.record), + 1, + "{}: Entry for key {:?} has unexpected strong count {}", + test_name, + key, + Arc::strong_count(&entry.record) + ); + let expected_freed = deepsize::DeepSizeOf::deep_size_of(&entry); + + let tracker = AllocTracker::new(); + { + let _guard = tracker.enter(); + // Evict the entry - this should free memory + drop(entry); + } + let stats = tracker.stats(); + + // Actual memory freed = deallocations - allocations during eviction + let actual_freed = stats + .total_bytes_deallocated + .saturating_sub(stats.total_bytes_allocated); + + assert!( + (expected_freed as isize - actual_freed).abs() <= tolerance_per_entry as isize, + "{}: Entry (key: {:?}): Expected to free {} bytes, but actually freed {} bytes (tolerance: {}). Stats: alloc={}, dealloc={}", + test_name, + key, + expected_freed, + actual_freed, + tolerance_per_entry, + stats.total_bytes_allocated, + stats.total_bytes_deallocated, + ); + } +} + +#[tokio::test] +async fn test_label_list_index_cache_accounting() { + // Create a dataset with a label list (inverted) index + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + + // Create test data - list of uint8 values + // Using larger dataset to get bigger cache entries: ~50MB + let batch_size = 16_000; + let num_batches = BatchCount::from(50); + let data = gen_batch() + .col( + "labels", + array::rand_type(&DataType::List(Arc::new(Field::new( + "item", + DataType::UInt8, + false, + )))), + ) + .into_df_stream(RowCount::from(batch_size), num_batches); + + InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + + // Build label list index + let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + dataset + .create_index_builder( + &["labels"], + IndexType::Scalar, + &ScalarIndexParams::new("labellist".to_string()), + ) + .await + .unwrap(); + + // Reload dataset to get fresh index with cache + let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + + // Access the index cache (now public) + let cache = (*dataset.index_cache).clone(); + + // Test cache accounting by prewarming the index + test_cache_accounting( + cache, + || async { + dataset.prewarm_index("labels_idx").await.unwrap(); + drop(dataset); + }, + "LabelListIndex", + 10_000, // 10KB tolerance per entry - accounts for cache overhead + ) + .await; +} diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs index 7a3657a8d54..52f5fd388ed 100644 --- a/rust/lance/tests/resource_test/mod.rs +++ b/rust/lance/tests/resource_test/mod.rs @@ -1,3 +1,4 @@ mod index; +mod index_cache; mod utils; mod write; From 90449b3afb97d04d8da16c4400dc6b0f6247f4a7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 14 Oct 2025 14:47:16 -0700 Subject: [PATCH 04/15] fix test --- rust/lance-core/src/cache.rs | 5 +++- rust/lance/tests/resource_test/index_cache.rs | 10 +++++-- rust/lance/tests/resource_test/utils.rs | 27 ++++++++++++++----- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 4910309c9cc..37d8dce9287 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -24,12 +24,13 @@ type ArcAny = Arc; pub struct SizedRecord { pub record: ArcAny, pub size_accessor: Arc usize + Send + Sync>, + pub type_name: &'static str, } impl std::fmt::Debug for SizedRecord { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SizedRecord") - .field("record", &self.record) + .field("type", &self.type_name) .finish() } } @@ -42,12 +43,14 @@ impl DeepSizeOf for SizedRecord { impl SizedRecord { fn new(record: Arc) -> Self { + // Calculate size once and store it // +8 for the size of the Arc pointer itself let size_accessor = |record: &ArcAny| -> usize { record.downcast_ref::().unwrap().deep_size_of() + 8 }; Self { record, size_accessor: Arc::new(size_accessor), + type_name: std::any::type_name::(), } } } diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs index 937170af3b9..40763e3beca 100644 --- a/rust/lance/tests/resource_test/index_cache.rs +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -58,6 +58,7 @@ async fn test_cache_accounting( Arc::strong_count(&entry.record) ); let expected_freed = deepsize::DeepSizeOf::deep_size_of(&entry); + let type_name = entry.type_name; let tracker = AllocTracker::new(); { @@ -72,13 +73,16 @@ async fn test_cache_accounting( .total_bytes_deallocated .saturating_sub(stats.total_bytes_allocated); + let deviation = (expected_freed as isize - actual_freed).abs(); assert!( - (expected_freed as isize - actual_freed).abs() <= tolerance_per_entry as isize, - "{}: Entry (key: {:?}): Expected to free {} bytes, but actually freed {} bytes (tolerance: {}). Stats: alloc={}, dealloc={}", + deviation <= tolerance_per_entry as isize, + "{}: Entry (key: {:?}, type: {}): Expected to free {} bytes, but actually freed {} bytes (deviation: {}, tolerance: {}). Stats: alloc={}, dealloc={}", test_name, key, + type_name, expected_freed, actual_freed, + deviation, tolerance_per_entry, stats.total_bytes_allocated, stats.total_bytes_deallocated, @@ -88,6 +92,8 @@ async fn test_cache_accounting( #[tokio::test] async fn test_label_list_index_cache_accounting() { + AllocTracker::init(); + // Create a dataset with a label list (inverted) index let tmp_dir = tempfile::tempdir().unwrap(); let tmp_path = tmp_dir.path().to_str().unwrap(); diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs index 7b6b673cf74..c7c98a13217 100644 --- a/rust/lance/tests/resource_test/utils.rs +++ b/rust/lance/tests/resource_test/utils.rs @@ -39,9 +39,11 @@ impl AllocationTracker for MemoryTracker { wrapped_size: usize, group_id: AllocationGroupId, ) { + if group_id == AllocationGroupId::ROOT { + // We don't track root allocations + return; + } let mut guard = GLOBAL_STATS.lock().unwrap(); - // if - // dbg!(&group_id); let stats = guard.entry(group_id).or_default(); stats.total_bytes_allocated += wrapped_size as isize; stats.total_allocations += 1; @@ -54,10 +56,19 @@ impl AllocationTracker for MemoryTracker { _object_size: usize, wrapped_size: usize, source_group_id: AllocationGroupId, - _current_group_id: AllocationGroupId, + current_group_id: AllocationGroupId, ) { + let group_id = if source_group_id != AllocationGroupId::ROOT { + source_group_id + } else { + current_group_id + }; + if group_id == AllocationGroupId::ROOT { + // We don't track root allocations + return; + } let mut guard = GLOBAL_STATS.lock().unwrap(); - let stats = guard.entry(source_group_id).or_default(); + let stats = guard.entry(group_id).or_default(); stats.total_bytes_deallocated += wrapped_size as isize; stats.total_deallocations += 1; } @@ -72,13 +83,17 @@ pub struct AllocTracker { } impl AllocTracker { - pub fn new() -> Self { + pub fn init() { INIT.call_once(init_memory_tracking); + } - let span = tracing::span!(tracing::Level::INFO, "AllocTracker"); + pub fn new() -> Self { + Self::init(); let token = AllocationGroupToken::register().expect("failed to register token"); let group_id = token.id(); + + let span = tracing::span!(tracing::Level::INFO, "AllocTracker"); token.attach_to_span(&span); Self { group_id, span } From 26af669f96dbbecbd99c8b0d43995f394334ce70 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 14 Oct 2025 15:43:04 -0700 Subject: [PATCH 05/15] fix up test --- rust/lance-index/src/scalar/bitmap.rs | 18 ++++++---- rust/lance-index/src/scalar/label_list.rs | 1 + rust/lance/tests/resource_test/index_cache.rs | 33 +++++++++++++++---- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 9f3779668f1..cd1081aab4b 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -104,8 +104,7 @@ pub struct BitmapIndex { value_type: DataType, - store: Arc, - + // store: Arc, index_cache: WeakLanceCache, frag_reuse_index: Option>, @@ -113,6 +112,13 @@ pub struct BitmapIndex { lazy_reader: LazyIndexReader, } +impl Drop for BitmapIndex { + fn drop(&mut self) { + println!("Dropping BitmapIndex"); + dbg!(DeepSizeOf::deep_size_of(self)); + } +} + #[derive(Debug, Clone)] pub struct BitmapKey { value: OrderableScalarValue, @@ -140,7 +146,7 @@ impl BitmapIndex { index_map, null_map, value_type, - store, + // store, index_cache, frag_reuse_index, lazy_reader, @@ -296,9 +302,9 @@ impl DeepSizeOf for BitmapIndex { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { let mut total_size = 0; - total_size += self.index_map.deep_size_of_children(context); - total_size += self.store.deep_size_of_children(context); - + total_size += dbg!(self.index_map.deep_size_of_children(context)); + total_size += dbg!(self.null_map.deep_size_of_children(context)); + // total_size += self.store.deep_size_of_children(context); total_size } } diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index b22a12f8e4a..e356e463e81 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::sync::Weak; use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc}; use arrow::array::AsArray; diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs index 40763e3beca..56cd556fe5e 100644 --- a/rust/lance/tests/resource_test/index_cache.rs +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -12,6 +12,7 @@ //! - Cache overhead miscalculations use super::utils::AllocTracker; +use arrow::datatypes::UInt8Type; use arrow_schema::{DataType, Field}; use lance::dataset::InsertBuilder; use lance_core::cache::LanceCache; @@ -74,6 +75,7 @@ async fn test_cache_accounting( .saturating_sub(stats.total_bytes_allocated); let deviation = (expected_freed as isize - actual_freed).abs(); + dbg!((expected_freed, actual_freed)); assert!( deviation <= tolerance_per_entry as isize, "{}: Entry (key: {:?}, type: {}): Expected to free {} bytes, but actually freed {} bytes (deviation: {}, tolerance: {}). Stats: alloc={}, dealloc={}", @@ -90,6 +92,26 @@ async fn test_cache_accounting( } } +// fn test_deep_size_of(value: impl deepsize::DeepSizeOf) { +// let tracker = AllocTracker::new(); +// let reported_size = deepsize::DeepSizeOf::deep_size_of(&value); +// { +// let _guard = tracker.enter(); +// drop(value); +// } +// let stats = tracker.stats(); +// let actual_freed = stats.total_bytes_deallocated.saturating_sub(stats.total_bytes_allocated) as usize; +// assert_eq!(reported_size, actual_freed); +// } + +// #[test] +// fn test_deep_size_of_label_list_index() { +// AllocTracker::init(); +// LabelListIndex::nemw +// let value = todo!(); +// test_deep_size_of(value); +// } + #[tokio::test] async fn test_label_list_index_cache_accounting() { AllocTracker::init(); @@ -100,16 +122,12 @@ async fn test_label_list_index_cache_accounting() { // Create test data - list of uint8 values // Using larger dataset to get bigger cache entries: ~50MB - let batch_size = 16_000; + let batch_size = 1_000_000; let num_batches = BatchCount::from(50); let data = gen_batch() .col( "labels", - array::rand_type(&DataType::List(Arc::new(Field::new( - "item", - DataType::UInt8, - false, - )))), + array::rand_list_any(array::cycle::(vec![1u8, 2]), false), ) .into_df_stream(RowCount::from(batch_size), num_batches); @@ -143,7 +161,8 @@ async fn test_label_list_index_cache_accounting() { drop(dataset); }, "LabelListIndex", - 10_000, // 10KB tolerance per entry - accounts for cache overhead + // TODO: if we impl DeepSizeOf for FileReader, then we should be able to reduce this tolerance + 60_000, // 60KB tolerance per entry - accounts for cache overhead ) .await; } From 8b0c4eed6d7020b67e4c30be65da89804ca891d2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 15 Oct 2025 16:07:10 -0700 Subject: [PATCH 06/15] wip --- Cargo.lock | 1 + python/Cargo.lock | 1 + rust/lance-core/src/cache.rs | 30 ++++++++- rust/lance-encoding/Cargo.toml | 1 + rust/lance-encoding/build.rs | 63 +++++++++++++++++++ rust/lance-encoding/src/decoder.rs | 33 ++++++++-- rust/lance-file/src/v2/reader.rs | 35 ++++++++--- rust/lance-index/src/scalar.rs | 2 +- rust/lance-index/src/scalar/bitmap.rs | 34 ++++++---- rust/lance-index/src/scalar/label_list.rs | 1 - rust/lance-index/src/scalar/lance_format.rs | 10 +-- rust/lance/tests/resource_test/index_cache.rs | 8 +-- 12 files changed, 179 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af5507e813d..1d1841291ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4485,6 +4485,7 @@ dependencies = [ "byteorder", "bytes", "criterion", + "deepsize", "fsst", "futures", "hex", diff --git a/python/Cargo.lock b/python/Cargo.lock index c88deac1132..ed48354b1d3 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4028,6 +4028,7 @@ dependencies = [ "bytemuck", "byteorder", "bytes", + "deepsize", "fsst", "futures", "hex", diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 37d8dce9287..c4491581702 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -350,9 +350,8 @@ pub struct WeakLanceCache { misses: Arc, } -impl WeakLanceCache { - /// Create a weak reference from a strong LanceCache - pub fn from(cache: &LanceCache) -> Self { +impl From<&LanceCache> for WeakLanceCache { + fn from(cache: &LanceCache) -> Self { Self { inner: Arc::downgrade(&cache.cache), prefix: cache.prefix.clone(), @@ -360,6 +359,31 @@ impl WeakLanceCache { misses: cache.misses.clone(), } } +} + +impl From for WeakLanceCache { + fn from(cache: LanceCache) -> Self { + Self { + inner: Arc::downgrade(&cache.cache), + prefix: cache.prefix, + hits: cache.hits, + misses: cache.misses, + } + } +} + +impl WeakLanceCache { + pub fn upgrade(self) -> LanceCache { + let Some(cache) = self.inner.upgrade() else { + return LanceCache::no_cache(); + }; + LanceCache { + cache, + prefix: self.prefix, + hits: self.hits, + misses: self.misses, + } + } /// Appends a prefix to the cache key pub fn with_key_prefix(&self, prefix: &str) -> Self { diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 27278667a6f..adfe25e85dc 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -23,6 +23,7 @@ arrow-schema.workspace = true arrow-select.workspace = true lance-bitpacking = { workspace = true, optional = true } bytes.workspace = true +deepsize.workspace = true futures.workspace = true fsst.workspace = true hex = "0.4.3" diff --git a/rust/lance-encoding/build.rs b/rust/lance-encoding/build.rs index 92bf497beeb..1c45a7da2dd 100644 --- a/rust/lance-encoding/build.rs +++ b/rust/lance-encoding/build.rs @@ -14,6 +14,69 @@ fn main() -> Result<()> { prost_build.protoc_arg("--experimental_allow_proto3_optional"); prost_build.enable_type_names(); prost_build.bytes(["."]); // Enable Bytes type for all messages to avoid Vec clones. + + // Implement DeepSizeOf so we can keep metadata in cache + for path in &[ + "lance.encodings.ColumnEncoding", + "lance.encodings.Blob", + "lance.encodings.ZoneIndex", + "lance.encodings.ArrayEncoding", + "lance.encodings.Flat", + "lance.encodings.Nullable", + "lance.encodings.FixedSizeList", + "lance.encodings.List", + "lance.encodings.Struct", + "lance.encodings.Binary", + "lance.encodings.Dictionary", + "lance.encodings.PackedStruct", + "lance.encodings.SimpleStruct", + "lance.encodings.Bitpacked", + "lance.encodings.FixedSizeBinary", + "lance.encodings.BitpackedForNonNeg", + "lance.encodings.InlineBitpacking", + "lance.encodings.OutOfLineBitpacking", + "lance.encodings.Variable", + "lance.encodings.PackedStructFixedWidthMiniBlock", + "lance.encodings.Block", + "lance.encodings.Rle", + "lance.encodings.GeneralMiniBlock", + "lance.encodings.ByteStreamSplit", + "lance.encodings.Buffer", + "lance.encodings.Compression", + "lance.encodings.Nullable.NoNull", + "lance.encodings.Nullable.AllNull", + "lance.encodings.Nullable.SomeNull", + "lance.encodings21.MiniBlockLayout", + "lance.encodings21.CompressiveEncoding", + "lance.encodings21.FullZipLayout", + "lance.encodings21.AllNullLayout", + "lance.encodings21.BlobLayout", + "lance.encodings21.PageLayout", + "lance.encodings21.BufferCompression", + "lance.encodings21.Flat", + "lance.encodings21.Variable", + "lance.encodings21.OutOfLineBitpacking", + "lance.encodings21.InlineBitpacking", + "lance.encodings21.Dictionary", + "lance.encodings21.Rle", + "lance.encodings21.FixedSizeList", + "lance.encodings21.PackedStruct", + "lance.encodings21.General", + "lance.encodings21.ByteStreamSplit", + ] { + prost_build.type_attribute(path, "#[derive(deepsize::DeepSizeOf)]"); + } + for path in &[ + "lance.encodings.ArrayEncoding.array_encoding", + "lance.encodings.ColumnEncoding.column_encoding", + "lance.encodings.Nullable.nullability", + "lance.encodings21.FullZipLayout.details", + "lance.encodings21.PageLayout.layout", + "lance.encodings21.CompressiveEncoding.compression", + ] { + prost_build.enum_attribute(path, "#[derive(deepsize::DeepSizeOf)]"); + } + prost_build.compile_protos(&["./protos/encodings_v2_0.proto"], &["./protos"])?; prost_build.compile_protos(&["./protos/encodings_v2_1.proto"], &["./protos"])?; diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index af87e068f9f..8b6a087fdcc 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -224,7 +224,7 @@ use futures::future::{maybe_done, BoxFuture, MaybeDone}; use futures::stream::{self, BoxStream}; use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; -use lance_core::cache::LanceCache; +use lance_core::cache::{DeepSizeOf, LanceCache}; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; use log::{debug, trace, warn}; use snafu::location; @@ -262,12 +262,37 @@ const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; /// A file should only use one or the other and never both. /// 2.0 decoders can always assume this is pb::ArrayEncoding /// and 2.1+ decoders can always assume this is pb::PageLayout -#[derive(Debug)] +#[derive(Debug, DeepSizeOf)] pub enum PageEncoding { Legacy(pb::ArrayEncoding), Structural(pb21::PageLayout), } +// Implement these manually because there isn't yet an implementation for bytes::Bytes +impl DeepSizeOf for pb::Fsst { + fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { + self.symbol_table.len() + } +} + +impl DeepSizeOf for pb::Constant { + fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { + self.value.len() + } +} + +impl DeepSizeOf for pb21::Fsst { + fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { + self.symbol_table.len() + } +} + +impl DeepSizeOf for pb21::Constant { + fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { + self.value.as_ref().map(|v| v.len()).unwrap_or(0) + } +} + impl PageEncoding { pub fn as_legacy(&self) -> &pb::ArrayEncoding { match self { @@ -291,7 +316,7 @@ impl PageEncoding { /// Metadata describing a page in a file /// /// This is typically created by reading the metadata section of a Lance file -#[derive(Debug)] +#[derive(Debug, DeepSizeOf)] pub struct PageInfo { /// The number of rows in the page pub num_rows: u64, @@ -308,7 +333,7 @@ pub struct PageInfo { /// Metadata describing a column in a file /// /// This is typically created by reading the metadata section of a Lance file -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub struct ColumnInfo { /// The index of the column in the file pub index: u32, diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index ec5f650bc0d..9c598ee5237 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -31,7 +31,7 @@ use prost::{Message, Name}; use snafu::location; use lance_core::{ - cache::LanceCache, + cache::{LanceCache, WeakLanceCache}, datatypes::{Field, Schema}, Error, Result, }; @@ -154,7 +154,7 @@ impl CachedFileMetadata { /// /// If users are not using the table format then they will need to figure /// out some way to do this themselves. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub struct ReaderProjection { /// The data types (schema) of the selected columns. The names /// of the schema are arbitrary and ignored. @@ -339,6 +339,12 @@ impl Default for FileReaderOptions { } } +impl DeepSizeOf for FileReaderOptions { + fn deep_size_of_children(&self, _context: &mut Context) -> usize { + 0 + } +} + #[derive(Debug)] pub struct FileReader { scheduler: Arc, @@ -347,9 +353,18 @@ pub struct FileReader { num_rows: u64, metadata: Arc, decoder_plugins: Arc, - cache: Arc, + cache: WeakLanceCache, options: FileReaderOptions, } + +impl DeepSizeOf for FileReader { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + // self.scheduler.deep_size_of_children(context) + self.base_projection.deep_size_of_children(context) + + self.options.deep_size_of_children(context) + } +} + #[derive(Debug)] struct Footer { #[allow(dead_code)] @@ -812,7 +827,7 @@ impl FileReader { cache: &LanceCache, options: FileReaderOptions, ) -> Result { - let cache = Arc::new(cache.with_key_prefix(path.as_ref())); + let cache = cache.with_key_prefix(path.as_ref()).into(); if let Some(base_projection) = base_projection.as_ref() { Self::validate_projection(base_projection, &file_metadata)?; @@ -905,7 +920,7 @@ impl FileReader { Self::do_read_range( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - self.cache.clone(), + Arc::new(self.cache.clone().upgrade()), self.num_rows, self.decoder_plugins.clone(), range, @@ -967,7 +982,7 @@ impl FileReader { Self::do_take_rows( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - self.cache.clone(), + Arc::new(self.cache.clone().upgrade()), self.decoder_plugins.clone(), indices, batch_size, @@ -1030,7 +1045,7 @@ impl FileReader { Self::do_read_ranges( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - self.cache.clone(), + Arc::new(self.cache.clone().upgrade()), self.decoder_plugins.clone(), ranges, batch_size, @@ -1186,7 +1201,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: self.cache.clone(), + cache: Arc::new(self.cache.clone().upgrade()), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), @@ -1225,7 +1240,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: self.cache.clone(), + cache: Arc::new(self.cache.clone().upgrade()), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), @@ -1264,7 +1279,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: self.cache.clone(), + cache: Arc::new(self.cache.clone().upgrade()), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 69b5ee35cf0..1ea4b3e3d41 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -178,7 +178,7 @@ pub trait IndexWriter: Send { /// Trait for reading an index (or parts of an index) from storage #[async_trait] -pub trait IndexReader: Send + Sync { +pub trait IndexReader: Send + Sync + DeepSizeOf { /// Read the n-th record batch from the file async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result; /// Read the range of rows from the file. diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index cd1081aab4b..f25d9d21564 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -9,7 +9,7 @@ use std::{ sync::Arc, }; -use crate::pbold; +use crate::{frag_reuse, pbold}; use arrow::array::BinaryBuilder; use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; @@ -59,7 +59,7 @@ const BITMAP_INDEX_VERSION: u32 = 0; // bitmaps are cached we don't open it. If we do open it we should only open it once. #[derive(Clone)] struct LazyIndexReader { - index_reader: Arc>>>, + index_reader: Arc>>, store: Arc, } @@ -71,21 +71,31 @@ impl std::fmt::Debug for LazyIndexReader { } } +impl DeepSizeOf for LazyIndexReader { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + let mut total_size = 0; + if let Some(reader) = self.index_reader.get() { + total_size += reader.deep_size_of_children(context); + } + total_size += self.index_reader.get().deep_size_of_children(context); + total_size + } +} + impl LazyIndexReader { fn new(store: Arc) -> Self { Self { - index_reader: Arc::new(tokio::sync::Mutex::new(None)), + index_reader: Arc::new(tokio::sync::OnceCell::new()), store, } } async fn get(&self) -> Result> { - let mut reader = self.index_reader.lock().await; - if reader.is_none() { - let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?; - *reader = Some(index_reader); - } - Ok(reader.as_ref().unwrap().clone()) + Ok(self + .index_reader + .get_or_try_init(|| async { self.store.open_index_file(BITMAP_LOOKUP_NAME).await }) + .await? + .clone()) } } @@ -302,9 +312,11 @@ impl DeepSizeOf for BitmapIndex { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { let mut total_size = 0; - total_size += dbg!(self.index_map.deep_size_of_children(context)); - total_size += dbg!(self.null_map.deep_size_of_children(context)); + total_size += self.index_map.deep_size_of_children(context); + total_size += self.null_map.deep_size_of_children(context); + total_size += self.lazy_reader.deep_size_of_children(context); // total_size += self.store.deep_size_of_children(context); + total_size } } diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index e356e463e81..b22a12f8e4a 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::sync::Weak; use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc}; use arrow::array::AsArray; diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index d2ac7e1fcb7..d76f4e045b6 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -9,6 +9,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use deepsize::DeepSizeOf; use futures::TryStreamExt; +use lance_core::cache::WeakLanceCache; use lance_core::{cache::LanceCache, Error, Result}; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; use lance_file::v2; @@ -35,7 +36,7 @@ use std::{any::Any, sync::Arc}; pub struct LanceIndexStore { object_store: Arc, index_dir: Path, - metadata_cache: Arc, + metadata_cache: WeakLanceCache, scheduler: Arc, } @@ -43,7 +44,6 @@ impl DeepSizeOf for LanceIndexStore { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { self.object_store.deep_size_of_children(context) + self.index_dir.as_ref().deep_size_of_children(context) - + self.metadata_cache.deep_size_of_children(context) } } @@ -61,7 +61,7 @@ impl LanceIndexStore { Self { object_store, index_dir, - metadata_cache, + metadata_cache: metadata_cache.as_ref().into(), scheduler, } } @@ -234,7 +234,7 @@ impl IndexStore for LanceIndexStore { file_scheduler, None, Arc::::default(), - &self.metadata_cache, + &self.metadata_cache.clone().upgrade(), FileReaderOptions::default(), ) .await @@ -247,7 +247,7 @@ impl IndexStore for LanceIndexStore { let file_reader = FileReader::try_new_self_described( &self.object_store, &path, - Some(&self.metadata_cache), + Some(&self.metadata_cache.clone().upgrade()), ) .await?; Ok(Arc::new(file_reader)) diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs index 56cd556fe5e..90b6f759fd9 100644 --- a/rust/lance/tests/resource_test/index_cache.rs +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -13,7 +13,6 @@ use super::utils::AllocTracker; use arrow::datatypes::UInt8Type; -use arrow_schema::{DataType, Field}; use lance::dataset::InsertBuilder; use lance_core::cache::LanceCache; use lance_datafusion::datagen::DatafusionDatagenExt; @@ -75,7 +74,7 @@ async fn test_cache_accounting( .saturating_sub(stats.total_bytes_allocated); let deviation = (expected_freed as isize - actual_freed).abs(); - dbg!((expected_freed, actual_freed)); + dbg!((type_name, expected_freed, actual_freed, deviation)); assert!( deviation <= tolerance_per_entry as isize, "{}: Entry (key: {:?}, type: {}): Expected to free {} bytes, but actually freed {} bytes (deviation: {}, tolerance: {}). Stats: alloc={}, dealloc={}", @@ -122,7 +121,7 @@ async fn test_label_list_index_cache_accounting() { // Create test data - list of uint8 values // Using larger dataset to get bigger cache entries: ~50MB - let batch_size = 1_000_000; + let batch_size = 100_000; let num_batches = BatchCount::from(50); let data = gen_batch() .col( @@ -161,8 +160,7 @@ async fn test_label_list_index_cache_accounting() { drop(dataset); }, "LabelListIndex", - // TODO: if we impl DeepSizeOf for FileReader, then we should be able to reduce this tolerance - 60_000, // 60KB tolerance per entry - accounts for cache overhead + 5_000, // 5KB tolerance per entry ) .await; } From 9211071d022d018ef7a28e2405dfd59950739996 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 15 Oct 2025 16:34:28 -0700 Subject: [PATCH 07/15] add btree and fts --- rust/lance-index/src/scalar/bitmap.rs | 2 +- rust/lance-index/src/scalar/btree.rs | 4 +- rust/lance/tests/resource_test/index_cache.rs | 100 +++++++++++++++++- 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index f25d9d21564..3745a43cccb 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -9,7 +9,7 @@ use std::{ sync::Arc, }; -use crate::{frag_reuse, pbold}; +use crate::pbold; use arrow::array::BinaryBuilder; use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index a9cd73dc5e5..4ee610ec566 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -766,7 +766,9 @@ impl DeepSizeOf for BTreeIndex { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { // We don't include the index cache, or anything stored in it. For example: // sub_index and fri. - self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context) + self.page_lookup.deep_size_of_children(context) + + self.store.deep_size_of_children(context) + + self.sub_index.deep_size_of_children(context) } } diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs index 90b6f759fd9..4424aae845b 100644 --- a/rust/lance/tests/resource_test/index_cache.rs +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -12,11 +12,12 @@ //! - Cache overhead miscalculations use super::utils::AllocTracker; -use arrow::datatypes::UInt8Type; +use arrow::datatypes::{UInt32Type, UInt8Type}; use lance::dataset::InsertBuilder; use lance_core::cache::LanceCache; use lance_datafusion::datagen::DatafusionDatagenExt; use lance_datagen::{array, gen_batch, BatchCount, RowCount}; +use lance_index::scalar::InvertedIndexParams; use lance_index::DatasetIndexExt; use lance_index::{scalar::ScalarIndexParams, IndexType}; use rand::seq::SliceRandom; @@ -164,3 +165,100 @@ async fn test_label_list_index_cache_accounting() { ) .await; } + +#[tokio::test] +async fn test_btree_index_cache_accounting() { + AllocTracker::init(); + + let batch_size = 100_000; + let num_batches = BatchCount::from(50); + let data = gen_batch() + .col("values", array::step::()) + .into_df_stream(RowCount::from(batch_size), num_batches); + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + + let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + dataset + .create_index_builder( + &["values"], + IndexType::Scalar, + &ScalarIndexParams::new("btree".to_string()), + ) + .await + .unwrap(); + + // Reload dataset to get fresh index with cache + let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + + // Access the index cache (now public) + let cache = (*dataset.index_cache).clone(); + + // Test cache accounting by prewarming the index + test_cache_accounting( + cache, + || async { + dataset.prewarm_index("values_idx").await.unwrap(); + drop(dataset); + }, + "BTreeIndex", + 5_000, // 5KB tolerance per entry + ) + .await; +} + +#[tokio::test] +async fn test_fts_index_cache_accounting() { + AllocTracker::init(); + + let batch_size = 10_000; + let num_batches = BatchCount::from(50); + // TODO: generate more realistic text data + let data = gen_batch() + .col( + "text", + array::rand_type(&arrow::datatypes::DataType::LargeUtf8), + ) + .into_df_stream(RowCount::from(batch_size), num_batches); + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + + let params = InvertedIndexParams::default(); + let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + dataset + .create_index_builder( + &["text"], + IndexType::Scalar, + &ScalarIndexParams::new("inverted".to_string()).with_params(¶ms), + ) + .await + .unwrap(); + + // Reload dataset to get fresh index with cache + let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); + + // Access the index cache (now public) + let cache = (*dataset.index_cache).clone(); + + // Test cache accounting by prewarming the index + test_cache_accounting( + cache, + || async { + dataset.prewarm_index("text_idx").await.unwrap(); + drop(dataset); + }, + "FTSIndex", + 20_000, // 20KB tolerance per entry - FTS indices are larger and more complex + ) + .await; +} From e24ce9c6c42ea165f8b7a21046ffd077ff6352dd Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 16 Oct 2025 14:15:17 -0700 Subject: [PATCH 08/15] docs --- rust/lance-encoding/build.rs | 4 +++- rust/lance/tests/README.md | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/lance-encoding/build.rs b/rust/lance-encoding/build.rs index 1c45a7da2dd..69eedb98bd6 100644 --- a/rust/lance-encoding/build.rs +++ b/rust/lance-encoding/build.rs @@ -15,7 +15,9 @@ fn main() -> Result<()> { prost_build.enable_type_names(); prost_build.bytes(["."]); // Enable Bytes type for all messages to avoid Vec clones. - // Implement DeepSizeOf so we can keep metadata in cache + // Implement DeepSizeOf so we can keep metadata in cache. + // Once https://github.com/nhtyy/deepsize2/pull/2 is merged and released, + // we can use that and just implement DeepSizeOf for `.` for path in &[ "lance.encodings.ColumnEncoding", "lance.encodings.Blob", diff --git a/rust/lance/tests/README.md b/rust/lance/tests/README.md index 74ee73ee97c..8d56b13b23c 100644 --- a/rust/lance/tests/README.md +++ b/rust/lance/tests/README.md @@ -10,6 +10,6 @@ bytehound on the binary, not on cargo, so we have to extract the test binary pat TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p') LD_PRELOAD=/usr/local/lib/libbytehound.so \ RUST_ALLOC_TIMINGS=true \ - $TEST_BINARY resource_test::index::test_label_list_lifecycle + $TEST_BINARY resource_test::index_cache::test_label_list_index_cache_accounting bytehound server memory-profiling_*.dat ``` From a016952518d220239b180987fd1161eb9e3a59a4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 4 Nov 2025 16:30:00 -0800 Subject: [PATCH 09/15] start to clean up tests --- rust/lance/tests/resource_test/index.rs | 91 ------------------- rust/lance/tests/resource_test/index_cache.rs | 20 ---- rust/lance/tests/resource_test/mod.rs | 1 - rust/lance/tests/resource_test/utils.rs | 31 +++++++ rust/lance/tests/resource_test/write.rs | 13 +-- 5 files changed, 35 insertions(+), 121 deletions(-) delete mode 100644 rust/lance/tests/resource_test/index.rs diff --git a/rust/lance/tests/resource_test/index.rs b/rust/lance/tests/resource_test/index.rs deleted file mode 100644 index 48850c33026..00000000000 --- a/rust/lance/tests/resource_test/index.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::sync::Arc; - -use super::utils::AllocTracker; -use all_asserts::assert_le; -use arrow_schema::{DataType, Field}; -use lance::dataset::InsertBuilder; -use lance_datafusion::datagen::DatafusionDatagenExt; -use lance_datagen::{array, gen_batch, BatchCount, RowCount}; -use lance_index::DatasetIndexExt; -use lance_index::{scalar::ScalarIndexParams, IndexType}; - -// Key things to test -// - Getting index stats requires reading only the metadata (no data read) -// - - -// Ops with index -// - Build -// - Load -// - get stats -// - - -#[tokio::test] -async fn test_label_list_lifecycle() { - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = tmp_dir.path().to_str().unwrap(); - // Create a stream of 100MB of data, in batches - { - // 12 bytes per list entry, average 5 entries per list -> 60 bytes per row - // 1MB / 60 = ~16k rows per batch - let batch_size = 16_000; - let num_batches = BatchCount::from(100); - let data = gen_batch() - .col( - "value", - array::rand_type(&DataType::List(Arc::new(Field::new( - "item", - DataType::UInt8, - false, - )))), - ) - .into_df_stream(RowCount::from(batch_size), num_batches); - let _ = InsertBuilder::new(tmp_path) - .execute_stream(data) - .await - .unwrap(); - } - - // Build index on column - // let io_tracking = todo!(); - let alloc_tracker = AllocTracker::new(); - { - let _guard = alloc_tracker.enter(); - let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - - dataset - .create_index_builder( - &["value"], - IndexType::Scalar, - &ScalarIndexParams::new("labellist".to_string()), - ) - .await - .unwrap(); - } - - let mem_stats = alloc_tracker.stats(); - assert_le!( - mem_stats.max_bytes_allocated, - 70 * 1024 * 1024, - "Memory usage too high" - ); - assert_le!( - mem_stats.total_bytes_allocated, - 400 * 1024 * 1024, - "Total memory allocation too high" - ); - // We do leak some memory for one-time initialization of DataFusion session. - assert_le!(mem_stats.net_bytes_allocated(), 300_000, "memory leak"); - - // Drop everything, assert no leak - - // Call load index - - // assert minimal IO and memory usage done - - // Drop everything, assert no leak - - // Call get stats - // Assert IO and memory are small - - // Drop everything, assert no leak -} diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs index 4424aae845b..9ef45bc487e 100644 --- a/rust/lance/tests/resource_test/index_cache.rs +++ b/rust/lance/tests/resource_test/index_cache.rs @@ -92,26 +92,6 @@ async fn test_cache_accounting( } } -// fn test_deep_size_of(value: impl deepsize::DeepSizeOf) { -// let tracker = AllocTracker::new(); -// let reported_size = deepsize::DeepSizeOf::deep_size_of(&value); -// { -// let _guard = tracker.enter(); -// drop(value); -// } -// let stats = tracker.stats(); -// let actual_freed = stats.total_bytes_deallocated.saturating_sub(stats.total_bytes_allocated) as usize; -// assert_eq!(reported_size, actual_freed); -// } - -// #[test] -// fn test_deep_size_of_label_list_index() { -// AllocTracker::init(); -// LabelListIndex::nemw -// let value = todo!(); -// test_deep_size_of(value); -// } - #[tokio::test] async fn test_label_list_index_cache_accounting() { AllocTracker::init(); diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs index 52f5fd388ed..5c6fd82641e 100644 --- a/rust/lance/tests/resource_test/mod.rs +++ b/rust/lance/tests/resource_test/mod.rs @@ -1,4 +1,3 @@ -mod index; mod index_cache; mod utils; mod write; diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs index c7c98a13217..f87178b7544 100644 --- a/rust/lance/tests/resource_test/utils.rs +++ b/rust/lance/tests/resource_test/utils.rs @@ -1,6 +1,7 @@ use std::alloc::System; use std::collections::HashMap; use std::sync::{Arc, LazyLock, Mutex, Once}; +use tracing::Instrument; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Registry; use tracking_allocator::{ @@ -162,3 +163,33 @@ fn check_memory_leak() { assert_eq!(stats.total_allocations, 2); assert_eq!(stats.net_bytes_allocated(), 1024 + 8); } + +#[tokio::test] +async fn check_test_spawn_alloc() { + let tracker = AllocTracker::new(); + { + let _guard = tracker.enter(); + let future1 = async { + let v = vec![0u8; 256 * 1024]; + drop(v); + }; + let handle = tokio::spawn(future1.in_current_span()); + let future2 = async { + let v = vec![0u8; 512 * 1024]; + drop(v); + }; + let handle2 = tokio::spawn(future2.in_current_span()); + handle.await.unwrap(); + handle2.await.unwrap(); + } + let stats = tracker.stats(); + assert_eq!(stats.total_allocations, 4); + assert_eq!( + stats.total_bytes_allocated >= (256 * 1024 + 512 * 1024), + true + ); + assert_eq!( + stats.total_bytes_deallocated >= (256 * 1024 + 512 * 1024), + true + ); +} diff --git a/rust/lance/tests/resource_test/write.rs b/rust/lance/tests/resource_test/write.rs index ace24c4279e..416a0e80fe3 100644 --- a/rust/lance/tests/resource_test/write.rs +++ b/rust/lance/tests/resource_test/write.rs @@ -5,13 +5,11 @@ use lance::dataset::InsertBuilder; use lance_datafusion::datagen::DatafusionDatagenExt; use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RoundingBehavior}; -// TODO: also add IO - #[tokio::test] async fn test_insert_memory() { // Create a stream of 100MB of data, in batches - let batch_size = 1024 * 1024; // 1MB - let num_batches = BatchCount::from(100); + let batch_size = 10 * 1024 * 1024; // 10MB + let num_batches = BatchCount::from(10); let data = gen_batch() .col("a", array::rand_type(&DataType::Int32)) .into_df_stream_bytes( @@ -35,14 +33,11 @@ async fn test_insert_memory() { } let stats = alloc_tracker.stats(); - // Allow for 15x the batch size to account for: - // - Allocator metadata overhead (wrapped_size vs object_size) - // - Internal buffering and temporary allocations - // - Arrow array overhead + // Allow for 2x the batch size to account for overheads. // The key test is that we don't load all 100MB into memory at once assert_le!( stats.max_bytes_allocated, - (batch_size * 15) as isize, + (batch_size * 2) as isize, "Max memory usage exceeded" ); } From 04ae6dbb7b9766b284c5310ed758b92cae89e137 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 5 Nov 2025 13:13:48 -0800 Subject: [PATCH 10/15] focus PR just on new framework --- Cargo.lock | 1 - python/Cargo.lock | 1 - rust/lance-core/src/cache.rs | 46 +--- rust/lance-encoding/Cargo.toml | 1 - rust/lance-encoding/build.rs | 65 ----- rust/lance-encoding/src/decoder.rs | 33 +-- rust/lance-file/src/v2/reader.rs | 35 +-- rust/lance-index/src/scalar.rs | 2 +- rust/lance-index/src/scalar/bitmap.rs | 42 +-- rust/lance-index/src/scalar/btree.rs | 4 +- rust/lance-index/src/scalar/lance_format.rs | 10 +- rust/lance/src/dataset.rs | 2 +- rust/lance/tests/resource_test/index_cache.rs | 244 ------------------ rust/lance/tests/resource_test/mod.rs | 1 - 14 files changed, 40 insertions(+), 447 deletions(-) delete mode 100644 rust/lance/tests/resource_test/index_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 1d1841291ae..af5507e813d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4485,7 +4485,6 @@ dependencies = [ "byteorder", "bytes", "criterion", - "deepsize", "fsst", "futures", "hex", diff --git a/python/Cargo.lock b/python/Cargo.lock index ed48354b1d3..c88deac1132 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4028,7 +4028,6 @@ dependencies = [ "bytemuck", "byteorder", "bytes", - "deepsize", "fsst", "futures", "hex", diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index c4491581702..d771fe84f68 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -22,15 +22,14 @@ type ArcAny = Arc; #[derive(Clone)] pub struct SizedRecord { - pub record: ArcAny, - pub size_accessor: Arc usize + Send + Sync>, - pub type_name: &'static str, + record: ArcAny, + size_accessor: Arc usize + Send + Sync>, } impl std::fmt::Debug for SizedRecord { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SizedRecord") - .field("type", &self.type_name) + .field("record", &self.record) .finish() } } @@ -43,14 +42,12 @@ impl DeepSizeOf for SizedRecord { impl SizedRecord { fn new(record: Arc) -> Self { - // Calculate size once and store it // +8 for the size of the Arc pointer itself let size_accessor = |record: &ArcAny| -> usize { record.downcast_ref::().unwrap().deep_size_of() + 8 }; Self { record, size_accessor: Arc::new(size_accessor), - type_name: std::any::type_name::(), } } } @@ -280,13 +277,6 @@ impl LanceCache { self.misses.store(0, Ordering::Relaxed); } - // For testing: get all entries in the cache - #[doc(hidden)] - pub async fn entries(&self) -> Vec<((String, TypeId), SizedRecord)> { - self.cache.run_pending_tasks().await; - self.cache.iter().map(|(k, v)| ((*k).clone(), v)).collect() - } - // CacheKey-based methods pub async fn insert_with_key(&self, cache_key: &K, metadata: Arc) where @@ -350,8 +340,9 @@ pub struct WeakLanceCache { misses: Arc, } -impl From<&LanceCache> for WeakLanceCache { - fn from(cache: &LanceCache) -> Self { +impl WeakLanceCache { + /// Create a weak reference from a strong LanceCache + pub fn from(cache: &LanceCache) -> Self { Self { inner: Arc::downgrade(&cache.cache), prefix: cache.prefix.clone(), @@ -359,31 +350,6 @@ impl From<&LanceCache> for WeakLanceCache { misses: cache.misses.clone(), } } -} - -impl From for WeakLanceCache { - fn from(cache: LanceCache) -> Self { - Self { - inner: Arc::downgrade(&cache.cache), - prefix: cache.prefix, - hits: cache.hits, - misses: cache.misses, - } - } -} - -impl WeakLanceCache { - pub fn upgrade(self) -> LanceCache { - let Some(cache) = self.inner.upgrade() else { - return LanceCache::no_cache(); - }; - LanceCache { - cache, - prefix: self.prefix, - hits: self.hits, - misses: self.misses, - } - } /// Appends a prefix to the cache key pub fn with_key_prefix(&self, prefix: &str) -> Self { diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index adfe25e85dc..27278667a6f 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -23,7 +23,6 @@ arrow-schema.workspace = true arrow-select.workspace = true lance-bitpacking = { workspace = true, optional = true } bytes.workspace = true -deepsize.workspace = true futures.workspace = true fsst.workspace = true hex = "0.4.3" diff --git a/rust/lance-encoding/build.rs b/rust/lance-encoding/build.rs index 69eedb98bd6..92bf497beeb 100644 --- a/rust/lance-encoding/build.rs +++ b/rust/lance-encoding/build.rs @@ -14,71 +14,6 @@ fn main() -> Result<()> { prost_build.protoc_arg("--experimental_allow_proto3_optional"); prost_build.enable_type_names(); prost_build.bytes(["."]); // Enable Bytes type for all messages to avoid Vec clones. - - // Implement DeepSizeOf so we can keep metadata in cache. - // Once https://github.com/nhtyy/deepsize2/pull/2 is merged and released, - // we can use that and just implement DeepSizeOf for `.` - for path in &[ - "lance.encodings.ColumnEncoding", - "lance.encodings.Blob", - "lance.encodings.ZoneIndex", - "lance.encodings.ArrayEncoding", - "lance.encodings.Flat", - "lance.encodings.Nullable", - "lance.encodings.FixedSizeList", - "lance.encodings.List", - "lance.encodings.Struct", - "lance.encodings.Binary", - "lance.encodings.Dictionary", - "lance.encodings.PackedStruct", - "lance.encodings.SimpleStruct", - "lance.encodings.Bitpacked", - "lance.encodings.FixedSizeBinary", - "lance.encodings.BitpackedForNonNeg", - "lance.encodings.InlineBitpacking", - "lance.encodings.OutOfLineBitpacking", - "lance.encodings.Variable", - "lance.encodings.PackedStructFixedWidthMiniBlock", - "lance.encodings.Block", - "lance.encodings.Rle", - "lance.encodings.GeneralMiniBlock", - "lance.encodings.ByteStreamSplit", - "lance.encodings.Buffer", - "lance.encodings.Compression", - "lance.encodings.Nullable.NoNull", - "lance.encodings.Nullable.AllNull", - "lance.encodings.Nullable.SomeNull", - "lance.encodings21.MiniBlockLayout", - "lance.encodings21.CompressiveEncoding", - "lance.encodings21.FullZipLayout", - "lance.encodings21.AllNullLayout", - "lance.encodings21.BlobLayout", - "lance.encodings21.PageLayout", - "lance.encodings21.BufferCompression", - "lance.encodings21.Flat", - "lance.encodings21.Variable", - "lance.encodings21.OutOfLineBitpacking", - "lance.encodings21.InlineBitpacking", - "lance.encodings21.Dictionary", - "lance.encodings21.Rle", - "lance.encodings21.FixedSizeList", - "lance.encodings21.PackedStruct", - "lance.encodings21.General", - "lance.encodings21.ByteStreamSplit", - ] { - prost_build.type_attribute(path, "#[derive(deepsize::DeepSizeOf)]"); - } - for path in &[ - "lance.encodings.ArrayEncoding.array_encoding", - "lance.encodings.ColumnEncoding.column_encoding", - "lance.encodings.Nullable.nullability", - "lance.encodings21.FullZipLayout.details", - "lance.encodings21.PageLayout.layout", - "lance.encodings21.CompressiveEncoding.compression", - ] { - prost_build.enum_attribute(path, "#[derive(deepsize::DeepSizeOf)]"); - } - prost_build.compile_protos(&["./protos/encodings_v2_0.proto"], &["./protos"])?; prost_build.compile_protos(&["./protos/encodings_v2_1.proto"], &["./protos"])?; diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 8b6a087fdcc..af87e068f9f 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -224,7 +224,7 @@ use futures::future::{maybe_done, BoxFuture, MaybeDone}; use futures::stream::{self, BoxStream}; use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; -use lance_core::cache::{DeepSizeOf, LanceCache}; +use lance_core::cache::LanceCache; use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD}; use log::{debug, trace, warn}; use snafu::location; @@ -262,37 +262,12 @@ const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024; /// A file should only use one or the other and never both. /// 2.0 decoders can always assume this is pb::ArrayEncoding /// and 2.1+ decoders can always assume this is pb::PageLayout -#[derive(Debug, DeepSizeOf)] +#[derive(Debug)] pub enum PageEncoding { Legacy(pb::ArrayEncoding), Structural(pb21::PageLayout), } -// Implement these manually because there isn't yet an implementation for bytes::Bytes -impl DeepSizeOf for pb::Fsst { - fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { - self.symbol_table.len() - } -} - -impl DeepSizeOf for pb::Constant { - fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { - self.value.len() - } -} - -impl DeepSizeOf for pb21::Fsst { - fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { - self.symbol_table.len() - } -} - -impl DeepSizeOf for pb21::Constant { - fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { - self.value.as_ref().map(|v| v.len()).unwrap_or(0) - } -} - impl PageEncoding { pub fn as_legacy(&self) -> &pb::ArrayEncoding { match self { @@ -316,7 +291,7 @@ impl PageEncoding { /// Metadata describing a page in a file /// /// This is typically created by reading the metadata section of a Lance file -#[derive(Debug, DeepSizeOf)] +#[derive(Debug)] pub struct PageInfo { /// The number of rows in the page pub num_rows: u64, @@ -333,7 +308,7 @@ pub struct PageInfo { /// Metadata describing a column in a file /// /// This is typically created by reading the metadata section of a Lance file -#[derive(Debug, Clone, DeepSizeOf)] +#[derive(Debug, Clone)] pub struct ColumnInfo { /// The index of the column in the file pub index: u32, diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 9c598ee5237..ec5f650bc0d 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -31,7 +31,7 @@ use prost::{Message, Name}; use snafu::location; use lance_core::{ - cache::{LanceCache, WeakLanceCache}, + cache::LanceCache, datatypes::{Field, Schema}, Error, Result, }; @@ -154,7 +154,7 @@ impl CachedFileMetadata { /// /// If users are not using the table format then they will need to figure /// out some way to do this themselves. -#[derive(Debug, Clone, DeepSizeOf)] +#[derive(Debug, Clone)] pub struct ReaderProjection { /// The data types (schema) of the selected columns. The names /// of the schema are arbitrary and ignored. @@ -339,12 +339,6 @@ impl Default for FileReaderOptions { } } -impl DeepSizeOf for FileReaderOptions { - fn deep_size_of_children(&self, _context: &mut Context) -> usize { - 0 - } -} - #[derive(Debug)] pub struct FileReader { scheduler: Arc, @@ -353,18 +347,9 @@ pub struct FileReader { num_rows: u64, metadata: Arc, decoder_plugins: Arc, - cache: WeakLanceCache, + cache: Arc, options: FileReaderOptions, } - -impl DeepSizeOf for FileReader { - fn deep_size_of_children(&self, context: &mut Context) -> usize { - // self.scheduler.deep_size_of_children(context) - self.base_projection.deep_size_of_children(context) - + self.options.deep_size_of_children(context) - } -} - #[derive(Debug)] struct Footer { #[allow(dead_code)] @@ -827,7 +812,7 @@ impl FileReader { cache: &LanceCache, options: FileReaderOptions, ) -> Result { - let cache = cache.with_key_prefix(path.as_ref()).into(); + let cache = Arc::new(cache.with_key_prefix(path.as_ref())); if let Some(base_projection) = base_projection.as_ref() { Self::validate_projection(base_projection, &file_metadata)?; @@ -920,7 +905,7 @@ impl FileReader { Self::do_read_range( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - Arc::new(self.cache.clone().upgrade()), + self.cache.clone(), self.num_rows, self.decoder_plugins.clone(), range, @@ -982,7 +967,7 @@ impl FileReader { Self::do_take_rows( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - Arc::new(self.cache.clone().upgrade()), + self.cache.clone(), self.decoder_plugins.clone(), indices, batch_size, @@ -1045,7 +1030,7 @@ impl FileReader { Self::do_read_ranges( self.collect_columns_from_projection(&projection)?, self.scheduler.clone(), - Arc::new(self.cache.clone().upgrade()), + self.cache.clone(), self.decoder_plugins.clone(), ranges, batch_size, @@ -1201,7 +1186,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: Arc::new(self.cache.clone().upgrade()), + cache: self.cache.clone(), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), @@ -1240,7 +1225,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: Arc::new(self.cache.clone().upgrade()), + cache: self.cache.clone(), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), @@ -1279,7 +1264,7 @@ impl FileReader { let config = SchedulerDecoderConfig { batch_size, - cache: Arc::new(self.cache.clone().upgrade()), + cache: self.cache.clone(), decoder_plugins: self.decoder_plugins.clone(), io: self.scheduler.clone(), decoder_config: self.options.decoder_config.clone(), diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 1ea4b3e3d41..69b5ee35cf0 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -178,7 +178,7 @@ pub trait IndexWriter: Send { /// Trait for reading an index (or parts of an index) from storage #[async_trait] -pub trait IndexReader: Send + Sync + DeepSizeOf { +pub trait IndexReader: Send + Sync { /// Read the n-th record batch from the file async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result; /// Read the range of rows from the file. diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 3745a43cccb..9f3779668f1 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -59,7 +59,7 @@ const BITMAP_INDEX_VERSION: u32 = 0; // bitmaps are cached we don't open it. If we do open it we should only open it once. #[derive(Clone)] struct LazyIndexReader { - index_reader: Arc>>, + index_reader: Arc>>>, store: Arc, } @@ -71,31 +71,21 @@ impl std::fmt::Debug for LazyIndexReader { } } -impl DeepSizeOf for LazyIndexReader { - fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { - let mut total_size = 0; - if let Some(reader) = self.index_reader.get() { - total_size += reader.deep_size_of_children(context); - } - total_size += self.index_reader.get().deep_size_of_children(context); - total_size - } -} - impl LazyIndexReader { fn new(store: Arc) -> Self { Self { - index_reader: Arc::new(tokio::sync::OnceCell::new()), + index_reader: Arc::new(tokio::sync::Mutex::new(None)), store, } } async fn get(&self) -> Result> { - Ok(self - .index_reader - .get_or_try_init(|| async { self.store.open_index_file(BITMAP_LOOKUP_NAME).await }) - .await? - .clone()) + let mut reader = self.index_reader.lock().await; + if reader.is_none() { + let index_reader = self.store.open_index_file(BITMAP_LOOKUP_NAME).await?; + *reader = Some(index_reader); + } + Ok(reader.as_ref().unwrap().clone()) } } @@ -114,7 +104,8 @@ pub struct BitmapIndex { value_type: DataType, - // store: Arc, + store: Arc, + index_cache: WeakLanceCache, frag_reuse_index: Option>, @@ -122,13 +113,6 @@ pub struct BitmapIndex { lazy_reader: LazyIndexReader, } -impl Drop for BitmapIndex { - fn drop(&mut self) { - println!("Dropping BitmapIndex"); - dbg!(DeepSizeOf::deep_size_of(self)); - } -} - #[derive(Debug, Clone)] pub struct BitmapKey { value: OrderableScalarValue, @@ -156,7 +140,7 @@ impl BitmapIndex { index_map, null_map, value_type, - // store, + store, index_cache, frag_reuse_index, lazy_reader, @@ -313,9 +297,7 @@ impl DeepSizeOf for BitmapIndex { let mut total_size = 0; total_size += self.index_map.deep_size_of_children(context); - total_size += self.null_map.deep_size_of_children(context); - total_size += self.lazy_reader.deep_size_of_children(context); - // total_size += self.store.deep_size_of_children(context); + total_size += self.store.deep_size_of_children(context); total_size } diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 4ee610ec566..a9cd73dc5e5 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -766,9 +766,7 @@ impl DeepSizeOf for BTreeIndex { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { // We don't include the index cache, or anything stored in it. For example: // sub_index and fri. - self.page_lookup.deep_size_of_children(context) - + self.store.deep_size_of_children(context) - + self.sub_index.deep_size_of_children(context) + self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context) } } diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index d76f4e045b6..d2ac7e1fcb7 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -9,7 +9,6 @@ use arrow_schema::Schema; use async_trait::async_trait; use deepsize::DeepSizeOf; use futures::TryStreamExt; -use lance_core::cache::WeakLanceCache; use lance_core::{cache::LanceCache, Error, Result}; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; use lance_file::v2; @@ -36,7 +35,7 @@ use std::{any::Any, sync::Arc}; pub struct LanceIndexStore { object_store: Arc, index_dir: Path, - metadata_cache: WeakLanceCache, + metadata_cache: Arc, scheduler: Arc, } @@ -44,6 +43,7 @@ impl DeepSizeOf for LanceIndexStore { fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { self.object_store.deep_size_of_children(context) + self.index_dir.as_ref().deep_size_of_children(context) + + self.metadata_cache.deep_size_of_children(context) } } @@ -61,7 +61,7 @@ impl LanceIndexStore { Self { object_store, index_dir, - metadata_cache: metadata_cache.as_ref().into(), + metadata_cache, scheduler, } } @@ -234,7 +234,7 @@ impl IndexStore for LanceIndexStore { file_scheduler, None, Arc::::default(), - &self.metadata_cache.clone().upgrade(), + &self.metadata_cache, FileReaderOptions::default(), ) .await @@ -247,7 +247,7 @@ impl IndexStore for LanceIndexStore { let file_reader = FileReader::try_new_self_described( &self.object_store, &path, - Some(&self.metadata_cache.clone().upgrade()), + Some(&self.metadata_cache), ) .await?; Ok(Arc::new(file_reader)) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c1478901e97..3adbd30bda6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -157,7 +157,7 @@ pub struct Dataset { pub(crate) fragment_bitmap: Arc, // These are references to session caches, but with the dataset URI as a prefix. - pub index_cache: Arc, + pub(crate) index_cache: Arc, pub(crate) metadata_cache: Arc, /// File reader options to use when reading data files. diff --git a/rust/lance/tests/resource_test/index_cache.rs b/rust/lance/tests/resource_test/index_cache.rs deleted file mode 100644 index 9ef45bc487e..00000000000 --- a/rust/lance/tests/resource_test/index_cache.rs +++ /dev/null @@ -1,244 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -//! Tests to validate IndexCache memory accounting by creating real indices, -//! prewarming them to populate the cache, then evicting entries one-by-one -//! and verifying that memory reduction matches DeepSizeOf estimates. -//! -//! This approach tests DeepSizeOf in realistic conditions where items are -//! actually cached, catching issues like: -//! - Arc sharing and double-counting -//! - Trait object DeepSizeOf under-counting -//! - Cache overhead miscalculations - -use super::utils::AllocTracker; -use arrow::datatypes::{UInt32Type, UInt8Type}; -use lance::dataset::InsertBuilder; -use lance_core::cache::LanceCache; -use lance_datafusion::datagen::DatafusionDatagenExt; -use lance_datagen::{array, gen_batch, BatchCount, RowCount}; -use lance_index::scalar::InvertedIndexParams; -use lance_index::DatasetIndexExt; -use lance_index::{scalar::ScalarIndexParams, IndexType}; -use rand::seq::SliceRandom; -use std::sync::Arc; - -/// Test framework that validates DeepSizeOf by creating an index, prewarming cache, -/// then evicting entries one-by-one and verifying memory reduction -/// -/// # Arguments -/// * `cache` - The cache instance to test -/// * `prewarm_fn` - Function to call that populates the cache -/// * `test_name` - Name of the test for error messages -/// * `tolerance_per_entry` - Acceptable deviation in bytes per cache entry -async fn test_cache_accounting( - cache: LanceCache, - prewarm_fn: F, - test_name: &str, - tolerance_per_entry: usize, -) where - F: FnOnce() -> Fut, - Fut: std::future::Future, -{ - prewarm_fn().await; - if cache.size().await == 0 { - panic!("{}: Cache is empty after prewarm!", test_name); - } - - let mut entries = cache.entries().await; - entries.shuffle(&mut rand::rng()); - drop(cache); - - for ((key, _), entry) in entries { - assert_eq!( - Arc::strong_count(&entry.record), - 1, - "{}: Entry for key {:?} has unexpected strong count {}", - test_name, - key, - Arc::strong_count(&entry.record) - ); - let expected_freed = deepsize::DeepSizeOf::deep_size_of(&entry); - let type_name = entry.type_name; - - let tracker = AllocTracker::new(); - { - let _guard = tracker.enter(); - // Evict the entry - this should free memory - drop(entry); - } - let stats = tracker.stats(); - - // Actual memory freed = deallocations - allocations during eviction - let actual_freed = stats - .total_bytes_deallocated - .saturating_sub(stats.total_bytes_allocated); - - let deviation = (expected_freed as isize - actual_freed).abs(); - dbg!((type_name, expected_freed, actual_freed, deviation)); - assert!( - deviation <= tolerance_per_entry as isize, - "{}: Entry (key: {:?}, type: {}): Expected to free {} bytes, but actually freed {} bytes (deviation: {}, tolerance: {}). Stats: alloc={}, dealloc={}", - test_name, - key, - type_name, - expected_freed, - actual_freed, - deviation, - tolerance_per_entry, - stats.total_bytes_allocated, - stats.total_bytes_deallocated, - ); - } -} - -#[tokio::test] -async fn test_label_list_index_cache_accounting() { - AllocTracker::init(); - - // Create a dataset with a label list (inverted) index - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = tmp_dir.path().to_str().unwrap(); - - // Create test data - list of uint8 values - // Using larger dataset to get bigger cache entries: ~50MB - let batch_size = 100_000; - let num_batches = BatchCount::from(50); - let data = gen_batch() - .col( - "labels", - array::rand_list_any(array::cycle::(vec![1u8, 2]), false), - ) - .into_df_stream(RowCount::from(batch_size), num_batches); - - InsertBuilder::new(tmp_path) - .execute_stream(data) - .await - .unwrap(); - - // Build label list index - let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - dataset - .create_index_builder( - &["labels"], - IndexType::Scalar, - &ScalarIndexParams::new("labellist".to_string()), - ) - .await - .unwrap(); - - // Reload dataset to get fresh index with cache - let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - - // Access the index cache (now public) - let cache = (*dataset.index_cache).clone(); - - // Test cache accounting by prewarming the index - test_cache_accounting( - cache, - || async { - dataset.prewarm_index("labels_idx").await.unwrap(); - drop(dataset); - }, - "LabelListIndex", - 5_000, // 5KB tolerance per entry - ) - .await; -} - -#[tokio::test] -async fn test_btree_index_cache_accounting() { - AllocTracker::init(); - - let batch_size = 100_000; - let num_batches = BatchCount::from(50); - let data = gen_batch() - .col("values", array::step::()) - .into_df_stream(RowCount::from(batch_size), num_batches); - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = tmp_dir.path().to_str().unwrap(); - InsertBuilder::new(tmp_path) - .execute_stream(data) - .await - .unwrap(); - - let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - dataset - .create_index_builder( - &["values"], - IndexType::Scalar, - &ScalarIndexParams::new("btree".to_string()), - ) - .await - .unwrap(); - - // Reload dataset to get fresh index with cache - let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - - // Access the index cache (now public) - let cache = (*dataset.index_cache).clone(); - - // Test cache accounting by prewarming the index - test_cache_accounting( - cache, - || async { - dataset.prewarm_index("values_idx").await.unwrap(); - drop(dataset); - }, - "BTreeIndex", - 5_000, // 5KB tolerance per entry - ) - .await; -} - -#[tokio::test] -async fn test_fts_index_cache_accounting() { - AllocTracker::init(); - - let batch_size = 10_000; - let num_batches = BatchCount::from(50); - // TODO: generate more realistic text data - let data = gen_batch() - .col( - "text", - array::rand_type(&arrow::datatypes::DataType::LargeUtf8), - ) - .into_df_stream(RowCount::from(batch_size), num_batches); - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = tmp_dir.path().to_str().unwrap(); - InsertBuilder::new(tmp_path) - .execute_stream(data) - .await - .unwrap(); - - let params = InvertedIndexParams::default(); - let mut dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - dataset - .create_index_builder( - &["text"], - IndexType::Scalar, - &ScalarIndexParams::new("inverted".to_string()).with_params(¶ms), - ) - .await - .unwrap(); - - // Reload dataset to get fresh index with cache - let dataset = lance::dataset::Dataset::open(tmp_path).await.unwrap(); - - // Access the index cache (now public) - let cache = (*dataset.index_cache).clone(); - - // Test cache accounting by prewarming the index - test_cache_accounting( - cache, - || async { - dataset.prewarm_index("text_idx").await.unwrap(); - drop(dataset); - }, - "FTSIndex", - 20_000, // 20KB tolerance per entry - FTS indices are larger and more complex - ) - .await; -} diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs index 5c6fd82641e..c0360bd6c20 100644 --- a/rust/lance/tests/resource_test/mod.rs +++ b/rust/lance/tests/resource_test/mod.rs @@ -1,3 +1,2 @@ -mod index_cache; mod utils; mod write; From fe8a7e2b2a6aed6ace57a79d9149f9c9881b1510 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 5 Nov 2025 13:17:32 -0800 Subject: [PATCH 11/15] license header --- rust/lance/tests/resource_test/mod.rs | 2 ++ rust/lance/tests/resource_test/utils.rs | 2 ++ rust/lance/tests/resource_test/write.rs | 2 ++ rust/lance/tests/resource_tests.rs | 2 ++ 4 files changed, 8 insertions(+) diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs index c0360bd6c20..80ec1ab9d20 100644 --- a/rust/lance/tests/resource_test/mod.rs +++ b/rust/lance/tests/resource_test/mod.rs @@ -1,2 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors mod utils; mod write; diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs index f87178b7544..526d0e2e7e8 100644 --- a/rust/lance/tests/resource_test/utils.rs +++ b/rust/lance/tests/resource_test/utils.rs @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors use std::alloc::System; use std::collections::HashMap; use std::sync::{Arc, LazyLock, Mutex, Once}; diff --git a/rust/lance/tests/resource_test/write.rs b/rust/lance/tests/resource_test/write.rs index 416a0e80fe3..9618dbc366c 100644 --- a/rust/lance/tests/resource_test/write.rs +++ b/rust/lance/tests/resource_test/write.rs @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors use super::utils::AllocTracker; use all_asserts::assert_le; use arrow_schema::DataType; diff --git a/rust/lance/tests/resource_tests.rs b/rust/lance/tests/resource_tests.rs index 1b4267ce9ec..0b64c2bd082 100644 --- a/rust/lance/tests/resource_tests.rs +++ b/rust/lance/tests/resource_tests.rs @@ -1 +1,3 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors mod resource_test; From 9fafe92ff1f4586f81bd97a87b7019c68cdb620d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 5 Nov 2025 14:10:57 -0800 Subject: [PATCH 12/15] clippy and cleanup --- rust/lance/tests/README.md | 6 +++++- rust/lance/tests/resource_test/utils.rs | 14 ++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/rust/lance/tests/README.md b/rust/lance/tests/README.md index 8d56b13b23c..396a4254216 100644 --- a/rust/lance/tests/README.md +++ b/rust/lance/tests/README.md @@ -6,10 +6,14 @@ Once you've identified a test that is using too much memory, you can use bytehound to find the source of the memory usage. (Note: we need to run bytehound on the binary, not on cargo, so we have to extract the test binary path.) +The `RUST_ALLOC_TIMINGS` environment variable tells the tracking allocator +to logs the start and end of each allocation tracking session, which makes it +easier to correlate the bytehound output with the code. + ```shell TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p') LD_PRELOAD=/usr/local/lib/libbytehound.so \ RUST_ALLOC_TIMINGS=true \ - $TEST_BINARY resource_test::index_cache::test_label_list_index_cache_accounting + $TEST_BINARY resource_test::write::test_memory_usage_write \ bytehound server memory-profiling_*.dat ``` diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs index 526d0e2e7e8..cefd59afcc7 100644 --- a/rust/lance/tests/resource_test/utils.rs +++ b/rust/lance/tests/resource_test/utils.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use all_asserts::assert_ge; use std::alloc::System; use std::collections::HashMap; use std::sync::{Arc, LazyLock, Mutex, Once}; @@ -72,6 +73,9 @@ impl AllocationTracker for MemoryTracker { } let mut guard = GLOBAL_STATS.lock().unwrap(); let stats = guard.entry(group_id).or_default(); + // Track size of wrapped allocation and not total allocation size. The + // tracking_allocator keeps some bookkeeping data in addition to the requested + // allocation, which we don't want to count here. stats.total_bytes_deallocated += wrapped_size as isize; stats.total_deallocations += 1; } @@ -186,12 +190,6 @@ async fn check_test_spawn_alloc() { } let stats = tracker.stats(); assert_eq!(stats.total_allocations, 4); - assert_eq!( - stats.total_bytes_allocated >= (256 * 1024 + 512 * 1024), - true - ); - assert_eq!( - stats.total_bytes_deallocated >= (256 * 1024 + 512 * 1024), - true - ); + assert_ge!(stats.total_bytes_allocated, 256 * 1024 + 512 * 1024); + assert_ge!(stats.total_bytes_deallocated, 256 * 1024 + 512 * 1024); } From c27a2ed4b84bfe952395ce705c5c7f9cc6d62410 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 5 Nov 2025 15:45:43 -0800 Subject: [PATCH 13/15] exclude macos --- rust/lance/tests/resource_tests.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/lance/tests/resource_tests.rs b/rust/lance/tests/resource_tests.rs index 0b64c2bd082..886d93d0b9d 100644 --- a/rust/lance/tests/resource_tests.rs +++ b/rust/lance/tests/resource_tests.rs @@ -1,3 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors + +// The memory tests don't work currently on MacOS because they rely on thread +// local storage in the allocator, which seems to have some issues on MacOS. +#[cfg(not(target_os = "macos"))] mod resource_test; From f8681b1e19d486bf49d89b2c07a4b9c1d5fa5840 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 6 Nov 2025 09:04:39 -0800 Subject: [PATCH 14/15] simplify --- rust/lance/tests/resource_test/utils.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs index cefd59afcc7..4035cddc806 100644 --- a/rust/lance/tests/resource_test/utils.rs +++ b/rust/lance/tests/resource_test/utils.rs @@ -39,8 +39,8 @@ impl AllocationTracker for MemoryTracker { fn allocated( &self, _addr: usize, - _object_size: usize, - wrapped_size: usize, + object_size: usize, + _wrapped_size: usize, group_id: AllocationGroupId, ) { if group_id == AllocationGroupId::ROOT { @@ -49,7 +49,7 @@ impl AllocationTracker for MemoryTracker { } let mut guard = GLOBAL_STATS.lock().unwrap(); let stats = guard.entry(group_id).or_default(); - stats.total_bytes_allocated += wrapped_size as isize; + stats.total_bytes_allocated += object_size as isize; stats.total_allocations += 1; stats.max_bytes_allocated = stats.max_bytes_allocated.max(stats.net_bytes_allocated()); } @@ -57,8 +57,8 @@ impl AllocationTracker for MemoryTracker { fn deallocated( &self, _addr: usize, - _object_size: usize, - wrapped_size: usize, + object_size: usize, + _wrapped_size: usize, source_group_id: AllocationGroupId, current_group_id: AllocationGroupId, ) { @@ -73,10 +73,7 @@ impl AllocationTracker for MemoryTracker { } let mut guard = GLOBAL_STATS.lock().unwrap(); let stats = guard.entry(group_id).or_default(); - // Track size of wrapped allocation and not total allocation size. The - // tracking_allocator keeps some bookkeeping data in addition to the requested - // allocation, which we don't want to count here. - stats.total_bytes_deallocated += wrapped_size as isize; + stats.total_bytes_deallocated += object_size as isize; stats.total_deallocations += 1; } } @@ -163,11 +160,11 @@ fn check_memory_leak() { drop(v); } let stats = tracker.stats(); - assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024 + 16); - assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024 + 16); - assert_eq!(stats.total_bytes_deallocated, (1024 * 1024) + 8); + assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024); + assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024); + assert_eq!(stats.total_bytes_deallocated, (1024 * 1024)); assert_eq!(stats.total_allocations, 2); - assert_eq!(stats.net_bytes_allocated(), 1024 + 8); + assert_eq!(stats.net_bytes_allocated(), 1024); } #[tokio::test] From bdd5b9d1cdaa7f968a7cfbcb9f6c638f7c71dd50 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 6 Nov 2025 11:32:12 -0800 Subject: [PATCH 15/15] just run on linux for now --- rust/lance/tests/resource_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/tests/resource_tests.rs b/rust/lance/tests/resource_tests.rs index 886d93d0b9d..b48ab8e5729 100644 --- a/rust/lance/tests/resource_tests.rs +++ b/rust/lance/tests/resource_tests.rs @@ -3,5 +3,5 @@ // The memory tests don't work currently on MacOS because they rely on thread // local storage in the allocator, which seems to have some issues on MacOS. -#[cfg(not(target_os = "macos"))] +#[cfg(target_os = "linux")] mod resource_test;