Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
// Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller passes a small number of bytes to write (potentially a single byte)
return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer), 16 * 1024);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is BufferedOutputStream good to other compression also?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be safe. It is used already in this file to wrap the GZIPOutputStream which also does not buffer uncompressed data prior to compression.

} catch (Throwable e) {
throw new KafkaException(e);
}
Expand All @@ -128,7 +130,9 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance
// in cases where the caller reads a small number of bytes (potentially a single byte)
return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)), 16 * 1024);
} catch (Throwable e) {
throw new KafkaException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ versions += [
testRetryPlugin: "1.1.6",
zinc: "1.3.5",
zookeeper: "3.5.8",
zstd: "1.4.5-6"
zstd: "1.4.5-12"
]
libs += [
activation: "javax.activation:activation:$versions.activation",
Expand Down