Skip to content
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion rust/lance-datafusion/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

use std::sync::Arc;

use arrow_array::RecordBatchReader;
use datafusion::{
execution::SendableRecordBatchStream,
physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan},
};
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
use lance_core::Error;
use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RoundingBehavior, RowCount};

use crate::exec::OneShotExec;

Expand All @@ -20,6 +22,13 @@ pub trait DatafusionDatagenExt {
num_batches: BatchCount,
) -> SendableRecordBatchStream;

fn into_df_stream_bytes(
self,
batch_size: ByteCount,
num_batches: BatchCount,
rounding_behavior: RoundingBehavior,
) -> Result<SendableRecordBatchStream, Error>;

fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan>;
}

Expand All @@ -34,6 +43,18 @@ impl DatafusionDatagenExt for BatchGeneratorBuilder {
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}

fn into_df_stream_bytes(
self,
batch_size: ByteCount,
num_batches: BatchCount,
rounding_behavior: RoundingBehavior,
) -> Result<SendableRecordBatchStream, Error> {
let stream = self.into_reader_bytes(batch_size, num_batches, rounding_behavior)?;
let schema = stream.schema();
let stream = futures::stream::iter(stream).map_err(DataFusionError::from);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan> {
let stream = self.into_df_stream(batch_size, num_batches);
Arc::new(OneShotExec::new(stream))
Expand Down
1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ tempfile.workspace = true
test-log.workspace = true
tracing-chrome = "0.7.1"
rstest = { workspace = true }
tracking-allocator = { version = "0.4", features = ["tracing-compat"] }
# For S3 / DynamoDB tests
aws-config = { workspace = true }
aws-sdk-s3 = { workspace = true }
Expand Down
19 changes: 19 additions & 0 deletions rust/lance/tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Tests for memory and IO usage.

## Debugging memory usage

Once you've identified a test that is using too much memory, you can use
bytehound to find the source of the memory usage. (Note: we need to run
bytehound on the binary, not on cargo, so we have to extract the test binary path.)

The `RUST_ALLOC_TIMINGS` environment variable tells the tracking allocator
to logs the start and end of each allocation tracking session, which makes it
easier to correlate the bytehound output with the code.

```shell
TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p')
LD_PRELOAD=/usr/local/lib/libbytehound.so \
RUST_ALLOC_TIMINGS=true \
$TEST_BINARY resource_test::write::test_memory_usage_write \
bytehound server memory-profiling_*.dat
```
4 changes: 4 additions & 0 deletions rust/lance/tests/resource_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
mod utils;
mod write;
192 changes: 192 additions & 0 deletions rust/lance/tests/resource_test/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use all_asserts::assert_ge;
use std::alloc::System;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock, Mutex, Once};
use tracing::Instrument;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
use tracking_allocator::{
AllocationGroupId, AllocationGroupToken, AllocationLayer, AllocationRegistry,
AllocationTracker, Allocator,
};

#[global_allocator]
static GLOBAL: Allocator<System> = Allocator::system();

#[derive(Default, Clone, Debug)]
pub struct AllocStats {
pub max_bytes_allocated: isize,
pub total_bytes_allocated: isize,
pub total_bytes_deallocated: isize,
pub total_allocations: usize,
pub total_deallocations: usize,
}

impl AllocStats {
pub fn net_bytes_allocated(&self) -> isize {
self.total_bytes_allocated - self.total_bytes_deallocated
}
}

static GLOBAL_STATS: LazyLock<Arc<Mutex<HashMap<AllocationGroupId, AllocStats>>>> =
std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));

struct MemoryTracker;

impl AllocationTracker for MemoryTracker {
fn allocated(
&self,
_addr: usize,
object_size: usize,
_wrapped_size: usize,
group_id: AllocationGroupId,
) {
if group_id == AllocationGroupId::ROOT {
// We don't track root allocations
return;
}
let mut guard = GLOBAL_STATS.lock().unwrap();
let stats = guard.entry(group_id).or_default();
stats.total_bytes_allocated += object_size as isize;
stats.total_allocations += 1;
stats.max_bytes_allocated = stats.max_bytes_allocated.max(stats.net_bytes_allocated());
}

fn deallocated(
&self,
_addr: usize,
object_size: usize,
_wrapped_size: usize,
source_group_id: AllocationGroupId,
current_group_id: AllocationGroupId,
) {
let group_id = if source_group_id != AllocationGroupId::ROOT {
source_group_id
} else {
current_group_id
};
if group_id == AllocationGroupId::ROOT {
// We don't track root allocations
return;
}
let mut guard = GLOBAL_STATS.lock().unwrap();
let stats = guard.entry(group_id).or_default();
stats.total_bytes_deallocated += object_size as isize;
stats.total_deallocations += 1;
}
}

static INIT: Once = Once::new();

// The alloc tracker holds a span and an associated allocation group id.
pub struct AllocTracker {
group_id: AllocationGroupId,
span: tracing::Span,
}

impl AllocTracker {
pub fn init() {
INIT.call_once(init_memory_tracking);
}

pub fn new() -> Self {
Self::init();

let token = AllocationGroupToken::register().expect("failed to register token");
let group_id = token.id();

let span = tracing::span!(tracing::Level::INFO, "AllocTracker");
token.attach_to_span(&span);

Self { group_id, span }
}

pub fn enter(&self) -> AllocGuard<'_> {
AllocGuard::new(self)
}

pub fn stats(self) -> AllocStats {
let mut stats = GLOBAL_STATS.lock().unwrap();
stats.remove(&self.group_id).unwrap_or_default()
}
}

pub struct AllocGuard<'a> {
_guard: tracing::span::Entered<'a>,
}

impl<'a> AllocGuard<'a> {
#[allow(clippy::print_stderr)]
pub fn new(tracker: &'a AllocTracker) -> Self {
if std::env::var("RUST_ALLOC_TIMINGS").is_ok() {
eprintln!("alloc:enter:{}", chrono::Utc::now().to_rfc3339());
}
AllocGuard {
_guard: tracker.span.enter(),
}
}
}

impl Drop for AllocGuard<'_> {
#[allow(clippy::print_stderr)]
fn drop(&mut self) {
if std::env::var("RUST_ALLOC_TIMINGS").is_ok() {
eprintln!("alloc:exit:{}", chrono::Utc::now().to_rfc3339());
}
}
}

pub fn init_memory_tracking() {
let registry = Registry::default().with(AllocationLayer::new());
tracing::subscriber::set_global_default(registry)
.expect("failed to install tracing subscriber");

let tracker = MemoryTracker;
AllocationRegistry::set_global_tracker(tracker).expect("failed to set global tracker");
AllocationRegistry::enable_tracking();
}

#[test]
fn check_memory_leak() {
// Make sure AllocTracker can detect leaks
let mut leaked = Vec::new();
let tracker = AllocTracker::new();
{
let _guard = tracker.enter();
let v = vec![0u8; 1024 * 1024];
leaked.resize(1024, 0u8);
drop(v);
}
let stats = tracker.stats();
assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024);
assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024);
assert_eq!(stats.total_bytes_deallocated, (1024 * 1024));
assert_eq!(stats.total_allocations, 2);
assert_eq!(stats.net_bytes_allocated(), 1024);
}

#[tokio::test]
async fn check_test_spawn_alloc() {
let tracker = AllocTracker::new();
{
let _guard = tracker.enter();
let future1 = async {
let v = vec![0u8; 256 * 1024];
drop(v);
};
let handle = tokio::spawn(future1.in_current_span());
let future2 = async {
let v = vec![0u8; 512 * 1024];
drop(v);
};
let handle2 = tokio::spawn(future2.in_current_span());
handle.await.unwrap();
handle2.await.unwrap();
}
let stats = tracker.stats();
assert_eq!(stats.total_allocations, 4);
assert_ge!(stats.total_bytes_allocated, 256 * 1024 + 512 * 1024);
assert_ge!(stats.total_bytes_deallocated, 256 * 1024 + 512 * 1024);
}
45 changes: 45 additions & 0 deletions rust/lance/tests/resource_test/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use super::utils::AllocTracker;
use all_asserts::assert_le;
use arrow_schema::DataType;
use lance::dataset::InsertBuilder;
use lance_datafusion::datagen::DatafusionDatagenExt;
use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RoundingBehavior};

#[tokio::test]
async fn test_insert_memory() {
// Create a stream of 100MB of data, in batches
let batch_size = 10 * 1024 * 1024; // 10MB
let num_batches = BatchCount::from(10);
let data = gen_batch()
.col("a", array::rand_type(&DataType::Int32))
.into_df_stream_bytes(
ByteCount::from(batch_size),
num_batches,
RoundingBehavior::RoundDown,
)
.unwrap();

let alloc_tracker = AllocTracker::new();
{
let _guard = alloc_tracker.enter();

// write out to temporary directory
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = tmp_dir.path().to_str().unwrap();
let _dataset = InsertBuilder::new(tmp_path)
.execute_stream(data)
.await
.unwrap();
}

let stats = alloc_tracker.stats();
// Allow for 2x the batch size to account for overheads.
// The key test is that we don't load all 100MB into memory at once
assert_le!(
stats.max_bytes_allocated,
(batch_size * 2) as isize,
"Max memory usage exceeded"
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make sense to wrap this up in a macro. Would that promote more widespread usage?

#[maxallocs=10*1024*1024]
{
        let tmp_dir = tempfile::tempdir().unwrap();
        let tmp_path = tmp_dir.path().to_str().unwrap();
        let _dataset = InsertBuilder::new(tmp_path)
            .execute_stream(data)
            .await
            .unwrap();
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add that later if we find it simplifies things. Will be easier to see once we have more tests in here and see what patterns are repeating.

}
7 changes: 7 additions & 0 deletions rust/lance/tests/resource_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

// The memory tests don't work currently on MacOS because they rely on thread
// local storage in the allocator, which seems to have some issues on MacOS.
#[cfg(target_os = "linux")]
mod resource_test;
Loading