Skip to content

KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id#6650

Merged
guozhangwang merged 21 commits intoapache:trunkfrom
abbccdda:fencing_instance
May 18, 2019
Merged

KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id#6650
guozhangwang merged 21 commits intoapache:trunkfrom
abbccdda:fencing_instance

Conversation

@abbccdda
Copy link
Copy Markdown

@abbccdda abbccdda commented Apr 29, 2019

For static members join/rejoin, we encode the current timestamp in the new member.id. The format looks like group.instance.id-timestamp.

During consumer/broker interaction logic (Join, Sync, Heartbeat, Commit), we shall check the whether group.instance.id is known on group. If yes, we shall match the member.id stored on static membership map with the request member.id. If mismatching, this indicates a conflict consumer has used same group.instance.id, and it will receive a fatal exception to shut down.

Right now the only missing part is the system test. Will work on it offline while getting the major logic changes reviewed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda abbccdda changed the title KAFKA-8225: fencing static member instances with conflicting group.instance.id KAFKA-8225: fencing static member instances with conflicting group.instance.id [WIP] Apr 29, 2019
@abbccdda abbccdda force-pushed the fencing_instance branch 2 times, most recently from fd37311 to 322225d Compare April 30, 2019 01:51
@abbccdda
Copy link
Copy Markdown
Author

Retest this please

@abbccdda abbccdda force-pushed the fencing_instance branch 5 times, most recently from caf3ff5 to a8bf61e Compare May 1, 2019 19:29
@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented May 1, 2019

@hachikuji @guozhangwang Could you do a review when you got time? Thanks!

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented May 1, 2019

Retest this please.

@abbccdda abbccdda changed the title KAFKA-8225: fencing static member instances with conflicting group.instance.id [WIP] KAFKA-8225: fencing static member instances with conflicting group.instance.id May 2, 2019
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the non-testing code.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this happen under normal case? If not we should log it as ERROR and return false.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are being extra cautious here since we don't want to unexpectedly fence any member without this structure. Returning true means we don't check this case in static member.id validation, but it is not guaranteed to be valid.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think that client code should not validly set their member.ids, i.e. they should always be set by brokers and broker code always set it in a way of *-*, then this should not happen and I think it is okay to treat as a fatal error and reject the client request.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
@abbccdda abbccdda force-pushed the fencing_instance branch 4 times, most recently from cbe5e4e to 15bdaa7 Compare May 9, 2019 17:37
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually thinking we should just reject the request all together if its member-id is ill-formatted (of course, not in this function then, but rather do that at the very beginning of every request handling). But if you feel it is good enough with sound rationales, I'm fine to leave it as is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang yea the reasoning is like we need to do a lot of unit test refactoring in this case. I could do that in a separate PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I agree it may be out of the scope for this PR.

@guozhangwang
Copy link
Copy Markdown
Contributor

The non-testing code looks promising to me now except a few minor questions above.

Will move on to the testing code itself. Maybe @hachikuji can take another look.

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented May 9, 2019

@guozhangwang Thank you for the review!

@abbccdda abbccdda changed the title KAFKA-8225: fencing static member instances with conflicting group.instance.id KAFKA-8225 & KIP-345 part-2: fencing static member instances with conflicting group.instance.id May 10, 2019
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on non-testing code as well. It lgtm modulo a few minor comments.

cc @hachikuji

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbccdda Thanks, left a few comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Why are we throwing this instead of invoking the callback?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to fail immediately, because we have detected a fenced exception in commit response queue, there should be no point retrying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. It is symmetric to our ProducerFencedException, where for past event's triggered fencing, it would be thrown to callers directly; only if this call triggers the first fenced error exactly, it would be marked in the callback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm one sec, a not-very-common-but-possible-pattern may be:

  1. user first call commitAsync, the response sent back has Fenced error; it would be kept.
  2. user second call commitAsync, we would poll the completion object and throw immediately.
  3. however, let's say user swallowed it and call commitAsync again, in this case we would proceed since the previous completion has been polled and hence there's no fence error any more, right?

I think the contract should be, once the consumer falls into the fenced state, it should always be in that state and hence always throw immediately for any following function calls. In this sense, we should probably change this logic a bit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the contract should be, once the consumer falls into the fenced state, it should always be in that state and hence always throw immediately for any following function calls. In this sense, we should probably change this logic a bit.

Yes, that is what I was trying to get to above. Adding a FENCED state to MemberState would be a nice way to achieve this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be clearer if we separate the static and dynamic cases explicitly:

groupInstanceId match {
  case Some(instanceId) =>  
    // Static member
  case None => 
    // Dynamic member
}

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji May 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still slightly prefer changing the protocols of SyncGroup, OffsetCommit, and Heartbeat so that the instance Id is an explicitly provided field. This implementation is not too terrible in terms of complexity, but it feels a tad hacky/dangerous to mix internal state with user-provided strings. Do you think there are any major downsides to modifying the protocols?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our previous discussion covers following trade-offs (by choosing timestamp solution):

  1. Avoiding bumping up many group protocols all at once
  2. Making debugging easier

Modifying many protocols all at once seems like over-kill, when we have good checking methodology already. Honestly I'm not fully convinced by the negative impact of bumping protocols, maybe @guozhangwang could chime in here and explain a bit on the pros and cons.

Here I still second the current approach because of 2): right now we don't have good mechanism to debug static membership issue, and meaningful member.id with tracking time seems very helpful compared with random generated id. WDYT?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji May 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the generation scheme we have here for member.id is a good idea in any case. Whether we use it for fencing detection is a separate thing. My argument is basically that we've identified a shortcoming of the protocol and we should try to fix it through protocol instead of by a side channel. I personally think bumping the protocol is not too big of a deal. (I realize the side channel was my idea, but I agreed with general feedback that it was a bit of a hack.)

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
Comment thread core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala Outdated
@abbccdda abbccdda force-pushed the fencing_instance branch from c22ae3b to 4341085 Compare May 17, 2019 04:36
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments.

|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED
|| error == Errors.FENCED_INSTANCE_ID) {
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are fenced, should we keep track of that somewhere so that we do not keep sending RPCs to the coordinator?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we don't reset our generation info, all subsequent requests should be failing once other consumer joins the group right? Eventually this will lead to a complete crash IIUC.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JoinResponse specifically, it should be caught in line 427 above and then falls into else if (!future.isRetriable()) to throw the exception to the callers immediately. So I agree with @abbccdda that no extra logic would be needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wasn't clear. What I'm asking is whether the consumer should remember the fact that it was fenced. So if the user continues trying to do stuff, we fail immediately instead of sending additional requests to the broker.

Copy link
Copy Markdown
Author

@abbccdda abbccdda May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Join/Sync/OffsetCommitSync the failures should be immediate; for heartbeat/commitAsync it would not be immediate but will be quickly surfaced. If we do want a global variable indicating the failure, potentially we need to add a new MemberState

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new MemberState would be a nice way to handle this. We can do this later if you do not think it is important now.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG!

Comment thread clients/src/main/resources/common/message/SyncGroupRequest.json Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated

def getGroupInstanceId(rawInstanceId: String): Option[String] = {
if (rawInstanceId == null ||
config.interBrokerProtocolVersion < KAFKA_2_3_IV0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's sufficient to do this for JoinGroup only. Basically we just try to gate entrance into the group.

By the way, it's worth add a comment in the JoinGroup handler explaining the purpose of the IBP check. Took me a little while to recall that it was tied to the schema we use in the offsets topic.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pre-check should keep the behavior in GroupCoordinator consistent IMO.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that a static member which already received a JoinGroup response could be downgraded to a dynamic member. Have you thought through what the implications of this are?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have IBP to guard against turning a member into static member. So as long as IBP < 2.3, we will not see a downgrade correct?

@abbccdda abbccdda force-pushed the fencing_instance branch 2 times, most recently from 3ad191d to bc1d096 Compare May 17, 2019 17:27
@abbccdda abbccdda force-pushed the fencing_instance branch from bc1d096 to 046d2a0 Compare May 17, 2019 18:16
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, I only have a question about commit fenced error handling. The key is that once a consumer is fenced, it should always be in that state such that all following calls are thrown immediately. Others are minor.

|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED
|| error == Errors.FENCED_INSTANCE_ID) {
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JoinResponse specifically, it should be caught in line 427 above and then falls into else if (!future.isRetriable()) to throw the exception to the callers immediately. So I agree with @abbccdda that no extra logic would be needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. It is symmetric to our ProducerFencedException, where for past event's triggered fencing, it would be thrown to callers directly; only if this call triggers the first fenced error exactly, it would be marked in the callback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm one sec, a not-very-common-but-possible-pattern may be:

  1. user first call commitAsync, the response sent back has Fenced error; it would be kept.
  2. user second call commitAsync, we would poll the completion object and throw immediately.
  3. however, let's say user swallowed it and call commitAsync again, in this case we would proceed since the previous completion has been polled and hence there's no fence error any more, right?

I think the contract should be, once the consumer falls into the fenced state, it should always be in that state and hence always throw immediately for any following function calls. In this sense, we should probably change this logic a bit.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@abbccdda abbccdda force-pushed the fencing_instance branch from 7f53609 to b8ef1cd Compare May 17, 2019 20:45
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
@abbccdda
Copy link
Copy Markdown
Author

@guozhangwang @hachikuji Addressed all comments and replicate fencing logic to all group related protocols. Filed a separate JIRA to track the group error change:
https://issues.apache.org/jira/browse/KAFKA-8386

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more small comments.

heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId);
heartbeatThread.failed.set(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return after we fail the heartbeat thread? We do no want it to keep running I assume.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are in a if-else branch here, but I agree. In case someone adds logic after if-else block in the future.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually it's against code style, so just leave it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a way to stop the heartbeat thread still, right? Perhaps we can invoke disable()?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it. Let's stop it through disable() then

log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id {} gets fenced", groupInstanceId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave the instance id out of this message since we added it to the log context. There are a few more of these below.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED
|| error == Errors.FENCED_INSTANCE_ID) {
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new MemberState would be a nice way to handle this. We can do this later if you do not think it is important now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the contract should be, once the consumer falls into the fenced state, it should always be in that state and hence always throw immediately for any following function calls. In this sense, we should probably change this logic a bit.

Yes, that is what I was trying to get to above. Adding a FENCED state to MemberState would be a nice way to achieve this.

Comment thread clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java Outdated
@abbccdda
Copy link
Copy Markdown
Author

@hachikuji Addressed new comments, and another JIRA to track Fenced state :)
https://issues.apache.org/jira/browse/KAFKA-8387

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@abbccdda abbccdda force-pushed the fencing_instance branch from 792edbc to 82f1f32 Compare May 18, 2019 01:08
@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. Waiting for green builds

@guozhangwang guozhangwang merged commit 9fa331b into apache:trunk May 18, 2019
@guozhangwang
Copy link
Copy Markdown
Contributor

Merged to trunk, kudos @abbccdda !!

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…flicting group.instance.id (apache#6650)

For static members join/rejoin, we encode the current timestamp in the new member.id. The format looks like group.instance.id-timestamp.

During consumer/broker interaction logic (Join, Sync, Heartbeat, Commit), we shall check the whether group.instance.id is known on group. If yes, we shall match the member.id stored on static membership map with the request member.id. If mismatching, this indicates a conflict consumer has used same group.instance.id, and it will receive a fatal exception to shut down.

Right now the only missing part is the system test. Will work on it offline while getting the major logic changes reviewed.

Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants