Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,21 @@ public LookupResult<CoordinatorKey> handleResponse(
failedKeys.put(key, new InvalidGroupIdException("The given group id '" +
key.idValue + "' cannot be represented in a request."));
}
FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
if (batch) {
for (Coordinator coordinator : response.data().coordinators()) {
CoordinatorKey key = (type == CoordinatorType.GROUP)

for (Coordinator coordinator : ((FindCoordinatorResponse) abstractResponse).coordinators()) {
CoordinatorKey key;
if (coordinator.key() == null) // old version without batching
key = requireSingletonAndType(keys);
else {
key = (type == CoordinatorType.GROUP)
? CoordinatorKey.byGroupId(coordinator.key())
: CoordinatorKey.byTransactionalId(coordinator.key());
handleError(Errors.forCode(coordinator.errorCode()),
key,
coordinator.nodeId(),
mappedKeys,
failedKeys);
}
} else {
CoordinatorKey key = requireSingletonAndType(keys);
Errors error = response.error();
handleError(error, key, response.node().id(), mappedKeys, failedKeys);
handleError(Errors.forCode(coordinator.errorCode()),
key,
coordinator.nodeId(),
mappedKeys,
failedKeys);
}
return new LookupResult<>(failedKeys, mappedKeys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
Expand All @@ -52,7 +53,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
Expand Down Expand Up @@ -139,7 +139,6 @@ public boolean hasNotJoinedGroup() {
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
private volatile boolean batchFindCoordinator = true;
private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;
Expand Down Expand Up @@ -815,56 +814,38 @@ public void handle(SyncGroupResponse syncResponse,
*/
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator);
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequestData data = new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id());
if (batchFindCoordinator) {
data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
} else {
data.setKey(this.rebalanceConfig.groupId);
}
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId);
FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler(batchFindCoordinator));
.compose(new FindCoordinatorResponseHandler());
}

private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
private boolean batch;
FindCoordinatorResponseHandler(boolean batch) {
this.batch = batch;
}

@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);

FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
if (batch && findCoordinatorResponse.data().coordinators().size() != 1) {
List<Coordinator> coordinators = ((FindCoordinatorResponse) resp.responseBody()).coordinators();
if (coordinators.size() != 1) {
log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator");
future.raise(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
}
Errors error = batch
? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
: findCoordinatorResponse.error();
Coordinator coordinatorData = coordinators.get(0);
Errors error = Errors.forCode(coordinatorData.errorCode());
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
int nodeId = batch
? findCoordinatorResponse.data().coordinators().get(0).nodeId()
: findCoordinatorResponse.data().nodeId();
String host = batch
? findCoordinatorResponse.data().coordinators().get(0).host()
: findCoordinatorResponse.data().host();
int port = batch
? findCoordinatorResponse.data().coordinators().get(0).port()
: findCoordinatorResponse.data().port();
// use MAX_VALUE - node.id as the coordinator id to allow separate connections
// for the coordinator in the underlying network client layer
int coordinatorConnectionId = Integer.MAX_VALUE - nodeId;
int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId();

AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
host,
port);
coordinatorData.host(),
coordinatorData.port());
log.info("Discovered group coordinator {}", coordinator);
client.tryConnect(coordinator);
heartbeat.resetSessionTimeout();
Expand All @@ -873,10 +854,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
String errorMessage = batch
? findCoordinatorResponse.data().coordinators().get(0).errorMessage()
: findCoordinatorResponse.data().errorMessage();
log.debug("Group coordinator lookup failed: {}", errorMessage);
log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage());
future.raise(error);
}
}
Expand All @@ -885,12 +863,6 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
log.debug("FindCoordinator request failed due to {}", e.toString());

if (e instanceof NoBatchedFindCoordinatorsException) {
batchFindCoordinator = false;
clearFindCoordinatorFuture();
lookupCoordinator();
return;
}
if (!(e instanceof RetriableException)) {
// Remember the exception if fatal so we can ensure it gets thrown by the main thread
fatalFindCoordinatorException = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
Expand All @@ -72,11 +72,11 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.OptionalInt;
Expand All @@ -99,7 +99,6 @@ public class TransactionManager {
private final String transactionalId;
private final int transactionTimeoutMs;
private final ApiVersions apiVersions;
private boolean batchFindCoordinator = true;

private static class TopicPartitionBookkeeper {

Expand Down Expand Up @@ -1144,12 +1143,8 @@ private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, Stri
}

FindCoordinatorRequestData data = new FindCoordinatorRequestData()
.setKeyType(type.id());
if (batchFindCoordinator) {
data.setCoordinatorKeys(Collections.singletonList(coordinatorKey));
} else {
data.setKey(coordinatorKey);
}
.setKeyType(type.id())
.setKey(coordinatorKey);
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(data);
enqueueRequest(new FindCoordinatorHandler(builder));
}
Expand Down Expand Up @@ -1284,9 +1279,6 @@ public void onComplete(ClientResponse response) {
if (this.needsCoordinator())
lookupCoordinator(this.coordinatorType(), this.coordinatorKey());
reenqueue();
} else if (response.versionMismatch() instanceof NoBatchedFindCoordinatorsException && response.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR) {
batchFindCoordinator = false;
reenqueue();
} else if (response.versionMismatch() != null) {
fatalError(response.versionMismatch());
} else if (response.hasResponse()) {
Expand Down Expand Up @@ -1533,30 +1525,19 @@ String coordinatorKey() {

@Override
public void handleResponse(AbstractResponse response) {
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());

if (batchFindCoordinator && findCoordinatorResponse.data().coordinators().size() != 1) {
List<Coordinator> coordinators = ((FindCoordinatorResponse) response).coordinators();
if (coordinators.size() != 1) {
log.error("Group coordinator lookup failed: Invalid response containing more than a single coordinator");
fatalError(new IllegalStateException("Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
}
String key = batchFindCoordinator
? findCoordinatorResponse.data().coordinators().get(0).key()
: builder.data().key();
Errors error = batchFindCoordinator
? Errors.forCode(findCoordinatorResponse.data().coordinators().get(0).errorCode())
: findCoordinatorResponse.error();
Coordinator coordinatorData = coordinators.get(0);
// For older versions without batching, obtain key from request data since it is not included in response
String key = coordinatorData.key() == null ? builder.data().key() : coordinatorData.key();
Errors error = Errors.forCode(coordinatorData.errorCode());
if (error == Errors.NONE) {
int nodeId = batchFindCoordinator
? findCoordinatorResponse.data().coordinators().get(0).nodeId()
: findCoordinatorResponse.data().nodeId();
String host = batchFindCoordinator
? findCoordinatorResponse.data().coordinators().get(0).host()
: findCoordinatorResponse.data().host();
int port = batchFindCoordinator
? findCoordinatorResponse.data().coordinators().get(0).port()
: findCoordinatorResponse.data().port();
Node node = new Node(nodeId, host, port);
Node node = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port());
switch (coordinatorType) {
case GROUP:
consumerGroupCoordinator = node;
Expand All @@ -1574,12 +1555,9 @@ public void handleResponse(AbstractResponse response) {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(key));
} else {
String errorMessage = batchFindCoordinator
? findCoordinatorResponse.data().coordinators().get(0).errorMessage()
: findCoordinatorResponse.data().errorMessage();
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " +
"unexpected error: %s", coordinatorType, key,
errorMessage)));
coordinatorData.errorMessage())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;

public class FindCoordinatorRequest extends AbstractRequest {

public static final short MIN_BATCHED_VERSION = 4;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactor!


public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
private final FindCoordinatorRequestData data;

Expand All @@ -43,9 +46,18 @@ public FindCoordinatorRequest build(short version) {
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in 2 or later.");
}
if (version < 4 && !data.coordinatorKeys().isEmpty()) {
throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in 4 or later.");
int batchedKeys = data.coordinatorKeys().size();
if (version < MIN_BATCHED_VERSION) {
if (batchedKeys > 1)
throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in " + MIN_BATCHED_VERSION + " or later.");
if (batchedKeys == 1) {
data.setKey(data.coordinatorKeys().get(0));
data.setCoordinatorKeys(Collections.emptyList());
}
} else if (batchedKeys == 0 && data.key() != null) {
data.setCoordinatorKeys(Collections.singletonList(data.key()));
data.setKey(""); // default value
}
return new FindCoordinatorRequest(data, version);
}
Expand Down Expand Up @@ -90,7 +102,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
response.setThrottleTimeMs(throttleTimeMs);
}
Errors error = Errors.forException(e);
if (version() < 4) {
if (version() < MIN_BATCHED_VERSION) {
return FindCoordinatorResponse.prepareOldResponse(error, Node.noNode());
} else {
return FindCoordinatorResponse.prepareErrorResponse(error, data.coordinatorKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -89,6 +90,21 @@ public boolean shouldClientThrottle(short version) {
return version >= 2;
}

public List<FindCoordinatorResponseData.Coordinator> coordinators() {
if (!data.coordinators().isEmpty())
return data.coordinators();
else {
FindCoordinatorResponseData.Coordinator coordinator = new Coordinator()
.setErrorCode(data.errorCode())
.setErrorMessage(data.errorMessage())
.setKey(null)
.setNodeId(data.nodeId())
.setHost(data.host())
.setPort(data.port());
return Collections.singletonList(coordinator);
}
}

public static FindCoordinatorResponse prepareOldResponse(Errors error, Node node) {
FindCoordinatorResponseData data = new FindCoordinatorResponseData();
data.setErrorCode(error.code())
Expand Down
Loading