MINOR: Remove connection id from Send and consolidate request/message utils#9714
MINOR: Remove connection id from Send and consolidate request/message utils#9714ijuma merged 8 commits intoapache:trunkfrom
Conversation
23ebe94 to
87db298
Compare
87db298 to
9b849c4
Compare
|
|
||
| int headerSize = header.size(cache, headerVersion); | ||
| int messageSize = apiMessage.size(cache, apiVersion); | ||
| ByteBufferAccessor writable = new ByteBufferAccessor(ByteBuffer.allocate(4 + headerSize + messageSize)); |
There was a problem hiding this comment.
Why it need 4 byte if it exclude size prefix?
There was a problem hiding this comment.
Great catch. This was a copy and paste bug and we had no tests verifying this. I have fixed it and added a test.
| @@ -46,8 +45,7 @@ public class MultiRecordsSend implements Send { | |||
| * Construct a MultiRecordsSend for the given destination from a queue of Send objects. The queue will be | |||
There was a problem hiding this comment.
This comment need to be updated.
| @@ -24,11 +24,6 @@ | |||
| */ | |||
| public interface Send { | |||
There was a problem hiding this comment.
please remove "destination" from the docs
| // `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long | ||
| if (openOrClosingChannel(connectionId).isDefined) { | ||
| selector.send(responseSend) | ||
| selector.send(new NetworkSend(response.request.context.connectionId, responseSend)) |
There was a problem hiding this comment.
line#955 already has connectionId.
| handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT) | ||
| tryUnmuteChannel(send.destination) | ||
| handleChannelMuteEvent(send.destinationId(), ChannelMuteEvent.RESPONSE_SENT) | ||
| tryUnmuteChannel(send.destinationId()) |
There was a problem hiding this comment.
How about keeping "parameterless"? fewer changes and more clear.
There was a problem hiding this comment.
Looks like the IDE added this when I renamed it or something. Fixed.
| final int numNodes, | ||
| final Map<String, Errors> topicErrors, | ||
| final Map<String, Integer> topicPartitionCounts, | ||
| final short responseVersion) { |
There was a problem hiding this comment.
Looks like a test bug from the previous PR. Will fix so that it's used.
| public static ByteBuffer serializeRequestHeader(RequestHeader header) { | ||
| ObjectSerializationCache serializationCache = new ObjectSerializationCache(); | ||
| ByteBuffer buffer = ByteBuffer.allocate(header.size(serializationCache)); | ||
| header.write(buffer, serializationCache); |
There was a problem hiding this comment.
RequestHeader#write has only 2 usages and both of them are in test scope. It should be fine to remove RequestHeader#write from production.
There was a problem hiding this comment.
It's about conceptual integrity for the class. It should provide a mechanism for serialization that doesn't require reaching into its internal structures.
|
@chia7712 Thanks for the review, I'm particularly glad you caught the 4 wasted bytes. :) I pushed updates and left a comment in the one case I haven't addressed. |
| /** | ||
| * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content. | ||
| */ | ||
| public class SizeDelimitedSend extends ByteBufferSend { |
There was a problem hiding this comment.
How about making SizeDelimitedSend be a static method in ByteBufferSend? For example:
public static Send withSizeDelimited(ByteBuffer buffer) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(0, buffer.remaining());
return new ByteBufferSend(sizeBuffer, buffer);
}There was a problem hiding this comment.
I had the same thought and initially did that, but it's an issue for many tests since they verify the properties of the ByteBuffer and it is not accessible if we do this.
There was a problem hiding this comment.
but it's an issue for many tests since they verify the properties of the ByteBuffer and it is not accessible if we do this.
Can it be resolved if we return ByteBuffer rather than Send?
There was a problem hiding this comment.
It has to be a Send too. I looked at this a bit closer and we can just return ByteBufferSend from the static factory method. The thing I attempted before was to combine at the NetworkSend level and that doesn't work since NetworkSend can work with Sends that are not backed by a ByteBuffer. But it actually works fine at ByteBufferSend level. Thanks for the suggestion, please check the update.
|
|
Tests passed, merging to trunk. |
Connection id is now only present in
NetworkSend, which is nowthe class used by
Selector/NetworkClient/KafkaChannel(whichworks well since
NetworkReceiveis the class used forreceived data).
The previous
NetworkSendwas also responsible for adding a sizeprefix. This logic is already present in
SendBuilder, but for theminority of cases where
SendBuilderis not used (includinga number of tests), we now have
ByteBufferSend.sizePrefixed().With regards to the request/message utilities:
toByteBuffer/toBytesinMessageUtiltotoVersionPrefixedByteBuffer/toVersionPrefixedBytesfor clarity.MessageUtil.toByteBufferthat does not includethe version as the prefix.
serializeBodyinAbstractRequest/Responsetoserializefor symmetry withparse.RequestTestUtilsand moved relevant methods fromTestUtils.serializeWithHeadermethods that were only used intests to
RequestTestUtils.MessageTestUtil.Finally, a couple of changes to simplify coding patterns:
flip()andbuffer()toByteBufferAccessor.MessageSizeAccumulator.sizeExcludingZeroCopy.TestCondition.Arrays.copyOfinstead ofSystem.arraycopyinMessageUtil.Committer Checklist (excluded from commit message)