From 1d80fbdce1155399d507c4798718caab7581de2d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 11 Apr 2021 11:13:12 -0700 Subject: [PATCH] PARQUET-2022: ZstdDecompressorStream should close `zstdInputStream` `ZstdDecompressorStream` should close its resource because `CompressionInputStream.close` closes only the inner stream. ``` public class ZstdDecompressorStream extends CompressionInputStream { private ZstdInputStream zstdInputStream; public ZstdDecompressorStream(InputStream stream) throws IOException { super(stream); zstdInputStream = new ZstdInputStream(stream); } } ``` --- .../parquet/hadoop/codec/ZstdDecompressorStream.java | 9 +++++++++ .../org/apache/parquet/hadoop/TestZstandardCodec.java | 9 ++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java index a505e7bde0..a24612f2ee 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdDecompressorStream.java @@ -50,4 +50,13 @@ public int read() throws IOException { public void resetState() throws IOException { // no-opt, doesn't apply to ZSTD } + + @Override + public void close() throws IOException { + try { + zstdInputStream.close(); + } finally { + super.close(); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java index c0d98266f7..b6bab26785 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestZstandardCodec.java @@ -79,8 +79,8 @@ private void testZstd(ZstandardCodec codec, int dataSize) throws IOException { byte[] data = new byte[dataSize]; (new Random()).nextBytes(data); BytesInput compressedData = compress(codec, BytesInput.from(data)); - BytesInput decompressedData = decompress(codec, compressedData, data.length); - Assert.assertArrayEquals(data, decompressedData.toByteArray()); + byte[] decompressedData = decompress(codec, compressedData, data.length); + Assert.assertArrayEquals(data, decompressedData); } private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws IOException { @@ -91,10 +91,9 @@ private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws IOExc return BytesInput.from(compressedOutBuffer); } - private BytesInput decompress(ZstandardCodec codec, BytesInput bytes, int uncompressedSize) throws IOException { - BytesInput decompressed; + private byte[] decompress(ZstandardCodec codec, BytesInput bytes, int uncompressedSize) throws IOException { InputStream is = codec.createInputStream(bytes.toInputStream(), null); - decompressed = BytesInput.from(is, uncompressedSize); + byte[] decompressed = BytesInput.from(is, uncompressedSize).toByteArray(); is.close(); return decompressed; }