diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 352d12d834977..cf6a026d0a526 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -119,7 +119,9 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { - return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer); + // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller passes a small number of bytes to write (potentially a single byte) + return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); } @@ -128,7 +130,9 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { - return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)); + // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller reads a small number of bytes (potentially a single byte) + return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)), 16 * 1024); } catch (Throwable e) { throw new KafkaException(e); } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index bb9d4d421752f..5a6ec990d8ace 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,7 +116,7 @@ versions += [ testRetryPlugin: "1.1.6", zinc: "1.3.5", zookeeper: "3.5.8", - zstd: "1.4.5-6" + zstd: "1.4.5-12" ] libs += [ activation: "javax.activation:activation:$versions.activation",