diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt index 578f710b1da5e..a066ced506b57 100644 --- a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt +++ b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt @@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Compression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Compression 4 times 2612 2624 17 0.0 652960433.5 1.0X +Compression 4 times 2611 2619 11 0.0 652760335.3 1.0X OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Decompression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Decompression 4 times 2219 2220 1 0.0 554762743.5 1.0X +Decompression 4 times 896 912 20 0.0 224050201.0 1.0X diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt index 7a5c89955eeb0..8d909dfc7f492 100644 --- a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt +++ b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt @@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Compression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Compression 4 times 2605 2611 8 0.0 651243236.8 1.0X +Compression 4 times 2602 2609 10 0.0 650438397.0 1.0X OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure AMD EPYC 7763 64-Core Processor Decompression: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Decompression 4 times 2361 2367 10 0.0 590134148.0 1.0X +Decompression 4 times 938 966 33 0.0 234424499.5 1.0X diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index b81e46667323e..243ee689cb5f2 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -152,12 +152,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = { - val disableConcatenationOfByteStream = false - new LZ4BlockInputStream( - s, - lz4Factory.fastDecompressor(), - xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, - disableConcatenationOfByteStream) + LZ4BlockInputStream.newBuilder() + .withDecompressor(lz4Factory.safeDecompressor()) + .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum) + .withStopOnEmptyBlock(false) + .build(s) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index a2850a0b179f7..d88bc62d0bc18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import java.math.{MathContext, RoundingMode} import java.util.Base64 -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.expressions._ @@ -210,7 +210,10 @@ object HistogramSerializer { final def deserialize(str: String): Histogram = { val bytes = Base64.getDecoder().decode(str) val bis = new ByteArrayInputStream(bytes) - val ins = new DataInputStream(new LZ4BlockInputStream(bis)) + val ins = new DataInputStream( + LZ4BlockInputStream.newBuilder() + .withDecompressor(LZ4Factory.fastestInstance().safeDecompressor()) + .build(bis)) val height = ins.readDouble() val numBins = ins.readInt()