KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version#10965
Conversation
ba2bee9 to
59adf86
Compare
showuon
left a comment
There was a problem hiding this comment.
@rajinisivaram , thanks for the PR. Nice tests! Left some comments.
|
|
||
| public class FindCoordinatorRequest extends AbstractRequest { | ||
|
|
||
| public static final short MIN_BATCHED_VERSION = 4; |
| if (version < MIN_BATCHED_VERSION) { | ||
| if (batchedKeys > 1) | ||
| throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + | ||
| "because we require features supported only in 4 or later."); |
There was a problem hiding this comment.
nit: "because we require features supported only in" + MIN_BATCHED_VERSION + "or later.");
| abstract AbstractRequest.Builder<?> requestBuilder(); | ||
|
|
||
| abstract void handleResponse(AbstractResponse responseBody); | ||
| abstract void handleResponse(AbstractResponse responseBody, short requestVersion); |
There was a problem hiding this comment.
maybe we can add a comment or java doc here to describe why requestVersion is necessary here.
| } | ||
| FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; | ||
| if (batch) { | ||
| if (!response.data().coordinators().isEmpty()) { |
There was a problem hiding this comment.
could we add a comment here to explain what case it is (ex: batch response case)
|
@showuon Thanks for the review, addressed comments. |
mimaison
left a comment
There was a problem hiding this comment.
Thanks @rajinisivaram for this fix!
chia7712
left a comment
There was a problem hiding this comment.
@rajinisivaram thanks for this nice patch. This patch also resolves the slow poll when requesting to older broker (#10963). a couple of comments below. PTAL
| } else { | ||
| data.setKey(this.rebalanceConfig.groupId); | ||
| } | ||
| data.setKey(this.rebalanceConfig.groupId); |
There was a problem hiding this comment.
this line can be merged. for example:
FindCoordinatorRequestData data = new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId);| FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; | ||
| if (batch) { | ||
| // Handle response based on whether batched version is used by checking `coordinators()` in the response data | ||
| if (!response.data().coordinators().isEmpty()) { |
There was a problem hiding this comment.
Both of consumer coordinator and transaction coordinator parse response according to version. By contrast, it check coordinators. Could we unify the behavior? Could consumer coordinator and transaction coordinator check coordinators instead? It seems checking coordinators can simplify the code as we don't need to pass request version to each handler.
| } else { | ||
| data.setKey(coordinatorKey); | ||
| } | ||
| data.setKey(coordinatorKey); |
dajac
left a comment
There was a problem hiding this comment.
@rajinisivaram Thanks for the fix! I left few suggestions.
| public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { | ||
| log.debug("Received FindCoordinator response {}", resp); | ||
|
|
||
| boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION; |
There was a problem hiding this comment.
How about adding a coordinators method to FindCoordinatorResponse which would either return the list of coordinators (data.coordinators()) if not empty or would return a list containing a Coordinator created from the top level information. That would remove all the batch checks below.
| public void handleResponse(AbstractResponse response, short requestVersion) { | ||
| FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; | ||
| CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); | ||
| boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION; |
There was a problem hiding this comment.
The above suggestion would also us to avoid having to pass the requestVersion down here.
| env.kafkaClient().setNodeApiVersions( | ||
| NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); | ||
|
|
||
| // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched |
|
|
||
| // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched | ||
| env.kafkaClient().prepareResponse(null); | ||
| //Retriable FindCoordinatorResponse errors should be retried |
There was a problem hiding this comment.
nit: Should we add a space before Retriable?
dajac
left a comment
There was a problem hiding this comment.
LGTM, assuming tests pass. Thanks!
…on version (apache#10965) KIP-699 added support for batching in FindCoordinatorRequest using a new protocol that changes the wire format for both batched and unbatched requests. Clients were updated to try the new format first and switch irreversibly to the old format if the new format is not supported on one broker. During rolling upgrade (or a downgrade), it is possible that a broker doesn't support new format at some point while other brokers do at a later point. Clients end up in a bad state until restarted since they use new version with old format. This PR changes FindCoordinatorRequest to set data based on actual version when a single group is used. This is always the case for consumer coordinator and transaction manager. For admin API, we still switch to unbatched mode on failure, but the data is set based on actual version, so we never fail even if brokers are upgraded/downgraded. Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
KIP-699 added support for batching in FindCoordinatorRequest using a new protocol that changes the wire format for both batched and unbatched requests. Clients were updated to try the new format first and switch irreversibly to the old format if the new format is not supported on one broker. During rolling upgrade (or a downgrade), it is possible that a broker doesn't support new format at some point while other brokers do at a later point. Clients end up in a bad state until restarted since use new version with old format. This PR changes FindCoordinatorRequest to set data based on actual version when a single group is used. This is always the case for consumer coordinator and transaction coordinator. For admin API, we still switch to unbatched mode on failure, but the data is set based on actual version, so we never fail even if brokers are upgraded/downgraded.
Committer Checklist (excluded from commit message)