diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 2e233b170ca..c8f23f9b914 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -76,6 +76,10 @@ features = ["protoc"] name = "decoder" harness = false +[[bench]] +name = "encoder" +harness = false + [[bench]] name = "buffer" harness = false diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 16d3faa9d97..c59ef23820a 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -6,13 +6,17 @@ use arrow_array::{RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use criterion::{criterion_group, criterion_main, Criterion}; +use futures::StreamExt; use lance_core::cache::LanceCache; use lance_datagen::ArrayGeneratorExt; use lance_encoding::{ - decoder::{DecoderPlugins, FilterExpression}, + decoder::{ + create_decode_stream, DecodeBatchScheduler, DecoderConfig, DecoderPlugins, FilterExpression, + }, encoder::{default_encoding_strategy, encode_batch, EncodingOptions}, version::LanceFileVersion, }; +use tokio::sync::mpsc::unbounded_channel; use rand::Rng; @@ -348,18 +352,225 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { }); } +fn bench_decode_compressed(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("decode_compressed"); + + const NUM_ROWS: usize = 5_000_000; + const NUM_COLUMNS: usize = 10; + + // Generate compressible string data - high cardinality but compressible + // (unique values to avoid dictionary encoding, repeated prefix for compression) + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + // Disable dictionary encoding to ensure we hit the compression path + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + // Force miniblock encoding (the path that benefits from compressor caching) + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + // V2_2+ required for general compression + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + // Encode once during setup + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap(); + + group.throughput(criterion::Throughput::Elements( + (NUM_ROWS * NUM_COLUMNS) as u64, + )); + group.bench_function( + format!("{}_strings_{}cols", compression, NUM_COLUMNS), + |b| { + b.iter(|| { + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + false, + LanceFileVersion::V2_2, + Some(Arc::new(LanceCache::no_cache())), + )) + .unwrap(); + assert_eq!(data.num_rows(), batch.num_rows()); + }) + }, + ); + } +} + +/// Benchmark parallel decoding with multiple concurrent batch decode tasks. +/// This creates contention on the shared decompressor mutex when multiple +/// batches from the same page are decoded in parallel. +fn bench_decode_compressed_parallel(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("decode_compressed_parallel"); + + const NUM_ROWS: u64 = 1_000_000; + const NUM_COLUMNS: usize = 10; + // Small batch size to create many batches that will contend on the same decompressor + const BATCH_SIZE: u32 = 100_000; + + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS as usize).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + let encoded = rt + .block_on(encode_batch( + &data, + lance_schema, + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap(); + + let encoded = Arc::new(encoded); + + // Test with different parallelism levels to see impact of mutex contention + // parallelism=1 is sequential (no contention), higher values cause contention + for parallelism in [1, 8] { + group.throughput(criterion::Throughput::Elements( + NUM_ROWS * NUM_COLUMNS as u64, + )); + group.bench_function( + format!( + "{}_{}cols_parallel_{}", + compression, NUM_COLUMNS, parallelism + ), + |b| { + b.iter(|| { + rt.block_on(async { + let io_scheduler = Arc::new(lance_encoding::BufferScheduler::new( + encoded.data.clone(), + )) + as Arc; + let cache = Arc::new(LanceCache::no_cache()); + let filter = FilterExpression::no_filter(); + + let mut decode_scheduler = DecodeBatchScheduler::try_new( + encoded.schema.as_ref(), + &encoded.top_level_columns, + &encoded.page_table, + &vec![], + encoded.num_rows, + Arc::::default(), + io_scheduler.clone(), + cache, + &filter, + &DecoderConfig::default(), + ) + .await + .unwrap(); + + let (tx, rx) = unbounded_channel(); + decode_scheduler.schedule_range( + 0..encoded.num_rows, + &filter, + tx, + io_scheduler, + ); + + let decode_stream = create_decode_stream( + &encoded.schema, + encoded.num_rows, + BATCH_SIZE, + true, // is_structural for V2_2 + false, + rx, + ) + .unwrap(); + + // Buffer multiple batch decodes in parallel - this causes contention + let batches: Vec<_> = decode_stream + .map(|task| task.task) + .buffered(parallelism) + .collect() + .await; + + let total_rows: usize = + batches.iter().map(|b| b.as_ref().unwrap().num_rows()).sum(); + assert_eq!(total_rows, NUM_ROWS as usize); + }) + }) + }, + ); + } + } +} + #[cfg(target_os = "linux")] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10) .with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct, - bench_decode_str_with_fixed_size_binary_encoding); + bench_decode_str_with_fixed_size_binary_encoding, bench_decode_compressed, + bench_decode_compressed_parallel); // Non-linux version does not support pprof. #[cfg(not(target_os = "linux"))] criterion_group!( name=benches; config = Criterion::default().significance_level(0.1).sample_size(10); - targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct); + targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct, + bench_decode_compressed, bench_decode_compressed_parallel); criterion_main!(benches); diff --git a/rust/lance-encoding/benches/encoder.rs b/rust/lance-encoding/benches/encoder.rs new file mode 100644 index 00000000000..6a0d5b94ad2 --- /dev/null +++ b/rust/lance-encoding/benches/encoder.rs @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::{collections::HashMap, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use lance_encoding::{ + encoder::{default_encoding_strategy, encode_batch, EncodingOptions}, + version::LanceFileVersion, +}; + +fn bench_encode_compressed(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group("encode_compressed"); + + const NUM_ROWS: usize = 5_000_000; + const NUM_COLUMNS: usize = 10; + + // Generate compressible string data - high cardinality but compressible + // (unique values to avoid dictionary encoding, repeated prefix for compression) + let array: Arc = Arc::new(arrow_array::StringArray::from_iter_values( + (0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)), + )); + + for compression in ["zstd", "lz4"] { + let mut metadata = HashMap::new(); + metadata.insert( + "lance-encoding:compression".to_string(), + compression.to_string(), + ); + // Disable dictionary encoding to ensure we hit the compression path + metadata.insert( + "lance-encoding:dict-divisor".to_string(), + "100000".to_string(), + ); + // Force miniblock encoding (the path that benefits from compressor caching) + metadata.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + let fields: Vec = (0..NUM_COLUMNS) + .map(|i| { + Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone()) + }) + .collect(); + let columns: Vec> = + (0..NUM_COLUMNS).map(|_| array.clone()).collect(); + let schema = Arc::new(Schema::new(fields)); + let data = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let lance_schema = + Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap()); + // V2_2+ required for general compression + let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2); + + group.throughput(criterion::Throughput::Elements( + (NUM_ROWS * NUM_COLUMNS) as u64, + )); + group.bench_function( + format!("{}_strings_{}cols", compression, NUM_COLUMNS), + |b| { + b.iter(|| { + rt.block_on(encode_batch( + &data, + lance_schema.clone(), + encoding_strategy.as_ref(), + &EncodingOptions::default(), + )) + .unwrap() + }) + }, + ); + } +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10) + .with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); + targets = bench_encode_compressed); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_encode_compressed); + +criterion_main!(benches); diff --git a/rust/lance-encoding/src/encodings/physical/block.rs b/rust/lance-encoding/src/encodings/physical/block.rs index fa48bee33c9..9104fd9bdd1 100644 --- a/rust/lance-encoding/src/encodings/physical/block.rs +++ b/rust/lance-encoding/src/encodings/physical/block.rs @@ -137,20 +137,59 @@ pub trait BufferCompressor: std::fmt::Debug + Send + Sync { #[cfg(feature = "zstd")] mod zstd { use std::io::{Cursor, Write}; + use std::sync::{Mutex, OnceLock}; use super::*; - use ::zstd::bulk::decompress_to_buffer; + use ::zstd::bulk::{decompress_to_buffer, Compressor}; use ::zstd::stream::copy_decode; - #[derive(Debug, Default)] + /// A zstd buffer compressor that lazily creates and reuses compression contexts. + /// + /// The compression context is cached to enable reuse across chunks within a + /// page. It is lazily initialized to prevent it from getting initialized on + /// decode-only codepaths. + /// + /// Reuse is not implemented for decompression, only for compression: + /// * The single-threaded benefit of reuse was negligible when measured. + /// * Decompressors can get shared across threads, leading to mutex + /// contention if the same strategy is used as for compression here. This + /// should be mitigable with pooling but we can skip the complexity until a + /// need is demonstrated. The multithreaded decode benchmark effectively + /// demonstrates this scenario. pub struct ZstdBufferCompressor { compression_level: i32, + compressor: OnceLock>, String>>, + } + + impl std::fmt::Debug for ZstdBufferCompressor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ZstdBufferCompressor") + .field("compression_level", &self.compression_level) + .finish() + } } impl ZstdBufferCompressor { pub fn new(compression_level: i32) -> Self { - Self { compression_level } + Self { + compression_level, + compressor: OnceLock::new(), + } + } + + fn get_compressor(&self) -> Result<&Mutex>> { + self.compressor + .get_or_init(|| { + Compressor::new(self.compression_level) + .map(Mutex::new) + .map_err(|e| e.to_string()) + }) + .as_ref() + .map_err(|e| Error::Internal { + message: format!("Failed to create zstd compressor: {}", e), + location: location!(), + }) } // https://datatracker.ietf.org/doc/html/rfc8878 @@ -213,13 +252,23 @@ mod zstd { impl BufferCompressor for ZstdBufferCompressor { fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { output_buf.write_all(&(input_buf.len() as u64).to_le_bytes())?; - let mut encoder = ::zstd::stream::Encoder::new(output_buf, self.compression_level)?; - encoder.write_all(input_buf)?; - match encoder.finish() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - } + let max_compressed_size = ::zstd::zstd_safe::compress_bound(input_buf.len()); + let start_pos = output_buf.len(); + output_buf.resize(start_pos + max_compressed_size, 0); + + let compressed_size = self + .get_compressor()? + .lock() + .unwrap() + .compress_to_buffer(input_buf, &mut output_buf[start_pos..]) + .map_err(|e| Error::Internal { + message: format!("Zstd compression error: {}", e), + location: location!(), + })?; + + output_buf.truncate(start_pos + compressed_size); + Ok(()) } fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> {