From bab8d4d365cc9dd98881ebd225f39ca0d8cc2f11 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 20 Jun 2025 15:48:03 +0800 Subject: [PATCH 1/2] fix reassign command bug --- .../kafka/tools/reassign/ReassignPartitionsCommand.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 72c49410e13eb..4893bb2583010 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -1271,7 +1271,7 @@ static Entry, Set> cancelAssignment(A Set targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet()); Set curReassigningParts = new HashSet<>(); adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> { - if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) + if (!reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) curReassigningParts.add(part); }); if (!curReassigningParts.isEmpty()) { @@ -1440,7 +1440,7 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { } OptionSpec action = allActions.get(0); - + if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) From c939887edf2a8848fd3528567408799e148b1bd9 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Mon, 23 Jun 2025 09:50:48 +0800 Subject: [PATCH 2/2] add UT --- .../ReassignPartitionsCommandTest.java | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 1ee55c6eacea1..069a64234c678 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -316,7 +316,7 @@ public void testCancellationWithBootstrapController() throws Exception { } @ClusterTest - public void testCancellationWithAddingReplicaInIsr() throws Exception { + public void testCancellationWithAddingAndRemovingReplicaInIsr() throws Exception { createTopics(); TopicPartition foo0 = new TopicPartition("foo", 0); produceMessages(foo0.topic(), foo0.partition(), 200); @@ -351,6 +351,42 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception { verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); } + @ClusterTest + public void testCancellationWithAddingReplicaInIsr() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 200); + + // The reassignment will bring replicas 3 and 4 into the replica set. + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}" + + "]}"; + + // We will throttle replica 4 so that only replica 3 joins the ISR + setReplicationThrottleForPartitions(foo0); + + // Execute the assignment and wait for replica 3 (only) to join the ISR + runExecuteAssignment(false, assignment, -1L, -1L); + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + TestUtils.waitForCondition( + () -> { + Set isr = admin.describeTopics(Collections.singleton(foo0.topic())) + .allTopicNames().get().get(foo0.topic()).partitions().stream() + .filter(p -> p.partition() == foo0.partition()) + .flatMap(p -> p.isr().stream()) + .map(Node::id).collect(Collectors.toSet()); + return isr.containsAll(Arrays.asList(0, 1, 2, 3)); + }, + "Timed out while waiting for replica 3 to join the ISR" + ); + } + + // Now cancel the assignment and verify that the partition is removed from cancelled replicas + assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true)); + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); + } + /** * Test moving partitions between directories. */