From 59adf863202a61f04c1e2d0f0de3502d215798cd Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Sat, 3 Jul 2021 11:07:03 +0100 Subject: [PATCH 1/3] KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version --- .../admin/internals/CoordinatorStrategy.java | 2 +- .../internals/AbstractCoordinator.java | 23 ++------ .../internals/TransactionManager.java | 29 ++++------ .../requests/FindCoordinatorRequest.java | 18 +++++-- .../clients/admin/KafkaAdminClientTest.java | 54 ++++++++++++++++--- .../common/requests/RequestResponseTest.java | 42 +++++++++++---- 6 files changed, 111 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java index cb44ad40e87b4..e46174bfeded6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java @@ -98,7 +98,7 @@ public LookupResult handleResponse( key.idValue + "' cannot be represented in a request.")); } FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - if (batch) { + if (!response.data().coordinators().isEmpty()) { for (Coordinator coordinator : response.data().coordinators()) { CoordinatorKey key = (type == CoordinatorType.GROUP) ? CoordinatorKey.byGroupId(coordinator.key()) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4123914c6c9e2..e59d40ece2535 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -52,7 +52,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; -import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; @@ -139,7 +138,6 @@ public boolean hasNotJoinedGroup() { private RequestFuture joinFuture = null; private RequestFuture findCoordinatorFuture = null; private volatile RuntimeException fatalFindCoordinatorException = null; - private volatile boolean batchFindCoordinator = true; private Generation generation = Generation.NO_GENERATION; private long lastRebalanceStartMs = -1L; private long lastRebalanceEndMs = -1L; @@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequestData data = new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()); - if (batchFindCoordinator) { - data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); - } else { - data.setKey(this.rebalanceConfig.groupId); - } + data.setKey(this.rebalanceConfig.groupId); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); + .compose(new FindCoordinatorResponseHandler()); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter { - private boolean batch; - FindCoordinatorResponseHandler(boolean batch) { - this.batch = batch; - } @Override public void onSuccess(ClientResponse resp, RequestFuture future) { log.debug("Received FindCoordinator response {}", resp); + boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION; FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); @@ -885,12 +876,6 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { public void onFailure(RuntimeException e, RequestFuture future) { log.debug("FindCoordinator request failed due to {}", e.toString()); - if (e instanceof NoBatchedFindCoordinatorsException) { - batchFindCoordinator = false; - clearFindCoordinatorFuture(); - lookupCoordinator(); - return; - } if (!(e instanceof RetriableException)) { // Remember the exception if fatal so we can ensure it gets thrown by the main thread fatalFindCoordinatorException = e; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 2febc2ec89a5c..65266908c521e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -58,7 +58,6 @@ import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; -import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -72,7 +71,6 @@ import org.slf4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -99,7 +97,6 @@ public class TransactionManager { private final String transactionalId; private final int transactionTimeoutMs; private final ApiVersions apiVersions; - private boolean batchFindCoordinator = true; private static class TopicPartitionBookkeeper { @@ -1145,11 +1142,7 @@ private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, Stri FindCoordinatorRequestData data = new FindCoordinatorRequestData() .setKeyType(type.id()); - if (batchFindCoordinator) { - data.setCoordinatorKeys(Collections.singletonList(coordinatorKey)); - } else { - data.setKey(coordinatorKey); - } + data.setKey(coordinatorKey); FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(data); enqueueRequest(new FindCoordinatorHandler(builder)); } @@ -1284,16 +1277,13 @@ public void onComplete(ClientResponse response) { if (this.needsCoordinator()) lookupCoordinator(this.coordinatorType(), this.coordinatorKey()); reenqueue(); - } else if (response.versionMismatch() instanceof NoBatchedFindCoordinatorsException && response.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR) { - batchFindCoordinator = false; - reenqueue(); } else if (response.versionMismatch() != null) { fatalError(response.versionMismatch()); } else if (response.hasResponse()) { log.trace("Received transactional response {} for request {}", response.responseBody(), requestBuilder()); synchronized (TransactionManager.this) { - handleResponse(response.responseBody()); + handleResponse(response.responseBody(), response.requestHeader().apiVersion()); } } else { fatalError(new KafkaException("Could not execute transactional request for unknown reasons")); @@ -1327,7 +1317,7 @@ boolean isEndTxn() { abstract AbstractRequest.Builder requestBuilder(); - abstract void handleResponse(AbstractResponse responseBody); + abstract void handleResponse(AbstractResponse responseBody, short requestVersion); abstract Priority priority(); } @@ -1362,7 +1352,7 @@ FindCoordinatorRequest.CoordinatorType coordinatorType() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response; Errors error = initProducerIdResponse.error(); @@ -1415,7 +1405,7 @@ Priority priority() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; Map errors = addPartitionsToTxnResponse.errors(); boolean hasPartitionErrors = false; @@ -1532,9 +1522,10 @@ String coordinatorKey() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); + boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION; if (batchFindCoordinator && findCoordinatorResponse.data().coordinators().size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); @@ -1608,7 +1599,7 @@ boolean isEndTxn() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); @@ -1661,7 +1652,7 @@ Priority priority() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response; Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode()); @@ -1723,7 +1714,7 @@ String coordinatorKey() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response; boolean coordinatorReloaded = false; Map errors = txnOffsetCommitResponse.errors(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index 30f98ba0007e2..a10db273de542 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -26,9 +26,12 @@ import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; +import java.util.Collections; public class FindCoordinatorRequest extends AbstractRequest { + public static final short MIN_BATCHED_VERSION = 4; + public static class Builder extends AbstractRequest.Builder { private final FindCoordinatorRequestData data; @@ -43,9 +46,18 @@ public FindCoordinatorRequest build(short version) { throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in 2 or later."); } - if (version < 4 && !data.coordinatorKeys().isEmpty()) { - throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + + int batchedKeys = data.coordinatorKeys().size(); + if (version < MIN_BATCHED_VERSION) { + if (batchedKeys > 1) + throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + "because we require features supported only in 4 or later."); + if (batchedKeys == 1) { + data.setKey(data.coordinatorKeys().get(0)); + data.setCoordinatorKeys(Collections.emptyList()); + } + } else if (batchedKeys == 0 && data.key() != null) { + data.setCoordinatorKeys(Collections.singletonList(data.key())); + data.setKey(""); // default value } return new FindCoordinatorRequest(data, version); } @@ -90,7 +102,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { response.setThrottleTimeMs(throttleTimeMs); } Errors error = Errors.forException(e); - if (version() < 4) { + if (version() < MIN_BATCHED_VERSION) { return FindCoordinatorResponse.prepareOldResponse(error, Node.noNode()); } else { return FindCoordinatorResponse.prepareErrorResponse(error, data.coordinatorKeys()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 446e22d17ff93..6808fe6a4e6d3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3155,8 +3155,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //Retriable FindCoordinatorResponse errors should be retried env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); @@ -3177,8 +3175,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { final KafkaFuture results = result.deletedGroups().get("groupId"); assertNull(results.get()); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //should throw error for non-retriable errors env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); @@ -3186,8 +3182,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); //Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { } } + @Test + public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { + final List groupIds = asList("group1", "group2"); + ApiVersion findCoordinatorV3 = new ApiVersion() + .setApiKey(ApiKeys.FIND_COORDINATOR.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + ApiVersion describeGroups = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion()); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions( + NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + + // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched + env.kafkaClient().prepareResponse(null); + //Retriable FindCoordinatorResponse errors should be retried + for (int i = 0; i < groupIds.size(); i++) { + env.kafkaClient().prepareResponse( + prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + } + for (int i = 0; i < groupIds.size(); i++) { + env.kafkaClient().prepareResponse( + prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + } + + final DeletableGroupResultCollection validResponse = new DeletableGroupResultCollection(); + validResponse.add(new DeletableGroupResult() + .setGroupId("group1") + .setErrorCode(Errors.NONE.code())); + validResponse.add(new DeletableGroupResult() + .setGroupId("group2") + .setErrorCode(Errors.NONE.code())); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(validResponse) + )); + + final DeleteConsumerGroupsResult result = env.adminClient() + .deleteConsumerGroups(groupIds); + + final KafkaFuture results = result.deletedGroups().get("group1"); + assertNull(results.get(5, TimeUnit.SECONDS)); + } + } + @Test public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { final Cluster cluster = mockCluster(3, 0); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 2fabba1843d5c..4d8e69e7026b7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -194,6 +194,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest.Builder; import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigType; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -227,6 +228,7 @@ import static org.apache.kafka.common.protocol.ApiKeys.DELETE_TOPICS; import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CONFIGS; import static org.apache.kafka.common.protocol.ApiKeys.FETCH; +import static org.apache.kafka.common.protocol.ApiKeys.FIND_COORDINATOR; import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP; import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR; import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS; @@ -249,12 +251,6 @@ public class RequestResponseTest { @Test public void testSerialization() throws Exception { - checkRequest(createFindCoordinatorRequest(0), true); - checkRequest(createFindCoordinatorRequest(1), true); - checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true); - checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true); - checkResponse(createFindCoordinatorResponse(), 0, true); - checkResponse(createFindCoordinatorResponse(), 1, true); checkRequest(createControlledShutdownRequest(), true); checkResponse(createControlledShutdownResponse(), 1, true); checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true); @@ -596,6 +592,22 @@ public void testUnregisterBrokerSerialization() { } } + @Test + public void testFindCoordinatorRequestSerialization() { + for (short version : ApiKeys.FIND_COORDINATOR.allVersions()) { + checkRequest(createFindCoordinatorRequest(version), true); + checkRequest(createBatchedFindCoordinatorRequest(Collections.singletonList("group1"), version), true); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + assertThrows(NoBatchedFindCoordinatorsException.class, () -> + createBatchedFindCoordinatorRequest(Arrays.asList("group1", "group2"), version)); + } else { + checkRequest(createBatchedFindCoordinatorRequest(Arrays.asList("group1", "group2"), version), true); + } + checkErrorResponse(createFindCoordinatorRequest(version), unknownServerException, true); + checkResponse(createFindCoordinatorResponse(version), version, true); + } + } + private DescribeClusterRequest createDescribeClusterRequest(short version) { return new DescribeClusterRequest.Builder( new DescribeClusterRequestData() @@ -1190,9 +1202,20 @@ private FindCoordinatorRequest createFindCoordinatorRequest(int version) { .build((short) version); } - private FindCoordinatorResponse createFindCoordinatorResponse() { + private FindCoordinatorRequest createBatchedFindCoordinatorRequest(List coordinatorKeys, int version) { + return new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(CoordinatorType.GROUP.id()) + .setCoordinatorKeys(coordinatorKeys)) + .build((short) version); + } + + private FindCoordinatorResponse createFindCoordinatorResponse(short version) { Node node = new Node(10, "host1", 2014); - return FindCoordinatorResponse.prepareOldResponse(Errors.NONE, node); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) + return FindCoordinatorResponse.prepareOldResponse(Errors.NONE, node); + else + return FindCoordinatorResponse.prepareResponse(Errors.NONE, "group", node); } private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List toForget) { @@ -2826,7 +2849,8 @@ public void testErrorCountsIncludesNone() { assertEquals(Integer.valueOf(1), createEndTxnResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createExpireTokenResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(3), createFetchResponse(123).errorCounts().get(Errors.NONE)); - assertEquals(Integer.valueOf(1), createFindCoordinatorResponse().errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(1), createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE)); + assertEquals(Integer.valueOf(1), createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createHeartBeatResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); assertEquals(Integer.valueOf(1), createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE)); From 6ab08c21211457ecdb0f09cd9762c6bb6a3c4b51 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Sun, 4 Jul 2021 12:11:48 +0100 Subject: [PATCH 2/3] Address review comments --- .../kafka/clients/admin/internals/CoordinatorStrategy.java | 1 + .../kafka/clients/producer/internals/TransactionManager.java | 3 +++ .../apache/kafka/common/requests/FindCoordinatorRequest.java | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java index e46174bfeded6..22912418e347e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java @@ -98,6 +98,7 @@ public LookupResult handleResponse( key.idValue + "' cannot be represented in a request.")); } FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + // Handle response based on whether batched version is used by checking `coordinators()` in the response data if (!response.data().coordinators().isEmpty()) { for (Coordinator coordinator : response.data().coordinators()) { CoordinatorKey key = (type == CoordinatorType.GROUP) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 65266908c521e..4e6a980f8ce45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1317,6 +1317,9 @@ boolean isEndTxn() { abstract AbstractRequest.Builder requestBuilder(); + // Propagate requestVersion for parsing response since some responses use + // different fields depending on the API version. For example, FindCoordinator + // uses different format for versions that support batching. abstract void handleResponse(AbstractResponse responseBody, short requestVersion); abstract Priority priority(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index a10db273de542..fcac7de545c2f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -50,7 +50,7 @@ public FindCoordinatorRequest build(short version) { if (version < MIN_BATCHED_VERSION) { if (batchedKeys > 1) throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " + - "because we require features supported only in 4 or later."); + "because we require features supported only in " + MIN_BATCHED_VERSION + " or later."); if (batchedKeys == 1) { data.setKey(data.coordinatorKeys().get(0)); data.setCoordinatorKeys(Collections.emptyList()); From ee4e4ba3f8972d878a2d92ad98bebb69417d055e Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Mon, 5 Jul 2021 10:12:30 +0100 Subject: [PATCH 3/3] Address review comments --- .../admin/internals/CoordinatorStrategy.java | 26 ++++----- .../internals/AbstractCoordinator.java | 35 ++++-------- .../internals/TransactionManager.java | 56 +++++++------------ .../requests/FindCoordinatorResponse.java | 16 ++++++ .../clients/admin/KafkaAdminClientTest.java | 4 +- 5 files changed, 61 insertions(+), 76 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java index 22912418e347e..e6fc0d624a0cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java @@ -97,23 +97,21 @@ public LookupResult handleResponse( failedKeys.put(key, new InvalidGroupIdException("The given group id '" + key.idValue + "' cannot be represented in a request.")); } - FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - // Handle response based on whether batched version is used by checking `coordinators()` in the response data - if (!response.data().coordinators().isEmpty()) { - for (Coordinator coordinator : response.data().coordinators()) { - CoordinatorKey key = (type == CoordinatorType.GROUP) + + for (Coordinator coordinator : ((FindCoordinatorResponse) abstractResponse).coordinators()) { + CoordinatorKey key; + if (coordinator.key() == null) // old version without batching + key = requireSingletonAndType(keys); + else { + key = (type == CoordinatorType.GROUP) ? CoordinatorKey.byGroupId(coordinator.key()) : CoordinatorKey.byTransactionalId(coordinator.key()); - handleError(Errors.forCode(coordinator.errorCode()), - key, - coordinator.nodeId(), - mappedKeys, - failedKeys); } - } else { - CoordinatorKey key = requireSingletonAndType(keys); - Errors error = response.error(); - handleError(error, key, response.node().id(), mappedKeys, failedKeys); + handleError(Errors.forCode(coordinator.errorCode()), + key, + coordinator.nodeId(), + mappedKeys, + failedKeys); } return new LookupResult<>(failedKeys, mappedKeys); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index e59d40ece2535..9608b8fe3c3aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -815,8 +816,8 @@ private RequestFuture sendFindCoordinatorRequest(Node node) { // initiate the group metadata request log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequestData data = new FindCoordinatorRequestData() - .setKeyType(CoordinatorType.GROUP.id()); - data.setKey(this.rebalanceConfig.groupId); + .setKeyType(CoordinatorType.GROUP.id()) + .setKey(this.rebalanceConfig.groupId); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); @@ -828,34 +829,23 @@ private class FindCoordinatorResponseHandler extends RequestFutureAdapter future) { log.debug("Received FindCoordinator response {}", resp); - boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION; - FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); - if (batch && findCoordinatorResponse.data().coordinators().size() != 1) { + List coordinators = ((FindCoordinatorResponse) resp.responseBody()).coordinators(); + if (coordinators.size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); } - Errors error = batch - ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) - : findCoordinatorResponse.error(); + Coordinator coordinatorData = coordinators.get(0); + Errors error = Errors.forCode(coordinatorData.errorCode()); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { - int nodeId = batch - ? findCoordinatorResponse.data().coordinators().get(0).nodeId() - : findCoordinatorResponse.data().nodeId(); - String host = batch - ? findCoordinatorResponse.data().coordinators().get(0).host() - : findCoordinatorResponse.data().host(); - int port = batch - ? findCoordinatorResponse.data().coordinators().get(0).port() - : findCoordinatorResponse.data().port(); // use MAX_VALUE - node.id as the coordinator id to allow separate connections // for the coordinator in the underlying network client layer - int coordinatorConnectionId = Integer.MAX_VALUE - nodeId; + int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId(); AbstractCoordinator.this.coordinator = new Node( coordinatorConnectionId, - host, - port); + coordinatorData.host(), + coordinatorData.port()); log.info("Discovered group coordinator {}", coordinator); client.tryConnect(coordinator); heartbeat.resetSessionTimeout(); @@ -864,10 +854,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { - String errorMessage = batch - ? findCoordinatorResponse.data().coordinators().get(0).errorMessage() - : findCoordinatorResponse.data().errorMessage(); - log.debug("Group coordinator lookup failed: {}", errorMessage); + log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage()); future.raise(error); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 4e6a980f8ce45..2de31a03c5694 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.KafkaException; @@ -75,6 +76,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.OptionalInt; @@ -1141,8 +1143,8 @@ private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, Stri } FindCoordinatorRequestData data = new FindCoordinatorRequestData() - .setKeyType(type.id()); - data.setKey(coordinatorKey); + .setKeyType(type.id()) + .setKey(coordinatorKey); FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(data); enqueueRequest(new FindCoordinatorHandler(builder)); } @@ -1283,7 +1285,7 @@ public void onComplete(ClientResponse response) { log.trace("Received transactional response {} for request {}", response.responseBody(), requestBuilder()); synchronized (TransactionManager.this) { - handleResponse(response.responseBody(), response.requestHeader().apiVersion()); + handleResponse(response.responseBody()); } } else { fatalError(new KafkaException("Could not execute transactional request for unknown reasons")); @@ -1317,10 +1319,7 @@ boolean isEndTxn() { abstract AbstractRequest.Builder requestBuilder(); - // Propagate requestVersion for parsing response since some responses use - // different fields depending on the API version. For example, FindCoordinator - // uses different format for versions that support batching. - abstract void handleResponse(AbstractResponse responseBody, short requestVersion); + abstract void handleResponse(AbstractResponse responseBody); abstract Priority priority(); } @@ -1355,7 +1354,7 @@ FindCoordinatorRequest.CoordinatorType coordinatorType() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { + public void handleResponse(AbstractResponse response) { InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response; Errors error = initProducerIdResponse.error(); @@ -1408,7 +1407,7 @@ Priority priority() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { + public void handleResponse(AbstractResponse response) { AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; Map errors = addPartitionsToTxnResponse.errors(); boolean hasPartitionErrors = false; @@ -1525,32 +1524,20 @@ String coordinatorKey() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { - FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; + public void handleResponse(AbstractResponse response) { CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); - boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION; - if (batchFindCoordinator && findCoordinatorResponse.data().coordinators().size() != 1) { + List coordinators = ((FindCoordinatorResponse) response).coordinators(); + if (coordinators.size() != 1) { log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator"); fatalError(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator")); } - String key = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).key() - : builder.data().key(); - Errors error = batchFindCoordinator - ? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode()) - : findCoordinatorResponse.error(); + Coordinator coordinatorData = coordinators.get(0); + // For older versions without batching, obtain key from request data since it is not included in response + String key = coordinatorData.key() == null ? builder.data().key() : coordinatorData.key(); + Errors error = Errors.forCode(coordinatorData.errorCode()); if (error == Errors.NONE) { - int nodeId = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).nodeId() - : findCoordinatorResponse.data().nodeId(); - String host = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).host() - : findCoordinatorResponse.data().host(); - int port = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).port() - : findCoordinatorResponse.data().port(); - Node node = new Node(nodeId, host, port); + Node node = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port()); switch (coordinatorType) { case GROUP: consumerGroupCoordinator = node; @@ -1568,12 +1555,9 @@ public void handleResponse(AbstractResponse response, short requestVersion) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(GroupAuthorizationException.forGroupId(key)); } else { - String errorMessage = batchFindCoordinator - ? findCoordinatorResponse.data().coordinators().get(0).errorMessage() - : findCoordinatorResponse.data().errorMessage(); fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " + "unexpected error: %s", coordinatorType, key, - errorMessage))); + coordinatorData.errorMessage()))); } } } @@ -1602,7 +1586,7 @@ boolean isEndTxn() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { + public void handleResponse(AbstractResponse response) { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); @@ -1655,7 +1639,7 @@ Priority priority() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { + public void handleResponse(AbstractResponse response) { AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response; Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode()); @@ -1717,7 +1701,7 @@ String coordinatorKey() { } @Override - public void handleResponse(AbstractResponse response, short requestVersion) { + public void handleResponse(AbstractResponse response) { TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response; boolean coordinatorReloaded = false; Map errors = txnOffsetCommitResponse.errors(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index 222f78ccb988b..156277a0e8de2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; @@ -89,6 +90,21 @@ public boolean shouldClientThrottle(short version) { return version >= 2; } + public List coordinators() { + if (!data.coordinators().isEmpty()) + return data.coordinators(); + else { + FindCoordinatorResponseData.Coordinator coordinator = new Coordinator() + .setErrorCode(data.errorCode()) + .setErrorMessage(data.errorMessage()) + .setKey(null) + .setNodeId(data.nodeId()) + .setHost(data.host()) + .setPort(data.port()); + return Collections.singletonList(coordinator); + } + } + public static FindCoordinatorResponse prepareOldResponse(Errors error, Node node) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); data.setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6808fe6a4e6d3..53e326ad8955f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3246,9 +3246,9 @@ public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { env.kafkaClient().setNodeApiVersions( NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); - // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched + // Dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched env.kafkaClient().prepareResponse(null); - //Retriable FindCoordinatorResponse errors should be retried + // Retriable FindCoordinatorResponse errors should be retried for (int i = 0; i < groupIds.size(); i++) { env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));