diff --git a/rust/lance-table/Cargo.toml b/rust/lance-table/Cargo.toml index 8f44e75f364..c9cfc828f70 100644 --- a/rust/lance-table/Cargo.toml +++ b/rust/lance-table/Cargo.toml @@ -72,5 +72,9 @@ features = ["protoc"] name = "row_id_index" harness = false +[[bench]] +name = "manifest_intern" +harness = false + [lints] workspace = true diff --git a/rust/lance-table/benches/manifest_intern.rs b/rust/lance-table/benches/manifest_intern.rs new file mode 100644 index 00000000000..09ba9264b21 --- /dev/null +++ b/rust/lance-table/benches/manifest_intern.rs @@ -0,0 +1,261 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +// Benchmarks use eprintln! to report memory stats alongside criterion output. +#![allow(clippy::print_stderr)] + +//! Benchmark for manifest fragment interning. +//! +//! Measures memory savings and deserialization throughput when interning +//! `DataFile.fields`, `DataFile.column_indices`, and +//! `RowDatasetVersionMeta::Inline` bytes across many fragments. + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use deepsize::DeepSizeOf; +use prost::Message; + +use lance_table::format::pb; +use lance_table::format::{DataFileFieldInterner, Fragment}; + +fn num_fragments() -> u64 { + std::env::var("BENCH_NUM_FRAGMENTS") + .map(|s| s.parse().unwrap()) + .unwrap_or(100_000) +} + +/// Build a vector of protobuf DataFragment messages that simulate a +/// homogeneous, post-compaction table: every fragment has the same field +/// list, column indices, and version metadata bytes. +fn make_uniform_pb_fragments(n: u64, num_fields: usize) -> Vec { + let fields: Vec = (0..num_fields as i32).collect(); + let column_indices: Vec = (0..num_fields as i32).collect(); + + // Simulate version metadata: a small protobuf-encoded payload + // (identical across all fragments post-compaction) + let version_bytes: Vec = { + let seq = pb::RowDatasetVersionSequence { + runs: vec![pb::RowDatasetVersionRun { + span: Some(pb::U64Segment { + segment: Some(pb::u64_segment::Segment::Range(pb::u64_segment::Range { + start: 0, + end: 1000, + })), + }), + version: 42, + }], + }; + seq.encode_to_vec() + }; + + (0..n) + .map(|i| pb::DataFragment { + id: i, + files: vec![pb::DataFile { + path: format!("data/{i}.lance"), + fields: fields.clone(), + column_indices: column_indices.clone(), + file_major_version: 2, + file_minor_version: 0, + file_size_bytes: 0, + base_id: None, + }], + deletion_file: None, + row_id_sequence: None, + physical_rows: 1000, + last_updated_at_version_sequence: Some( + pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions( + version_bytes.clone(), + ), + ), + created_at_version_sequence: Some( + pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions( + version_bytes.clone(), + ), + ), + }) + .collect() +} + +/// Deserialize protobuf fragments WITHOUT interning (baseline). +fn deserialize_without_interning(protos: &[pb::DataFragment]) -> Vec { + protos + .iter() + .map(|p| Fragment::try_from(p.clone()).unwrap()) + .collect() +} + +/// Deserialize protobuf fragments WITH interning. +fn deserialize_with_interning(protos: &[pb::DataFragment]) -> Vec { + let mut interner = DataFileFieldInterner::default(); + protos + .iter() + .map(|p| interner.intern_fragment(p.clone()).unwrap()) + .collect() +} + +/// Build fragments where each group shares the same version metadata, +/// simulating many small appends without compaction. +fn make_diverse_pb_fragments( + n: u64, + num_fields: usize, + unique_versions: u64, +) -> Vec { + let fields: Vec = (0..num_fields as i32).collect(); + let column_indices: Vec = (0..num_fields as i32).collect(); + let group_size = n / unique_versions; + + let version_payloads: Vec> = (0..unique_versions) + .map(|v| { + let seq = pb::RowDatasetVersionSequence { + runs: vec![pb::RowDatasetVersionRun { + span: Some(pb::U64Segment { + segment: Some(pb::u64_segment::Segment::Range(pb::u64_segment::Range { + start: 0, + end: 1000, + })), + }), + version: v, + }], + }; + seq.encode_to_vec() + }) + .collect(); + + (0..n) + .map(|i| { + let version_idx = (i / group_size).min(unique_versions - 1) as usize; + pb::DataFragment { + id: i, + files: vec![pb::DataFile { + path: format!("data/{i}.lance"), + fields: fields.clone(), + column_indices: column_indices.clone(), + file_major_version: 2, + file_minor_version: 0, + file_size_bytes: 0, + base_id: None, + }], + deletion_file: None, + row_id_sequence: None, + physical_rows: 1000, + last_updated_at_version_sequence: Some( + pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions( + version_payloads[version_idx].clone(), + ), + ), + created_at_version_sequence: Some( + pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions( + version_payloads[version_idx].clone(), + ), + ), + } + }) + .collect() +} + +fn bench_deserialization(c: &mut Criterion) { + let mut group = c.benchmark_group("manifest_intern"); + let n = num_fragments(); + + for num_fields in [10, 50] { + let protos = make_uniform_pb_fragments(n, num_fields); + + group.bench_with_input( + BenchmarkId::new("deserialize_no_intern", num_fields), + &num_fields, + |b, _| { + b.iter(|| deserialize_without_interning(&protos)); + }, + ); + + group.bench_with_input( + BenchmarkId::new("deserialize_with_intern", num_fields), + &num_fields, + |b, _| { + b.iter(|| deserialize_with_interning(&protos)); + }, + ); + } + + // Benchmark with many unique version payloads + for unique_versions in [10, 100, 500] { + let protos = make_diverse_pb_fragments(n, 10, unique_versions); + + group.bench_with_input( + BenchmarkId::new("deserialize_no_intern_diverse", unique_versions), + &unique_versions, + |b, _| { + b.iter(|| deserialize_without_interning(&protos)); + }, + ); + + group.bench_with_input( + BenchmarkId::new("deserialize_with_intern_diverse", unique_versions), + &unique_versions, + |b, _| { + b.iter(|| deserialize_with_interning(&protos)); + }, + ); + } + + group.finish(); +} + +fn bench_memory(c: &mut Criterion) { + let mut group = c.benchmark_group("manifest_memory"); + let n = num_fragments(); + + for num_fields in [10, 50] { + let protos = make_uniform_pb_fragments(n, num_fields); + + let no_intern = deserialize_without_interning(&protos); + let with_intern = deserialize_with_interning(&protos); + + let size_no_intern = no_intern.deep_size_of(); + let size_with_intern = with_intern.deep_size_of(); + + eprintln!( + "\n[{} fragments, {} fields] Memory without interning: {:.2} MB", + n, + num_fields, + size_no_intern as f64 / 1_048_576.0 + ); + eprintln!( + "[{} fragments, {} fields] Memory with interning: {:.2} MB", + n, + num_fields, + size_with_intern as f64 / 1_048_576.0 + ); + eprintln!( + "[{} fragments, {} fields] Savings: {:.2} MB ({:.1}%)", + n, + num_fields, + (size_no_intern - size_with_intern) as f64 / 1_048_576.0, + (1.0 - size_with_intern as f64 / size_no_intern as f64) * 100.0 + ); + + // Benchmark deep_size_of measurement itself (sanity check) + group.bench_with_input( + BenchmarkId::new("deep_size_of_interned", num_fields), + &num_fields, + |b, _| { + b.iter(|| with_intern.deep_size_of()); + }, + ); + + drop(no_intern); + drop(with_intern); + } + + group.finish(); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default().with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); + targets = bench_deserialization, bench_memory +); +#[cfg(not(target_os = "linux"))] +criterion_group!(benches, bench_deserialization, bench_memory); +criterion_main!(benches); diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 621198e329e..e1643b0712e 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -233,33 +233,119 @@ impl TryFrom for DataFile { } } -/// Interns `fields` and `column_indices` slices so that `DataFile` instances -/// with identical content share a single `Arc<[i32]>` allocation. +/// Interns repeated data so that fragments with identical content share a +/// single heap allocation via `Arc`. /// /// At 20M fragments the deduplication typically saves multiple GB of heap -/// because every fragment in a homogeneous table carries the same field list. +/// because every fragment in a homogeneous table carries the same field list, +/// and post-compaction fragments share identical version metadata bytes. +/// +/// Uses a `Vec`-based linear scan when the cache is small (<=16 entries) +/// and upgrades to `HashMap` for larger caches. In the common homogeneous +/// case (1-3 unique values), linear scan avoids per-fragment hashing overhead. #[derive(Default)] pub struct DataFileFieldInterner { - fields: HashMap, Arc<[i32]>>, - column_indices: HashMap, Arc<[i32]>>, + fields: InternCache, + column_indices: InternCache, + inline_bytes: InternCache, +} + +/// A cache that uses linear scan for small sizes and HashMap for large. +/// The threshold is chosen so that scan + compare is cheaper than hash for +/// typical payload sizes (20-200 bytes). +enum InternCache { + Small(Vec>), + Large(HashMap, ()>), +} + +const INTERN_CACHE_UPGRADE_THRESHOLD: usize = 16; + +impl Default for InternCache { + fn default() -> Self { + Self::Small(Vec::new()) + } +} + +impl InternCache { + fn intern(&mut self, v: Vec) -> Arc<[T]> { + match self { + Self::Small(entries) => { + for existing in entries.iter() { + if existing.as_ref() == v.as_slice() { + return existing.clone(); + } + } + let arc: Arc<[T]> = Arc::from(v); + entries.push(arc.clone()); + if entries.len() > INTERN_CACHE_UPGRADE_THRESHOLD { + let mut map = HashMap::with_capacity(entries.len()); + for e in entries.drain(..) { + map.insert(e, ()); + } + *self = Self::Large(map); + } + arc + } + Self::Large(map) => { + if let Some((existing, _)) = map.get_key_value(v.as_slice()) { + existing.clone() + } else { + let arc: Arc<[T]> = Arc::from(v); + map.insert(arc.clone(), ()); + arc + } + } + } + } } impl DataFileFieldInterner { - /// Intern a `Vec`: if the same content was seen before, return a - /// clone of the existing `Arc`; otherwise store and return a new one. - fn intern(cache: &mut HashMap, Arc<[i32]>>, v: Vec) -> Arc<[i32]> { - cache - .entry(v) - .or_insert_with_key(|k| Arc::from(k.as_slice())) - .clone() + /// Intern a `RowDatasetVersionMeta`, deduplicating inline byte payloads. + /// Accepts the protobuf oneof value directly to avoid an intermediate + /// `Arc<[u8]>` allocation that would need to be `.to_vec()`'d for the key lookup. + fn intern_last_updated_version_meta( + cache: &mut InternCache, + pb: pb::data_fragment::LastUpdatedAtVersionSequence, + ) -> Result { + match pb { + pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => { + Ok(RowDatasetVersionMeta::Inline(cache.intern(data))) + } + pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions( + file, + ) => Ok(RowDatasetVersionMeta::External(ExternalFile { + path: file.path, + offset: file.offset, + size: file.size, + })), + } + } + + /// Intern a `RowDatasetVersionMeta`, deduplicating inline byte payloads. + fn intern_created_version_meta( + cache: &mut InternCache, + pb: pb::data_fragment::CreatedAtVersionSequence, + ) -> Result { + match pb { + pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => { + Ok(RowDatasetVersionMeta::Inline(cache.intern(data))) + } + pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => { + Ok(RowDatasetVersionMeta::External(ExternalFile { + path: file.path, + offset: file.offset, + size: file.size, + })) + } + } } /// Convert a protobuf `DataFile`, interning `fields` and `column_indices`. pub fn intern_data_file(&mut self, proto: pb::DataFile) -> Result { Ok(DataFile { path: proto.path, - fields: Self::intern(&mut self.fields, proto.fields), - column_indices: Self::intern(&mut self.column_indices, proto.column_indices), + fields: self.fields.intern(proto.fields), + column_indices: self.column_indices.intern(proto.column_indices), file_major_version: proto.file_major_version, file_minor_version: proto.file_minor_version, file_size_bytes: CachedFileSize::new(proto.file_size_bytes), @@ -267,13 +353,21 @@ impl DataFileFieldInterner { }) } - /// Convert a protobuf `DataFragment`, interning fields within its data files. + /// Convert a protobuf `DataFragment`, interning fields and version metadata. pub fn intern_fragment(&mut self, p: pb::DataFragment) -> Result { let physical_rows = if p.physical_rows > 0 { Some(p.physical_rows as usize) } else { None }; + let last_updated_at_version_meta = p + .last_updated_at_version_sequence + .map(|pb| Self::intern_last_updated_version_meta(&mut self.inline_bytes, pb)) + .transpose()?; + let created_at_version_meta = p + .created_at_version_sequence + .map(|pb| Self::intern_created_version_meta(&mut self.inline_bytes, pb)) + .transpose()?; Ok(Fragment { id: p.id, files: p @@ -284,14 +378,8 @@ impl DataFileFieldInterner { deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?, row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?, physical_rows, - last_updated_at_version_meta: p - .last_updated_at_version_sequence - .map(RowDatasetVersionMeta::try_from) - .transpose()?, - created_at_version_meta: p - .created_at_version_sequence - .map(RowDatasetVersionMeta::try_from) - .transpose()?, + last_updated_at_version_meta, + created_at_version_meta, }) } } diff --git a/rust/lance-table/src/rowids/version.rs b/rust/lance-table/src/rowids/version.rs index f1c528c91ef..80f3d06db60 100644 --- a/rust/lance-table/src/rowids/version.rs +++ b/rust/lance-table/src/rowids/version.rs @@ -7,10 +7,14 @@ //! update version for each row in a Lance dataset, enabling efficient //! cross-version diff operations. +use std::sync::Arc; + use deepsize::DeepSizeOf; use lance_core::Error; use lance_core::Result; use prost::Message; +use serde::de::Deserializer; +use serde::ser::Serializer; use serde::{Deserialize, Serialize}; use crate::format::{ExternalFile, Fragment, pb}; @@ -210,19 +214,59 @@ impl<'a> Iterator for VersionsIter<'a> { /// Metadata about the location of dataset version sequence data /// Following the same pattern as RowIdMeta -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] +/// +/// When stored inline, identical byte sequences are shared across fragments +/// via `Arc<[u8]>` to reduce manifest memory for large tables. +#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] pub enum RowDatasetVersionMeta { /// Small sequences stored inline in the fragment metadata - Inline(Vec), + Inline(Arc<[u8]>), /// Large sequences stored in external files External(ExternalFile), } +// Custom Serialize: convert Arc<[u8]> to slice for transparent JSON output +impl Serialize for RowDatasetVersionMeta { + fn serialize(&self, serializer: S) -> std::result::Result { + #[derive(Serialize)] + #[serde(untagged)] + enum Helper<'a> { + Inline { inline: &'a [u8] }, + External { external: &'a ExternalFile }, + } + + match self { + Self::Inline(data) => Helper::Inline { + inline: data.as_ref(), + } + .serialize(serializer), + Self::External(file) => Helper::External { external: file }.serialize(serializer), + } + } +} + +// Custom Deserialize: read Vec and convert to Arc<[u8]> +impl<'de> Deserialize<'de> for RowDatasetVersionMeta { + fn deserialize>(deserializer: D) -> std::result::Result { + #[derive(Deserialize)] + #[serde(untagged)] + enum Helper { + Inline { inline: Vec }, + External { external: ExternalFile }, + } + + match Helper::deserialize(deserializer)? { + Helper::Inline { inline } => Ok(Self::Inline(Arc::from(inline))), + Helper::External { external } => Ok(Self::External(external)), + } + } +} + impl RowDatasetVersionMeta { /// Create inline metadata from a version sequence pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result { let bytes = write_dataset_versions(sequence); - Ok(Self::Inline(bytes)) + Ok(Self::Inline(Arc::from(bytes))) } /// Create external metadata reference @@ -248,7 +292,7 @@ pub fn last_updated_at_version_meta_to_pb( meta.as_ref().map(|m| match m { RowDatasetVersionMeta::Inline(data) => { pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions( - data.clone(), + data.to_vec(), ) } RowDatasetVersionMeta::External(file) => { @@ -269,7 +313,7 @@ pub fn created_at_version_meta_to_pb( ) -> Option { meta.as_ref().map(|m| match m { RowDatasetVersionMeta::Inline(data) => { - pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.clone()) + pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.to_vec()) } RowDatasetVersionMeta::External(file) => { pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions( @@ -561,7 +605,7 @@ impl TryFrom for RowDatasetVers fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result { match value { pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => { - Ok(Self::Inline(data)) + Ok(Self::Inline(Arc::from(data))) } pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions( file, @@ -580,7 +624,7 @@ impl TryFrom for RowDatasetVersionM fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result { match value { pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => { - Ok(Self::Inline(data)) + Ok(Self::Inline(Arc::from(data))) } pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => { Ok(Self::External(ExternalFile {