diff --git a/.github/workflows/ci-benchmarks.yml b/.github/workflows/ci-benchmarks.yml index 3277c7e7f71..ab1da5a121b 100644 --- a/.github/workflows/ci-benchmarks.yml +++ b/.github/workflows/ci-benchmarks.yml @@ -46,15 +46,31 @@ jobs: source venv/bin/activate pip install maturin duckdb requests pytest pytest-benchmark maturin develop --locked --release + - name: Build memtest + run: | + source venv/bin/activate + cd memtest + make build-release - name: Generate datasets run: | - python -m venv venv source venv/bin/activate python python/ci_benchmarks/datagen/gen_all.py - name: Run benchmarks run: | - python -m venv venv source venv/bin/activate bencher run --project weston-lancedb --token ${{ secrets.LANCE_BENCHER_TOKEN }} --adapter python_pytest \ --branch main --testbed google-genoa --err --file results.json "python -mpytest --benchmark-json \ results.json python/ci_benchmarks" + - name: Run IO/memory benchmarks + run: | + source venv/bin/activate + LIB_PATH=$(lance-memtest) + LD_PRELOAD=$LIB_PATH pytest python/ci_benchmarks \ + -k "io_mem_" \ + --benchmark-stats-json io_mem_stats.json + - name: Upload IO/memory stats to bencher + run: | + source venv/bin/activate + bencher run --project weston-lancedb --token ${{ secrets.LANCE_BENCHER_TOKEN }} \ + --adapter json --branch main --testbed google-genoa \ + --err --file io_mem_stats.json diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 0abe4b80f85..6755d38087c 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -118,6 +118,8 @@ jobs: with: args: "--profile ci" - uses: ./.github/workflows/run_tests + with: + memtest: true - name: Upload wheels as artifacts if: ${{ matrix.python-minor-version == '13' }} uses: actions/upload-artifact@v4 diff --git a/.github/workflows/run_tests/action.yml b/.github/workflows/run_tests/action.yml index 14c4b3d6f46..935a5a7e099 100644 --- a/.github/workflows/run_tests/action.yml +++ b/.github/workflows/run_tests/action.yml @@ -9,6 +9,10 @@ inputs: required: false description: "Skip pytorch tests" default: "false" + memtest: + required: false + description: "Run memtest" + default: "false" runs: using: "composite" steps: @@ -24,6 +28,13 @@ runs: run: | # Install cpu only pytorch pip install torch --index-url https://download.pytorch.org/whl/cpu + - name: Install memtest + working-directory: memtest + if: inputs.memtest == 'true' + shell: bash + run: | + make build-release + echo "LD_PRELOAD=$(lance-memtest)" >> $GITHUB_ENV - name: Run python tests shell: bash working-directory: python diff --git a/memtest/.gitignore b/memtest/.gitignore new file mode 100644 index 00000000000..171315214e2 --- /dev/null +++ b/memtest/.gitignore @@ -0,0 +1,19 @@ +# Rust +target/ +Cargo.lock + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.pytest_cache/ +*.egg-info/ +dist/ +build/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo diff --git a/memtest/Cargo.toml b/memtest/Cargo.toml new file mode 100644 index 00000000000..ef4cd5736ab --- /dev/null +++ b/memtest/Cargo.toml @@ -0,0 +1,23 @@ +[workspace] + +[package] +name = "lance-memtest" +version = "0.1.0" +edition = "2021" +authors = ["Lance Developers"] +description = "Memory allocation testing utilities for Python" +license = "Apache-2.0" + +[lints.clippy] +arithmetic_side_effects = "deny" + +[lib] +name = "memtest" +crate-type = ["cdylib", "rlib"] + +[dependencies] +libc = "0.2" + +[profile.release] +lto = true +codegen-units = 1 diff --git a/memtest/Makefile b/memtest/Makefile new file mode 100644 index 00000000000..071eb6f271d --- /dev/null +++ b/memtest/Makefile @@ -0,0 +1,29 @@ +.PHONY: build test lint format clean + +build: + cargo build + cp target/debug/libmemtest.so python/memtest/ + pip install -e . + +build-release: + cargo build --release + cp target/release/libmemtest.so python/memtest/ + pip install -e . + +test: + LD_PRELOAD=./python/memtest/libmemtest.so pytest python/tests/ -v + +lint: + cargo clippy -- -D warnings + ruff check python/ + +format: + cargo fmt + ruff format python/ + +clean: + cargo clean + rm -rf target/ + find . -type d -name __pycache__ -exec rm -rf {} + + find . -type f -name "*.pyc" -delete + find . -type f -name "*.so" -delete diff --git a/memtest/README.md b/memtest/README.md new file mode 100644 index 00000000000..4d0766d0732 --- /dev/null +++ b/memtest/README.md @@ -0,0 +1,35 @@ +# lance-memtest + +Memory allocation testing utilities for Python test suites. This package provides tools to track memory allocations made by the Python interpreter and any Python libraries during test execution. + +## Usage + +Install with: + +```shell +make build-release +``` + +To activate the memory tracking, you need to set the `LD_PRELOAD` environment variable: + +```shell +export LD_PRELOAD=$(lance-memtest) +``` + +Then you can write Python code that tracks memory allocations: + +```python +import memtest + +def test_memory(): + with memtest.track() as get_stats: + # Your code that allocates memory + data = [0] * 1000000 + + stats = get_stats() + assert stats['peak_bytes'] < 10**7 # Assert peak memory usage +``` + +## How this works + +The library uses dynamic linking to intercept memory allocation calls (like `malloc`, `free`, etc.) made by the Python interpreter and its extensions. It keeps track of the total number of allocations, deallocations, and the peak memory usage during the execution of your code. diff --git a/memtest/pyproject.toml b/memtest/pyproject.toml new file mode 100644 index 00000000000..396d7c442e0 --- /dev/null +++ b/memtest/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "lance-memtest" +version = "0.1.0" +description = "Memory allocation testing utilities for Python test suites" +readme = "README.md" +requires-python = ">=3.9" +license = { text = "Apache-2.0" } +authors = [ + { name = "Lance Developers" } +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Rust", +] + +[project.scripts] +lance-memtest = "memtest.__main__:main" + +[tool.setuptools] +packages = ["memtest"] + +[tool.setuptools.package-dir] +memtest = "python/memtest" + +[tool.setuptools.package-data] +memtest = ["*.so", "*.dylib", "*.dll"] diff --git a/memtest/python/memtest/__init__.py b/memtest/python/memtest/__init__.py new file mode 100644 index 00000000000..b5ffcac6537 --- /dev/null +++ b/memtest/python/memtest/__init__.py @@ -0,0 +1,249 @@ +"""Memory allocation testing utilities for Python.""" + +import ctypes +import platform +import warnings +from pathlib import Path +from typing import Dict, Optional +from contextlib import contextmanager + +__version__ = "0.1.0" + +# Platform support check +_SUPPORTED_PLATFORM = platform.system() == "Linux" +if not _SUPPORTED_PLATFORM: + warnings.warn( + f"lance-memtest only supports Linux (current platform: {platform.system()}). " + "Memory statistics will not be available.", + RuntimeWarning, + stacklevel=2, + ) + + +class _MemtestStats(ctypes.Structure): + """C struct matching MemtestStats in Rust.""" + + _fields_ = [ + ("total_allocations", ctypes.c_uint64), + ("total_deallocations", ctypes.c_uint64), + ("total_bytes_allocated", ctypes.c_uint64), + ("total_bytes_deallocated", ctypes.c_uint64), + ("current_bytes", ctypes.c_uint64), + ("peak_bytes", ctypes.c_uint64), + ] + + +def _load_library(): + """Load the memtest shared library.""" + if not _SUPPORTED_PLATFORM: + return None, None + + # Find the library relative to this module + module_dir = Path(__file__).parent + + lib_path = module_dir / "libmemtest.so" + if lib_path.exists(): + lib = ctypes.CDLL(str(lib_path)) + + # Define function signatures + lib.memtest_get_stats.argtypes = [ctypes.POINTER(_MemtestStats)] + lib.memtest_get_stats.restype = None + + lib.memtest_reset_stats.argtypes = [] + lib.memtest_reset_stats.restype = None + + return lib, lib_path + + raise RuntimeError("memtest library not found. Run 'make build' to build it.") + + +# Load library at module import +_lib, _lib_path = _load_library() + + +def _empty_stats() -> Dict[str, int]: + """Return empty stats for unsupported platforms.""" + return { + "total_allocations": 0, + "total_deallocations": 0, + "total_bytes_allocated": 0, + "total_bytes_deallocated": 0, + "current_bytes": 0, + "peak_bytes": 0, + } + + +def get_library_path() -> Optional[Path]: + """Get the path to the memtest shared library for use with LD_PRELOAD. + + Returns: + Path to the .so file that can be used with LD_PRELOAD, or None on + unsupported platforms. + + Example: + >>> lib_path = get_library_path() + >>> if lib_path: + ... os.environ['LD_PRELOAD'] = str(lib_path) + """ + return _lib_path + + +def get_stats() -> Dict[str, int]: + """Get current memory allocation statistics. + + Returns: + Dictionary containing: + - total_allocations: Total number of malloc/calloc calls + - total_deallocations: Total number of free calls + - total_bytes_allocated: Total bytes allocated + - total_bytes_deallocated: Total bytes freed + - current_bytes: Current memory usage (allocated - deallocated) + - peak_bytes: Peak memory usage observed + + On unsupported platforms, all values will be 0. + + Example: + >>> stats = get_stats() + >>> print(f"Current memory: {stats['current_bytes']} bytes") + >>> print(f"Peak memory: {stats['peak_bytes']} bytes") + """ + if _lib is None: + return _empty_stats() + + stats = _MemtestStats() + _lib.memtest_get_stats(ctypes.byref(stats)) + + return { + "total_allocations": stats.total_allocations, + "total_deallocations": stats.total_deallocations, + "total_bytes_allocated": stats.total_bytes_allocated, + "total_bytes_deallocated": stats.total_bytes_deallocated, + "current_bytes": stats.current_bytes, + "peak_bytes": stats.peak_bytes, + } + + +def reset_stats() -> None: + """Reset all allocation statistics to zero. + + This is useful for measuring allocations in a specific section of code. + On unsupported platforms, this is a no-op. + + Example: + >>> reset_stats() + >>> # ... run code to measure ... + >>> stats = get_stats() + """ + if _lib is None: + return + _lib.memtest_reset_stats() + + +@contextmanager +def track(reset: bool = True): + """Context manager to track allocations within a code block. + + Args: + reset: Whether to reset statistics before entering the context + + Yields: + A function that returns current statistics + + Example: + >>> with track() as get: + ... data = [0] * 1000 + ... stats = get() + ... print(f"Allocated: {stats['total_bytes_allocated']} bytes") + """ + if reset: + reset_stats() + + yield get_stats + + +def format_bytes(num_bytes: int) -> str: + """Format byte count as human-readable string. + + Args: + num_bytes: Number of bytes + + Returns: + Formatted string (e.g., "1.5 MB") + """ + for unit in ["B", "KB", "MB", "GB", "TB"]: + if abs(num_bytes) < 1024.0: + return f"{num_bytes:.1f} {unit}" + num_bytes /= 1024.0 + return f"{num_bytes:.1f} PB" + + +def print_stats(stats: Optional[Dict[str, int]] = None) -> None: + """Print allocation statistics in a readable format. + + Args: + stats: Statistics dictionary. If None, fetches current stats. + + Example: + >>> print_stats() + Memory Allocation Statistics: + Total allocations: 1,234 + Total deallocations: 1,100 + Total bytes allocated: 128.5 KB + Total bytes freed: 120.0 KB + Current memory usage: 8.5 KB + Peak memory usage: 15.2 KB + """ + if stats is None: + stats = get_stats() + + print("Memory Allocation Statistics:") + print(f" Total allocations: {stats['total_allocations']:,}") + print(f" Total deallocations: {stats['total_deallocations']:,}") + print(f" Total bytes allocated: {format_bytes(stats['total_bytes_allocated'])}") + print(f" Total bytes freed: {format_bytes(stats['total_bytes_deallocated'])}") + print(f" Current memory usage: {format_bytes(stats['current_bytes'])}") + print(f" Peak memory usage: {format_bytes(stats['peak_bytes'])}") + + +def is_preloaded() -> bool: + """Check if libmemtest.so is preloaded and actively tracking allocations. + + Returns: + True if the library is preloaded via LD_PRELOAD, False otherwise. + + Example: + >>> if is_preloaded(): + ... stats = get_stats() + ... print(f"Tracking {stats['total_allocations']} allocations") + """ + import os + + ld_preload = os.environ.get("LD_PRELOAD", "") + return "libmemtest" in ld_preload + + +def is_supported() -> bool: + """Check if memory tracking is supported on this platform. + + Returns: + True if on Linux (the only supported platform), False otherwise. + + Example: + >>> if is_supported(): + ... with track() as get: + ... # ... do work ... + ... stats = get() + """ + return _SUPPORTED_PLATFORM + + +__all__ = [ + "get_library_path", + "get_stats", + "reset_stats", + "track", + "format_bytes", + "print_stats", + "is_preloaded", + "is_supported", +] diff --git a/memtest/python/memtest/__main__.py b/memtest/python/memtest/__main__.py new file mode 100644 index 00000000000..4e684e2c01f --- /dev/null +++ b/memtest/python/memtest/__main__.py @@ -0,0 +1,28 @@ +"""CLI for lance-memtest.""" + +import sys +from memtest import get_library_path + + +def main(): + """Main CLI entry point.""" + args = sys.argv[1:] + + if not args or args[0] == "path": + lib_path = get_library_path() + if lib_path is None: + print( + "lance-memtest is not supported on this platform", + file=sys.stderr, + ) + return 1 + print(lib_path) + return 0 + else: + print(f"Unknown command: {args[0]}", file=sys.stderr) + print("Usage: lance-memtest [path]", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/memtest/python/tests/__init__.py b/memtest/python/tests/__init__.py new file mode 100644 index 00000000000..3263fffd5fe --- /dev/null +++ b/memtest/python/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for lance-memtest.""" diff --git a/memtest/python/tests/test_basic.py b/memtest/python/tests/test_basic.py new file mode 100644 index 00000000000..625840b79b7 --- /dev/null +++ b/memtest/python/tests/test_basic.py @@ -0,0 +1,125 @@ +"""Basic tests for memtest functionality.""" + +import subprocess +import sys + +import memtest + + +def test_get_library_path(): + """Test that we can get the library path.""" + lib_path = memtest.get_library_path() + assert lib_path.exists() + assert lib_path.suffix == ".so" + + +def test_get_stats(): + """Test that we can get statistics.""" + stats = memtest.get_stats() + + assert isinstance(stats, dict) + assert "total_allocations" in stats + assert "total_deallocations" in stats + assert "total_bytes_allocated" in stats + assert "total_bytes_deallocated" in stats + assert "current_bytes" in stats + assert "peak_bytes" in stats + + # All values should be non-negative integers + for key, value in stats.items(): + assert isinstance(value, int) + assert value >= 0 + + +def test_reset_stats(): + """Test that we can reset statistics.""" + # Get initial stats + _ = memtest.get_stats() + + # Reset + memtest.reset_stats() + + # All stats should be zero after reset + stats = memtest.get_stats() + assert stats["total_allocations"] == 0 + assert stats["total_deallocations"] == 0 + assert stats["total_bytes_allocated"] == 0 + assert stats["total_bytes_deallocated"] == 0 + assert stats["current_bytes"] == 0 + assert stats["peak_bytes"] == 0 + + +def test_track_context_manager(): + """Test the track context manager.""" + with memtest.track() as get_stats: + # Allocate some memory + _ = [0] * 1000 + + # Get stats within the context + stats = get_stats() + + # We should see some allocations + assert stats["total_allocations"] > 0 + assert stats["total_bytes_allocated"] > 0 + + +def test_format_bytes(): + """Test byte formatting.""" + assert "B" in memtest.format_bytes(100) + assert "KB" in memtest.format_bytes(1024) + assert "MB" in memtest.format_bytes(1024 * 1024) + assert "GB" in memtest.format_bytes(1024 * 1024 * 1024) + + +def test_print_stats(): + """Test that print_stats doesn't crash.""" + # This should not raise an exception + memtest.print_stats() + + # Should also work with explicit stats + stats = memtest.get_stats() + memtest.print_stats(stats) + + +def test_allocation_tracking(): + """Test that allocations are actually tracked.""" + memtest.reset_stats() + + initial_stats = memtest.get_stats() + assert initial_stats["total_allocations"] == 0 + + # Allocate a large list + _ = [0] * 10000 + + stats_after = memtest.get_stats() + + # We should see allocations (though the exact number depends on Python internals) + assert stats_after["total_allocations"] > 0 + assert stats_after["total_bytes_allocated"] > 0 + + # Peak should be at least as much as current + assert stats_after["peak_bytes"] >= stats_after["current_bytes"] + + +def test_cli_path(): + """Test the CLI path command.""" + result = subprocess.run( + [sys.executable, "-m", "memtest", "path"], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert ".so" in result.stdout + + +def test_cli_stats(): + """Test the CLI stats command.""" + result = subprocess.run( + [sys.executable, "-m", "memtest", "stats"], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert "Memory Allocation Statistics" in result.stdout diff --git a/memtest/python/tests/test_integration.py b/memtest/python/tests/test_integration.py new file mode 100644 index 00000000000..f0ab8ac9f90 --- /dev/null +++ b/memtest/python/tests/test_integration.py @@ -0,0 +1,128 @@ +"""Integration tests for memtest with real allocations.""" + +import os +import subprocess +import sys +import tempfile +import pytest + +import memtest + + +def test_preload_environment(): + """Test that LD_PRELOAD works correctly.""" + lib_path = memtest.get_library_path() + + # Create a small Python script that uses memtest + test_script = """ +import memtest + +memtest.reset_stats() + +# Allocate some data +data = [i for i in range(1000)] + +stats = memtest.get_stats() +print(f"Allocations: {stats['total_allocations']}") +print(f"Bytes: {stats['total_bytes_allocated']}") + +assert stats['total_allocations'] > 0, "Should see allocations" +assert stats['total_bytes_allocated'] > 0, "Should see bytes allocated" +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(test_script) + script_path = f.name + + try: + env = os.environ.copy() + env["LD_PRELOAD"] = str(lib_path) + + result = subprocess.run( + [sys.executable, script_path], + env=env, + capture_output=True, + text=True, + ) + + assert result.returncode == 0, f"Script failed: {result.stderr}" + assert "Allocations:" in result.stdout + assert "Bytes:" in result.stdout + + finally: + os.unlink(script_path) + + +def test_repeated_allocations(): + """Test tracking repeated allocations and deallocations.""" + memtest.reset_stats() + + # Do several allocation/deallocation cycles + for i in range(10): + data = [0] * 1000 + del data + + stats = memtest.get_stats() + + # Should see multiple allocations + assert stats["total_allocations"] >= 10 + assert stats["total_deallocations"] > 0 + assert stats["total_bytes_allocated"] > 0 + assert stats["total_bytes_deallocated"] > 0 + + +def test_peak_tracking(): + """Test that peak memory usage is tracked correctly.""" + memtest.reset_stats() + + # Allocate progressively larger arrays + arrays = [] + for size in [100, 1000, 10000]: + arrays.append([0] * size) + + stats = memtest.get_stats() + + # Peak should be higher than or equal to current + assert stats["peak_bytes"] >= stats["current_bytes"] + + # Free the arrays + arrays.clear() + + stats_after = memtest.get_stats() + + # Peak should remain the same (doesn't decrease) + assert stats_after["peak_bytes"] == stats["peak_bytes"] + + +def test_with_numpy(): + """Test tracking NumPy allocations if NumPy is available.""" + try: + import numpy as np + except ImportError: + pytest.skip("NumPy not available") + + memtest.reset_stats() + + # Create a large NumPy array + _ = np.zeros((1000, 1000), dtype=np.float64) + + stats = memtest.get_stats() + + # NumPy uses malloc internally, so we should see allocations + assert stats["total_allocations"] > 0 + assert stats["total_bytes_allocated"] > 0 + + +def test_context_manager_integration(): + """Test the context manager with real workload.""" + results = [] + + with memtest.track() as get_stats: + # Allocate in stages and track progress + for i in range(5): + _ = [0] * 1000 + results.append(get_stats()) + + # Each measurement should show increasing allocations + for i in range(1, len(results)): + assert results[i]["total_allocations"] >= results[i - 1]["total_allocations"] diff --git a/memtest/src/allocator.rs b/memtest/src/allocator.rs new file mode 100644 index 00000000000..d3c9fca75cc --- /dev/null +++ b/memtest/src/allocator.rs @@ -0,0 +1,229 @@ +use crate::stats::STATS; +use libc::{c_void, size_t}; + +extern "C" { + #[link_name = "__libc_malloc"] + fn libc_malloc(size: size_t) -> *mut c_void; + #[link_name = "__libc_calloc"] + fn libc_calloc(count: size_t, element_size: size_t) -> *mut c_void; + #[link_name = "__libc_realloc"] + fn libc_realloc(ptr: *mut c_void, size: size_t) -> *mut c_void; + #[link_name = "__libc_free"] + fn libc_free(ptr: *mut c_void); + #[link_name = "__libc_memalign"] + fn libc_memalign(alignment: size_t, size: size_t) -> *mut c_void; +} + +// Magic number to identify our allocations +const MAGIC: u64 = 0xDEADBEEF_CAFEBABE; + +/// Header stored before each tracked allocation +#[repr(C)] +struct AllocationHeader { + magic: u64, + size: u64, + alignment: u64, + /// For aligned allocations, stores the actual pointer returned by libc_memalign + /// For unaligned allocations, this is unused (but present for consistent size) + actual_ptr: u64, +} + +const HEADER_SIZE: usize = std::mem::size_of::(); + +/// Check if a pointer was allocated by us +unsafe fn is_ours(virtual_ptr: *mut c_void) -> bool { + if virtual_ptr.is_null() { + return false; + } + let header_ptr = (virtual_ptr as *mut u8).sub(HEADER_SIZE) as *const AllocationHeader; + (*header_ptr).magic == MAGIC +} + +/// Extract size, alignment, and actual pointer from a virtual pointer +unsafe fn extract(virtual_ptr: *mut c_void) -> (usize, usize, *mut c_void) { + let header_ptr = (virtual_ptr as *mut u8).sub(HEADER_SIZE) as *const AllocationHeader; + let header = &*header_ptr; + + let size = header.size as usize; + let alignment = header.alignment as usize; + + let actual_ptr = if alignment > 0 { + // For aligned allocations, the actual pointer is stored in the header + header.actual_ptr as *mut c_void + } else { + // For unaligned allocations, the actual pointer is the header itself + header_ptr as *mut c_void + }; + + (size, alignment, actual_ptr) +} + +/// Take an allocated pointer and size, store header, and return the adjusted pointer +unsafe fn to_virtual(actual_ptr: *mut c_void, size: usize, alignment: usize) -> *mut c_void { + if actual_ptr.is_null() { + return std::ptr::null_mut(); + } + + if alignment > 0 { + // For aligned allocations: + // 1. Find the first aligned position after we have room for the header + // 2. Store the header just before that position + // 3. Store the actual_ptr in the header so we can free it later + + let actual_addr = actual_ptr as usize; + // Find the first address >= actual_addr + HEADER_SIZE that is aligned + let min_virtual_addr = actual_addr.saturating_add(HEADER_SIZE); + let virtual_addr = (min_virtual_addr.saturating_add(alignment).saturating_sub(1)) + & !(alignment.saturating_sub(1)); + + // Write header just before the aligned virtual address + let header_ptr = (virtual_addr.saturating_sub(HEADER_SIZE)) as *mut AllocationHeader; + *header_ptr = AllocationHeader { + magic: MAGIC, + size: size as u64, + alignment: alignment as u64, + actual_ptr: actual_addr as u64, + }; + + virtual_addr as *mut c_void + } else { + // Unaligned allocation - header is at the start + let header_ptr = actual_ptr as *mut AllocationHeader; + *header_ptr = AllocationHeader { + magic: MAGIC, + size: size as u64, + alignment: 0, + actual_ptr: 0, // Unused for unaligned allocations + }; + (actual_ptr as *mut u8).add(HEADER_SIZE) as *mut c_void + } +} + +#[no_mangle] +pub unsafe extern "C" fn malloc(size: size_t) -> *mut c_void { + STATS.record_allocation(size); + to_virtual(libc_malloc(size.saturating_add(HEADER_SIZE)), size, 0) +} + +#[no_mangle] +pub unsafe extern "C" fn calloc(size: size_t, element_size: size_t) -> *mut c_void { + let Some(total_size) = size.checked_mul(element_size) else { + return std::ptr::null_mut(); + }; + STATS.record_allocation(total_size); + to_virtual( + libc_calloc(total_size.saturating_add(HEADER_SIZE), 1), + total_size, + 0, + ) +} + +#[no_mangle] +pub unsafe extern "C" fn free(ptr: *mut c_void) { + if ptr.is_null() { + return; + } + + if is_ours(ptr) { + // It's ours - extract size and track + let (size, _alignment, actual_ptr) = extract(ptr); + STATS.record_deallocation(size); + libc_free(actual_ptr); + } else { + // Not ours - just free it without tracking + libc_free(ptr); + } +} + +#[no_mangle] +pub unsafe extern "C" fn realloc(ptr: *mut c_void, size: size_t) -> *mut c_void { + let (old_size, actual_ptr) = if ptr.is_null() || !is_ours(ptr) { + // Either null or not ours - don't track + if ptr.is_null() { + (0, std::ptr::null_mut()) + } else { + // Not ours - just realloc without tracking + return libc_realloc(ptr, size); + } + } else { + let (s, _align, a) = extract(ptr); + (s, a) + }; + + STATS.record_deallocation(old_size); + STATS.record_allocation(size); + + let new_ptr = libc_realloc(actual_ptr, size.saturating_add(HEADER_SIZE)); + to_virtual(new_ptr, size, 0) +} + +#[no_mangle] +pub unsafe extern "C" fn memalign(alignment: size_t, size: size_t) -> *mut c_void { + STATS.record_allocation(size); + // Allocate extra space for header + padding to maintain alignment + // We need: header (24 bytes) + actual_ptr (8 bytes) + padding to reach alignment + let extra = alignment.saturating_add(HEADER_SIZE).saturating_add(8); + let actual_ptr = libc_memalign(alignment, size.saturating_add(extra)); + to_virtual(actual_ptr, size, alignment) +} + +#[no_mangle] +pub unsafe extern "C" fn posix_memalign( + memptr: *mut *mut c_void, + alignment: size_t, + size: size_t, +) -> i32 { + STATS.record_allocation(size); + let extra = alignment.saturating_add(HEADER_SIZE).saturating_add(8); + let actual_ptr = libc_memalign(alignment, size.saturating_add(extra)); + if actual_ptr.is_null() { + return libc::ENOMEM; + } + *memptr = to_virtual(actual_ptr, size, alignment); + 0 +} + +#[no_mangle] +pub unsafe extern "C" fn aligned_alloc(alignment: size_t, size: size_t) -> *mut c_void { + STATS.record_allocation(size); + let extra = alignment.saturating_add(HEADER_SIZE).saturating_add(8); + let actual_ptr = libc_memalign(alignment, size.saturating_add(extra)); + to_virtual(actual_ptr, size, alignment) +} + +#[no_mangle] +pub unsafe extern "C" fn valloc(size: size_t) -> *mut c_void { + STATS.record_allocation(size); + let page_size = libc::sysconf(libc::_SC_PAGESIZE) as size_t; + let extra = page_size.saturating_add(HEADER_SIZE).saturating_add(8); + let actual_ptr = libc_memalign(page_size, size.saturating_add(extra)); + to_virtual(actual_ptr, size, page_size) +} + +#[no_mangle] +pub unsafe extern "C" fn reallocarray( + old_ptr: *mut c_void, + count: size_t, + element_size: size_t, +) -> *mut c_void { + let Some(size) = count.checked_mul(element_size) else { + return std::ptr::null_mut(); + }; + realloc(old_ptr, size) +} + +#[no_mangle] +pub unsafe extern "C" fn malloc_usable_size(ptr: *mut c_void) -> size_t { + if ptr.is_null() { + return 0; + } + + if is_ours(ptr) { + let (size, _, _) = extract(ptr); + size + } else { + // Not our allocation - return 0 as we don't know the size + // (there's no __libc_malloc_usable_size to call) + 0 + } +} diff --git a/memtest/src/lib.rs b/memtest/src/lib.rs new file mode 100644 index 00000000000..4c869864552 --- /dev/null +++ b/memtest/src/lib.rs @@ -0,0 +1,49 @@ +mod allocator; +mod stats; + +use stats::STATS; + +/// C-compatible statistics struct +#[repr(C)] +pub struct MemtestStats { + pub total_allocations: u64, + pub total_deallocations: u64, + pub total_bytes_allocated: u64, + pub total_bytes_deallocated: u64, + pub current_bytes: u64, + pub peak_bytes: u64, +} + +/// Get all statistics in a single call +/// +/// # Safety +/// The `stats` pointer must be valid and properly aligned +#[no_mangle] +pub unsafe extern "C" fn memtest_get_stats(stats: *mut MemtestStats) { + if stats.is_null() { + return; + } + + (*stats).total_allocations = STATS + .total_allocations + .load(std::sync::atomic::Ordering::Relaxed); + (*stats).total_deallocations = STATS + .total_deallocations + .load(std::sync::atomic::Ordering::Relaxed); + (*stats).total_bytes_allocated = STATS + .total_bytes_allocated + .load(std::sync::atomic::Ordering::Relaxed); + (*stats).total_bytes_deallocated = STATS + .total_bytes_deallocated + .load(std::sync::atomic::Ordering::Relaxed); + (*stats).current_bytes = STATS + .current_bytes + .load(std::sync::atomic::Ordering::Relaxed); + (*stats).peak_bytes = STATS.peak_bytes.load(std::sync::atomic::Ordering::Relaxed); +} + +/// Reset all statistics to zero +#[no_mangle] +pub extern "C" fn memtest_reset_stats() { + STATS.reset(); +} diff --git a/memtest/src/stats.rs b/memtest/src/stats.rs new file mode 100644 index 00000000000..76c0253e843 --- /dev/null +++ b/memtest/src/stats.rs @@ -0,0 +1,59 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Global allocation statistics tracked using atomic operations for thread safety +pub struct AllocationStats { + pub total_allocations: AtomicU64, + pub total_deallocations: AtomicU64, + pub total_bytes_allocated: AtomicU64, + pub total_bytes_deallocated: AtomicU64, + pub current_bytes: AtomicU64, + pub peak_bytes: AtomicU64, +} + +impl AllocationStats { + pub const fn new() -> Self { + Self { + total_allocations: AtomicU64::new(0), + total_deallocations: AtomicU64::new(0), + total_bytes_allocated: AtomicU64::new(0), + total_bytes_deallocated: AtomicU64::new(0), + current_bytes: AtomicU64::new(0), + peak_bytes: AtomicU64::new(0), + } + } + + pub fn record_allocation(&self, size: usize) { + self.total_allocations.fetch_add(1, Ordering::Relaxed); + self.total_bytes_allocated + .fetch_add(size as u64, Ordering::Relaxed); + + let prev = self.current_bytes.fetch_add(size as u64, Ordering::Relaxed); + let current = prev.saturating_add(size as u64); + self.peak_bytes.fetch_max(current, Ordering::Relaxed); + } + + pub fn record_deallocation(&self, size: usize) { + self.total_deallocations.fetch_add(1, Ordering::Relaxed); + self.total_bytes_deallocated + .fetch_add(size as u64, Ordering::Relaxed); + + // Use fetch_update to perform saturating subtraction atomically + self.current_bytes + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + Some(current.saturating_sub(size as u64)) + }) + .ok(); + } + + pub fn reset(&self) { + self.total_allocations.store(0, Ordering::Relaxed); + self.total_deallocations.store(0, Ordering::Relaxed); + self.total_bytes_allocated.store(0, Ordering::Relaxed); + self.total_bytes_deallocated.store(0, Ordering::Relaxed); + self.current_bytes.store(0, Ordering::Relaxed); + self.peak_bytes.store(0, Ordering::Relaxed); + } +} + +/// Global statistics instance +pub static STATS: AllocationStats = AllocationStats::new(); diff --git a/memtest/tests/integration_test.rs b/memtest/tests/integration_test.rs new file mode 100644 index 00000000000..b83b50cd3d9 --- /dev/null +++ b/memtest/tests/integration_test.rs @@ -0,0 +1,447 @@ +use libc::{c_void, size_t}; +use std::ptr; + +// Import from the library we're testing +use memtest::{memtest_get_stats, memtest_reset_stats, MemtestStats}; + +extern "C" { + fn malloc(size: size_t) -> *mut c_void; + fn calloc(count: size_t, element_size: size_t) -> *mut c_void; + fn realloc(ptr: *mut c_void, size: size_t) -> *mut c_void; + fn free(ptr: *mut c_void); + fn memalign(alignment: size_t, size: size_t) -> *mut c_void; + fn posix_memalign(memptr: *mut *mut c_void, alignment: size_t, size: size_t) -> i32; + fn aligned_alloc(alignment: size_t, size: size_t) -> *mut c_void; +} + +fn get_stats() -> MemtestStats { + let mut stats = MemtestStats { + total_allocations: 0, + total_deallocations: 0, + total_bytes_allocated: 0, + total_bytes_deallocated: 0, + current_bytes: 0, + peak_bytes: 0, + }; + unsafe { + memtest_get_stats(&mut stats as *mut MemtestStats); + } + stats +} + +fn reset_stats() { + memtest_reset_stats(); +} + +#[test] +fn test_malloc_free() { + unsafe { + reset_stats(); + let stats_after_reset = get_stats(); + + let size = 1024; + let ptr = malloc(size); + assert!(!ptr.is_null()); + + let stats_after_alloc = get_stats(); + // Check delta from reset + assert_eq!( + stats_after_alloc + .total_allocations + .saturating_sub(stats_after_reset.total_allocations), + 1 + ); + assert_eq!( + stats_after_alloc + .total_bytes_allocated + .saturating_sub(stats_after_reset.total_bytes_allocated), + size as u64 + ); + + free(ptr); + + let stats_after_free = get_stats(); + assert_eq!( + stats_after_free + .total_deallocations + .saturating_sub(stats_after_reset.total_deallocations), + 1 + ); + assert_eq!( + stats_after_free + .total_bytes_deallocated + .saturating_sub(stats_after_reset.total_bytes_deallocated), + size as u64 + ); + } +} + +#[test] +fn test_calloc_free() { + unsafe { + reset_stats(); + let stats_baseline = get_stats(); + + let count = 10; + let element_size = 100; + let total_size = count * element_size; + + let ptr = calloc(count, element_size); + assert!(!ptr.is_null()); + + // Verify memory is zeroed + let slice = std::slice::from_raw_parts(ptr as *const u8, total_size); + assert!(slice.iter().all(|&b| b == 0)); + + let stats = get_stats(); + assert_eq!( + stats + .total_allocations + .saturating_sub(stats_baseline.total_allocations), + 1 + ); + assert_eq!( + stats + .total_bytes_allocated + .saturating_sub(stats_baseline.total_bytes_allocated), + total_size as u64 + ); + + free(ptr); + + let stats = get_stats(); + assert_eq!( + stats + .total_deallocations + .saturating_sub(stats_baseline.total_deallocations), + 1 + ); + } +} + +#[test] +fn test_realloc() { + reset_stats(); + + unsafe { + // Start with malloc + let ptr1 = malloc(100); + assert!(!ptr1.is_null()); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 1); + assert_eq!(stats.total_bytes_allocated, 100); + + // Grow the allocation + let ptr2 = realloc(ptr1, 200); + assert!(!ptr2.is_null()); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 2); // realloc counts as new allocation + assert_eq!(stats.total_deallocations, 1); // old allocation freed + assert_eq!(stats.total_bytes_allocated, 300); // 100 + 200 + assert_eq!(stats.total_bytes_deallocated, 100); + assert_eq!(stats.current_bytes, 200); + + // Shrink the allocation + let ptr3 = realloc(ptr2, 50); + assert!(!ptr3.is_null()); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 3); + assert_eq!(stats.total_deallocations, 2); + assert_eq!(stats.current_bytes, 50); + + free(ptr3); + + let stats = get_stats(); + assert_eq!(stats.current_bytes, 0); + } +} + +#[test] +fn test_realloc_null_is_malloc() { + reset_stats(); + + unsafe { + // realloc(NULL, size) should behave like malloc + let ptr = realloc(ptr::null_mut(), 100); + assert!(!ptr.is_null()); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 1); + assert_eq!(stats.total_bytes_allocated, 100); + + free(ptr); + } +} + +#[test] +fn test_peak_tracking() { + unsafe { + reset_stats(); + let stats_baseline = get_stats(); + + let ptr1 = malloc(1000); + let ptr2 = malloc(500); + let ptr3 = malloc(2000); + + let stats = get_stats(); + let current_bytes = stats + .current_bytes + .saturating_sub(stats_baseline.current_bytes); + let peak_bytes = stats.peak_bytes.saturating_sub(stats_baseline.peak_bytes); + assert_eq!(current_bytes, 3500); + assert_eq!(peak_bytes, 3500); + + free(ptr3); + + let stats = get_stats(); + let current_bytes = stats + .current_bytes + .saturating_sub(stats_baseline.current_bytes); + let peak_bytes = stats.peak_bytes.saturating_sub(stats_baseline.peak_bytes); + assert_eq!(current_bytes, 1500); + assert_eq!(peak_bytes, 3500); // Peak should remain + + let ptr4 = malloc(1000); + + let stats = get_stats(); + let current_bytes = stats + .current_bytes + .saturating_sub(stats_baseline.current_bytes); + let peak_bytes = stats.peak_bytes.saturating_sub(stats_baseline.peak_bytes); + assert_eq!(current_bytes, 2500); + assert_eq!(peak_bytes, 3500); // Still the peak + + free(ptr1); + free(ptr2); + free(ptr4); + } +} + +#[test] +fn test_memalign() { + unsafe { + reset_stats(); + let stats_baseline = get_stats(); + + let alignment = 128; + let size = 1024; + + let ptr = memalign(alignment, size); + assert!(!ptr.is_null()); + + // Verify alignment + assert_eq!(ptr as usize % alignment, 0); + + let stats = get_stats(); + assert_eq!( + stats + .total_allocations + .saturating_sub(stats_baseline.total_allocations), + 1 + ); + assert_eq!( + stats + .total_bytes_allocated + .saturating_sub(stats_baseline.total_bytes_allocated), + size as u64 + ); + + free(ptr); + + let stats = get_stats(); + assert_eq!( + stats + .total_deallocations + .saturating_sub(stats_baseline.total_deallocations), + 1 + ); + } +} + +#[test] +fn test_posix_memalign() { + unsafe { + reset_stats(); + let stats_baseline = get_stats(); + + let alignment = 256; + let size = 2048; + let mut ptr: *mut c_void = ptr::null_mut(); + + let ret = posix_memalign(&mut ptr as *mut *mut c_void, alignment, size); + assert_eq!(ret, 0); + assert!(!ptr.is_null()); + + // Verify alignment + assert_eq!(ptr as usize % alignment, 0); + + let stats = get_stats(); + assert_eq!( + stats + .total_allocations + .saturating_sub(stats_baseline.total_allocations), + 1 + ); + assert_eq!( + stats + .total_bytes_allocated + .saturating_sub(stats_baseline.total_bytes_allocated), + size as u64 + ); + + free(ptr); + } +} + +#[test] +fn test_aligned_alloc() { + reset_stats(); + + unsafe { + let alignment = 64; + let size = 512; + + let ptr = aligned_alloc(alignment, size); + assert!(!ptr.is_null()); + + // Verify alignment + assert_eq!(ptr as usize % alignment, 0); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 1); + assert_eq!(stats.total_bytes_allocated, size as u64); + + free(ptr); + } +} + +#[test] +fn test_large_alignment() { + reset_stats(); + + unsafe { + // Test with page-sized alignment (4096 bytes) + let alignment = 4096; + let size = 8192; + + let ptr = memalign(alignment, size); + assert!(!ptr.is_null()); + assert_eq!(ptr as usize % alignment, 0); + + // Write to the memory to ensure it's actually usable + let slice = std::slice::from_raw_parts_mut(ptr as *mut u8, size); + slice[0] = 42; + slice[size - 1] = 43; + assert_eq!(slice[0], 42); + assert_eq!(slice[size - 1], 43); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 1); + assert_eq!(stats.total_bytes_allocated, size as u64); + + free(ptr); + + let stats = get_stats(); + assert_eq!(stats.current_bytes, 0); + } +} + +#[test] +fn test_mixed_aligned_unaligned() { + reset_stats(); + + unsafe { + let ptr1 = malloc(1000); // Unaligned + let ptr2 = memalign(128, 2000); // Aligned + let ptr3 = malloc(500); // Unaligned + let ptr4 = aligned_alloc(64, 1500); // Aligned + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 4); + assert_eq!(stats.total_bytes_allocated, 5000); + assert_eq!(stats.current_bytes, 5000); + + // Verify alignments + assert_eq!(ptr2 as usize % 128, 0); + assert_eq!(ptr4 as usize % 64, 0); + + free(ptr1); + free(ptr2); + free(ptr3); + free(ptr4); + + let stats = get_stats(); + assert_eq!(stats.total_deallocations, 4); + assert_eq!(stats.current_bytes, 0); + } +} + +#[test] +fn test_free_null() { + reset_stats(); + + unsafe { + // Freeing null should not crash or affect stats + free(ptr::null_mut()); + + let stats = get_stats(); + assert_eq!(stats.total_deallocations, 0); + } +} + +#[test] +fn test_reset_stats() { + unsafe { + let ptr1 = malloc(1000); + let ptr2 = malloc(2000); + + let stats = get_stats(); + assert!(stats.total_allocations > 0); + assert!(stats.total_bytes_allocated > 0); + + reset_stats(); + + let stats = get_stats(); + assert_eq!(stats.total_allocations, 0); + assert_eq!(stats.total_deallocations, 0); + assert_eq!(stats.total_bytes_allocated, 0); + assert_eq!(stats.total_bytes_deallocated, 0); + assert_eq!(stats.current_bytes, 0); + assert_eq!(stats.peak_bytes, 0); + + // Clean up (stats won't count these since we reset) + free(ptr1); + free(ptr2); + } +} + +#[test] +fn test_alignment_with_write() { + reset_stats(); + + unsafe { + // Test that aligned allocations are actually writable + let alignment = 256; + let size = 1024; + + let ptr = memalign(alignment, size); + assert!(!ptr.is_null()); + assert_eq!(ptr as usize % alignment, 0); + + // Write pattern to memory + let slice = std::slice::from_raw_parts_mut(ptr as *mut u8, size); + for (i, byte) in slice.iter_mut().enumerate() { + *byte = (i % 256) as u8; + } + + // Verify pattern + for (i, byte) in slice.iter().enumerate() { + assert_eq!(*byte, (i % 256) as u8); + } + + free(ptr); + } +} diff --git a/python/python/ci_benchmarks/README.md b/python/python/ci_benchmarks/README.md new file mode 100644 index 00000000000..0245d29166f --- /dev/null +++ b/python/python/ci_benchmarks/README.md @@ -0,0 +1,114 @@ +# CI Benchmarks + +This directory contains benchmarks that run in CI and report results to [bencher.dev](https://bencher.dev). + +## Structure + +``` +ci_benchmarks/ +├── benchmarks/ # Benchmark tests +│ ├── test_scan.py +│ ├── test_search.py +│ └── test_random_access.py +├── datagen/ # Dataset generation scripts +│ ├── gen_all.py # Generate all datasets +│ ├── basic.py # 10M row dataset +│ └── lineitems.py # TPC-H lineitem dataset +├── benchmark.py # IO/memory benchmark infrastructure +├── conftest.py # Pytest configuration +└── datasets.py # Dataset URI resolver (local vs GCS) +``` + +## Running Benchmarks Locally + +### 1. Generate test datasets + +```bash +python python/ci_benchmarks/datagen/gen_all.py +``` + +This creates datasets in `~/lance-benchmarks-ci-datasets/`. + +### 2. Run pytest-benchmark tests + +```bash +pytest python/ci_benchmarks/ --benchmark-only +``` + +To save timing results as JSON: + +```bash +pytest python/ci_benchmarks/ --benchmark-json results.json +``` + +## IO/Memory Benchmarks + +The `io_memory_benchmark` marker provides benchmarks that track both IO statistics +and memory allocations during the benchmark execution (not setup/teardown). + +### Writing IO/Memory Benchmarks + +```python +@pytest.mark.io_memory_benchmark() +def test_full_scan(io_mem_benchmark): + dataset_uri = get_dataset_uri("basic") + ds = lance.dataset(dataset_uri) + + def bench(dataset): + dataset.to_table() + + io_mem_benchmark(bench, ds) +``` + +The `io_mem_benchmark` fixture: +- Runs an optional warmup iteration (not measured) +- Tracks IO stats via `dataset.io_stats_incremental()` +- Optionally tracks memory via `lance-memtest` if preloaded + +### Running IO/Memory Benchmarks + +Without memory tracking: +```bash +pytest python/ci_benchmarks/benchmarks/test_search.py::test_io_mem_basic_btree_search -v +``` + +With memory tracking (Linux only): +```bash +LD_PRELOAD=$(lance-memtest) pytest python/ci_benchmarks/benchmarks/test_search.py::test_io_mem_basic_btree_search -v +``` + +### Output + +Terminal output shows a summary table: +``` +======================== IO/Memory Benchmark Statistics ======================== +Test Peak Mem Allocs Read IOPS Read Bytes +--------------------------------------------------------------------------------------- +test_io_mem_basic_btree_search[...] 3.6 MB 135,387 2 1.8 MB +``` + +To save results as JSON (Bencher Metric Format): +```bash +pytest ... --benchmark-stats-json stats.json +``` + +## Investigating memory use for a particular benchmark + +To investigate memory use for a particular benchmark, you can use the `bytehound` library. +After installing it, you can run a benchmark with memory profiling enabled: + +```shell +LD_PRELOAD=/usr/local/lib/libbytehound.so \ + pytest 'python/ci_benchmarks/benchmarks/test_search.py::test_io_mem_basic_btree_search[small_strings-equal]' -v +``` + +Then use the `bytehound` server to visualize the memory profiling data: + +```shell +bytehound server memory-profiling_*.dat +``` + +You can use time filters on the allocations view to see memory allocations at a specific point in time, +which can help you filter out allocations from setup. Once you have filters in place, you can use +the Flamegraph view (available from the menu in the upper right corner) to get a flamegraph of the +memory allocations in that time range. diff --git a/python/python/ci_benchmarks/benchmark.py b/python/python/ci_benchmarks/benchmark.py new file mode 100644 index 00000000000..e735b8e382f --- /dev/null +++ b/python/python/ci_benchmarks/benchmark.py @@ -0,0 +1,278 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +""" +Custom benchmark infrastructure for tracking IO and memory stats. + +This module provides an `io_memory_benchmark` marker and fixture that tracks: +- Peak memory usage +- Total allocations +- Read IOPS and bytes +- Write IOPS and bytes + +Usage: + @pytest.mark.io_memory_benchmark() + def test_something(benchmark): + def workload(dataset): + dataset.to_table() + benchmark(workload, dataset) +""" + +import json +from dataclasses import dataclass +from typing import Any, Callable, List + +import pytest + +# Try to import memtest, but don't fail if not available +try: + import memtest + + MEMTEST_AVAILABLE = memtest.is_preloaded() +except ImportError: + MEMTEST_AVAILABLE = False + + +@dataclass +class BenchmarkStats: + """Statistics collected during a benchmark run.""" + + # Memory stats (only populated if memtest is preloaded) + peak_bytes: int = 0 + total_allocations: int = 0 + + # IO stats + read_iops: int = 0 + read_bytes: int = 0 + write_iops: int = 0 + write_bytes: int = 0 + + +@dataclass +class BenchmarkResult: + """Result of a single benchmark test.""" + + name: str + stats: BenchmarkStats + + +# Global storage for benchmark results +_benchmark_results: List[BenchmarkResult] = [] + + +def _format_bytes(num_bytes: int) -> str: + """Format byte count as human-readable string.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if abs(num_bytes) < 1024.0: + return f"{num_bytes:.1f} {unit}" + num_bytes /= 1024.0 + return f"{num_bytes:.1f} PB" + + +class IOMemoryBenchmark: + """Benchmark fixture that tracks IO and memory during execution.""" + + def __init__(self, test_name: str): + self._test_name = test_name + self._stats = BenchmarkStats() + + def __call__( + self, + func: Callable, + dataset: Any, + warmup: bool = True, + ) -> Any: + """ + Run a benchmark function with IO and memory tracking. + + Parameters + ---------- + func : Callable + The function to benchmark. Should accept a dataset as first argument. + dataset : lance.LanceDataset + The dataset to pass to the function. + warmup : bool, default True + Whether to run a warmup iteration before measuring. + + Returns + ------- + Any + The return value of the benchmark function. + """ + # Warmup run (not measured) + if warmup: + func(dataset) + + # Reset IO stats before the measured run + dataset.io_stats_incremental() + + # Run with memory tracking if available + if MEMTEST_AVAILABLE: + memtest.reset_stats() + result = func(dataset) + mem_stats = memtest.get_stats() + self._stats.peak_bytes = mem_stats["peak_bytes"] + self._stats.total_allocations = mem_stats["total_allocations"] + else: + result = func(dataset) + + # Capture IO stats + io_stats = dataset.io_stats_incremental() + self._stats.read_iops = io_stats.read_iops + self._stats.read_bytes = io_stats.read_bytes + self._stats.write_iops = io_stats.write_iops + self._stats.write_bytes = io_stats.written_bytes + + return result + + def get_stats(self) -> BenchmarkStats: + """Get the collected statistics.""" + return self._stats + + +@pytest.fixture +def io_mem_benchmark(request): + """ + Fixture that provides IO and memory benchmarking. + + Only active for tests marked with @pytest.mark.io_memory_benchmark(). + For other tests, returns a no-op benchmark that just calls the function. + + Usage: + @pytest.mark.io_memory_benchmark() + def test_something(io_mem_benchmark): + def workload(dataset): + dataset.to_table() + io_mem_benchmark(workload, dataset) + """ + marker = request.node.get_closest_marker("io_memory_benchmark") + + if marker is None: + # Not an io_memory_benchmark test, return a simple passthrough + class PassthroughBenchmark: + def __call__(self, func, dataset, warmup=True): + return func(dataset) + + yield PassthroughBenchmark() + return + + test_name = request.node.name + tracker = IOMemoryBenchmark(test_name) + + yield tracker + + # Store results after test completes + stats = tracker.get_stats() + _benchmark_results.append(BenchmarkResult(name=test_name, stats=stats)) + + +def pytest_configure(config): + """Register the io_memory_benchmark marker.""" + config.addinivalue_line( + "markers", + "io_memory_benchmark(): Mark test as an IO/memory benchmark", + ) + + +def pytest_addoption(parser): + """Add command-line options for benchmark output.""" + group = parser.getgroup("io_memory_benchmark", "IO/memory benchmark options") + group.addoption( + "--benchmark-stats-json", + action="store", + default=None, + metavar="PATH", + help="Output path for benchmark stats JSON in Bencher Metric Format (BMF)", + ) + + +def pytest_terminal_summary(terminalreporter, exitstatus, config): + """Print benchmark statistics summary at the end of the test run.""" + if not _benchmark_results: + return + + terminalreporter.write_sep("=", "IO/Memory Benchmark Statistics") + + # Calculate column widths + name_width = max(len(r.name) for r in _benchmark_results) + name_width = max(name_width, len("Test")) + + # Header + if MEMTEST_AVAILABLE: + terminalreporter.write_line( + f"{'Test':<{name_width}} {'Peak Mem':>10} {'Allocs':>10} " + f"{'Read IOPS':>10} {'Read Bytes':>12} " + f"{'Write IOPS':>10} {'Write Bytes':>12}" + ) + terminalreporter.write_line("-" * (name_width + 72)) + else: + terminalreporter.write_line( + f"{'Test':<{name_width}} " + f"{'Read IOPS':>10} {'Read Bytes':>12} " + f"{'Write IOPS':>10} {'Write Bytes':>12}" + ) + terminalreporter.write_line("-" * (name_width + 50)) + + # Results sorted by read bytes (descending) + sorted_results = sorted( + _benchmark_results, key=lambda r: r.stats.read_bytes, reverse=True + ) + + for result in sorted_results: + s = result.stats + if MEMTEST_AVAILABLE: + terminalreporter.write_line( + f"{result.name:<{name_width}} " + f"{_format_bytes(s.peak_bytes):>10} " + f"{s.total_allocations:>10,} " + f"{s.read_iops:>10,} " + f"{_format_bytes(s.read_bytes):>12} " + f"{s.write_iops:>10,} " + f"{_format_bytes(s.write_bytes):>12}" + ) + else: + terminalreporter.write_line( + f"{result.name:<{name_width}} " + f"{s.read_iops:>10,} " + f"{_format_bytes(s.read_bytes):>12} " + f"{s.write_iops:>10,} " + f"{_format_bytes(s.write_bytes):>12}" + ) + + if not MEMTEST_AVAILABLE: + terminalreporter.write_line("") + terminalreporter.write_line( + "Note: Memory tracking not available. " + "Run with LD_PRELOAD=$(lance-memtest) to enable." + ) + + terminalreporter.write_line("") + + +def pytest_sessionfinish(session, exitstatus): + """Write benchmark results to JSON file if --benchmark-stats-json was specified.""" + if not _benchmark_results: + return + + output_path = session.config.getoption("--benchmark-stats-json") + if not output_path: + return + + # Convert to Bencher Metric Format (BMF) + bmf_output = {} + for result in _benchmark_results: + s = result.stats + bmf_output[result.name] = { + "read_iops": {"value": s.read_iops}, + "read_bytes": {"value": s.read_bytes}, + "write_iops": {"value": s.write_iops}, + "write_bytes": {"value": s.write_bytes}, + } + if MEMTEST_AVAILABLE: + bmf_output[result.name]["peak_memory_bytes"] = {"value": s.peak_bytes} + bmf_output[result.name]["total_allocations"] = { + "value": s.total_allocations + } + + with open(output_path, "w") as f: + json.dump(bmf_output, f, indent=2) diff --git a/python/python/ci_benchmarks/benchmarks/test_search.py b/python/python/ci_benchmarks/benchmarks/test_search.py index 7f0eb2f84b3..cf23de5324d 100644 --- a/python/python/ci_benchmarks/benchmarks/test_search.py +++ b/python/python/ci_benchmarks/benchmarks/test_search.py @@ -1,7 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors -import re import lance import pytest @@ -201,22 +200,10 @@ def test_basic_bitmap_search( do_basic_search(benchmark, filt, payload, use_cache) -IOPS = 0.0 - - -def set_iops(iops: float): - global IOPS - IOPS = iops - - -def iops_timer(): - return IOPS - - -@pytest.mark.benchmark(warmup=False, timer=iops_timer) +@pytest.mark.io_memory_benchmark() @pytest.mark.parametrize("filt", BASIC_BTREE_FILTERS, ids=BASIC_BTREE_FILTER_LABELS) @pytest.mark.parametrize("payload", ["small_strings", "integers"]) -def test_iops_basic_btree_search(benchmark, filt: str | None, payload: str): +def test_io_mem_basic_btree_search(io_mem_benchmark, filt: str | None, payload: str): dataset_uri = get_dataset_uri("basic") ds = lance.dataset(dataset_uri) @@ -224,23 +211,12 @@ def test_iops_basic_btree_search(benchmark, filt: str | None, payload: str): if payload is not None: columns = [payload] - def bench(): - plan = ds.scanner( + def bench(dataset): + dataset.to_table( columns=columns, filter=filt, with_row_id=True, batch_size=32 * 1024, - ).analyze_plan() - iops = re.search(r"iops=(\d+)", plan) - if iops is not None: - set_iops(float(iops.group(1))) - else: - set_iops(0.0) - - def clear_timer(): - set_iops(0.0) + ) - # We still do a warmup since caching may reduce IOPS and not just latency - benchmark.pedantic( - bench, warmup_rounds=1, rounds=1, iterations=1, setup=clear_timer - ) + io_mem_benchmark(bench, ds) diff --git a/python/python/ci_benchmarks/conftest.py b/python/python/ci_benchmarks/conftest.py new file mode 100644 index 00000000000..7ea42b773bb --- /dev/null +++ b/python/python/ci_benchmarks/conftest.py @@ -0,0 +1,5 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +# Import the benchmark plugin to register hooks and fixtures +pytest_plugins = ["ci_benchmarks.benchmark"] diff --git a/python/python/ci_benchmarks/datagen/lineitems.py b/python/python/ci_benchmarks/datagen/lineitems.py index b91c1c3b422..b79e37eda1c 100644 --- a/python/python/ci_benchmarks/datagen/lineitems.py +++ b/python/python/ci_benchmarks/datagen/lineitems.py @@ -9,7 +9,7 @@ from ci_benchmarks.datasets import get_dataset_uri -NUM_ROWS = 59986052 +NUM_ROWS = 59_986_052 def _gen_data(scale_factor: int): diff --git a/python/python/tests/test_memory.py b/python/python/tests/test_memory.py new file mode 100644 index 00000000000..39485c13f35 --- /dev/null +++ b/python/python/tests/test_memory.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +from pathlib import Path + +import lance +import pyarrow as pa +import pytest + +memtest = pytest.importorskip( + "memtest", reason="memtest is not available. Please install from ../memtest" +) + + +def test_insert_memory(tmp_path: Path): + def batch_generator(): + # 5MB batches -> 100MB total + for _ in range(20): + yield pa.RecordBatch.from_arrays( + [pa.array([b"x" * 1024 * 1024] * 5)], names=["data"] + ) + + reader = pa.RecordBatchReader.from_batches( + schema=pa.schema([("data", pa.binary())]), + batches=batch_generator(), + ) + + with memtest.track() as get_stats: + lance.write_dataset( + reader, + tmp_path / "test.lance", + ) + stats = get_stats() + + assert stats["peak_bytes"] >= 5 * 1024 * 1024 + assert stats["peak_bytes"] < 30 * 1024 * 1024