Conversation
|
@hachikuji, due to the fast-approaching feature freeze, I thought I'd ask your opinion: The KIP proposes an error at the high level and then per group errors. But I now think that the high level error is not really something that could apply to all groups that are requested to be deleted. For example, Just wanted to get your feedback on my understanding is correct, and if so, the course of action. Thanks a lot. |
|
@vahidhashemian Yes, I was in fact wondering about that when I read through the KIP. The only case I could think of when we'd take advantage of it would be unhandled errors. If we don't have a good use case for it at the moment, I think it would be fine to drop it. |
|
@hachikuji thanks a lot for the quick response. Should I just update the KIP with this? Any notification/revote required? |
|
I'd suggest updating the KIP and sending a message to the discussion thread. We often change minor details during implementation, so I don't think a revote will be needed, but we can see if anyone has feedback. |
8dd6874 to
ae247cf
Compare
ae247cf to
b282a21
Compare
|
@hachikuji, would appreciate your feedback on this PR when you get a chance. Thanks! |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch. Left some comments.
| @Override | ||
| public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { | ||
| Errors error = Errors.forException(e); | ||
| Map<String, Errors> groupErrors = new HashMap<>(); |
There was a problem hiding this comment.
nit: may as well initialize with the right size
| return new DeleteGroupsResponse(throttleTimeMs, groupErrors); | ||
| default: | ||
| throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", | ||
| version(), this.getClass().getSimpleName(), ApiKeys.DELETE_GROUPS.latestVersion())); |
There was a problem hiding this comment.
nit: instead of the class name, maybe use ApiKeys.DELETE_GROUPS.name.
|
|
||
|
|
||
| /** | ||
| * Possible error codes: |
There was a problem hiding this comment.
I think NOT_COORDINATOR should also be possible?
There was a problem hiding this comment.
Correct. Also COORDINATOR_LOAD_IN_PROGRESS if I'm not mistaken?
|
|
||
| def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = { | ||
| var errors: Map[String, Errors] = Map() | ||
| val groupsPerCoordinator = groups.map { group => |
There was a problem hiding this comment.
I think I'd suggest moving coordinator lookup to a separate function. You might also consider using the Either class to distinguish errors since the mixture of functional logic and updates to the mutable errors collection is a little odd.
There was a problem hiding this comment.
I tried to improve upon this in the new commit. Please let me know what you think.
| } | ||
|
|
||
| override def deleteGroups(): Map[String, Errors] = { | ||
| val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList |
There was a problem hiding this comment.
Hmm.. It's a little weird that we allow multiple groups to be passed when using the new consumer, but we expect a single group for the old consumer. If we're to stay consistent, do you think it would be restrictive in practice to only support deletion of a single group at a time?
There was a problem hiding this comment.
In the meantime I'll look at your other feedback (thanks btw) regarding this one, it seems the old consumer also supports deleting multiple groups, i.e. ... --delete --group group1 --group group2 works and attempts to remove both groups.
I originally wanted to support single group deletion only, but after considering the existing behavior for old consumer decided otherwise.
There was a problem hiding this comment.
Ah, you are right. The name deleteForGroup is kind of misleading. Maybe it just needs to be pluralized. I wouldn't hate it if we came up with better names for all of these deleteForXXX APIs.
There was a problem hiding this comment.
Sure, I gave this a quick try. Let me know if you have better suggestions.
| var result: Map[String, Errors] = Map() | ||
|
|
||
| groupIds.foreach { groupId => | ||
| if (!groupMetadataCache.contains(groupId)) |
There was a problem hiding this comment.
This "check and act" is not safe since we're not holding a lock. It would be better to get the GroupMetadata object and check if it is null. If it is not null, then we need to grab the group lock before checking its state and attempting to delete its state.
There was a problem hiding this comment.
That's correct. It seems because of this lock we need to delete groups one by one then (as in the new commit)?
| } | ||
|
|
||
| if (eligibleGroups.nonEmpty) { | ||
| cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue) |
There was a problem hiding this comment.
I don't think passing None works. Looking at cleanupGroupMetadata, that would just result in removal of the expired offsets.
There was a problem hiding this comment.
Correct, but since we pass Long.MaxValue as the current time, all offsets in the passed groups expire. Would that work?
| def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) { | ||
| val startMs = time.milliseconds() | ||
| def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]], | ||
| groups: Iterable[GroupMetadata] = groupMetadataCache.values, |
There was a problem hiding this comment.
It would be better not to have optional arguments. Let's make the caller provide the values explicitly.
| groups.foreach { group => | ||
| if (!authorize(request.session, Delete, new Resource(Group, group))) { | ||
| unauthorizedGroupsDeletionResult += (group -> Errors.GROUP_AUTHORIZATION_FAILED) | ||
| groups -= group |
There was a problem hiding this comment.
Perhaps we can partition to split the incoming group list into the authorized and unauthorized groups.
There was a problem hiding this comment.
Thanks for the suggestion, makes a lot of sense.
| } | ||
|
|
||
| @Test | ||
| def testDeleteEmptyGroup() { |
There was a problem hiding this comment.
We should have a test case which tests removal when there are stored offsets.
There was a problem hiding this comment.
I added one in the new commit.
vahidhashemian
left a comment
There was a problem hiding this comment.
@hachikuji thanks for the feedback. I tried to address them in the new commit.
|
|
||
|
|
||
| /** | ||
| * Possible error codes: |
There was a problem hiding this comment.
Correct. Also COORDINATOR_LOAD_IN_PROGRESS if I'm not mistaken?
|
|
||
| def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = { | ||
| var errors: Map[String, Errors] = Map() | ||
| val groupsPerCoordinator = groups.map { group => |
There was a problem hiding this comment.
I tried to improve upon this in the new commit. Please let me know what you think.
| else if (opts.options.has(opts.topicOpt)) | ||
| deleteAllForTopic() | ||
|
|
||
| Map() |
There was a problem hiding this comment.
Sounds good. I updated this in the new commit.
| } | ||
|
|
||
| if (eligibleGroups.nonEmpty) { | ||
| cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue) |
There was a problem hiding this comment.
Correct, but since we pass Long.MaxValue as the current time, all offsets in the passed groups expire. Would that work?
| var result: Map[String, Errors] = Map() | ||
|
|
||
| groupIds.foreach { groupId => | ||
| if (!groupMetadataCache.contains(groupId)) |
There was a problem hiding this comment.
That's correct. It seems because of this lock we need to delete groups one by one then (as in the new commit)?
| groups.foreach { group => | ||
| if (!authorize(request.session, Delete, new Resource(Group, group))) { | ||
| unauthorizedGroupsDeletionResult += (group -> Errors.GROUP_AUTHORIZATION_FAILED) | ||
| groups -= group |
There was a problem hiding this comment.
Thanks for the suggestion, makes a lot of sense.
| } | ||
|
|
||
| override def deleteGroups(): Map[String, Errors] = { | ||
| val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList |
There was a problem hiding this comment.
Sure, I gave this a quick try. Let me know if you have better suggestions.
| } | ||
|
|
||
| @Test | ||
| def testDeleteEmptyGroup() { |
There was a problem hiding this comment.
I added one in the new commit.
| authorize(request.session, Delete, new Resource(Group, group)) | ||
| } | ||
|
|
||
| val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups)._2 ++ |
There was a problem hiding this comment.
looks like we are ignoring groupCoordinator.handleDeleteGroups(authorizedGroups)._1 error here.
handleDeleteGroups(authorizedGroups) can return Errors.COORDINATOR_NOT_AVAILABLE.
There was a problem hiding this comment.
Thanks for catching this. I missed updating this with the recent change to the protocol. Will update it in the next commit.
d37d459 to
6d3a0b7
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Left a few more comments. I think we still need a little work to make the deletion safe for edge cases around coordinator failover.
| }.filter(_._1 != null) | ||
| } | ||
|
|
||
| val groupCoordinator = groups.map(group => (group -> coordinatorLookup(group))) |
| errors += (group -> error) | ||
| case Left(coordinator) => | ||
| groupsPerCoordinator.get(coordinator) match { | ||
| case Some(gList: List[String]) => |
There was a problem hiding this comment.
nit: I don't think you need the type.
| val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava)) | ||
| val response = responseBody.asInstanceOf[DeleteGroupsResponse] | ||
| groups.foreach { | ||
| case group if (response.hasError(group)) => errors += (group -> response.errors.get(group)) |
There was a problem hiding this comment.
nit: unneeded parenthesis around response.hasError(group)
| deleteAllForTopic() | ||
| deleteAllGroupsInfoForTopic() | ||
|
|
||
| Map() |
There was a problem hiding this comment.
Seems like you were intending to use the results of the deleteGroupsInfo and such. We should probably have a test case (could be done in a follow-up).
There was a problem hiding this comment.
Sure, I'll submit a separate PR with proper test(s) after this is merged.
| if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) { | ||
| println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.") | ||
| else | ||
| (group -> Errors.NONE) |
There was a problem hiding this comment.
nit: unneeded parenthesis. A few more like this.
| } | ||
| } | ||
|
|
||
| val groupCoordinator = groups.map(group => (group -> coordinatorLookup(group))) |
| groupIds.foreach { groupId => | ||
| if (!validGroupId(groupId)) | ||
| groupErrors += (groupId -> Errors.INVALID_GROUP_ID) | ||
| else if (!isCoordinatorForGroup(groupId)) { |
There was a problem hiding this comment.
nit: looks weird that only this branch has braces
| var groupErrors: Map[String, Errors] = Map() | ||
| var eligibleGroups: Seq[String] = Seq() | ||
|
|
||
| groupIds.foreach { groupId => |
There was a problem hiding this comment.
In fact, this is also a "check and act." It is possible for an eligible group to be unloaded between these checks and the call to deleteGroups.
I think we should follow a structure more similar to the other API handlers. I would suggest moving the state checking that we currently have in GroupMetadataManager.deleteGroups into the else case below. We should do the following:
- Check if there is no group or if the group is Dead. If so, it could mean that it has already been moved to another broker or it could mean that the group doesn't exist. I am not sure we have a bulletproof way to distinguish these cases, but maybe we could just check again if the coordinator is still correct?
- Check if the group is not empty. If so, return the GROUP_NOT_EMPTY error code.
- If the group is empty, we should transition to
Dead. Once we do so, we are ensured that no other thread will attempt to use theGroupMetadataobject. We can then collect this eligible group in a collection (as is currently done) and send it tocleanupGroupMetadataoutside of the lock.
There was a problem hiding this comment.
I'm working on this and have a couple of questions for now:
- It seems all this can be done here and we could get rid of
GroupMetadataManager.deleteGroups(). Do you see an issue with it? - Could you please clarify what you mean by "check again if the coordinator is still correct" when group cannot be found or is
Dead?
Thanks.
There was a problem hiding this comment.
-
Seems reasonable to me.
-
We are trying to address the case in which a group gets deleted or migrated in between the time that we check if the coordinator is assigned and the time we delete the group metadata. We have the
Deadstate for this purpose, so whenever we checkGroupMetadata, the first thing we should check is whether it is alreadyDead. If it is, then we know it was either already deleted or already migrated. My suggestion is to check again whether we are still the coordinator for the group to disambiguate the two cases.
Note that I am not sure that this is 100% bulletproof. For example, it may not handle the case when the coordinator is migrated away and then back very quickly. A spurious NOT_COORDINATOR error is not a big deal because clients are expected to handle it, but I am not too sure about the GROUP_ID_NOT_FOUND error. Maybe clients just have to treat it with the same skepticism that they treat the UNKOWN_TOPIC_OR_PARTITION errors.
There was a problem hiding this comment.
Thanks for clarifying this. I'll try to do just that in the new patch. Will submit shortly.
vahidhashemian
left a comment
There was a problem hiding this comment.
@hachikuji thanks for another review. Could you please clarify on a couple of questions inline before I submit another patch? Thanks.
| deleteAllForTopic() | ||
| deleteAllGroupsInfoForTopic() | ||
|
|
||
| Map() |
There was a problem hiding this comment.
Sure, I'll submit a separate PR with proper test(s) after this is merged.
| var groupErrors: Map[String, Errors] = Map() | ||
| var eligibleGroups: Seq[String] = Seq() | ||
|
|
||
| groupIds.foreach { groupId => |
There was a problem hiding this comment.
I'm working on this and have a couple of questions for now:
- It seems all this can be done here and we could get rid of
GroupMetadataManager.deleteGroups(). Do you see an issue with it? - Could you please clarify what you mean by "check again if the coordinator is still correct" when group cannot be found or is
Dead?
Thanks.
hachikuji
left a comment
There was a problem hiding this comment.
Thank for the updates. A few more comments.
| else { | ||
| groupManager.getGroup(groupId) match { | ||
| case None => | ||
| groupErrors += groupId -> Errors.GROUP_ID_NOT_FOUND |
There was a problem hiding this comment.
This case should be handled the same as if the group is dead. You can probably add a little helper to avoid the duplication.
There was a problem hiding this comment.
On second thought, this probably doesn't solve the underlying issue. I'm trying to think how we can be sure that we're returning this error code correctly. Maybe we need to check for group existence while holding the ownedPartitions lock in GroupMetadataManager.
There was a problem hiding this comment.
If ownedPartitions includes the corresponding topic partition for the group, and if the cached group either doesn't exist or is Dead, then I think it is safe to return GROUP_ID_NOT_FOUND. Maybe we can just add a method like the following to GroupMetadataManager:
// return true iff group is owned and the group doesn't exist
def groupNotExists(groupId: String) = inLock(partitionLock) {
isGroupLocal(groupId) && (!groupMetadataCache.contains(groupId) || groupMetadataCache.get(groupId).is(Dead))
}Then we can use this function here instead of checking the coordinator again. The name could probably be improved.
There was a problem hiding this comment.
For both case None and case Dead we already know the second part ((!groupMetadataCache.contains(groupId) || groupMetadataCache.get(groupId).is(Dead))) is true. So, it suffices to check isGroupLocal(groupId) (to avoid redundant checks). Is that correct? If so, we wouldn't need this helper (at least here).
There was a problem hiding this comment.
The point is to check it while holding the partition lock so that it is an atomic operation. This ensures that we will not have any race conditions with partition loading/unloading.
There was a problem hiding this comment.
Aah, right. That makes sense.
| } | ||
|
|
||
| @Test | ||
| def testDeleteNonEmptyGroup() { |
There was a problem hiding this comment.
Why remove these test cases?
There was a problem hiding this comment.
They made a call to GroupMetadataManager.deleteGroups(...) that we just deleted. Similar tests exist in GroupCoordinatorTest.
| } | ||
|
|
||
| if (eligibleGroups.nonEmpty) { | ||
| groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue) |
There was a problem hiding this comment.
This still feels a bit hacky. As an alternative, maybe we can let the offset selector be provided as a function. Something like this:
def cleanupGroupMetadata(
groups: Iterable[GroupMetadata],
collectOffsetsToRemove: Group => Map[TopicPartition, OffsetAndMetadata])What do you think?
There was a problem hiding this comment.
I'm not sure which part you consider hacky, and am trying to understand your suggestion.
For the sake of deleteGroups functionality, we can use group.allOffsets that conforms to the function signature above. But how about the existing functionality, where we want to delete specific topic partitions from a group: groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds()) and populate the corresponding OffsetAndMetadata values? I'm assuming we want to reuse the same cleanupGroupMetadata method for both cases.
On the same assumption, we also need to factor in the concept of current time so we can determine the expired offsets for the existing functionality.
On the other hand if you are proposing to create A new cleanupGroupMetadata method that calls on the existing method, we should make this call once per group (since topic partitions are group-specific).
Or maybe I'm missing the point :)
There was a problem hiding this comment.
It's not that big of a deal. I just thought it was a mild abuse to reuse the expiration logic to delete all offsets. Alternatively, what I was suggesting is to let the caller choose the offsets to delete.
|
@vahidhashemian If you can update the patch this morning, we may still be able to get it into this release. The main thing from my perspective is ensuring that the |
|
@hachikuji I just updated the patch, without the improvement on |
|
|
||
| // return true iff group is owned and the group doesn't exist | ||
| def groupNotExists(groupId: String) = inLock(partitionLock) { | ||
| isGroupLocal(groupId) && (!groupMetadataCache.contains(groupId) || groupMetadataCache.get(groupId).is(Dead)) |
There was a problem hiding this comment.
Should have mentioned before, but we do need to grab the group lock to check the state.
There was a problem hiding this comment.
Correct, thanks for catching. Hopefully the new commit works.
| // return true iff group is owned and the group doesn't exist | ||
| def groupNotExists(groupId: String) = inLock(partitionLock) { | ||
| isGroupLocal(groupId) && (!groupMetadataCache.contains(groupId) || groupMetadataCache.get(groupId).is(Dead)) | ||
| isGroupLocal(groupId) && (!groupMetadataCache.contains(groupId) || { |
There was a problem hiding this comment.
Can you write a short test case to make sure this function works correctly. Also, I think this is a bit more concise:
isGroupLocal(groupId) && getGroup(groupId).forall { group =>
group.inLock(group.is(Dead))
}There was a problem hiding this comment.
Thanks for the code improvement suggestion. I added a basic unit test in the new commit.
| // group is not owned | ||
| assertFalse(groupMetadataManager.groupNotExists(groupId)) | ||
|
|
||
| groupMetadataManager.addPartitionOwnership(groupPartitionId) |
There was a problem hiding this comment.
Following this and prior to adding the group, we should see groupNotExists return true?
There was a problem hiding this comment.
Yes, I'll add that. Thanks!
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch!
|
The test failures appear unrelated. Merging to trunk. |
|
Great, and thanks for quick reviews! |
|
Refer to this link for build results (access rights to CI server needed): |
This PR implements KIP-229.
Committer Checklist (excluded from commit message)