From 4bfa378d13ebc44cbc2d042b14ec989cc5c9c994 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 8 Mar 2016 17:42:11 -0800 Subject: [PATCH 1/5] KAFKA-3318: clean up consumer logging and error messages --- .../internals/AbstractCoordinator.java | 48 +++++++++---------- .../internals/ConsumerCoordinator.java | 29 ++++++----- .../clients/consumer/internals/Fetcher.java | 6 +-- .../apache/kafka/common/protocol/Errors.java | 10 ++++ 4 files changed, 53 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c6492bc66d2a6..1b49f43821134 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -216,7 +216,7 @@ public void ensureActiveGroup() { continue; } - RequestFuture future = performGroupJoin(); + RequestFuture future = sendJoinGroupRequest(); client.poll(future); if (future.succeeded()) { @@ -299,12 +299,12 @@ public void onFailure(RuntimeException e) { * elected leader by the coordinator. * @return A request future which wraps the assignment returned from the group leader */ - private RequestFuture performGroupJoin() { + private RequestFuture sendJoinGroupRequest() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator - log.debug("(Re-)joining group {}", groupId); + log.info("(Re-)joining group {}", groupId); JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs, @@ -313,7 +313,7 @@ private RequestFuture performGroupJoin() { metadata()); // create the request for the coordinator - log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id()); + log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator.id()); return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); } @@ -349,30 +349,27 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { // reset the member id and retry immediately AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; - log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.", - groupId); + log.debug("Attempt to join group {} failed due to unknown member id, resetting and retrying.", groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { // re-discover the coordinator and retry with backoff coordinatorDead(); - log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", - groupId); + log.debug("Attempt to join group {} failed due to obsolete coordinator information, retrying.", groupId); future.raise(Errors.forCode(errorCode)); } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code() || errorCode == Errors.INVALID_GROUP_ID.code()) { // log the error and re-throw the exception Errors error = Errors.forCode(errorCode); - log.error("Attempt to join group {} failed due to: {}", - groupId, error.exception().getMessage()); + log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); future.raise(error); } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException("Unexpected error in join group response: " - + Errors.forCode(joinResponse.errorCode()).exception().getMessage())); + + Errors.forCode(joinResponse.errorCode()).message())); } } } @@ -381,7 +378,7 @@ private RequestFuture onJoinFollower() { // send follower's sync group with an empty assignment SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, Collections.emptyMap()); - log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + log.debug("Sending follower SyncGroup ({}) to coordinator {}", request, this.coordinator.id()); return sendSyncGroupRequest(request); } @@ -392,7 +389,7 @@ private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) { joinResponse.members()); SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment); - log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + log.debug("Sending leader SyncGroup ({}) to coordinator {}", request, this.coordinator.id()); return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); @@ -418,6 +415,7 @@ public void handle(SyncGroupResponse syncResponse, RequestFuture future) { Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { + log.info("Successfully joined group {} with generation {}", groupId, generation); log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct()); sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment()); @@ -426,20 +424,20 @@ public void handle(SyncGroupResponse syncResponse, if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId); + log.debug("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { - log.info("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error); + log.debug("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error); AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; future.raise(error); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { - log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error); + log.debug("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error); coordinatorDead(); future.raise(error); } else { - future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage())); + future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } } @@ -473,7 +471,7 @@ public void onSuccess(ClientResponse response, RequestFuture future) { } private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture future) { - log.debug("Group metadata response {}", resp); + log.debug("Received group metadata response {}", resp); if (!coordinatorUnknown()) { // We already found the coordinator, so ignore the request @@ -524,7 +522,7 @@ public boolean coordinatorUnknown() { */ protected void coordinatorDead() { if (this.coordinator != null) { - log.info("Marking the coordinator {} dead.", this.coordinator.id()); + log.info("Marking the coordinator {} dead. Find the new coordinator and retry.", this.coordinator.id()); this.coordinator = null; } } @@ -566,7 +564,7 @@ public void onSuccess(Void value) {} @Override public void onFailure(RuntimeException e) { - log.info("LeaveGroup request failed with error", e); + log.debug("LeaveGroup request failed with error", e); } }); @@ -614,19 +612,19 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu future.complete(null); } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { - log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); + log.debug("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); coordinatorDead(); future.raise(Errors.forCode(errorCode)); } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { - log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); + log.debug("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) { - log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); + log.debug("Attempt to heart beat failed since generation id is not legal, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { - log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); + log.debug("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); @@ -634,7 +632,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " - + Errors.forCode(errorCode).exception().getMessage())); + + Errors.forCode(errorCode).message())); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index aa39e11929116..5425321313872 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -447,7 +447,7 @@ private void maybeAutoCommitOffsetsSync() { throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Auto offset commit failed: ", e.getMessage()); + log.warn("Auto offset commit failed: {}", e.getMessage()); } } } @@ -525,7 +525,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - log.error("Unauthorized to commit for group {}", groupId); + log.error("Unauthorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { @@ -533,18 +533,19 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { // raise the error to the user - log.info("Offset commit for group {} failed on partition {} due to {}, will retry", groupId, tp, error); + log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry - log.info("Offset commit for group {} failed due to {}, will retry", groupId, error); + log.debug("Offset commit for group {} failed due to {}, will retry", groupId, error); future.raise(error); return; } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) { - log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error); + log.debug("Offset commit for group {} failed due to {}, which indicates that the coordinator" + + "is not available, will find new coordinator and retry", groupId, error); coordinatorDead(); future.raise(error); return; @@ -552,13 +553,18 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group - log.error("Error {} occurred while committing offsets for group {}", error, groupId); + log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error); subscriptions.needReassignment(); - future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance")); + future.raise(new CommitFailedException("Commit cannot be completed since the group has already " + + "rebalanced and assigned the partitions to another member. This most likely means that " + + "the consumer has taken longer than the configured session.timeout.ms to handle a batch " + + "of messages returned from poll(). You can address this either by increasing the session " + + "timeout or by reducing the maximum size of batches returned in poll() by with " + + "max.poll.records.")); return; } else { - log.error("Error committing partition {} at offset {}: {}", tp, offset, error.exception().getMessage()); - future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage())); + log.error("Error committing partition {} at offset {}: {}", tp, offset, error.message()); + future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } @@ -607,8 +613,7 @@ public void handle(OffsetFetchResponse response, RequestFuture= 0) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7a1a72090a3a3..b4d5c02a08ac3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -501,12 +501,12 @@ private void handleListOffsetResponse(TopicPartition topicPartition, future.complete(offset); } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", + log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); future.raise(Errors.forCode(errorCode)); } else { - log.error("Attempt to fetch offsets for partition {} failed due to: {}", - topicPartition, Errors.forCode(errorCode).exception().getMessage()); + log.warn("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).message()); future.raise(new StaleMetadataException()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ab299af47486c..90be0144109af 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -182,6 +182,16 @@ public void maybeThrow() { } } + /** + * Get a friendly description of the error (if one is available). + * @return the error message + */ + public String message() { + if (exception != null) + return exception.getMessage(); + return toString(); + } + /** * Throw the exception if there is one */ From 56777217ae797d57f5d3273f8566290e2182d4c8 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 9 Mar 2016 10:09:59 -0800 Subject: [PATCH 2/5] fix typo in CommitFailedException message --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5425321313872..733be0ea5911e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -559,7 +559,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu "rebalanced and assigned the partitions to another member. This most likely means that " + "the consumer has taken longer than the configured session.timeout.ms to handle a batch " + "of messages returned from poll(). You can address this either by increasing the session " + - "timeout or by reducing the maximum size of batches returned in poll() by with " + + "timeout or by reducing the maximum size of batches returned in poll() with " + "max.poll.records.")); return; } else { From e7617ce8f9b0fb04aa49210470147e93ee88b1e2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 9 Mar 2016 14:21:24 -0800 Subject: [PATCH 3/5] more logging additions including ensuring groupId and coordinator information available --- .../internals/AbstractCoordinator.java | 105 +++++++++--------- .../internals/ConsumerCoordinator.java | 80 ++++++------- 2 files changed, 92 insertions(+), 93 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1b49f43821134..589fea31692bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -174,7 +174,7 @@ protected abstract void onJoinComplete(int generation, */ public void ensureCoordinatorKnown() { while (coordinatorUnknown()) { - RequestFuture future = sendGroupMetadataRequest(); + RequestFuture future = sendGroupCoordinatorRequest(); client.poll(future); if (future.failed()) { @@ -312,8 +312,7 @@ private RequestFuture sendJoinGroupRequest() { protocolType(), metadata()); - // create the request for the coordinator - log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator.id()); + log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator); return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); } @@ -328,10 +327,9 @@ public JoinGroupResponse parse(ClientResponse response) { @Override public void handle(JoinGroupResponse joinResponse, RequestFuture future) { - // process the response - short errorCode = joinResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { - log.debug("Joined group: {}", joinResponse.toStruct()); + Errors error = Errors.forCode(joinResponse.errorCode()); + if (error == Errors.NONE) { + log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); AbstractCoordinator.this.rejoinNeeded = false; @@ -342,34 +340,33 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else { onJoinFollower().chain(future); } - } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { - log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId); + } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { + log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId, + coordinator); // backoff and retry - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { + future.raise(error); + } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; - log.debug("Attempt to join group {} failed due to unknown member id, resetting and retrying.", groupId); + log.debug("Attempt to join group {} failed due to unknown member id.", groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); - } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry with backoff coordinatorDead(); - log.debug("Attempt to join group {} failed due to obsolete coordinator information, retrying.", groupId); - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() - || errorCode == Errors.INVALID_SESSION_TIMEOUT.code() - || errorCode == Errors.INVALID_GROUP_ID.code()) { + log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message()); + future.raise(error); + } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL + || error == Errors.INVALID_SESSION_TIMEOUT + || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception - Errors error = Errors.forCode(errorCode); log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); future.raise(error); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception - future.raise(new KafkaException("Unexpected error in join group response: " - + Errors.forCode(joinResponse.errorCode()).message())); + future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } } @@ -378,7 +375,7 @@ private RequestFuture onJoinFollower() { // send follower's sync group with an empty assignment SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, Collections.emptyMap()); - log.debug("Sending follower SyncGroup ({}) to coordinator {}", request, this.coordinator.id()); + log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request); return sendSyncGroupRequest(request); } @@ -389,7 +386,7 @@ private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) { joinResponse.members()); SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment); - log.debug("Sending leader SyncGroup ({}) to coordinator {}", request, this.coordinator.id()); + log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request); return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); @@ -424,16 +421,16 @@ public void handle(SyncGroupResponse syncResponse, if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId); + log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { - log.debug("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error); + log.debug("SyncGroup for group {} failed due to {}", groupId, error); AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; future.raise(error); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { - log.debug("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error); + log.debug("SyncGroup for group {} failed due to {}", groupId, error); coordinatorDead(); future.raise(error); } else { @@ -448,7 +445,7 @@ public void handle(SyncGroupResponse syncResponse, * one of the brokers. The returned future should be polled to get the result of the request. * @return A request future which indicates the completion of the metadata request */ - private RequestFuture sendGroupMetadataRequest() { + private RequestFuture sendGroupCoordinatorRequest() { // initiate the group metadata request // find a node to ask about the coordinator Node node = this.client.leastLoadedNode(); @@ -458,7 +455,7 @@ private RequestFuture sendGroupMetadataRequest() { return RequestFuture.noBrokersAvailable(); } else { // create a group metadata request - log.debug("Issuing group metadata request to broker {}", node.id()); + log.debug("Sending coordinator request for group {} to broker {}", groupId, node); GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter() { @@ -471,7 +468,7 @@ public void onSuccess(ClientResponse response, RequestFuture future) { } private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture future) { - log.debug("Received group metadata response {}", resp); + log.debug("Received group coordinator response {}", resp); if (!coordinatorUnknown()) { // We already found the coordinator, so ignore the request @@ -481,22 +478,24 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture 0) heartbeatTask.reset(); future.complete(null); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(Errors.forCode(errorCode)); + future.raise(error); } } } @@ -522,7 +521,7 @@ public boolean coordinatorUnknown() { */ protected void coordinatorDead() { if (this.coordinator != null) { - log.info("Marking the coordinator {} dead. Find the new coordinator and retry.", this.coordinator.id()); + log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); this.coordinator = null; } } @@ -564,7 +563,7 @@ public void onSuccess(Void value) {} @Override public void onFailure(RuntimeException e) { - log.debug("LeaveGroup request failed with error", e); + log.debug("LeaveGroup request for group {} failed with error", groupId, e); } }); @@ -606,33 +605,33 @@ public HeartbeatResponse parse(ClientResponse response) { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); - short errorCode = heartbeatResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { - log.debug("Received successful heartbeat response."); + Errors error = Errors.forCode(heartbeatResponse.errorCode()); + if (error == Errors.NONE) { + log.debug("Received successful heartbeat response for group {}", groupId); future.complete(null); - } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { - log.debug("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR_FOR_GROUP) { + log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", + groupId, coordinator); coordinatorDead(); - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { - log.debug("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); + future.raise(error); + } else if (error == Errors.REBALANCE_IN_PROGRESS) { + log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); - } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) { - log.debug("Attempt to heart beat failed since generation id is not legal, try to re-join group."); + } else if (error == Errors.ILLEGAL_GENERATION) { + log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); - } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { - log.debug("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); + } else if (error == Errors.UNKNOWN_MEMBER_ID) { + log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " - + Errors.forCode(errorCode).message())); + future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " + error.message())); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 733be0ea5911e..f069eb544c709 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -188,15 +188,15 @@ protected void onJoinComplete(int generation, // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); + log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException e) { throw e; } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition assignment: ", e); + log.error("User provided listener {} for group {} failed on partition assignment", + listener.getClass().getName(), groupId, e); } } @@ -222,11 +222,12 @@ protected Map performAssignment(String leaderId, metadata.setTopics(this.subscriptions.groupSubscription()); client.ensureFreshMetadata(); - log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions); + log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", + groupId, assignor.name(), subscriptions); Map assignment = assignor.assign(metadata.fetch(), subscriptions); - log.debug("Finished assignment: {}", assignment); + log.debug("Finished assignment for group {}: {}", groupId, assignment); Map groupAssignment = new HashMap<>(); for (Map.Entry assignmentEntry : assignment.entrySet()) { @@ -244,15 +245,15 @@ protected void onJoinPrepare(int generation, String memberId) { // execute the user's callback before rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions()); + log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set revoked = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsRevoked(revoked); } catch (WakeupException e) { throw e; } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition revocation: ", e); + log.error("User provided listener {} for group {} failed on partition revocation", + listener.getClass().getName(), groupId, e); } subscriptions.needReassignment(); @@ -410,7 +411,7 @@ public void run(final long now) { return; if (coordinatorUnknown()) { - log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff"); + log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId); client.schedule(this, now + retryBackoffMs); return; } @@ -423,10 +424,10 @@ public void onComplete(Map offsets, Exception if (exception == null) { reschedule(now + interval); } else if (exception instanceof SendFailedException) { - log.debug("Failed to send automatic offset commit, will retry immediately"); + log.debug("Failed to send automatic offset commit for group {}", groupId); reschedule(now); } else { - log.warn("Auto offset commit failed: {}", exception.getMessage()); + log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); reschedule(now + interval); } } @@ -447,7 +448,7 @@ private void maybeAutoCommitOffsetsSync() { throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Auto offset commit failed: {}", e.getMessage()); + log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage()); } } } @@ -481,7 +482,7 @@ private RequestFuture sendOffsetCommitRequest(final Map futu Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { - log.debug("Committed offset {} for partition {}", offset, tp); + log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - log.error("Unauthorized to commit offsets for group {}", groupId); + log.error("Not authorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { @@ -538,14 +539,13 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu return; } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry - log.debug("Offset commit for group {} failed due to {}, will retry", groupId, error); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) { - log.debug("Offset commit for group {} failed due to {}, which indicates that the coordinator" + - "is not available, will find new coordinator and retry", groupId, error); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); coordinatorDead(); future.raise(error); return; @@ -553,24 +553,24 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group - log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); subscriptions.needReassignment(); future.raise(new CommitFailedException("Commit cannot be completed since the group has already " + - "rebalanced and assigned the partitions to another member. This most likely means that " + - "the consumer has taken longer than the configured session.timeout.ms to handle a batch " + - "of messages returned from poll(). You can address this either by increasing the session " + - "timeout or by reducing the maximum size of batches returned in poll() with " + - "max.poll.records.")); + "rebalanced and assigned the partitions to another member. This means that the time " + + "between subsequent calls to poll() was longer than the configured session.timeout.ms, " + + "which typically implies that the poll loop is spending too much time message processing. " + + "You can address this either by increasing the session timeout or by reducing the maximum " + + "size of batches returned in poll() with max.poll.records.")); return; } else { - log.error("Error committing partition {} at offset {}: {}", tp, offset, error.message()); + log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } if (!unauthorizedTopics.isEmpty()) { - log.error("Unauthorized to commit to topics {}", unauthorizedTopics); + log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { future.complete(null); @@ -589,9 +589,9 @@ private RequestFuture> sendOffsetFetchReq if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); - log.debug("Fetching committed offsets for partitions: {}", partitions); + log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions); // construct the request - OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList(partitions)); + OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions)); // send the request with a callback return client.send(coordinator, ApiKeys.OFFSET_FETCH, request) @@ -612,30 +612,30 @@ public void handle(OffsetFetchResponse response, RequestFuture= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); } else { - log.debug("No committed offset for partition " + tp); + log.debug("Group {} has no committed offset for partition {}", groupId, tp); } } From df48d5102ef0d1aa49c0131115296af23ffd570d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 10 Mar 2016 11:11:52 -0800 Subject: [PATCH 4/5] change Node toString() to be more logging friendly --- clients/src/main/java/org/apache/kafka/common/Node.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 644cd71c8cb4f..24cf6f4616f7a 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -96,7 +96,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "Node(" + id + ", " + host + ", " + port + ")"; + return host + ":" + port + " (id: " + idString + ")"; } } From 6c0fe62838687cd0ae57ed0519dd61885dff87f3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 10 Mar 2016 11:12:10 -0800 Subject: [PATCH 5/5] minor log message tweaks --- .../kafka/clients/consumer/internals/AbstractCoordinator.java | 3 +-- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 589fea31692bb..c79d8e7d30393 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -413,7 +413,6 @@ public void handle(SyncGroupResponse syncResponse, Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { log.info("Successfully joined group {} with generation {}", groupId, generation); - log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct()); sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment()); } else { @@ -631,7 +630,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " + error.message())); + future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index f069eb544c709..b6b46c135a579 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -613,7 +613,7 @@ public void handle(OffsetFetchResponse response, RequestFuture