Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected abstract void onJoinComplete(int generation,
*/
public void ensureCoordinatorKnown() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupMetadataRequest();
RequestFuture<Void> future = sendGroupCoordinatorRequest();
client.poll(future);

if (future.failed()) {
Expand Down Expand Up @@ -216,7 +216,7 @@ public void ensureActiveGroup() {
continue;
}

RequestFuture<ByteBuffer> future = performGroupJoin();
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
client.poll(future);

if (future.succeeded()) {
Expand Down Expand Up @@ -299,21 +299,20 @@ public void onFailure(RuntimeException e) {
* elected leader by the coordinator.
* @return A request future which wraps the assignment returned from the group leader
*/
private RequestFuture<ByteBuffer> performGroupJoin() {
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();

// send a join group request to the coordinator
log.debug("(Re-)joining group {}", groupId);
log.info("(Re-)joining group {}", groupId);
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.memberId,
protocolType(),
metadata());

// create the request for the coordinator
log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the toString of Node to be more logging friendly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It currently looks like "Node(id, host, port)," which doesn't seem too bad to me. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I suggested it, I thought we could do a bit better, maybe something like (id: 5, www.example.com:9123), but maybe that's actually worse.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly trying to get rid of the word Node because it's a bit redundant when you look at the log messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I didn't use Node's toString() at all. I just filled in "{}:{}" with the host and port, but that got a little annoying after a couple times. But maybe it's actually better?

return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
Expand All @@ -328,10 +327,9 @@ public JoinGroupResponse parse(ClientResponse response) {

@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
// process the response
short errorCode = joinResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
log.debug("Joined group: {}", joinResponse.toStruct());
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
Expand All @@ -342,37 +340,33 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
} else {
onJoinFollower().chain(future);
}
} else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId);
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
coordinator);
// backoff and retry
future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.",
groupId);
log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
groupId);
future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
|| errorCode == Errors.INVALID_SESSION_TIMEOUT.code()
|| errorCode == Errors.INVALID_GROUP_ID.code()) {
log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID) {
// log the error and re-throw the exception
Errors error = Errors.forCode(errorCode);
log.error("Attempt to join group {} failed due to: {}",
groupId, error.exception().getMessage());
log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
future.raise(error);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: "
+ Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
Expand All @@ -381,7 +375,7 @@ private RequestFuture<ByteBuffer> onJoinFollower() {
// send follower's sync group with an empty assignment
SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
memberId, Collections.<String, ByteBuffer>emptyMap());
log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
}

Expand All @@ -392,7 +386,7 @@ private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
joinResponse.members());

SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
Expand All @@ -418,28 +412,28 @@ public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(syncResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct());
log.info("Successfully joined group {} with generation {}", groupId, generation);
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
} else {
AbstractCoordinator.this.rejoinNeeded = true;
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId);
log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
log.info("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error);
log.debug("SyncGroup for group {} failed due to {}", groupId, error);
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
future.raise(error);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error);
log.debug("SyncGroup for group {} failed due to {}", groupId, error);
coordinatorDead();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage()));
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
Expand All @@ -450,7 +444,7 @@ public void handle(SyncGroupResponse syncResponse,
* one of the brokers. The returned future should be polled to get the result of the request.
* @return A request future which indicates the completion of the metadata request
*/
private RequestFuture<Void> sendGroupMetadataRequest() {
private RequestFuture<Void> sendGroupCoordinatorRequest() {
// initiate the group metadata request
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
Expand All @@ -460,7 +454,7 @@ private RequestFuture<Void> sendGroupMetadataRequest() {
return RequestFuture.noBrokersAvailable();
} else {
// create a group metadata request
log.debug("Issuing group metadata request to broker {}", node.id());
log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new RequestFutureAdapter<ClientResponse, Void>() {
Expand All @@ -473,7 +467,7 @@ public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
}

private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Group metadata response {}", resp);
log.debug("Received group coordinator response {}", resp);

if (!coordinatorUnknown()) {
// We already found the coordinator, so ignore the request
Expand All @@ -483,22 +477,24 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
short errorCode = groupCoordinatorResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
if (error == Errors.NONE) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());

log.info("Discovered coordinator {} for group {}.", coordinator, groupId);

client.tryConnect(coordinator);

// start sending heartbeats only if we have a valid generation
if (generation > 0)
heartbeatTask.reset();
future.complete(null);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(Errors.forCode(errorCode));
future.raise(error);
}
}
}
Expand All @@ -524,7 +520,7 @@ public boolean coordinatorUnknown() {
*/
protected void coordinatorDead() {
if (this.coordinator != null) {
log.info("Marking the coordinator {} dead.", this.coordinator.id());
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
this.coordinator = null;
}
}
Expand Down Expand Up @@ -566,7 +562,7 @@ public void onSuccess(Void value) {}

@Override
public void onFailure(RuntimeException e) {
log.info("LeaveGroup request failed with error", e);
log.debug("LeaveGroup request for group {} failed with error", groupId, e);
}
});

Expand Down Expand Up @@ -608,33 +604,33 @@ public HeartbeatResponse parse(ClientResponse response) {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
short errorCode = heartbeatResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
log.debug("Received successful heartbeat response.");
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
log.debug("Received successful heartbeat response for group {}", groupId);
future.complete(null);
} else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
groupId, coordinator);
coordinatorDead();
future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
} else if (error == Errors.ILLEGAL_GENERATION) {
log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected errorCode in heartbeat response: "
+ Errors.forCode(errorCode).exception().getMessage()));
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
Expand Down
Loading