From b5bcf6807e3066a81071672dc670ebc7322afae8 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sat, 27 Mar 2021 04:13:31 +0000 Subject: [PATCH 1/5] ARROW-12110: [Java] Implement ZSTD compression --- dev/archery/archery/integration/runner.py | 2 -- docs/source/status.rst | 8 ++++++-- java/compression/pom.xml | 7 ++++--- .../compression/CommonsCompressionFactory.java | 6 +++++- .../arrow/compression/Lz4CompressionCodec.java | 13 ++++--------- .../arrow/compression/TestCompressionCodec.java | 4 ++++ 6 files changed, 23 insertions(+), 17 deletions(-) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 819f978a862c..23ebd0f01b2e 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -137,8 +137,6 @@ def _gold_tests(self, gold_dir): skip.add("Go") skip.add("JS") skip.add("Rust") - if name == 'zstd': - skip.add("Java") yield datagen.File(name, None, None, skip=skip, path=out_path) def _run_test_cases(self, producer, consumer, case_runner, diff --git a/docs/source/status.rst b/docs/source/status.rst index 2e062122ba75..1c6262274e65 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -126,7 +126,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Sparse tensors | ✓ | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Buffer compression | ✓ | | | | | | ✓ | +| Buffer compression | ✓ | ✓ (3) | | | | | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Endianness conversion | ✓ (2) | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -139,6 +139,8 @@ Notes: * \(2) Data with non-native endianness can be byte-swapped automatically when reading. +* \(3) LZ4 Codec currently is quite inefficient. ARROW-11901 tracks improving performance. + .. seealso:: The :ref:`format-ipc` specification. @@ -221,7 +223,7 @@ Third-Party Data Formats +-----------------------------+---------+---------+-------+------------+-------+---------+-------+ | ORC | R | | | | | | | +-----------------------------+---------+---------+-------+------------+-------+---------+-------+ -| Parquet | R/W | | | | | R/W (1) | | +| Parquet | R/W | R (2) | | | | R/W (1) | | +-----------------------------+---------+---------+-------+------------+-------+---------+-------+ Notes: @@ -231,3 +233,5 @@ Notes: * *W* = Write supported * \(1) Nested read/write not supported + +* \(2) Through JNI bindings to datasets. diff --git a/java/compression/pom.xml b/java/compression/pom.xml index 9a6ab3508ed4..dc0a9586539e 100644 --- a/java/compression/pom.xml +++ b/java/compression/pom.xml @@ -44,8 +44,9 @@ 1.20 - io.netty - netty-common - + com.github.luben + zstd-jni + 1.4.9-1 + diff --git a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java index 4becbbe78c96..867e9f418b2b 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -21,7 +21,9 @@ import org.apache.arrow.vector.compression.CompressionUtil; /** - * A factory implementation based on Apache Commons library. + * Default implementation of factory supported LZ4 and ZSTD compression. + * + * // TODO(ARROW-12115): Rename this class. */ public class CommonsCompressionFactory implements CompressionCodec.Factory { @@ -32,6 +34,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { switch (codecType) { case LZ4_FRAME: return new Lz4CompressionCodec(); + case ZSTD: + return new ZstdCompressionCodec(); default: throw new IllegalArgumentException("Compression type not supported: " + codecType); } diff --git a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java index 2d4b48e5ee52..daa35b7e15be 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -32,8 +32,6 @@ import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; import org.apache.commons.compress.utils.IOUtils; -import io.netty.util.internal.PlatformDependent; - /** * Compression codec for the LZ4 algorithm. */ @@ -45,7 +43,7 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu "The uncompressed buffer size exceeds the integer limit %s.", Integer.MAX_VALUE); byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; - PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); + uncompressedBuffer.getBytes(/*index=*/0, inBytes); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (InputStream in = new ByteArrayInputStream(inBytes); OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { @@ -57,8 +55,7 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu byte[] outBytes = baos.toByteArray(); ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); - PlatformDependent.copyMemory( - outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes); compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); return compressedBuffer; } @@ -71,8 +68,7 @@ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBu long decompressedLength = readUncompressedLength(compressedBuffer); byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; - PlatformDependent.copyMemory( - compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes); ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { IOUtils.copy(in, out); @@ -82,8 +78,7 @@ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBu byte[] outBytes = out.toByteArray(); ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); - PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length); - decompressedBuffer.writerIndex(decompressedLength); + decompressedBuffer.setBytes(/*index=*/0, outBytes); return decompressedBuffer; } diff --git a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java index 52f24e20533e..1f6d64d47610 100644 --- a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java +++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -80,6 +80,10 @@ public static Collection getCodecs() { CompressionCodec lz4Codec = new Lz4CompressionCodec(); params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec}); + + CompressionCodec zstdCodec = new ZstdCompressionCodec(); + params.add(new Object[]{zstdCodec.getCodecType(), len, zstdCodec}); + } return params; } From c3245274e21cc49dfcba0168cdf952b321f3ecfa Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sat, 27 Mar 2021 04:15:59 +0000 Subject: [PATCH 2/5] add missing file --- .../compression/ZstdCompressionCodec.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java diff --git a/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java new file mode 100644 index 000000000000..6ae172078595 --- /dev/null +++ b/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.compression.AbstractCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +import com.github.luben.zstd.Zstd; + +/** + * Compression codec for the ZSTD algorithm. + */ +public class ZstdCompressionCodec extends AbstractCompressionCodec { + + @Override + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex()); + long dstSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize; + ArrowBuf compressedBuffer = allocator.buffer(dstSize); + long bytesWritten = Zstd.compressUnsafe( + compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, dstSize, + /*src*/uncompressedBuffer.memoryAddress(), /*srcSize=*/uncompressedBuffer.writerIndex(), + /*level=*/3); + if (Zstd.isError(bytesWritten)) { + compressedBuffer.close(); + throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten)); + } + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + bytesWritten); + return compressedBuffer; + } + + @Override + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + long decompressedLength = readUncompressedLength(compressedBuffer); + ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength); + long decompressedSize = Zstd.decompressUnsafe(uncompressedBuffer.memoryAddress(), decompressedLength, + /*src=*/compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + if (Zstd.isError(decompressedLength)) { + uncompressedBuffer.close(); + throw new RuntimeException("Error decompressing: " + Zstd.getErrorName(decompressedLength)); + } + return uncompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.ZSTD; + } +} From b97e83f5105fcb8055976c7a1a2472fa480ee93f Mon Sep 17 00:00:00 2001 From: emkornfield Date: Fri, 26 Mar 2021 21:28:51 -0700 Subject: [PATCH 3/5] Update runner.py Add link back to this PR for disabling specific codecs. --- dev/archery/archery/integration/runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 23ebd0f01b2e..f17a6bcd2fae 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -137,6 +137,8 @@ def _gold_tests(self, gold_dir): skip.add("Go") skip.add("JS") skip.add("Rust") + # See https://github.com/apache/arrow/pull/9822 for how to + # disable specific compression type tests. yield datagen.File(name, None, None, skip=skip, path=out_path) def _run_test_cases(self, producer, consumer, case_runner, From cd0f4baf7e2666eb720982714ec5eefd0b1804c4 Mon Sep 17 00:00:00 2001 From: emkornfield Date: Fri, 26 Mar 2021 21:41:25 -0700 Subject: [PATCH 4/5] Update ZstdCompressionCodec.java check decompressed length. --- .../apache/arrow/compression/ZstdCompressionCodec.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java index 6ae172078595..38717843ef86 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/ZstdCompressionCodec.java @@ -54,10 +54,16 @@ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBu long decompressedSize = Zstd.decompressUnsafe(uncompressedBuffer.memoryAddress(), decompressedLength, /*src=*/compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); - if (Zstd.isError(decompressedLength)) { + if (Zstd.isError(decompressedSize)) { uncompressedBuffer.close(); throw new RuntimeException("Error decompressing: " + Zstd.getErrorName(decompressedLength)); } + if (decompressedLength != decompressedSize) { + uncompressedBuffer.close(); + throw new RuntimeException("Expected != actual decompressed length: " + + decompressedLength + " != " + decompressedSize); + } + uncompressedBuffer.writerIndex(decompressedLength); return uncompressedBuffer; } From c9216a80d9cf319c27a997a55e179cee2378692a Mon Sep 17 00:00:00 2001 From: emkornfield Date: Sat, 27 Mar 2021 10:42:13 -0700 Subject: [PATCH 5/5] Update runner.py --- dev/archery/archery/integration/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index f17a6bcd2fae..7fbf2fb3ba96 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -137,7 +137,7 @@ def _gold_tests(self, gold_dir): skip.add("Go") skip.add("JS") skip.add("Rust") - # See https://github.com/apache/arrow/pull/9822 for how to + # See https://github.com/apache/arrow/pull/9822 for how to # disable specific compression type tests. yield datagen.File(name, None, None, skip=skip, path=out_path)