diff --git a/Cargo.lock b/Cargo.lock index 9cb8d37275d..af5507e813d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4350,6 +4350,7 @@ dependencies = [ "tracing", "tracing-chrome", "tracing-subscriber", + "tracking-allocator", "url", "uuid", ] @@ -8667,6 +8668,16 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracking-allocator" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b61e0cb3385e17df7db29c565b40fd0350dfe8a076c7eea83d416e30cfd0581" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/rust/lance-datafusion/src/datagen.rs b/rust/lance-datafusion/src/datagen.rs index 70b07b9a20b..e3370088a2a 100644 --- a/rust/lance-datafusion/src/datagen.rs +++ b/rust/lance-datafusion/src/datagen.rs @@ -3,13 +3,15 @@ use std::sync::Arc; +use arrow_array::RecordBatchReader; use datafusion::{ execution::SendableRecordBatchStream, physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan}, }; use datafusion_common::DataFusionError; use futures::TryStreamExt; -use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount}; +use lance_core::Error; +use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RoundingBehavior, RowCount}; use crate::exec::OneShotExec; @@ -20,6 +22,13 @@ pub trait DatafusionDatagenExt { num_batches: BatchCount, ) -> SendableRecordBatchStream; + fn into_df_stream_bytes( + self, + batch_size: ByteCount, + num_batches: BatchCount, + rounding_behavior: RoundingBehavior, + ) -> Result; + fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc; } @@ -34,6 +43,18 @@ impl DatafusionDatagenExt for BatchGeneratorBuilder { Box::pin(RecordBatchStreamAdapter::new(schema, stream)) } + fn into_df_stream_bytes( + self, + batch_size: ByteCount, + num_batches: BatchCount, + rounding_behavior: RoundingBehavior, + ) -> Result { + let stream = self.into_reader_bytes(batch_size, num_batches, rounding_behavior)?; + let schema = stream.schema(); + let stream = futures::stream::iter(stream).map_err(DataFusionError::from); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc { let stream = self.into_df_stream(batch_size, num_batches); Arc::new(OneShotExec::new(stream)) diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 548f7c33312..a738758532d 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -103,6 +103,7 @@ tempfile.workspace = true test-log.workspace = true tracing-chrome = "0.7.1" rstest = { workspace = true } +tracking-allocator = { version = "0.4", features = ["tracing-compat"] } # For S3 / DynamoDB tests aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } diff --git a/rust/lance/tests/README.md b/rust/lance/tests/README.md new file mode 100644 index 00000000000..396a4254216 --- /dev/null +++ b/rust/lance/tests/README.md @@ -0,0 +1,19 @@ +Tests for memory and IO usage. + +## Debugging memory usage + +Once you've identified a test that is using too much memory, you can use +bytehound to find the source of the memory usage. (Note: we need to run +bytehound on the binary, not on cargo, so we have to extract the test binary path.) + +The `RUST_ALLOC_TIMINGS` environment variable tells the tracking allocator +to logs the start and end of each allocation tracking session, which makes it +easier to correlate the bytehound output with the code. + +```shell +TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p') +LD_PRELOAD=/usr/local/lib/libbytehound.so \ + RUST_ALLOC_TIMINGS=true \ + $TEST_BINARY resource_test::write::test_memory_usage_write \ +bytehound server memory-profiling_*.dat +``` diff --git a/rust/lance/tests/resource_test/mod.rs b/rust/lance/tests/resource_test/mod.rs new file mode 100644 index 00000000000..80ec1ab9d20 --- /dev/null +++ b/rust/lance/tests/resource_test/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +mod utils; +mod write; diff --git a/rust/lance/tests/resource_test/utils.rs b/rust/lance/tests/resource_test/utils.rs new file mode 100644 index 00000000000..4035cddc806 --- /dev/null +++ b/rust/lance/tests/resource_test/utils.rs @@ -0,0 +1,192 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +use all_asserts::assert_ge; +use std::alloc::System; +use std::collections::HashMap; +use std::sync::{Arc, LazyLock, Mutex, Once}; +use tracing::Instrument; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::Registry; +use tracking_allocator::{ + AllocationGroupId, AllocationGroupToken, AllocationLayer, AllocationRegistry, + AllocationTracker, Allocator, +}; + +#[global_allocator] +static GLOBAL: Allocator = Allocator::system(); + +#[derive(Default, Clone, Debug)] +pub struct AllocStats { + pub max_bytes_allocated: isize, + pub total_bytes_allocated: isize, + pub total_bytes_deallocated: isize, + pub total_allocations: usize, + pub total_deallocations: usize, +} + +impl AllocStats { + pub fn net_bytes_allocated(&self) -> isize { + self.total_bytes_allocated - self.total_bytes_deallocated + } +} + +static GLOBAL_STATS: LazyLock>>> = + std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new()))); + +struct MemoryTracker; + +impl AllocationTracker for MemoryTracker { + fn allocated( + &self, + _addr: usize, + object_size: usize, + _wrapped_size: usize, + group_id: AllocationGroupId, + ) { + if group_id == AllocationGroupId::ROOT { + // We don't track root allocations + return; + } + let mut guard = GLOBAL_STATS.lock().unwrap(); + let stats = guard.entry(group_id).or_default(); + stats.total_bytes_allocated += object_size as isize; + stats.total_allocations += 1; + stats.max_bytes_allocated = stats.max_bytes_allocated.max(stats.net_bytes_allocated()); + } + + fn deallocated( + &self, + _addr: usize, + object_size: usize, + _wrapped_size: usize, + source_group_id: AllocationGroupId, + current_group_id: AllocationGroupId, + ) { + let group_id = if source_group_id != AllocationGroupId::ROOT { + source_group_id + } else { + current_group_id + }; + if group_id == AllocationGroupId::ROOT { + // We don't track root allocations + return; + } + let mut guard = GLOBAL_STATS.lock().unwrap(); + let stats = guard.entry(group_id).or_default(); + stats.total_bytes_deallocated += object_size as isize; + stats.total_deallocations += 1; + } +} + +static INIT: Once = Once::new(); + +// The alloc tracker holds a span and an associated allocation group id. +pub struct AllocTracker { + group_id: AllocationGroupId, + span: tracing::Span, +} + +impl AllocTracker { + pub fn init() { + INIT.call_once(init_memory_tracking); + } + + pub fn new() -> Self { + Self::init(); + + let token = AllocationGroupToken::register().expect("failed to register token"); + let group_id = token.id(); + + let span = tracing::span!(tracing::Level::INFO, "AllocTracker"); + token.attach_to_span(&span); + + Self { group_id, span } + } + + pub fn enter(&self) -> AllocGuard<'_> { + AllocGuard::new(self) + } + + pub fn stats(self) -> AllocStats { + let mut stats = GLOBAL_STATS.lock().unwrap(); + stats.remove(&self.group_id).unwrap_or_default() + } +} + +pub struct AllocGuard<'a> { + _guard: tracing::span::Entered<'a>, +} + +impl<'a> AllocGuard<'a> { + #[allow(clippy::print_stderr)] + pub fn new(tracker: &'a AllocTracker) -> Self { + if std::env::var("RUST_ALLOC_TIMINGS").is_ok() { + eprintln!("alloc:enter:{}", chrono::Utc::now().to_rfc3339()); + } + AllocGuard { + _guard: tracker.span.enter(), + } + } +} + +impl Drop for AllocGuard<'_> { + #[allow(clippy::print_stderr)] + fn drop(&mut self) { + if std::env::var("RUST_ALLOC_TIMINGS").is_ok() { + eprintln!("alloc:exit:{}", chrono::Utc::now().to_rfc3339()); + } + } +} + +pub fn init_memory_tracking() { + let registry = Registry::default().with(AllocationLayer::new()); + tracing::subscriber::set_global_default(registry) + .expect("failed to install tracing subscriber"); + + let tracker = MemoryTracker; + AllocationRegistry::set_global_tracker(tracker).expect("failed to set global tracker"); + AllocationRegistry::enable_tracking(); +} + +#[test] +fn check_memory_leak() { + // Make sure AllocTracker can detect leaks + let mut leaked = Vec::new(); + let tracker = AllocTracker::new(); + { + let _guard = tracker.enter(); + let v = vec![0u8; 1024 * 1024]; + leaked.resize(1024, 0u8); + drop(v); + } + let stats = tracker.stats(); + assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024); + assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024); + assert_eq!(stats.total_bytes_deallocated, (1024 * 1024)); + assert_eq!(stats.total_allocations, 2); + assert_eq!(stats.net_bytes_allocated(), 1024); +} + +#[tokio::test] +async fn check_test_spawn_alloc() { + let tracker = AllocTracker::new(); + { + let _guard = tracker.enter(); + let future1 = async { + let v = vec![0u8; 256 * 1024]; + drop(v); + }; + let handle = tokio::spawn(future1.in_current_span()); + let future2 = async { + let v = vec![0u8; 512 * 1024]; + drop(v); + }; + let handle2 = tokio::spawn(future2.in_current_span()); + handle.await.unwrap(); + handle2.await.unwrap(); + } + let stats = tracker.stats(); + assert_eq!(stats.total_allocations, 4); + assert_ge!(stats.total_bytes_allocated, 256 * 1024 + 512 * 1024); + assert_ge!(stats.total_bytes_deallocated, 256 * 1024 + 512 * 1024); +} diff --git a/rust/lance/tests/resource_test/write.rs b/rust/lance/tests/resource_test/write.rs new file mode 100644 index 00000000000..9618dbc366c --- /dev/null +++ b/rust/lance/tests/resource_test/write.rs @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +use super::utils::AllocTracker; +use all_asserts::assert_le; +use arrow_schema::DataType; +use lance::dataset::InsertBuilder; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RoundingBehavior}; + +#[tokio::test] +async fn test_insert_memory() { + // Create a stream of 100MB of data, in batches + let batch_size = 10 * 1024 * 1024; // 10MB + let num_batches = BatchCount::from(10); + let data = gen_batch() + .col("a", array::rand_type(&DataType::Int32)) + .into_df_stream_bytes( + ByteCount::from(batch_size), + num_batches, + RoundingBehavior::RoundDown, + ) + .unwrap(); + + let alloc_tracker = AllocTracker::new(); + { + let _guard = alloc_tracker.enter(); + + // write out to temporary directory + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap(); + let _dataset = InsertBuilder::new(tmp_path) + .execute_stream(data) + .await + .unwrap(); + } + + let stats = alloc_tracker.stats(); + // Allow for 2x the batch size to account for overheads. + // The key test is that we don't load all 100MB into memory at once + assert_le!( + stats.max_bytes_allocated, + (batch_size * 2) as isize, + "Max memory usage exceeded" + ); +} diff --git a/rust/lance/tests/resource_tests.rs b/rust/lance/tests/resource_tests.rs new file mode 100644 index 00000000000..b48ab8e5729 --- /dev/null +++ b/rust/lance/tests/resource_tests.rs @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +// The memory tests don't work currently on MacOS because they rely on thread +// local storage in the allocator, which seems to have some issues on MacOS. +#[cfg(target_os = "linux")] +mod resource_test;