diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index b81e46667323e..243ee689cb5f2 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -152,12 +152,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = { - val disableConcatenationOfByteStream = false - new LZ4BlockInputStream( - s, - lz4Factory.fastDecompressor(), - xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, - disableConcatenationOfByteStream) + LZ4BlockInputStream.newBuilder() + .withDecompressor(lz4Factory.safeDecompressor()) + .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum) + .withStopOnEmptyBlock(false) + .build(s) } }