Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ static Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> cancelAssignment(A
Set<TopicPartition> targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet());
Set<TopicPartition> curReassigningParts = new HashSet<>();
adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> {
if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty())
if (!reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty())
curReassigningParts.add(part);
});
if (!curReassigningParts.isEmpty()) {
Expand Down Expand Up @@ -1440,7 +1440,7 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
}

OptionSpec<?> action = allActions.get(0);

if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller");
else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void testCancellationWithBootstrapController() throws Exception {
}

@ClusterTest
public void testCancellationWithAddingReplicaInIsr() throws Exception {
public void testCancellationWithAddingAndRemovingReplicaInIsr() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 200);
Expand Down Expand Up @@ -351,6 +351,42 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception {
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
}

@ClusterTest
public void testCancellationWithAddingReplicaInIsr() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 200);

// The reassignment will bring replicas 3 and 4 into the replica set.
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}" +
"]}";

// We will throttle replica 4 so that only replica 3 joins the ISR
setReplicationThrottleForPartitions(foo0);

// Execute the assignment and wait for replica 3 (only) to join the ISR
runExecuteAssignment(false, assignment, -1L, -1L);
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
TestUtils.waitForCondition(
() -> {
Set<Integer> isr = admin.describeTopics(Collections.singleton(foo0.topic()))
.allTopicNames().get().get(foo0.topic()).partitions().stream()
.filter(p -> p.partition() == foo0.partition())
.flatMap(p -> p.isr().stream())
.map(Node::id).collect(Collectors.toSet());
return isr.containsAll(Arrays.asList(0, 1, 2, 3));
},
"Timed out while waiting for replica 3 to join the ISR"
);
}

// Now cancel the assignment and verify that the partition is removed from cancelled replicas
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runCancelAssignment returns SimpleImmutableEntry, so you use SimpleImmutableEntry to verify the value, right? If so, that is fine in this PR. However, could you please file a PR to replace SimpleImmutableEntry by Map.entry?

verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
}

/**
* Test moving partitions between directories.
*/
Expand Down