KAFKA-9437: KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies#7994
KAFKA-9437: KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies#7994hachikuji merged 8 commits intoapache:trunkfrom
Conversation
|
ok to test |
|
retest this please |
|
ok to test |
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
Looks good overall. Left a few small comments.
| } | ||
|
|
||
| public boolean areProtocolTypeAndNamePresent() { | ||
| return version() >= 5 && (data.protocolType() == null || data.protocolName() == null); |
There was a problem hiding this comment.
The version check is probably not necessary. Also, shouldn't we be checking for non-null entries?
There was a problem hiding this comment.
I think that we need the version check. ProtocolType and ProtocolName are null by default thus they will be null here for all the versions oldest than version 5 and we must accept them, right? However, for newer version, we want to ensure that both fields are provided.
Regarding the non-null entries, the verification (== provided values match the group's ones) is done in the GroupCoordinator when the fields are provided. Does it answer your question?
There was a problem hiding this comment.
Maybe I'm missing something, but the function name suggests that it should return true if both the protocol type and name are non-null. The current logic returns true if either field is null. Are we missing a !?
There was a problem hiding this comment.
Oh... The name is indeed misleading. I have reworked this part a bit. Now, the function is called areMandatoryProtocolTypeAndNamePresent. It returns true for any version above 5, and verifies that both fields are non-null for versions from 5 and above. The call is the Api layer has been updated accordingly. Apparently, my unit tests were broken as well as I was not calling the right function. Good catch!
| leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, | ||
| protocolSuperset, leaderInstanceId, DefaultSessionTimeout) | ||
|
|
||
| // JoinGroup(follower): The Protocol Type is Defined when there is not error |
There was a problem hiding this comment.
nit: I'd suggest making each one of these "stanzas" a separate test case
There was a problem hiding this comment.
You're right. Let me separate them.
| val capturedCallback = EasyMock.newCapture[JoinGroupCallback]() | ||
|
|
||
| EasyMock.expect(groupCoordinator.handleJoinGroup( | ||
| anyString, |
There was a problem hiding this comment.
Would it be worth adding explicit matchers for these?
There was a problem hiding this comment.
Good point. Being explicit is way better for sure.
| } | ||
|
|
||
| private boolean isProtocolNameInconsistent(String protocolName) { | ||
| return protocolName != null && !protocolName.equals(generation().protocol); |
There was a problem hiding this comment.
Should we change Generation.protocol to protocolName as well?
|
@hachikuji I just pushed an update which, I think, addresses all your points. Thanks for the review! |
|
ok to test |
|
retest this please |
|
@hachikuji I just pushed a fix for Do you have an idea why the CI jobs are not linked to the PR? |
|
retest this please |
1 similar comment
|
retest this please |
|
ok to test |
|
Couple of tests have failed. It seems there are all flaky ones. I have ran them locally to verify and they all pass. For the record, failed tests are: JDK 11 and Scala 2.13
JDK 8 and Scala 2.12
|
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the updates. Just a few additional comments.
| if (error == Errors.NONE) { | ||
| if (isProtocolTypeInconsistent(syncResponse.data.protocolType()) | ||
| || isProtocolNameInconsistent(syncResponse.data.protocolName())) { | ||
| log.debug("SyngGroup failed: Received inconsistent ProtocolType ({}) and/or ProtocolName ({})", |
There was a problem hiding this comment.
nit: I'd suggest breaking this into two separate cases so that the user doesn't have to guess what was inconsistent. Also, can we include the expected value?
There was a problem hiding this comment.
That makes sense. I will also include the expected value.
| RequestFuture<ByteBuffer> future) { | ||
| Errors error = syncResponse.error(); | ||
| if (error == Errors.NONE) { | ||
| if (isProtocolTypeInconsistent(syncResponse.data.protocolType()) |
There was a problem hiding this comment.
I think the error code should take precedence over this validation. Perhaps we do this only when the error is NONE? Similarly for the JoinGroup response.
There was a problem hiding this comment.
You're right. Let me do this.
| } | ||
|
|
||
| private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = { | ||
| val protocolType = if (error == Errors.NONE) group.protocolType else None |
There was a problem hiding this comment.
nit: we could do this in a single line with a dual assignment val (protocolType, protocolName) = ...
| memberId = memberId, | ||
| generationId = GroupCoordinator.NoGeneration, | ||
| protocolType = None, | ||
| protocolName = GroupCoordinator.NoProtocol, |
There was a problem hiding this comment.
The inconsistency with SyncGroup is a little annoying, but I guess we have to keep it for compatibility. I was considering if it's worthwhile making the this field nullable going forward.
There was a problem hiding this comment.
You bring a really good point that I haven't considered. I have been looking at it and I think that it makes sense to make it nullable going forward. It means that Protocol Name would be null in case of an error instead of an empty string for versions 7 and above. Overall, it makes the internals better, the API more consistent and the changes in the API layer to handle the backward compatibility are small. Moreover, the impact on clients is very limited as likely few or none of them relies on the Protocol Name in case of an error. We don't for instance. Long story short, I am up for it.
I went ahead and made the changes, let me know what do you think.
There was a problem hiding this comment.
From a non OO language perspective, changing the protocol name field in the join group response from non-null to nullable isn't the greatest / is backwards incompatible.
Effectively, in Go, either I consider the string type in protocol to be a legitimate object that can at some future point be nullable, or I use the Go string primitive and hope that the Java side never changes the equation.
This will be the first instance of a primitive (non array) switching from non-nullable to nullable. So, while from a backwards compatibility perspective, there are no problems for the Java client, there are problems for clients of non-OO languages.
Because of the incompatibility, I can add support for decoding null as non-null, but I can't really add support for allowing the field itself in the JoinGroupResponse struct to be null. To do so would effectively necessitate adding a second field (non-null field v6 and below, nullable field v7 up).
There was a problem hiding this comment.
Thank you for your feedback. You bring a point that we haven't considered. I think that it is more challenging for languages where the string type is not an object such a Go.
Have you considered changing the field type to a pointer to a string? Usually, the internal protocol is not exposed to the users of the consumer thus changing the type should only impact the internals which seems acceptable. That being said, I am not aware of your project so I may be completely wrong.
There was a problem hiding this comment.
I think switching to a pointer now is alright. I agree about the internal protocol not usually being exposed, so it's probably alright to have more backwards incompatible changes in that. The API of the protocol is public, though, to theoretically allow anybody to build on top of it.
It's a work in progress, but barring integration tests showing clear failures, the API is complete.
| } | ||
|
|
||
| @Test | ||
| def testJoinGroupReturnsAnNoneProtocolTypeWhenAnErrorOccurs(): Unit = { |
There was a problem hiding this comment.
nit: "an none" seems ungrammatical
There was a problem hiding this comment.
Indeed. I am not sure how I managed to write this ;)
| <Match> | ||
| <!-- The code generator generates useless condition. Disable the check temporarily. --> | ||
| <Class name="org.apache.kafka.common.message.JoinGroupResponseData"/> | ||
| <Bug pattern="UC_USELESS_CONDITION"/> | ||
| </Match> | ||
|
|
There was a problem hiding this comment.
With the nullable Protocol Name, the code generator generates unreachable condition. I have added an exception for the time being and will address this in a follow up PR if you don't mind. It does not impact the behaviour of this PR.
|
@hachikuji Thanks for the review. I have updated the PR to address your points. |
|
ok to test |
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch.
|
Two test failures in latest build https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/438/testReport/ They pass locally for me |
Conflicts and/or compiler errors due to the fact that we temporarily reverted the commit that removes Scala 2.11 support: * SslAdminIntegrationTest: keep using JAdminClient, take upstream changes otherwise. * ReassignPartitionsClusterTest: keep using JAdminClient, take upstream changes otherwise. * KafkaApis: use `asScala.foreach` instead of `forEach`. # By Ismael Juma (3) and others # Via GitHub * apache-github/trunk: (22 commits) KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (apache#7994) KAFKA-9375: Add names to all Connect threads (apache#7901) MINOR: Introduce 2.5-IV0 IBP (apache#8010) KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (apache#8011) Add retries to release.py script (apache#8021) KAFKA-8162: IBM JDK Class not found error when handling SASL (apache#6524) MINOR: Add explicit result type in public defs/vals (apache#7993) KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (apache#7940) KAFKA-9474: Adds 'float64' to the RPC protocol types (apache#8012) KAFKA-9360: Allow disabling MM2 heartbeat and checkpoint emissions (apache#7887) KAFKA-7658: Add KStream#toTable to the Streams DSL (apache#7985) KAFKA-9445: Allow adding changes to allow serving from a specific partition (apache#7984) KAFKA-9422: Track the set of topics a connector is using (KIP-558) (apache#8017) KAFKA-9040; Add --all option to config command (apache#7607) KAFKA-4203: Align broker default for max.message.bytes with Java producer default (apache#4154) KAFKA-9426: Use switch instead of chained if/else in OffsetsForLeaderEpochClient (apache#7959) KAFKA-9405: Use Map.computeIfAbsent where applicable (apache#7937) KAFKA-9026: Use automatic RPC generation in DescribeAcls (apache#7560) MINOR: Remove unused fields in StreamsMetricsImpl (apache#7992) KAFKA-9460: Enable only TLSv1.2 by default and disable other TLS protocol versions (KIP-553) (apache#7998) ...
| // Version 6 is the first flexible version. | ||
| "validVersions": "0-6", | ||
| // | ||
| // Starting from version 7, the broker sends back the Protocol Type to the client (KIP-599). |
| "validVersions": "0-4", | ||
| // | ||
| // Starting from version 5, the client sends the Protocol Type and the Protocol Name | ||
| // to the broker (KIP-599). The broker will reject the request if they are inconsistent |
…8324) This bug was incurred by #7994 with a too-strong consistency check. It is because a reset generation operation could be called in between the joinGroupRequest -> joinGroupResponse -> SyncGroupRequest -> SyncGroupResponse sequence of events, if user calls unsubscribe in the middle of consumer#poll(). Proper fix is to avoid the protocol name check when the generation is invalid. Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…8324) This bug was incurred by #7994 with a too-strong consistency check. It is because a reset generation operation could be called in between the joinGroupRequest -> joinGroupResponse -> SyncGroupRequest -> SyncGroupResponse sequence of events, if user calls unsubscribe in the middle of consumer#poll(). Proper fix is to avoid the protocol name check when the generation is invalid. Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

This PR implements the KIP-559: https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies
ProtocolTypeandProtocolNameare used across the board in the coordinator instead of having a mix of protocol type, protocol name, subprotocol, protocol, etc.Committer Checklist (excluded from commit message)