fix: replace an inefficient loop in kafka internals#13162
fix: replace an inefficient loop in kafka internals#13162dpcollins-google wants to merge 4 commits intoapache:trunkfrom
Conversation
Instead use Channels.newChannel to write in larger chunks
| int pos = buffer.position(); | ||
| for (int i = pos; i < length + pos; i++) | ||
| out.writeByte(buffer.get(i)); | ||
| Channels.newChannel(out).write(buffer); |
There was a problem hiding this comment.
- Is this preserving the same behaviour i.e. copy in the range
[pos, pos + length]? - This is mutating the position of the
buffer, as opposed to the current implementation.
There was a problem hiding this comment.
- Yes, per the docs of WritableByteChannel
Writes a sequence of bytes to this channel from the given buffer.
An attempt is made to write up to r bytes to the channel, where r is the number of bytes remaining in the buffer, that is, src.remaining(), at the moment this method is invoked.
Suppose that a byte sequence of length n is written, where 0 <= n <= r. This byte sequence will be transferred from the buffer starting at index p, where p is the buffer's position at the moment this method is invoked; the index of the last byte written will be p + n - 1. Upon return the buffer's position will be equal to p + n; its limit will not have changed.
- Good point, although I don't think this has an effect on any of the 4 current users (DefaultRecord.writeTo and LegacyRecord.writeTo for writing key and value), I've added a defensive call to asReadOnlyBuffer.
There was a problem hiding this comment.
Thanks for the follow-up.
-
This new implementation ignores the
lengthargument provided to the method if the buffer is not backed by an array. What iflengthdoes not equal the number of remaining bytes on the buffer? -
Is there an actual optimization offered by calling
write? The implementation of direct buffers use a similar linear iteration. Do you have data showing performance improvements with this implementation?
There was a problem hiding this comment.
Per 1): This parameter is always buffer.remaining(), I've cleaned up the call sites and removed this parameter.
Per 2): Yes, its substantial. The reason is WritableByteChannelImpl writes in 8k chunks when feasible, instead of 1 byte chunks https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/2544d2a351eca1a3d62276f969dd2d95e4a4d2b6/jdk/src/share/classes/java/nio/channels/Channels.java#L442
I can't show benchmarks unfortunately to demonstrate this, as they're of a production application and collected using internal tooling
There was a problem hiding this comment.
Got it. Thanks.
Would you have performance gains at a high-level, without sharing details on the application?
Publicly available data showing the expectable gains would back up the PR further. Has this been discussed in the dev mailing list already?
There was a problem hiding this comment.
Thanks for the PR. Can you please provide a bit more detail regarding the workload? I assume we're talking about the producer here. If so, can you please share whether compression was used, the compression algorithm (if applicable) and the average message size.
There was a problem hiding this comment.
This is producer code, with no compression (the data is pre-encrypted so it would be useless anyway) and the message size is 1-2 kB
There was a problem hiding this comment.
ping @ijuma / @Hangleton , are there any blockers to getting this merged?
There was a problem hiding this comment.
Apologies for the delay - would you have any JMH benchmark for this change? E.g. something like in #13312.
There was a problem hiding this comment.
Sorry for the delay. I was trying to understand how to show the improvement, but we seem to always pass a heap byte buffer. How can I reproduce this improvement?
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) |
|
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
|
This PR has been closed since it has not had any activity in 120 days. If you feel like this |
Instead use Channels.newChannel to write in larger chunks