diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java index cb44ad40e87b4..e6fc0d624a0cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java @@ -97,22 +97,21 @@ public LookupResult handleResponse( failedKeys.put(key, new InvalidGroupIdException("The given group id '" + key.idValue + "' cannot be represented in a request.")); } - FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - if (batch) { - for (Coordinator coordinator : response.data().coordinators()) { - CoordinatorKey key = (type == CoordinatorType.GROUP) + + for (Coordinator coordinator : ((FindCoordinatorResponse) abstractResponse).coordinators()) { + CoordinatorKey key; + if (coordinator.key() == null) // old version without batching + key = requireSingletonAndType(keys); + else { + key = (type == CoordinatorType.GROUP) ? CoordinatorKey.byGroupId(coordinator.key()) : CoordinatorKey.byTransactionalId(coordinator.key()); - handleError(Errors.forCode(coordinator.errorCode()), - key, - coordinator.nodeId(), - mappedKeys, - failedKeys); } - } else { - CoordinatorKey key = requireSingletonAndType(keys); - Errors error = response.error(); - handleError(error, key, response.node().id(), mappedKeys, failedKeys); + handleError(Errors.forCode(coordinator.errorCode()), + key, + coordinator.nodeId(), + mappedKeys, + failedKeys); } return new LookupResult<>(failedKeys, mappedKeys); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4123914c6c9e2..9608b8fe3c3aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -52,7 +53,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; -import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; @@ -139,7 +139,6 @@ public boolean hasNotJoinedGroup() { private RequestFuture joinFuture = null; private RequestFuture findCoordinatorFuture = null; private volatile RuntimeException fatalFindCoordinatorException = null; - private volatile boolean batchFindCoordinator = true; private Generation generation = Generation.NO_GENERATION; private long lastRebalanceStartMs = -1L; private long lastRebalanceEndMs = -1L; @@ -815,56 +814,38 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequestData data = new FindCoordinatorRequestData() - .setKeyType(CoordinatorType.GROUP.id()); - if (batchFindCoordinator) { - data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); - } else { - data.setKey(this.rebalanceConfig.groupId); - } + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(this.rebalanceConfig.groupId); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); + .compose(new FindCoordinatorResponseHandler()); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter { - private boolean batch; - FindCoordinatorResponseHandler(boolean batch) { - this.batch = batch; - } @Override public void onSuccess(ClientResponse resp, RequestFuture future) { log.debug("Received FindCoordinator response {}", resp); - FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); - if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { + List coordinators = ((FindCoordinatorResponse) resp.responseBody()).coordinators(); + if (coordinators.size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); } - Errors error = batch - ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) - : findCoordinatorResponse.error(); + Coordinator coordinatorData = coordinators.get(0); + Errors error = Errors.forCode(coordinatorData.errorCode()); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { - int nodeId = batch - ? findCoordinatorResponse.data().coordinators().get(0).nodeId() - : findCoordinatorResponse.data().nodeId(); - String host = batch - ? findCoordinatorResponse.data().coordinators().get(0).host() - : findCoordinatorResponse.data().host(); - int port = batch - ? findCoordinatorResponse.data().coordinators().get(0).port() - : findCoordinatorResponse.data().port(); // use MAX_VALUE - node.id as the coordinator id to allow separate connections // for the coordinator in the underlying network client layer - int coordinatorConnectionId = Integer.MAX_VALUE - nodeId; + int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId(); AbstractCoordinator.this.coordinator = new Node( coordinatorConnectionId, - host, - port); + coordinatorData.host(), + coordinatorData.port()); log.info("Discovered group coordinator {}", coordinator); client.tryConnect(coordinator); heartbeat.resetSessionTimeout(); @@ -873,10 +854,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { - String errorMessage = batch - ? findCoordinatorResponse.data().coordinators().get(0).errorMessage() - : findCoordinatorResponse.data().errorMessage(); - log.debug("Group coordinator lookup failed: {}", errorMessage); + log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage()); future.raise(error); } } @@ -885,12 +863,6 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { public void onFailure(RuntimeException e, RequestFuture future) { log.debug("FindCoordinator request failed due to {}", e.toString()); - if (e instanceof NoBatchedFindCoordinatorsException) { - batchFindCoordinator = false; - clearFindCoordinatorFuture(); - lookupCoordinator(); - return; - } if (!(e instanceof RetriableException)) { // Remember the exception if fatal so we can ensure it gets thrown by the main thread fatalFindCoordinatorException = e; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 2febc2ec89a5c..2de31a03c5694 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.KafkaException; @@ -58,7 +59,6 @@ import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; -import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -72,11 +72,11 @@ import org.slf4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.OptionalInt; @@ -99,7 +99,6 @@ public class TransactionManager { private final String transactionalId; private final int transactionTimeoutMs; private final ApiVersions apiVersions; - private boolean batchFindCoordinator = true; private static class TopicPartitionBookkeeper { @@ -1144,12 +1143,8 @@ private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, Stri } FindCoordinatorRequestData data = new FindCoordinatorRequestData() - .setKeyType(type.id()); - if (batchFindCoordinator) { - data.setCoordinatorKeys(Collections.singletonList(coordinatorKey)); - } else { - data.setKey(coordinatorKey); - } + .setKeyType(type.id()) + .setKey(coordinatorKey); FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(data); enqueueRequest(new FindCoordinatorHandler(builder)); } @@ -1284,9 +1279,6 @@ public void onComplete(ClientResponse response) { if (this.needsCoordinator()) lookupCoordinator(this.coordinatorType(), this.coordinatorKey()); reenqueue(); - } else if (response.versionMismatch() instanceof NoBatchedFindCoordinatorsException && response.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR) { - batchFindCoordinator = false; - reenqueue(); } else if (response.versionMismatch() != null) { fatalError(response.versionMismatch()); } else if (response.hasResponse()) { @@ -1533,30 +1525,19 @@ String coordinatorKey() { @Override public void handleResponse(AbstractResponse response) { - FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); - if (batchFindCoordinator && findCoordinatorResponse.data().coordinators().size() != 1) { + List coordinators = ((FindCoordinatorResponse) response).coordinators(); + if (coordinators.size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); fatalError(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); } - String key = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).key() - : builder.data().key(); - Errors error = batchFindCoordinator - ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) - : findCoordinatorResponse.error(); + Coordinator coordinatorData = coordinators.get(0); + // For older versions without batching, obtain key from request data since it is not included in response + String key = coordinatorData.key() == null ? builder.data().key() : coordinatorData.key(); + Errors error = Errors.forCode(coordinatorData.errorCode()); if (error == Errors.NONE) { - int nodeId = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).nodeId() - : findCoordinatorResponse.data().nodeId(); - String host = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).host() - : findCoordinatorResponse.data().host(); - int port = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).port() - : findCoordinatorResponse.data().port(); - Node node = new Node(nodeId, host, port); + Node node = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port()); switch (coordinatorType) { case GROUP: consumerGroupCoordinator = node; @@ -1574,12 +1555,9 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(key)); } else { - String errorMessage = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).errorMessage() - : findCoordinatorResponse.data().errorMessage(); fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " + "unexpected error: %s", coordinatorType, key, - errorMessage))); + coordinatorData.errorMessage()))); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index 30f98ba0007e2..fcac7de545c2f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -26,9 +26,12 @@ import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; +import java.util.Collections; public class FindCoordinatorRequest extends AbstractRequest { + public static final short MIN_BATCHED_VERSION = 4; + public static class Builder extends AbstractRequest.Builder { private final FindCoordinatorRequestData data; @@ -43,9 +46,18 @@ public FindCoordinatorRequest build(short version) { throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in 2 or later."); } - if (version < 4 && !data.coordinatorKeys().isEmpty()) { - throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + - "because we require features supported only in 4 or later."); + int batchedKeys = data.coordinatorKeys().size(); + if (version < MIN_BATCHED_VERSION) { + if (batchedKeys > 1) + throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + + "because we require features supported only in " + MIN_BATCHED_VERSION + " or later."); + if (batchedKeys == 1) { + data.setKey(data.coordinatorKeys().get(0)); + data.setCoordinatorKeys(Collections.emptyList()); + } + } else if (batchedKeys == 0 && data.key() != null) { + data.setCoordinatorKeys(Collections.singletonList(data.key())); + data.setKey(""); // default value } return new FindCoordinatorRequest(data, version); } @@ -90,7 +102,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { response.setThrottleTimeMs(throttleTimeMs); } Errors error = Errors.forException(e); - if (version() < 4) { + if (version() < MIN_BATCHED_VERSION) { return FindCoordinatorResponse.prepareOldResponse(error, Node.noNode()); } else { return FindCoordinatorResponse.prepareErrorResponse(error, data.coordinatorKeys()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index 222f78ccb988b..156277a0e8de2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; @@ -89,6 +90,21 @@ public boolean shouldClientThrottle(short version) { return version >= 2; } + public List coordinators() { + if (!data.coordinators().isEmpty()) + return data.coordinators(); + else { + FindCoordinatorResponseData.Coordinator coordinator = new Coordinator() + .setErrorCode(data.errorCode()) + .setErrorMessage(data.errorMessage()) + .setKey(null) + .setNodeId(data.nodeId()) + .setHost(data.host()) + .setPort(data.port()); + return Collections.singletonList(coordinator); + } + } + public static FindCoordinatorResponse prepareOldResponse(Errors error, Node node) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); data.setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 446e22d17ff93..53e326ad8955f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3155,8 +3155,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //Retriable FindCoordinatorResponse errors should be retried env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); @@ -3177,8 +3175,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { final KafkaFuture results = result.deletedGroups().get("groupId"); assertNull(results.get()); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //should throw error for non-retriable errors env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); @@ -3186,8 +3182,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { } } + @Test + public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { + final List groupIds = asList("group1", "group2"); + ApiVersion findCoordinatorV3 = new ApiVersion() + .setApiKey(ApiKeys.FIND_COORDINATOR.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + ApiVersion describeGroups = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion()); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions( + NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + + // Dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched + env.kafkaClient().prepareResponse(null); + // Retriable FindCoordinatorResponse errors should be retried + for (int i = 0; i < groupIds.size(); i++) { + env.kafkaClient().prepareResponse( + prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + } + for (int i = 0; i < groupIds.size(); i++) { + env.kafkaClient().prepareResponse( + prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + } + + final DeletableGroupResultCollection validResponse = new DeletableGroupResultCollection(); + validResponse.add(new DeletableGroupResult() + .setGroupId("group1") + .setErrorCode(Errors.NONE.code())); + validResponse.add(new DeletableGroupResult() + .setGroupId("group2") + .setErrorCode(Errors.NONE.code())); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(validResponse) + )); + + final DeleteConsumerGroupsResult result = env.adminClient() + .deleteConsumerGroups(groupIds); + + final KafkaFuture results = result.deletedGroups().get("group1"); + assertNull(results.get(5, TimeUnit.SECONDS)); + } + } + @Test public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { final Cluster cluster = mockCluster(3, 0); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 2fabba1843d5c..4d8e69e7026b7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -194,6 +194,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest.Builder; import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigType; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -227,6 +228,7 @@ import static org.apache.kafka.common.protocol.ApiKeys.DELETE_TOPICS; import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CONFIGS; import static org.apache.kafka.common.protocol.ApiKeys.FETCH; +import static org.apache.kafka.common.protocol.ApiKeys.FIND_COORDINATOR; import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP; import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR; import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS; @@ -249,12 +251,6 @@ public class RequestResponseTest { @Test public void testSerialization() throws Exception { - checkRequest(createFindCoordinatorRequest(0), true); - checkRequest(createFindCoordinatorRequest(1), true); - checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true); - checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true); - checkResponse(createFindCoordinatorResponse(), 0, true); - checkResponse(createFindCoordinatorResponse(), 1, true); checkRequest(createControlledShutdownRequest(), true); checkResponse(createControlledShutdownResponse(), 1, true); checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true); @@ -596,6 +592,22 @@ public void testUnregisterBrokerSerialization() { } } + @Test + public void testFindCoordinatorRequestSerialization() { + for (short version : ApiKeys.FIND_COORDINATOR.allVersions()) { + checkRequest(createFindCoordinatorRequest(version), true); + checkRequest(createBatchedFindCoordinatorRequest(Collections.singletonList("group1"), version), true); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + assertThrows(NoBatchedFindCoordinatorsException.class, () -> + createBatchedFindCoordinatorRequest(Arrays.asList("group1", "group2"), version)); + } else { + checkRequest(createBatchedFindCoordinatorRequest(Arrays.asList("group1", "group2"), version), true); + } + checkErrorResponse(createFindCoordinatorRequest(version), unknownServerException, true); + checkResponse(createFindCoordinatorResponse(version), version, true); + } + } + private DescribeClusterRequest createDescribeClusterRequest(short version) { return new DescribeClusterRequest.Builder( new DescribeClusterRequestData() @@ -1190,9 +1202,20 @@ private FindCoordinatorRequest createFindCoordinatorRequest(int version) { .build((short) version); } - private FindCoordinatorResponse createFindCoordinatorResponse() { + private FindCoordinatorRequest createBatchedFindCoordinatorRequest(List coordinatorKeys, int version) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setCoordinatorKeys(coordinatorKeys)) + .build((short) version); + } + + private FindCoordinatorResponse createFindCoordinatorResponse(short version) { Node node = new Node(10, "host1", 2014); - return FindCoordinatorResponse.prepareOldResponse(Errors.NONE, node); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) + return FindCoordinatorResponse.prepareOldResponse(Errors.NONE, node); + else + return FindCoordinatorResponse.prepareResponse(Errors.NONE, "group", node); } private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List toForget) { @@ -2826,7 +2849,8 @@ public void testErrorCountsIncludesNone() { assertEquals(Integer.valueOf(1), createEndTxnResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createExpireTokenResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(3), createFetchResponse(123).errorCounts().get(Errors.NONE)); - assertEquals(Integer.valueOf(1), createFindCoordinatorResponse().errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(1), createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(1), createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createHeartBeatResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));