From 1a90011ef8f2cbb87aab40a4bb2067b718006d9a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 4 Mar 2021 12:08:05 -0800 Subject: [PATCH] HOTFIX: Controller topic deletion should be atomic --- .../controller/ReplicationControlManager.java | 2 +- .../ReplicationControlManagerTest.java | 24 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 59798c4ac61da..df16479df8737 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -624,7 +624,7 @@ ControllerResult> deleteTopics(Collection ids) { results.put(id, ApiError.fromThrowable(e)); } } - return new ControllerResult<>(records, results); + return ControllerResult.atomicOf(records, results); } void deleteTopic(Uuid id, List records) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index da6e4afe527b2..8b00c10eea31e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) @@ -451,19 +452,19 @@ public void testDeleteTopics() throws Exception { unfenceBroker(0, ctx); registerBroker(1, ctx); unfenceBroker(1, ctx); - ControllerResult result = + ControllerResult createResult = replicationControl.createTopics(request); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - Uuid topicId = result.response().topics().find("foo").topicId(); + Uuid topicId = createResult.response().topics().find("foo").topicId(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). setNumPartitions(3).setReplicationFactor((short) 2). setErrorMessage(null).setErrorCode((short) 0). setTopicId(topicId)); - assertEquals(expectedResponse, result.response()); + assertEquals(expectedResponse, createResult.response()); // Until the records are replayed, no changes are made assertNull(replicationControl.getPartition(topicId, 0)); assertEmptyTopicConfigs(ctx, "foo"); - ctx.replay(result.records()); + ctx.replay(createResult.records()); assertNotNull(replicationControl.getPartition(topicId, 0)); assertNotNull(replicationControl.getPartition(topicId, 1)); assertNotNull(replicationControl.getPartition(topicId, 2)); @@ -483,17 +484,18 @@ public void testDeleteTopics() throws Exception { new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar"))); - ControllerResult> result1 = replicationControl. + ControllerResult> invalidDeleteResult = replicationControl. deleteTopics(Collections.singletonList(invalidId)); - assertEquals(0, result1.records().size()); + assertEquals(0, invalidDeleteResult.records().size()); assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)), - result1.response()); - ControllerResult> result2 = replicationControl. + invalidDeleteResult.response()); + ControllerResult> deleteResult = replicationControl. deleteTopics(Collections.singletonList(topicId)); + assertTrue(deleteResult.isAtomic()); assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)), - result2.response()); - assertEquals(1, result2.records().size()); - ctx.replay(result2.records()); + deleteResult.response()); + assertEquals(1, deleteResult.records().size()); + ctx.replay(deleteResult.records()); assertNull(replicationControl.getPartition(topicId, 0)); assertNull(replicationControl.getPartition(topicId, 1)); assertNull(replicationControl.getPartition(topicId, 2));