From 313fdf30f2fc9d0dd94accb8ed4f4fdfb156c415 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 30 Dec 2025 08:51:37 -0800 Subject: [PATCH 1/2] perf: add benchmarks for compressed encode/decode This adds some simple benchmarks for encoding/decoding of a string-valued column under lz4 and zstd compression. A benchmark for parallel decoding is also included. --- rust/lance-encoding/Cargo.toml | 4 + rust/lance-encoding/benches/decoder.rs | 217 ++++++++++++++++++++++++- rust/lance-encoding/benches/encoder.rs | 91 +++++++++++ 3 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 rust/lance-encoding/benches/encoder.rs diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 2e233b170ca..c8f23f9b914 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -76,6 +76,10 @@ features = ["protoc"] name = "decoder" harness = false +[[bench]] +name = "encoder" +harness = false + [[bench]] name = "buffer" harness = false diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 16d3faa9d97..c59ef23820a 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -6,13 +6,17 @@ use arrow_array::{RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use criterion::{criterion_group, criterion_main, Criterion}; +use futures::StreamExt; use lance_core::cache::LanceCache; use lance_datagen::ArrayGeneratorExt; use lance_encoding::{ - decoder::{DecoderPlugins, FilterExpression}, + decoder::{ + create_decode_stream, DecodeBatchScheduler, DecoderConfig, DecoderPlugins, FilterExpression, + }, encoder::{default_encoding_strategy, encode_batch, EncodingOptions}, version::LanceFileVersion, }; +use tokio::sync::mpsc::unbounded_channel; use rand::Rng; @@ -348,18 +352,225 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { }); } +fn bench_decode_compressed(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("decode_compressed"); + + const NUM_ROWS: usize = 5_000_000; + const NUM_COLUMNS: usize = 10; + + // Generate compressible string data - high cardinality but compressible + // (unique values to avoid dictionary encoding, repeated prefix for compression) + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + // Disable dictionary encoding to ensure we hit the compression path + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + // Force miniblock encoding (the path that benefits from compressor caching) + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + // V2_2+ required for general compression + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + // Encode once during setup + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap(); + + group.throughput(criterion::Throughput::Elements( + (NUM_ROWS * NUM_COLUMNS) as u64, + )); + group.bench_function( + format!("{}_strings_{}cols", compression, NUM_COLUMNS), + |b| { + b.iter(|| { + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + false, + LanceFileVersion::V2_2, + Some(Arc::new(LanceCache::no_cache())), + )) + .unwrap(); + assert_eq!(data.num_rows(), batch.num_rows()); + }) + }, + ); + } +} + +/// Benchmark parallel decoding with multiple concurrent batch decode tasks. +/// This creates contention on the shared decompressor mutex when multiple +/// batches from the same page are decoded in parallel. +fn bench_decode_compressed_parallel(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("decode_compressed_parallel"); + + const NUM_ROWS: u64 = 1_000_000; + const NUM_COLUMNS: usize = 10; + // Small batch size to create many batches that will contend on the same decompressor + const BATCH_SIZE: u32 = 100_000; + + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS as usize).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap(); + + let encoded = Arc::new(encoded); + + // Test with different parallelism levels to see impact of mutex contention + // parallelism=1 is sequential (no contention), higher values cause contention + for parallelism in [1, 8] { + group.throughput(criterion::Throughput::Elements( + NUM_ROWS * NUM_COLUMNS as u64, + )); + group.bench_function( + format!( + "{}_{}cols_parallel_{}", + compression, NUM_COLUMNS, parallelism + ), + |b| { + b.iter(|| { + rt.block_on(async { + let io_scheduler = Arc::new(lance_encoding::BufferScheduler::new( + encoded.data.clone(), + )) + as Arc; + let cache = Arc::new(LanceCache::no_cache()); + let filter = FilterExpression::no_filter(); + + let mut decode_scheduler = DecodeBatchScheduler::try_new( + encoded.schema.as_ref(), + &encoded.top_level_columns, + &encoded.page_table, + &vec![], + encoded.num_rows, + Arc::::default(), + io_scheduler.clone(), + cache, + &filter, + &DecoderConfig::default(), + ) + .await + .unwrap(); + + let (tx, rx) = unbounded_channel(); + decode_scheduler.schedule_range( + 0..encoded.num_rows, + &filter, + tx, + io_scheduler, + ); + + let decode_stream = create_decode_stream( + &encoded.schema, + encoded.num_rows, + BATCH_SIZE, + true, // is_structural for V2_2 + false, + rx, + ) + .unwrap(); + + // Buffer multiple batch decodes in parallel - this causes contention + let batches: Vec<_> = decode_stream + .map(|task| task.task) + .buffered(parallelism) + .collect() + .await; + + let total_rows: usize = + batches.iter().map(|b| b.as_ref().unwrap().num_rows()).sum(); + assert_eq!(total_rows, NUM_ROWS as usize); + }) + }) + }, + ); + } + } +} + #[cfg(target_os = "linux")] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10) .with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct, - bench_decode_str_with_fixed_size_binary_encoding); + bench_decode_str_with_fixed_size_binary_encoding, bench_decode_compressed, + bench_decode_compressed_parallel); // Non-linux version does not support pprof. #[cfg(not(target_os = "linux"))] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10); - targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct); + targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct, + bench_decode_compressed, bench_decode_compressed_parallel); criterion_main!(benches); diff --git a/rust/lance-encoding/benches/encoder.rs b/rust/lance-encoding/benches/encoder.rs new file mode 100644 index 00000000000..6a0d5b94ad2 --- /dev/null +++ b/rust/lance-encoding/benches/encoder.rs @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::{collections::HashMap, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use lance_encoding::{ + encoder::{default_encoding_strategy, encode_batch, EncodingOptions}, + version::LanceFileVersion, +}; + +fn bench_encode_compressed(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("encode_compressed"); + + const NUM_ROWS: usize = 5_000_000; + const NUM_COLUMNS: usize = 10; + + // Generate compressible string data - high cardinality but compressible + // (unique values to avoid dictionary encoding, repeated prefix for compression) + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + // Disable dictionary encoding to ensure we hit the compression path + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + // Force miniblock encoding (the path that benefits from compressor caching) + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + // V2_2+ required for general compression + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + group.throughput(criterion::Throughput::Elements( + (NUM_ROWS * NUM_COLUMNS) as u64, + )); + group.bench_function( + format!("{}_strings_{}cols", compression, NUM_COLUMNS), + |b| { + b.iter(|| { + rt.block_on(encode_batch( + &data, + lance_schema.clone(), + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap() + }) + }, + ); + } +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10) + .with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); + targets = bench_encode_compressed); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_encode_compressed); + +criterion_main!(benches); From c7320f709e1781e6bb13e84f2a87ee42e3e40db7 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 30 Dec 2025 11:49:16 -0800 Subject: [PATCH 2/2] perf: reuse zstd compressors in encoding Prior to this commit, the ZstdBufferCompressor would construct a new zstd stream encoder on every call to compress. With this change we create one compression context for the ZstdBufferCompressor, and reuse it across calls to compress. Reuse is not implemented for lz4 compression or for decompression. These were both explored but did not bring meaningful benefits over the existing code. --- .../src/encodings/physical/block.rs | 67 ++++++++++++++++--- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/block.rs b/rust/lance-encoding/src/encodings/physical/block.rs index fa48bee33c9..9104fd9bdd1 100644 --- a/rust/lance-encoding/src/encodings/physical/block.rs +++ b/rust/lance-encoding/src/encodings/physical/block.rs @@ -137,20 +137,59 @@ pub trait BufferCompressor: std::fmt::Debug + Send + Sync { #[cfg(feature = "zstd")] mod zstd { use std::io::{Cursor, Write}; + use std::sync::{Mutex, OnceLock}; use super::*; - use ::zstd::bulk::decompress_to_buffer; + use ::zstd::bulk::{decompress_to_buffer, Compressor}; use ::zstd::stream::copy_decode; - #[derive(Debug, Default)] + /// A zstd buffer compressor that lazily creates and reuses compression contexts. + /// + /// The compression context is cached to enable reuse across chunks within a + /// page. It is lazily initialized to prevent it from getting initialized on + /// decode-only codepaths. + /// + /// Reuse is not implemented for decompression, only for compression: + /// * The single-threaded benefit of reuse was negligible when measured. + /// * Decompressors can get shared across threads, leading to mutex + /// contention if the same strategy is used as for compression here. This + /// should be mitigable with pooling but we can skip the complexity until a + /// need is demonstrated. The multithreaded decode benchmark effectively + /// demonstrates this scenario. pub struct ZstdBufferCompressor { compression_level: i32, + compressor: OnceLock>, String>>, + } + + impl std::fmt::Debug for ZstdBufferCompressor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ZstdBufferCompressor") + .field("compression_level", &self.compression_level) + .finish() + } } impl ZstdBufferCompressor { pub fn new(compression_level: i32) -> Self { - Self { compression_level } + Self { + compression_level, + compressor: OnceLock::new(), + } + } + + fn get_compressor(&self) -> Result<&Mutex>> { + self.compressor + .get_or_init(|| { + Compressor::new(self.compression_level) + .map(Mutex::new) + .map_err(|e| e.to_string()) + }) + .as_ref() + .map_err(|e| Error::Internal { + message: format!("Failed to create zstd compressor: {}", e), + location: location!(), + }) } // https://datatracker.ietf.org/doc/html/rfc8878 @@ -213,13 +252,23 @@ mod zstd { impl BufferCompressor for ZstdBufferCompressor { fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { output_buf.write_all(&(input_buf.len() as u64).to_le_bytes())?; - let mut encoder = ::zstd::stream::Encoder::new(output_buf, self.compression_level)?; - encoder.write_all(input_buf)?; - match encoder.finish() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - } + let max_compressed_size = ::zstd::zstd_safe::compress_bound(input_buf.len()); + let start_pos = output_buf.len(); + output_buf.resize(start_pos + max_compressed_size, 0); + + let compressed_size = self + .get_compressor()? + .lock() + .unwrap() + .compress_to_buffer(input_buf, &mut output_buf[start_pos..]) + .map_err(|e| Error::Internal { + message: format!("Zstd compression error: {}", e), + location: location!(), + })?; + + output_buf.truncate(start_pos + compressed_size); + Ok(()) } fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> {