From 67d63f798f02e498d18037270623547c6416af72 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 12 Dec 2025 16:16:59 +0800 Subject: [PATCH 1/5] safeDecompressor --- .../scala/org/apache/spark/io/CompressionCodec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index b81e46667323e..2fa05d9204953 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -153,11 +153,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedInputStream(s: InputStream): InputStream = { val disableConcatenationOfByteStream = false - new LZ4BlockInputStream( - s, - lz4Factory.fastDecompressor(), - xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, - disableConcatenationOfByteStream) + LZ4BlockInputStream.newBuilder() + .withDecompressor(lz4Factory.safeDecompressor()) + .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum) + .withStopOnEmptyBlock(disableConcatenationOfByteStream) + .build(s) } } From 73693263a89f1814feeba93679628fc9e4e0cb6d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 17 Dec 2025 15:08:19 +0800 Subject: [PATCH 2/5] Switch to LZ4 safeDecompressor in Statistics --- .../spark/sql/catalyst/plans/logical/Statistics.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index a2850a0b179f7..d88bc62d0bc18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import java.math.{MathContext, RoundingMode} import java.util.Base64 -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.expressions._ @@ -210,7 +210,10 @@ object HistogramSerializer { final def deserialize(str: String): Histogram = { val bytes = Base64.getDecoder().decode(str) val bis = new ByteArrayInputStream(bytes) - val ins = new DataInputStream(new LZ4BlockInputStream(bis)) + val ins = new DataInputStream( + LZ4BlockInputStream.newBuilder() + .withDecompressor(LZ4Factory.fastestInstance().safeDecompressor()) + .build(bis)) val height = ins.readDouble() val numBins = ins.readInt() From def6e39d5160c0d9074f4ccf37deddd2ec2a2196 Mon Sep 17 00:00:00 2001 From: pan3793 Date: Wed, 17 Dec 2025 07:53:35 +0000 Subject: [PATCH 3/5] Benchmark results for org.apache.spark.io.LZ4TPCDSDataBenchmark (JDK 17, Scala 2.13, split 1 of 1) --- core/benchmarks/LZ4TPCDSDataBenchmark-results.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt index 7a5c89955eeb0..8d909dfc7f492 100644 --- a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt +++ b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt @@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Compression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Compression 4 times 2605 2611 8 0.0 651243236.8 1.0X +Compression 4 times 2602 2609 10 0.0 650438397.0 1.0X OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Decompression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Decompression 4 times 2361 2367 10 0.0 590134148.0 1.0X +Decompression 4 times 938 966 33 0.0 234424499.5 1.0X From c0515084493fdb5752dd67e98cf898b5f40a9d2a Mon Sep 17 00:00:00 2001 From: pan3793 Date: Wed, 17 Dec 2025 07:45:52 +0000 Subject: [PATCH 4/5] Benchmark results for org.apache.spark.io.LZ4TPCDSDataBenchmark (JDK 21, Scala 2.13, split 1 of 1) --- core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt index 578f710b1da5e..a066ced506b57 100644 --- a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt +++ b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt @@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Compression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Compression 4 times 2612 2624 17 0.0 652960433.5 1.0X +Compression 4 times 2611 2619 11 0.0 652760335.3 1.0X OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Decompression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Decompression 4 times 2219 2220 1 0.0 554762743.5 1.0X +Decompression 4 times 896 912 20 0.0 224050201.0 1.0X From 0d1bde084f1287d8c6acf84ede96fc3a7ee85f41 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 26 Feb 2026 19:33:38 +0800 Subject: [PATCH 5/5] inline --- core/src/main/scala/org/apache/spark/io/CompressionCodec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 2fa05d9204953..243ee689cb5f2 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -152,11 +152,10 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = { - val disableConcatenationOfByteStream = false LZ4BlockInputStream.newBuilder() .withDecompressor(lz4Factory.safeDecompressor()) .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum) - .withStopOnEmptyBlock(disableConcatenationOfByteStream) + .withStopOnEmptyBlock(false) .build(s) } }