From b179459bc6de43419d3652bac9d2f3208ddc7cdd Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Mon, 8 Aug 2022 10:33:01 +0800 Subject: [PATCH 1/2] PARQUET-2160: Close ZstdInputStream to free off-heap memory in time. --- .../main/java/org/apache/parquet/hadoop/CodecFactory.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index f0e7af35c5..49bdde8a97 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -37,6 +37,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class CodecFactory implements CompressionCodecFactory { @@ -109,7 +110,12 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx decompressor.reset(); } InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); + if (codec instanceof ZstandardCodec) { + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + is.close(); + } else { + decompressed = BytesInput.from(is, uncompressedSize); + } } else { decompressed = bytes; } From b813ca72d117a5402c852aef9fdef1983677f910 Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Mon, 15 Aug 2022 13:07:47 +0800 Subject: [PATCH 2/2] Add comment. --- .../main/java/org/apache/parquet/hadoop/CodecFactory.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 49bdde8a97..1998ea09dc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -110,6 +110,11 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx decompressor.reset(); } InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + + // We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to avoid + // off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160. + // This change will load the decompressor stream into heap a little earlier, since the problem it solves + // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. if (codec instanceof ZstandardCodec) { decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); is.close();