From 8429ffc6860291f98de7a5d81f33c33688e7a6b3 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 25 Oct 2020 21:24:16 -0400 Subject: [PATCH 1/5] Use new version of zstd-jni --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index e5bc56173ad74..246526c0965c3 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,7 +116,7 @@ versions += [ testRetryPlugin: "1.1.6", zinc: "1.3.5", zookeeper: "3.5.8", - zstd: "1.4.5-6" + zstd: "1.4.5-8" ] libs += [ activation: "javax.activation:activation:$versions.activation", From 9bf3dce45cce37680abdfba17ad85d625987cea1 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 25 Oct 2020 21:24:32 -0400 Subject: [PATCH 2/5] add buffering for zstd-jni --- .../apache/kafka/common/record/CompressionType.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 352d12d834977..752ea87d489c4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -119,7 +119,10 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { - return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer); + // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to + // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small + // number of bytes to write (potentially a single byte) + return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); } @@ -128,7 +131,10 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { - return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)); + // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to + // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small + // number of bytes (potentially a single byte) + return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); } From 87d625fd38e79df43053e4812820cfe53e928757 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Sun, 25 Oct 2020 23:26:45 -0400 Subject: [PATCH 3/5] remove gzip-specific comments --- .../apache/kafka/common/record/CompressionType.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 752ea87d489c4..cf6a026d0a526 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -119,9 +119,8 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { - // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to - // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small - // number of bytes to write (potentially a single byte) + // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller passes a small number of bytes to write (potentially a single byte) return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); @@ -131,9 +130,8 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { - // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to - // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small - // number of bytes (potentially a single byte) + // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller reads a small number of bytes (potentially a single byte) return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); From 9d827c512ea72fb786e43071d411bcce07b386b8 Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Mon, 26 Oct 2020 15:24:05 -0400 Subject: [PATCH 4/5] use newer zstd-jni --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 246526c0965c3..be31691c469e5 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,7 +116,7 @@ versions += [ testRetryPlugin: "1.1.6", zinc: "1.3.5", zookeeper: "3.5.8", - zstd: "1.4.5-8" + zstd: "1.4.5-10" ] libs += [ activation: "javax.activation:activation:$versions.activation", From f7b3621ec19f75cd6f647ec2575f09c631be5fde Mon Sep 17 00:00:00 2001 From: James Yuzawa Date: Tue, 10 Nov 2020 09:22:18 -0500 Subject: [PATCH 5/5] Update dependencies.gradle --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9760c9b5a9790..5a6ec990d8ace 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,7 +116,7 @@ versions += [ testRetryPlugin: "1.1.6", zinc: "1.3.5", zookeeper: "3.5.8", - zstd: "1.4.5-10" + zstd: "1.4.5-12" ] libs += [ activation: "javax.activation:activation:$versions.activation",