From 06fcb5ec0372164002645172de8908027caf3f2c Mon Sep 17 00:00:00 2001 From: zebsme Date: Thu, 27 Mar 2025 15:42:21 +0800 Subject: [PATCH 1/8] add skip_validation and comment --- Cargo.lock | 75 +++++++++++++---------- Cargo.toml | 2 +- datafusion/physical-plan/src/spill/mod.rs | 5 +- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e7064c9a0470..e71c1d1ff1e69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,9 +246,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc208515aa0151028e464cc94a692156e945ce5126abd3537bb7fd6ba2143ed1" +checksum = "84ef243634a39fb6e9d1710737e7a5ef96c9bacabd2326859ff889bc9ef755e5" dependencies = [ "arrow-arith", "arrow-array", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e07e726e2b3f7816a85c6a45b6ec118eeeabf0b2a8c208122ad949437181f49a" +checksum = "8f420c6aef51dad2e4a96ce29c0ec90ad84880bdb60b321c74c652a6be07b93f" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,9 +284,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2262eba4f16c78496adfd559a29fe4b24df6088efc9985a873d58e92be022d5" +checksum = "24bda5ff6461a4ff9739959b3d57b377f45e3f878f7be1a4f28137c0a8f339fa" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -301,9 +301,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e899dade2c3b7f5642eb8366cfd898958bcca099cde6dfea543c7e8d3ad88d4" +checksum = "bc6ed265c73f134a583d02c3cab5e16afab9446d8048ede8707e31f85fad58a0" dependencies = [ "bytes", "half", @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4103d88c5b441525ed4ac23153be7458494c2b0c9a11115848fdb9b81f6f886a" +checksum = "01c648572391edcef10e5fd458db70ba27ed6f71bcaee04397d0cfb100b34f8b" dependencies = [ "arrow-array", "arrow-buffer", @@ -333,9 +333,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d3cb0914486a3cae19a5cad2598e44e225d53157926d0ada03c20521191a65" +checksum = "a02fb265a6d8011a7d3ad1a36f25816ad0a3bb04cb8e9fe7929c165b98c0cbcd" dependencies = [ "arrow-array", "arrow-cast", @@ -349,9 +349,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a329fb064477c9ec5f0870d2f5130966f91055c7c5bce2b3a084f116bc28c3b" +checksum = "5f2cebf504bb6a92a134a87fff98f01b14fbb3a93ecf7aef90cd0f888c5fffa4" dependencies = [ "arrow-buffer", "arrow-schema", @@ -388,9 +388,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddecdeab02491b1ce88885986e25002a3da34dd349f682c7cfe67bab7cc17b86" +checksum = "8e6405b287671c88846e7751f7291f717b164911474cabac6d3d8614d5aa7374" dependencies = [ "arrow-array", "arrow-buffer", @@ -402,9 +402,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b9340013413eb84868682ace00a1098c81a5ebc96d279f7ebf9a4cac3c0fd" +checksum = "5329bf9e7390cbb6b117ddd4d82e94c5362ea4cab5095697139429f36a38350c" dependencies = [ "arrow-array", "arrow-buffer", @@ -415,16 +415,18 @@ dependencies = [ "half", "indexmap 2.8.0", "lexical-core", + "memchr", "num", "serde", "serde_json", + "simdutf8", ] [[package]] name = "arrow-ord" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f841bfcc1997ef6ac48ee0305c4dfceb1f7c786fe31e67c1186edf775e1f1160" +checksum = "e103c13d4b80da28339c1d7aa23dd85bd59f42158acc45d39eeb6770627909ce" dependencies = [ "arrow-array", "arrow-buffer", @@ -435,9 +437,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eeb55b0a0a83851aa01f2ca5ee5648f607e8506ba6802577afdda9d75cdedcd" +checksum = "170549a11b8534f3097a0619cfe89c42812345dc998bcf81128fc700b84345b8" dependencies = [ "arrow-array", "arrow-buffer", @@ -448,9 +450,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85934a9d0261e0fa5d4e2a5295107d743b543a6e0484a835d4b8db2da15306f9" +checksum = "a5c53775bba63f319189f366d2b86e9a8889373eb198f07d8544938fc9f8ed9a" dependencies = [ "bitflags 2.8.0", "serde", @@ -458,9 +460,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2932aece2d0c869dd2125feb9bd1709ef5c445daa3838ac4112dcfa0fda52c" +checksum = "0a99003b2eb562b8d9c99dfb672306f15e94b20d3734179d596895703e821dcf" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -472,9 +474,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "912e38bd6a7a7714c1d9b61df80315685553b7455e8a6045c27531d8ecd5b458" +checksum = "90fdb130ee8325f4cd8262e19bb6baa3cbcef2b2573c4bee8c6fda7ea08199d7" dependencies = [ "arrow-array", "arrow-buffer", @@ -1347,9 +1349,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1357,7 +1359,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -4346,9 +4348,9 @@ dependencies = [ [[package]] name = "parquet" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88838dca3b84d41444a0341b19f347e8098a3898b0f21536654b8b799e11abd" +checksum = "94243778210509a5a5e9e012872127180c155d73a9cd6e2df9243d213e81e100" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -4378,7 +4380,6 @@ dependencies = [ "tokio", "twox-hash", "zstd", - "zstd-sys", ] [[package]] @@ -7108,6 +7109,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index b6164f89d31e8..fc2439da914be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "54.2.1", features = [ +arrow = { version = "54.3.0", features = [ "prettyprint", "chrono-tz", ] } diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 88bf7953daeb4..e9a36990a452e 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -35,7 +35,10 @@ use datafusion_common::{exec_datafusion_err, HashSet, Result}; fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); - let reader = StreamReader::try_new(file, None)?; + // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications + // with validated schemas and buffers. Skip redundant validation during read + // to speedup read operation. This is a deliberate safety-performance tradeoff. + let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; for batch in reader { sender .blocking_send(batch.map_err(Into::into)) From 46f967ed7f19078b03d5f5f09785d85e45297e5b Mon Sep 17 00:00:00 2001 From: zebsme Date: Thu, 27 Mar 2025 15:56:28 +0800 Subject: [PATCH 2/8] add benchmark --- Cargo.lock | 1 + datafusion/physical-plan/Cargo.toml | 5 + .../physical-plan/benches/read_spill.rs | 152 ++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 datafusion/physical-plan/benches/read_spill.rs diff --git a/Cargo.lock b/Cargo.lock index e71c1d1ff1e69..80e69b7c290a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2476,6 +2476,7 @@ dependencies = [ "rand 0.8.5", "rstest", "rstest_reuse", + "tempfile", "tokio", ] diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 1f38e2ed31263..58e07ccdf524b 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -77,7 +77,12 @@ tokio = { workspace = true, features = [ "fs", "parking_lot", ] } +tempfile = "3.19.1" [[bench]] harness = false name = "partial_ordering" + +[[bench]] +harness = false +name = "read_spill" diff --git a/datafusion/physical-plan/benches/read_spill.rs b/datafusion/physical-plan/benches/read_spill.rs new file mode 100644 index 0000000000000..6b8ed7b4ea15c --- /dev/null +++ b/datafusion/physical-plan/benches/read_spill.rs @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, +}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::StreamWriter; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use datafusion_common::{exec_datafusion_err, Result}; +use std::fs::File; +use std::path::Path; +use std::sync::mpsc::{channel, Sender}; +use std::sync::Arc; +use std::thread; +use tempfile::NamedTempFile; + +fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Date32, true), + Field::new("c3", DataType::Decimal128(11, 2), true), + ])); + + let mut a = Int32Builder::new(); + let mut b = StringBuilder::new(); + let mut c = Date32Builder::new(); + let mut d = Decimal128Builder::new() + .with_precision_and_scale(11, 2) + .unwrap(); + + for i in 0..num_rows { + a.append_value(i as i32); + c.append_value(i as i32); + d.append_value((i * 1000000) as i128); + if allow_nulls && i % 10 == 0 { + b.append_null(); + } else { + b.append_value(format!("this is string number {i}")); + } + } + + let a = a.finish(); + let b = b.finish(); + let c = c.finish(); + let d = d.finish(); + + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)], + ) + .unwrap() +} + +/// Return a temporary file that contains an IPC file with 100 [`RecordBatch`]es +fn ipc_file() -> NamedTempFile { + let mut file = NamedTempFile::new().unwrap(); + let batch = create_batch(8192, true); + let mut writer = + StreamWriter::try_new(file.as_file_mut(), batch.schema().as_ref()).unwrap(); + for _ in 0..100 { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + file +} + +fn bench_spill_read(c: &mut Criterion) { + let file = ipc_file(); + let path = file.path(); + + let mut group = c.benchmark_group("read_spill"); + + let benches = &[ + ("StreamReader/with_validation", read_spill as fn(_, _) -> _), + ("StreamReader/skip_validation", read_spill_skip_validation), + ]; + + for &(name, func) in benches { + group.bench_with_input(BenchmarkId::new(name, ""), &path, |b, path| { + b.iter_batched( + // Setup phase: Create fresh state for each benchmark iteration. + // - A channel to send/receive RecordBatch results between threads. + // - A background thread to consume received RecordBatches. + // This ensures each iteration starts with clean resources. + || { + let (sender, receiver) = + channel::>(); + let join_handle = thread::spawn(move || { + while let Ok(batch) = receiver.recv() { + let _ = batch.unwrap(); + } + }); + (sender, join_handle) + }, + // Benchmark phase: + // - Execute the target function to send batches via the channel + // - Wait for the consumer thread to finish processing + |(sender, join_handle)| { + func(sender, path).unwrap(); + join_handle.join().unwrap(); + }, + BatchSize::LargeInput, + ) + }); + } + + group.finish(); +} +criterion_group!(benches, bench_spill_read); +criterion_main!(benches); + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = File::open(path)?; + let reader = StreamReader::try_new(file, None)?; + for batch in reader { + sender + .send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} + +fn read_spill_skip_validation( + sender: Sender>, + path: &Path, +) -> Result<()> { + let file = File::open(path)?; + let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; + for batch in reader { + sender + .send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} From b9226fbae2ff270569d378c00e6dc532a7211710 Mon Sep 17 00:00:00 2001 From: zebsme Date: Thu, 27 Mar 2025 16:34:28 +0800 Subject: [PATCH 3/8] fmt --- datafusion/physical-plan/benches/read_spill.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/benches/read_spill.rs b/datafusion/physical-plan/benches/read_spill.rs index 6b8ed7b4ea15c..da39ae0d64076 100644 --- a/datafusion/physical-plan/benches/read_spill.rs +++ b/datafusion/physical-plan/benches/read_spill.rs @@ -88,8 +88,14 @@ fn bench_spill_read(c: &mut Criterion) { let mut group = c.benchmark_group("read_spill"); let benches = &[ - ("StreamReader/with_validation", read_spill as fn(_, _) -> _), - ("StreamReader/skip_validation", read_spill_skip_validation), + ( + "StreamReader/read_100/with_validation", + read_spill as fn(_, _) -> _, + ), + ( + "StreamReader/read_100/skip_validation", + read_spill_skip_validation, + ), ]; for &(name, func) in benches { @@ -100,8 +106,7 @@ fn bench_spill_read(c: &mut Criterion) { // - A background thread to consume received RecordBatches. // This ensures each iteration starts with clean resources. || { - let (sender, receiver) = - channel::>(); + let (sender, receiver) = channel::>(); let join_handle = thread::spawn(move || { while let Ok(batch) = receiver.recv() { let _ = batch.unwrap(); From ca20b794f1bd32748c28750737ae9ead07b28fb9 Mon Sep 17 00:00:00 2001 From: zebsme Date: Thu, 27 Mar 2025 18:01:59 +0800 Subject: [PATCH 4/8] update sqllogictests and fmt Cargo.toml --- datafusion/physical-plan/Cargo.toml | 2 +- datafusion/sqllogictest/test_files/dates.slt | 2 +- datafusion/sqllogictest/test_files/timestamps.slt | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 58e07ccdf524b..5161539caf76f 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -72,12 +72,12 @@ insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" +tempfile = "3.19.1" tokio = { workspace = true, features = [ "rt-multi-thread", "fs", "parking_lot", ] } -tempfile = "3.19.1" [[bench]] harness = false diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 4425eee333735..148f0dfe64bb7 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -183,7 +183,7 @@ query error input contains invalid characters SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') # to_date with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_date('2020-09-08 12/00/00+00:00', '%q') statement ok diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index dcbcfbfa439d5..e3f8d2e4c8bb2 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2241,23 +2241,23 @@ query error input contains invalid characters SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%c', '%+') # to_timestamp with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_nanos with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_millis with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_millis('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_micros with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_seconds with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%q') # Create string timestamp table with different formats From 8e02b9f51c7db2713f79a4f9ae970de6d71d3385 Mon Sep 17 00:00:00 2001 From: zebsme Date: Fri, 28 Mar 2025 14:38:47 +0800 Subject: [PATCH 5/8] use SpillManager in benchmark --- datafusion/physical-plan/Cargo.toml | 2 +- .../physical-plan/benches/read_spill.rs | 157 ------------------ datafusion/physical-plan/benches/spill_io.rs | 124 ++++++++++++++ datafusion/physical-plan/src/lib.rs | 1 + 4 files changed, 126 insertions(+), 158 deletions(-) delete mode 100644 datafusion/physical-plan/benches/read_spill.rs create mode 100644 datafusion/physical-plan/benches/spill_io.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 5161539caf76f..5210ee26755c9 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -85,4 +85,4 @@ name = "partial_ordering" [[bench]] harness = false -name = "read_spill" +name = "spill_io" diff --git a/datafusion/physical-plan/benches/read_spill.rs b/datafusion/physical-plan/benches/read_spill.rs deleted file mode 100644 index da39ae0d64076..0000000000000 --- a/datafusion/physical-plan/benches/read_spill.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{ - Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, -}; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::ipc::reader::StreamReader; -use arrow::ipc::writer::StreamWriter; -use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; -use datafusion_common::{exec_datafusion_err, Result}; -use std::fs::File; -use std::path::Path; -use std::sync::mpsc::{channel, Sender}; -use std::sync::Arc; -use std::thread; -use tempfile::NamedTempFile; - -fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ - Field::new("c0", DataType::Int32, true), - Field::new("c1", DataType::Utf8, true), - Field::new("c2", DataType::Date32, true), - Field::new("c3", DataType::Decimal128(11, 2), true), - ])); - - let mut a = Int32Builder::new(); - let mut b = StringBuilder::new(); - let mut c = Date32Builder::new(); - let mut d = Decimal128Builder::new() - .with_precision_and_scale(11, 2) - .unwrap(); - - for i in 0..num_rows { - a.append_value(i as i32); - c.append_value(i as i32); - d.append_value((i * 1000000) as i128); - if allow_nulls && i % 10 == 0 { - b.append_null(); - } else { - b.append_value(format!("this is string number {i}")); - } - } - - let a = a.finish(); - let b = b.finish(); - let c = c.finish(); - let d = d.finish(); - - RecordBatch::try_new( - schema.clone(), - vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)], - ) - .unwrap() -} - -/// Return a temporary file that contains an IPC file with 100 [`RecordBatch`]es -fn ipc_file() -> NamedTempFile { - let mut file = NamedTempFile::new().unwrap(); - let batch = create_batch(8192, true); - let mut writer = - StreamWriter::try_new(file.as_file_mut(), batch.schema().as_ref()).unwrap(); - for _ in 0..100 { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - file -} - -fn bench_spill_read(c: &mut Criterion) { - let file = ipc_file(); - let path = file.path(); - - let mut group = c.benchmark_group("read_spill"); - - let benches = &[ - ( - "StreamReader/read_100/with_validation", - read_spill as fn(_, _) -> _, - ), - ( - "StreamReader/read_100/skip_validation", - read_spill_skip_validation, - ), - ]; - - for &(name, func) in benches { - group.bench_with_input(BenchmarkId::new(name, ""), &path, |b, path| { - b.iter_batched( - // Setup phase: Create fresh state for each benchmark iteration. - // - A channel to send/receive RecordBatch results between threads. - // - A background thread to consume received RecordBatches. - // This ensures each iteration starts with clean resources. - || { - let (sender, receiver) = channel::>(); - let join_handle = thread::spawn(move || { - while let Ok(batch) = receiver.recv() { - let _ = batch.unwrap(); - } - }); - (sender, join_handle) - }, - // Benchmark phase: - // - Execute the target function to send batches via the channel - // - Wait for the consumer thread to finish processing - |(sender, join_handle)| { - func(sender, path).unwrap(); - join_handle.join().unwrap(); - }, - BatchSize::LargeInput, - ) - }); - } - - group.finish(); -} -criterion_group!(benches, bench_spill_read); -criterion_main!(benches); - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = File::open(path)?; - let reader = StreamReader::try_new(file, None)?; - for batch in reader { - sender - .send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} - -fn read_spill_skip_validation( - sender: Sender>, - path: &Path, -) -> Result<()> { - let file = File::open(path)?; - let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; - for batch in reader { - sender - .send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs new file mode 100644 index 0000000000000..86dc1f6843b40 --- /dev/null +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, +}; +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_physical_plan::common::collect; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +use datafusion_physical_plan::SpillManager; +use std::sync::Arc; +use tokio::runtime::Runtime; + +pub fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Date32, true), + Field::new("c3", DataType::Decimal128(11, 2), true), + ])); + + let mut a = Int32Builder::new(); + let mut b = StringBuilder::new(); + let mut c = Date32Builder::new(); + let mut d = Decimal128Builder::new() + .with_precision_and_scale(11, 2) + .unwrap(); + + for i in 0..num_rows { + a.append_value(i as i32); + c.append_value(i as i32); + d.append_value((i * 1000000) as i128); + if allow_nulls && i % 10 == 0 { + b.append_null(); + } else { + b.append_value(format!("this is string number {i}")); + } + } + + let a = a.finish(); + let b = b.finish(); + let c = c.finish(); + let d = d.finish(); + + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)], + ) + .unwrap() +} + +// BENCHMARK: REVALIDATION OVERHEAD COMPARISON +// --------------------------------------------------------- +// To compare performance with/without Arrow IPC validation: +// +// 1. Locate the function `read_spill` +// 2. Modify the `skip_validation` flag: +// - Set to `false` to enable validation +// 3. Rerun `cargo bench --bench spill_io` +fn bench_spill_io(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Date32, true), + Field::new("c3", DataType::Decimal128(11, 2), true), + ])); + let spill_manager = SpillManager::new(env, metrics, schema); + + let mut group = c.benchmark_group("spill_manager"); + let rt = Runtime::new().unwrap(); + + group.bench_with_input( + BenchmarkId::new("StreamReader/read_100", ""), + &spill_manager, + |b, spill_manager| { + b.iter_batched( + // Setup phase: Create fresh state for each benchmark iteration. + // - generate an ipc file. + // This ensures each iteration starts with clean resources. + || { + let batch = create_batch(8192, true); + let spill_file = spill_manager + .spill_record_batch_and_finish(&vec![batch; 100], "Test") + .unwrap() + .unwrap(); + spill_file + }, + // Benchmark phase: + // - Execute the read operation via SpillManager + // - Wait for the consumer to finish processing + |spill_file| { + rt.block_on(async { + let stream = + spill_manager.read_spill_as_stream(spill_file).unwrap(); + let _ = collect(stream).await.unwrap(); + }) + }, + BatchSize::LargeInput, + ) + }, + ); + group.finish(); +} + +criterion_group!(benches, bench_spill_io); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 04fbd06fabcde..b256e615b2320 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -50,6 +50,7 @@ pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +pub use spill::spill_manager::SpillManager; mod ordering; mod render_tree; From 3ac770e1e85197b302b5e0242286c9e3f60693fd Mon Sep 17 00:00:00 2001 From: zebsme Date: Fri, 28 Mar 2025 15:03:21 +0800 Subject: [PATCH 6/8] fix clippy --- datafusion/physical-plan/benches/spill_io.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs index 86dc1f6843b40..3b877671ad583 100644 --- a/datafusion/physical-plan/benches/spill_io.rs +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -84,7 +84,7 @@ fn bench_spill_io(c: &mut Criterion) { ])); let spill_manager = SpillManager::new(env, metrics, schema); - let mut group = c.benchmark_group("spill_manager"); + let mut group = c.benchmark_group("spill_io"); let rt = Runtime::new().unwrap(); group.bench_with_input( @@ -97,11 +97,10 @@ fn bench_spill_io(c: &mut Criterion) { // This ensures each iteration starts with clean resources. || { let batch = create_batch(8192, true); - let spill_file = spill_manager + spill_manager .spill_record_batch_and_finish(&vec![batch; 100], "Test") .unwrap() - .unwrap(); - spill_file + .unwrap() }, // Benchmark phase: // - Execute the read operation via SpillManager From 0c52e80846ce60bd4b68326ba23322585a62f2f0 Mon Sep 17 00:00:00 2001 From: zeb Date: Mon, 31 Mar 2025 23:51:23 +0800 Subject: [PATCH 7/8] upgrade dependencies --- Cargo.lock | 4 ++-- Cargo.toml | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80e69b7c290a8..f20116f196c59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "54.2.1" +version = "54.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7408f2bf3b978eddda272c7699f439760ebc4ac70feca25fefa82c5b8ce808d" +checksum = "aecab6e2ed46c553354ffbc6cb76ac3eaa61dc07bd13d7de8ece233933bcf679" dependencies = [ "arrow-arith", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index fc2439da914be..1350a4654dbc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,15 +91,15 @@ arrow = { version = "54.3.0", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "54.1.0", default-features = false } -arrow-flight = { version = "54.2.1", features = [ +arrow-buffer = { version = "54.3.0", default-features = false } +arrow-flight = { version = "54.3.0", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "54.2.0", default-features = false, features = [ +arrow-ipc = { version = "54.3.0", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "54.1.0", default-features = false } -arrow-schema = { version = "54.1.0", default-features = false } +arrow-ord = { version = "54.3.0", default-features = false } +arrow-schema = { version = "54.3.0", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.7" bytes = "1.10" @@ -149,7 +149,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.11.0", default-features = false } parking_lot = "0.12" -parquet = { version = "54.2.1", default-features = false, features = [ +parquet = { version = "54.3.0", default-features = false, features = [ "arrow", "async", "object_store", From 5a30981d4ca1219482c074a5f16bde167e8a3dc2 Mon Sep 17 00:00:00 2001 From: zeb Date: Tue, 1 Apr 2025 16:57:17 +0800 Subject: [PATCH 8/8] Update comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniƫl Heres --- datafusion/physical-plan/src/spill/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index e9a36990a452e..c32711a96b975 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -37,7 +37,7 @@ fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications // with validated schemas and buffers. Skip redundant validation during read - // to speedup read operation. This is a deliberate safety-performance tradeoff. + // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written. let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) }; for batch in reader { sender