Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ features = ["protoc"]
name = "decoder"
harness = false

[[bench]]
name = "encoder"
harness = false

[[bench]]
name = "buffer"
harness = false
Expand Down
217 changes: 214 additions & 3 deletions rust/lance-encoding/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_select::take::take;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use lance_core::cache::LanceCache;
use lance_datagen::ArrayGeneratorExt;
use lance_encoding::{
decoder::{DecoderPlugins, FilterExpression},
decoder::{
create_decode_stream, DecodeBatchScheduler, DecoderConfig, DecoderPlugins, FilterExpression,
},
encoder::{default_encoding_strategy, encode_batch, EncodingOptions},
version::LanceFileVersion,
};
use tokio::sync::mpsc::unbounded_channel;

use rand::Rng;

Expand Down Expand Up @@ -348,18 +352,225 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) {
});
}

fn bench_decode_compressed(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_compressed");

const NUM_ROWS: usize = 5_000_000;
const NUM_COLUMNS: usize = 10;

// Generate compressible string data - high cardinality but compressible
// (unique values to avoid dictionary encoding, repeated prefix for compression)
let array: Arc<dyn arrow_array::Array> = Arc::new(arrow_array::StringArray::from_iter_values(
(0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)),
));

for compression in ["zstd", "lz4"] {
let mut metadata = HashMap::new();
metadata.insert(
"lance-encoding:compression".to_string(),
compression.to_string(),
);
// Disable dictionary encoding to ensure we hit the compression path
metadata.insert(
"lance-encoding:dict-divisor".to_string(),
"100000".to_string(),
);
// Force miniblock encoding (the path that benefits from compressor caching)
metadata.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
let fields: Vec<Field> = (0..NUM_COLUMNS)
.map(|i| {
Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone())
})
.collect();
let columns: Vec<Arc<dyn arrow_array::Array>> =
(0..NUM_COLUMNS).map(|_| array.clone()).collect();
let schema = Arc::new(Schema::new(fields));
let data = RecordBatch::try_new(schema.clone(), columns).unwrap();

let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap());
// V2_2+ required for general compression
let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2);

// Encode once during setup
let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();

group.throughput(criterion::Throughput::Elements(
(NUM_ROWS * NUM_COLUMNS) as u64,
));
group.bench_function(
format!("{}_strings_{}cols", compression, NUM_COLUMNS),
|b| {
b.iter(|| {
let batch = rt
.block_on(lance_encoding::decoder::decode_batch(
&encoded,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
LanceFileVersion::V2_2,
Some(Arc::new(LanceCache::no_cache())),
))
.unwrap();
assert_eq!(data.num_rows(), batch.num_rows());
})
},
);
}
}

/// Benchmark parallel decoding with multiple concurrent batch decode tasks.
/// This creates contention on the shared decompressor mutex when multiple
/// batches from the same page are decoded in parallel.
fn bench_decode_compressed_parallel(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("decode_compressed_parallel");

const NUM_ROWS: u64 = 1_000_000;
const NUM_COLUMNS: usize = 10;
// Small batch size to create many batches that will contend on the same decompressor
const BATCH_SIZE: u32 = 100_000;

let array: Arc<dyn arrow_array::Array> = Arc::new(arrow_array::StringArray::from_iter_values(
(0..NUM_ROWS as usize).map(|i| format!("prefix_that_compresses_well_{}", i)),
));

for compression in ["zstd", "lz4"] {
let mut metadata = HashMap::new();
metadata.insert(
"lance-encoding:compression".to_string(),
compression.to_string(),
);
metadata.insert(
"lance-encoding:dict-divisor".to_string(),
"100000".to_string(),
);
metadata.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
let fields: Vec<Field> = (0..NUM_COLUMNS)
.map(|i| {
Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone())
})
.collect();
let columns: Vec<Arc<dyn arrow_array::Array>> =
(0..NUM_COLUMNS).map(|_| array.clone()).collect();
let schema = Arc::new(Schema::new(fields));
let data = RecordBatch::try_new(schema.clone(), columns).unwrap();

let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap());
let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2);

let encoded = rt
.block_on(encode_batch(
&data,
lance_schema,
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap();

let encoded = Arc::new(encoded);

// Test with different parallelism levels to see impact of mutex contention
// parallelism=1 is sequential (no contention), higher values cause contention
for parallelism in [1, 8] {
group.throughput(criterion::Throughput::Elements(
NUM_ROWS * NUM_COLUMNS as u64,
));
group.bench_function(
format!(
"{}_{}cols_parallel_{}",
compression, NUM_COLUMNS, parallelism
),
|b| {
b.iter(|| {
rt.block_on(async {
let io_scheduler = Arc::new(lance_encoding::BufferScheduler::new(
encoded.data.clone(),
))
as Arc<dyn lance_encoding::EncodingsIo>;
let cache = Arc::new(LanceCache::no_cache());
let filter = FilterExpression::no_filter();

let mut decode_scheduler = DecodeBatchScheduler::try_new(
encoded.schema.as_ref(),
&encoded.top_level_columns,
&encoded.page_table,
&vec![],
encoded.num_rows,
Arc::<DecoderPlugins>::default(),
io_scheduler.clone(),
cache,
&filter,
&DecoderConfig::default(),
)
.await
.unwrap();

let (tx, rx) = unbounded_channel();
decode_scheduler.schedule_range(
0..encoded.num_rows,
&filter,
tx,
io_scheduler,
);

let decode_stream = create_decode_stream(
&encoded.schema,
encoded.num_rows,
BATCH_SIZE,
true, // is_structural for V2_2
false,
rx,
)
.unwrap();

// Buffer multiple batch decodes in parallel - this causes contention
let batches: Vec<_> = decode_stream
.map(|task| task.task)
.buffered(parallelism)
.collect()
.await;

let total_rows: usize =
batches.iter().map(|b| b.as_ref().unwrap().num_rows()).sum();
assert_eq!(total_rows, NUM_ROWS as usize);
})
})
},
);
}
}
}

#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None)));
targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct,
bench_decode_str_with_fixed_size_binary_encoding);
bench_decode_str_with_fixed_size_binary_encoding, bench_decode_compressed,
bench_decode_compressed_parallel);

// Non-linux version does not support pprof.
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct);
targets = bench_decode, bench_decode_fsl, bench_decode_str_with_dict_encoding, bench_decode_packed_struct,
bench_decode_compressed, bench_decode_compressed_parallel);
criterion_main!(benches);
91 changes: 91 additions & 0 deletions rust/lance-encoding/benches/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::HashMap, sync::Arc};

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use lance_encoding::{
encoder::{default_encoding_strategy, encode_batch, EncodingOptions},
version::LanceFileVersion,
};

fn bench_encode_compressed(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("encode_compressed");

const NUM_ROWS: usize = 5_000_000;
const NUM_COLUMNS: usize = 10;

// Generate compressible string data - high cardinality but compressible
// (unique values to avoid dictionary encoding, repeated prefix for compression)
let array: Arc<dyn arrow_array::Array> = Arc::new(arrow_array::StringArray::from_iter_values(
(0..NUM_ROWS).map(|i| format!("prefix_that_compresses_well_{}", i)),
));

for compression in ["zstd", "lz4"] {
let mut metadata = HashMap::new();
metadata.insert(
"lance-encoding:compression".to_string(),
compression.to_string(),
);
// Disable dictionary encoding to ensure we hit the compression path
metadata.insert(
"lance-encoding:dict-divisor".to_string(),
"100000".to_string(),
);
// Force miniblock encoding (the path that benefits from compressor caching)
metadata.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
let fields: Vec<Field> = (0..NUM_COLUMNS)
.map(|i| {
Field::new(format!("s{}", i), DataType::Utf8, false).with_metadata(metadata.clone())
})
.collect();
let columns: Vec<Arc<dyn arrow_array::Array>> =
(0..NUM_COLUMNS).map(|_| array.clone()).collect();
let schema = Arc::new(Schema::new(fields));
let data = RecordBatch::try_new(schema.clone(), columns).unwrap();

let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(schema.as_ref()).unwrap());
// V2_2+ required for general compression
let encoding_strategy = default_encoding_strategy(LanceFileVersion::V2_2);

group.throughput(criterion::Throughput::Elements(
(NUM_ROWS * NUM_COLUMNS) as u64,
));
group.bench_function(
format!("{}_strings_{}cols", compression, NUM_COLUMNS),
|b| {
b.iter(|| {
rt.block_on(encode_batch(
&data,
lance_schema.clone(),
encoding_strategy.as_ref(),
&EncodingOptions::default(),
))
.unwrap()
})
},
);
}
}

#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None)));
targets = bench_encode_compressed);

#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_encode_compressed);

criterion_main!(benches);
Loading