Skip to content

MINOR: enable topic deletion in the KIP-500 controller#10184

Merged
hachikuji merged 5 commits intoapache:trunkfrom
cmccabe:kip500_delete
Mar 4, 2021
Merged

MINOR: enable topic deletion in the KIP-500 controller#10184
hachikuji merged 5 commits intoapache:trunkfrom
cmccabe:kip500_delete

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Feb 23, 2021

Enable the new KIP-500 controller to delete topics.

Fix a bug where feature level records were not correctly replayed.

Fix a bug in TimelineHashMap#remove where the wrong type was being
returned.

@cmccabe cmccabe added the kraft label Feb 23, 2021
Comment thread config/log4j.properties Outdated
@rondagostino
Copy link
Copy Markdown
Contributor

Successfully ran the system test that was previously failing due to an inability to delete topics:

TC_PATHS="tests/kafkatest/tests/core/replica_scale_test.py::ReplicaScaleTest.test_clean_bounce" bash tests/docker/run_tests.sh

test_id:    kafkatest.tests.core.replica_scale_test.ReplicaScaleTest.test_clean_bounce.topic_count=50.partition_count=34.replication_factor=3.metadata_quorum=REMOTE_RAFT
status:     PASS
run time:   8 minutes 17.307 seconds

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM. some minor questions are left. Please take a look.

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Feb 24, 2021

rebased on trunk

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating code. Some minor comments are left. Please take a look.

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/Controller.java Outdated
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Feb 25, 2021

rebased on trunk

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Feb 26, 2021

rebased on trunk

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe thanks for updating code. please take a look at following minor questions.

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/test/java/kafka/test/MockController.java Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should be TOPIC_AUTHORIZATION_FAILED if the client does not have describe permission regardless of existence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are 5 cases:

  1. name provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
  2. name provided, topic doesn't exist, can't delete, can describe => UNKNOWN_TOPIC_OR_PARTITION
  3. name provided, topic doesn't exist, can't delete, can't describe => TOPIC_AUTHORIZATION_FAILED
  4. id provided, topic exists, can't delete, maybe can describe => TOPIC_AUTHORIZATION_FAILED
  5. id provided, topic doesn't exists, can't delete, maybe can describe => UNKNOWN_TOPIC_ID

Copy link
Copy Markdown
Member

@jolshan jolshan Feb 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For case 4, are we exposing the topic exists by returning a different error (than case 5) in the case where we can't describe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the earlier list was not quite right. I've revised this a bit. It's now:

no name or id => INVALID_REQUEST

duplicate name or id => INVALID_REQUEST

can't resolve topic id => UNKNOWN_TOPIC_ID

can't locate topic name => UNKNOWN_TOPIC_OR_PARTITION

no delete permission, no describe permission => UNKNOWN_TOPIC_ID (if topic id was provided) or UNKNOWN_TOPIC_OR_PARTITION (if topic name was provided)

no delete permission, describe permission => TOPIC_AUTHORIZATION_FAILED with both name and id filled out correctly

The new code should implement this correctly...

Copy link
Copy Markdown
Member

@jolshan jolshan Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Oh sorry this just got changed due to https://issues.apache.org/jira/browse/KAFKA-12394
So the case of no delete permission, no describe permission, topic ID provided is now TOPIC_AUTHORIZATION_FAILED. This may have been what you had initially.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the PR. Added a couple of comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update brokersToIsrs too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ISRs should already have been updated by BrokerChangeRecords that were previously replayed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you mean PartitionChangeRecord? I don't see PartitionChangeRecord being generated from the topicDeletion request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you're right: we need to remove this from brokersToIsrs. Fixed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we haven't hooked up the logic to trigger the deletion of the replicas of the deleted topic in the broker?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't hooked that up yet, correct. But that logic is in BrokerMetadataListener. It would probably be better to have a separate PR for that.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the explanation. A couple of more comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you mean PartitionChangeRecord? I don't see PartitionChangeRecord being generated from the topicDeletion request.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to delete the configuration associated with the topic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the deletion of topic configurations implicit based on the DeleteTopic record? I know we discussed this, but I'm unsure what the final outcome was. I don't see any logic for this in the broker listener, but the implementation looks incomplete anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be implicit based on the DeleteTopic record. I will fix the controller to do the right thing here. We'll also need to have the broker do that too.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Thanks for updating PR.

this code ends up being complicated. Could it be separated and then reused by ControlApis and KafkaApis? I left same comment on #10223 (#10223 (comment))

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 27, 2021

@cmccabe I pushed a commit that removes a bunch of unnecessary asScala conversions (even though these don't copy the collection, they add a shallow allocation and indirection and the code is shorter with these changes - win/win).

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Thanks for handling this very, very, very complicated code. a couple of comments are left. Please take a look.

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the deletion of topic configurations implicit based on the DeleteTopic record? I know we discussed this, but I'm unsure what the final outcome was. I don't see any logic for this in the broker listener, but the implementation looks incomplete anyway.

@cmccabe cmccabe force-pushed the kip500_delete branch 2 times, most recently from 0274964 to 9424061 Compare March 2, 2021 20:30
@rondagostino
Copy link
Copy Markdown
Contributor

The below, if added as tests/kafkatest/sanity_checks/test_delete_topic.py, fails for the Raft cases on this PR branch as of this moment because the broker fails to shutdown. The following appears in the controller log:

[2021-03-02 21:41:13,354] INFO [Controller 1] Unfenced broker 1 has requested and been granted a controlled shutdown. (org.apache.kafka.controller.BrokerHeartbeatManager)
[2021-03-02 21:41:13,355] WARN [Controller 1] org.apache.kafka.controller.QuorumController@3fa533f1: failed with unknown server exception RuntimeException at epoch 1 in 802 us.  Reverting to last committed offset 5. (org.apache.kafka.controller.QuorumController)
java.lang.RuntimeException: Topic ID VnD54LHq2t3qq_m1WLasZg existed in isrMembers, but not in the topics map.
	at org.apache.kafka.controller.ReplicationControlManager.handleNodeDeactivated(ReplicationControlManager.java:752)
	at org.apache.kafka.controller.ReplicationControlManager.processBrokerHeartbeat(ReplicationControlManager.java:931)
	at org.apache.kafka.controller.QuorumController$1.generateRecordsAndResult(QuorumController.java:911)
	at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:419)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
	at java.lang.Thread.run(Thread.java:748)

Maybe add this system test to this PR as tests/kafkatest/sanity_checks/test_delete_topic.py?

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
import time

class TestDeleteTopic(Test):
    """Sanity checks that we can create and delete a topic and then shutdown."""
    def __init__(self, test_context):
        super(TestDeleteTopic, self).__init__(test_context)

        self.topic = "test_topic"
        self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}},
                                  controller_num_nodes_override=1)
    def setUp(self):
        if self.zk:
            self.zk.start()

    @cluster(num_nodes=2)
    @matrix(metadata_quorum=quorum.all)
    def test_delete_topic(self, metadata_quorum):
        """
        Test that we can create and delete a topic and then shutdown
        """
        self.kafka.start()
        self.kafka.delete_topic(self.topic)
        time.sleep(10) # give it a bit to take effect
        self.kafka.stop() # explicit stop so that failure to stop fails the test

@hachikuji
Copy link
Copy Markdown
Contributor

@rondagostino I believe that error will be fixed by https://issues.apache.org/jira/browse/KAFKA-12403.

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Mar 2, 2021

Thanks for the reviews! I reworked the authentication, validation, and de-duplication code a lot. The new logic should take into account the issues pointed out here. I resolved a few comment threads since they refer to code that was refactored-- please take another look if you get a chance.

To clarify a bit, RemoveTopicRecord should imply some other effects:

  • All topic configs for the affected topic should be deleted
  • We should delete all the partitions of the deleted topic
  • We should remove the topic from brokersToIsrs

The fact that it wasn't doing these things was a bug... it's fixed now. This should also allow the ducktape test to work (cc @rondagostino )

We also have a JIRA to follow up on the broker side: https://issues.apache.org/jira/browse/KAFKA-12403

Enable the new KIP-500 controller to delete topics.

Fix a bug where feature level records were not correctly replayed.

Fix a bug in TimelineHashMap#remove where the wrong type was being
returned.
@rondagostino
Copy link
Copy Markdown
Contributor

[2021-03-02T22:42:17.438Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10184/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:76:8: Unused import - java.util.Set. [UnusedImports]

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe thanks for updating code.

The following comments are used to make sure the error handle (and response) is consistent to #10223

please take a look.

Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ControllerApis.scala Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @junrao Would you mind checking over the last two commits I added?

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the updated PR. Just a couple of more comments.

Comment thread core/src/test/scala/unit/kafka/server/ControllerApisTest.scala Outdated
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
}
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do we need to remove for -1 broker? It doesn't seem that brokersToIsrs tracks that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case BrokersToIsrsTest.testNoLeader suggests that it is a possible case. It looks like the path through ReplicationControlManager.handleNodeDeactivated could result in a PartitionChangeRecord which has leaderId set to -1.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true and a partition could have isr and no leader. However, in that case, isrMembers in brokersToIsrs will still be updated with key from replicaId in isr and isr will never have -1 in its list. The noLeader info is only stored in the value of isrMembers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stepped through testNoLeader and it seems that -1 can indeed be a key in isrMembers. The noLeaderIterator makes the expectation explicit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The following comment confirmed this.

    /**
     * A map of broker IDs to the partitions that the broker is in the ISR for.
     * Partitions with no isr members appear in this map under id NO_LEADER.
     */
    private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this nice patch. Some trivial comments are left. Please take a look.

Comment thread core/src/test/scala/unit/kafka/server/ControllerApisTest.scala Outdated
val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
val controller = new MockController.Builder().
newInitialTopic("foo", fooId).build()
controller.setActive(false)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this controller is mock so disabling active works well for this test. However, I did not observe the check of control activity in production code. Could you share that with me?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See QuorumController.QuorumMetaLogListener for the callbacks that make the controller active or inactive.

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the updated PR. LGTM too.

brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
}
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The following comment confirmed this.

    /**
     * A map of broker IDs to the partitions that the broker is in the ISR for.
     * Partitions with no isr members appear in this map under id NO_LEADER.
     */
    private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;

@hachikuji
Copy link
Copy Markdown
Contributor

Thanks for reviews. I will merge this on behalf of @cmccabe to trunk and 2.8.

@hachikuji hachikuji merged commit eebc6f2 into apache:trunk Mar 4, 2021
@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Mar 4, 2021

@hachikuji this PR breaks the build (this PR was merged after #10253). Could you file a hot-fix?

hachikuji added a commit that referenced this pull request Mar 4, 2021
This patch enables delete topic support for the new KIP-500 controller. Also fixes the following:
- Fix a bug where feature level records were not correctly replayed.
- Fix a bug in TimelineHashMap#remove where the wrong type was being returned.

Reviewers: Jason Gustafson <jason@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ron Dagostino <rdagostino@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>

Co-authored-by: Jason Gustafson <jason@confluent.io>
hachikuji added a commit that referenced this pull request Mar 4, 2021
Topic deletions should be atomic. This fixes a build error caused by merging of both #10253 and #10184 at about the same time. 

Reviewers: David Arthur <mumrah@gmail.com>
mumrah pushed a commit that referenced this pull request Mar 4, 2021
Topic deletions should be atomic. This fixes a build error caused by merging of both #10253 and #10184 at about the same time. 

Reviewers: David Arthur <mumrah@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants