From 87aadcd413b369724250be7e8bd0418c765d27a6 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 25 Feb 2026 15:59:05 -0800 Subject: [PATCH] fix: use LZ4 safeDecompressor via Builder API in LZ4CompressionCodec --- .../scala/org/apache/spark/io/CompressionCodec.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index b81e46667323e..243ee689cb5f2 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -152,12 +152,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = { - val disableConcatenationOfByteStream = false - new LZ4BlockInputStream( - s, - lz4Factory.fastDecompressor(), - xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, - disableConcatenationOfByteStream) + LZ4BlockInputStream.newBuilder() + .withDecompressor(lz4Factory.safeDecompressor()) + .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum) + .withStopOnEmptyBlock(false) + .build(s) } }