From 894ac0aecccf3be89bd9d8255de86f481fae3674 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 13 Oct 2023 15:35:06 -0400 Subject: [PATCH 01/12] After sending full metadata to brokers, delete the pending topic deletions --- .../zk/migration/ZkTopicMigrationClient.scala | 22 +++++-- .../kafka/zk/ZkMigrationIntegrationTest.scala | 66 +++++++++++++++++++ .../zk/migration/ZkMigrationClientTest.scala | 2 +- .../kafka/controller/QuorumController.java | 16 +++++ .../migration/KRaftMigrationDriver.java | 15 +++++ .../migration/KRaftMigrationZkWriter.java | 6 +- .../migration/TopicMigrationClient.java | 3 + .../metadata/migration/ZkRecordConsumer.java | 2 + .../CapturingTopicMigrationClient.java | 12 +++- .../migration/KRaftMigrationDriverTest.java | 5 ++ 10 files changed, 141 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala index 9459a83b2830d..70acc861c773c 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala @@ -47,8 +47,10 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie if (!interests.contains(TopicVisitorInterest.TOPICS)) { throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.") } - val topics = zkClient.getAllTopicsInCluster() - val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics) + val allTopics = zkClient.getAllTopicsInCluster() + val topicDeletions = zkClient.getTopicDeletions + info(s"Found ${topicDeletions.size} pending topic deletions: ${topicDeletions}. These will be migrated to KRaft and then deleted.") + val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(allTopics) replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) => val topicAssignment = partitionAssignments.map { case (partition, assignment) => partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava @@ -195,6 +197,7 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie override def deleteTopic( topicName: String, + removePendingDeletion: Boolean, state: ZkMigrationLeadershipState ): ZkMigrationLeadershipState = wrapZkException { // Delete the partition state ZNodes recursively, then topic config, and finally the topic znode @@ -204,8 +207,15 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie DeleteRequest(childPath, ZkVersion.MatchAnyVersion) } ++ Seq( DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion), - DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion) - ) + DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion), + ) ++ { + if (removePendingDeletion) { + Seq(DeleteRequest(DeleteTopicsTopicZNode.path(topicName), ZkVersion.MatchAnyVersion)) + } else { + Seq.empty + } + } + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state) val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap if (responses.last.resultCode.equals(Code.OK)) { @@ -316,4 +326,8 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch) SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition)) } + + override def readPendingTopicDeletions(): util.Set[String] = { + zkClient.getTopicDeletions.toSet.asJava + } } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 4f5a2b8cb43a6..5b99c5564dd5e 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -265,6 +265,72 @@ class ZkMigrationIntegrationTest { migrationState = migrationClient.releaseControllerLeadership(migrationState) } + @ClusterTemplate("zkClustersForAllMigrationVersions") + def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = { + // Create a topic in ZK mode + var admin = zkCluster.createAdminClient() + val newTopics = new util.ArrayList[NewTopic]() + newTopics.add(new NewTopic("test-topic-1", 100, 3.toShort)) + newTopics.add(new NewTopic("test-topic-2", 100, 3.toShort)) + newTopics.add(new NewTopic("test-topic-3", 100, 3.toShort)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get(300, TimeUnit.SECONDS) + admin.close() + val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient + + // Bootstrap the ZK cluster ID into KRaft + val clusterId = zkCluster.clusterId() + val kraftCluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(zkCluster.config().metadataVersion()). + setClusterId(Uuid.fromString(clusterId)). + setNumBrokerNodes(0). + setNumControllerNodes(1).build()) + .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") + .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .build() + try { + kraftCluster.format() + kraftCluster.startup() + val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) + + // Start a delete, but don't wait for it + admin = zkCluster.createAdminClient() + admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJava) + admin.close() + + // Enable migration configs and restart brokers + log.info("Restart brokers in migration mode") + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) + zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + zkCluster.rollingBrokerRestart() + + zkCluster.waitForReadyBrokers() + readyFuture.get(30, TimeUnit.SECONDS) + + // Wait for migration to begin + log.info("Waiting for ZK migration to complete") + TestUtils.waitUntilTrue( + () => zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(), + "Timed out waiting for migration to complete", + 30000) + + Thread.sleep(30000) + + admin = zkCluster.createAdminClient() + TestUtils.waitUntilTrue( + () => admin.listTopics().names().get(30, TimeUnit.SECONDS).isEmpty, + "Timed out waiting for topics to be deleted", + 300000) + log.info("Topics after migration {}", admin.listTopics().names().get(30, TimeUnit.SECONDS)) + admin.close() + } finally { + shutdownInSequence(zkCluster, kraftCluster) + } + } + // SCRAM and Quota are intermixed. Test SCRAM Only here @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala index 773c42a66e4a6..ce6a3e6d1fd40 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala @@ -97,7 +97,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness { assertEquals(List(3), partition1.isr) // Delete whole topic - migrationState = migrationClient.topicClient().deleteTopic("test", migrationState) + migrationState = migrationClient.topicClient().deleteTopic("test", false, migrationState) assertEquals(2, migrationState.migrationZkVersion()) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 5b437cec754ac..c27e64eab83e3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1031,6 +1031,22 @@ public void abortMigration() { // next controller abort the migration transaction (if in use). fatalFaultHandler.handleFault("Aborting the ZK migration"); } + + @Override + public CompletableFuture deleteTopic(String topicName) { + Uuid topicId = QuorumController.this.replicationControl().getTopicId(topicName); + if (topicId == null) { + return CompletableFuture.completedFuture(null); + } + ControllerWriteEvent deleteEvent = new ControllerWriteEvent<>( + "Delete Topic " + topicName, + new MigrationWriteOperation(Collections.singletonList( + new ApiMessageAndVersion( + new RemoveTopicRecord().setTopicId(topicId), (short) 0)) + ), EnumSet.noneOf(ControllerOperationFlag.class)); + queue.append(deleteEvent); + return deleteEvent.future; + } } class QuorumMetaLogListener implements RaftClient.Listener { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index aa60390cc06d4..ec97d8f83ec94 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -722,6 +722,21 @@ public void run() throws Exception { log.info("Sending RPCs to broker before moving to dual-write mode using " + "at offset and epoch {}", image.highestOffsetAndEpoch()); propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); + + // After full metadata update, delete the pending topic deletions from KRaft. This will cause the + // next round of RPCs sent to brokers to include StopReplicas + zkMigrationClient.topicClient().readPendingTopicDeletions().forEach(topicToDelete -> { + try { + log.info("Deleting topic {} which was marked for deletion prior to the migration", topicToDelete); + CompletableFuture future = zkRecordConsumer.deleteTopic(topicToDelete); + FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", + "the controller to delete topic " + topicToDelete, + future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); + } catch (Throwable t) { + log.error("Failed to delete topic " + topicToDelete, t); + } + }); + // Migration leadership state doesn't change since we're not doing any Zk writes. transitionTo(MigrationDriverState.DUAL_WRITE); } else { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index a57e1cadc65c5..90543cc3955d7 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -145,6 +145,7 @@ void handleTopicsSnapshot(TopicsImage topicsImage, KRaftMigrationOperationConsum Map> extraneousPartitionsInZk = new HashMap<>(); Map> changedPartitions = new HashMap<>(); Map> newPartitions = new HashMap<>(); + Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); migrationClient.topicClient().iterateTopics( EnumSet.of( @@ -234,7 +235,7 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat operationConsumer.accept( DELETE_TOPIC, "Delete Topic " + topicName + ", ID " + topicId, - migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState) + migrationState -> migrationClient.topicClient().deleteTopic(topicName, pendingTopicDeletions.contains(topicName), migrationState) ); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); operationConsumer.accept( @@ -280,10 +281,11 @@ void handleTopicsDelta( TopicsDelta topicsDelta, KRaftMigrationOperationConsumer operationConsumer ) { + Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); topicsDelta.deletedTopicIds().forEach(topicId -> { String name = deletedTopicNameResolver.apply(topicId); operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + ", ID " + topicId, - migrationState -> migrationClient.topicClient().deleteTopic(name, migrationState)); + migrationState -> migrationClient.topicClient().deleteTopic(name, pendingTopicDeletions.contains(name), migrationState)); }); topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java index 505c78a10b4cc..15edc41a0e46d 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java @@ -42,8 +42,11 @@ default void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistra void iterateTopics(EnumSet interests, TopicVisitor visitor); + Set readPendingTopicDeletions(); + ZkMigrationLeadershipState deleteTopic( String topicName, + boolean removePendingDeletion, ZkMigrationLeadershipState state ); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java index 910a58bec796a..6b9dbfd4859e1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java @@ -27,4 +27,6 @@ public interface ZkRecordConsumer { CompletableFuture acceptBatch(List recordBatch); CompletableFuture completeMigration(); void abortMigration(); + + CompletableFuture deleteTopic(String topicName); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java index 30317b20ecc7d..cd32812486809 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java @@ -21,6 +21,7 @@ import org.apache.kafka.metadata.PartitionRegistration; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; @@ -51,7 +52,16 @@ public void iterateTopics(EnumSet interests, TopicVisitor } @Override - public ZkMigrationLeadershipState deleteTopic(String topicName, ZkMigrationLeadershipState state) { + public Set readPendingTopicDeletions() { + return Collections.emptySet(); + } + + @Override + public ZkMigrationLeadershipState deleteTopic( + String topicName, + boolean removePendingDeletion, + ZkMigrationLeadershipState state + ) { deletedTopics.add(topicName); return state; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 1bca3e2f2b2e8..5e48c5f3f015e 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -143,6 +143,11 @@ public CompletableFuture completeMigration() { public void abortMigration() { } + + @Override + public CompletableFuture deleteTopic(String topicName) { + return CompletableFuture.completedFuture(null); + } } static class CountingMetadataPropagator implements LegacyPropagator { From 139aa8a2d361a941086e81466f15e603a53f8619 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 18 Oct 2023 14:29:51 -0400 Subject: [PATCH 02/12] Add pending topic reconciliation to ZK broker --- .../common/requests/LeaderAndIsrRequest.java | 38 ++++++++++++- .../controller/ControllerChannelManager.scala | 18 ++++++- core/src/main/scala/kafka/log/LocalLog.scala | 37 ++++++++++--- .../src/main/scala/kafka/log/LogManager.scala | 32 ++++++++--- .../src/main/scala/kafka/log/UnifiedLog.scala | 4 ++ .../kafka/migration/MigrationPropagator.scala | 2 + .../scala/kafka/server/ReplicaManager.scala | 50 ++++++++++++++++- .../zk/migration/ZkTopicMigrationClient.scala | 38 ++++++++----- .../kafka/zk/ZkMigrationIntegrationTest.scala | 30 +++++++++-- .../kafka/server/ReplicaManagerTest.scala | 53 ++++++++++++++++++- .../zk/migration/ZkMigrationClientTest.scala | 2 +- .../kafka/controller/QuorumController.java | 16 ------ .../migration/KRaftMigrationDriver.java | 15 ------ .../migration/KRaftMigrationZkWriter.java | 14 +++-- .../migration/TopicMigrationClient.java | 6 ++- .../metadata/migration/ZkRecordConsumer.java | 2 - .../CapturingTopicMigrationClient.java | 9 +++- .../migration/KRaftMigrationDriverTest.java | 5 -- 18 files changed, 294 insertions(+), 77 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 257d8e78bfcc3..3c81c7e802f5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -42,26 +42,52 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { + public enum Type { + None(0), + Incremental(1), + Full(2); + + private final byte type; + Type(int type) { + this.type = (byte) type; + } + + public byte asByte() { + return type; + } + + public static Type fromByte(byte type) { + for (Type t : Type.values()) { + if (t.type == type) { + return t; + } + } + throw new IllegalArgumentException("No Type enum for value " + type); + } + } + public static class Builder extends AbstractControlRequest.Builder { private final List partitionStates; private final Map topicIds; private final Collection liveLeaders; + private final Type updateType; public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List partitionStates, Map topicIds, Collection liveLeaders) { this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds, - liveLeaders, false); + liveLeaders, false, Type.None); } public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List partitionStates, Map topicIds, - Collection liveLeaders, boolean kraftController) { + Collection liveLeaders, boolean kraftController, Type updateType) { super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController); this.partitionStates = partitionStates; this.topicIds = topicIds; this.liveLeaders = liveLeaders; + this.updateType = updateType; } @Override @@ -82,6 +108,10 @@ public LeaderAndIsrRequest build(short version) { data.setIsKRaftController(kraftController); } + if (version >= 5) { + data.setType(updateType.asByte()); + } + if (version >= 2) { Map topicStatesMap = groupByTopic(partitionStates, topicIds); data.setTopicStates(new ArrayList<>(topicStatesMap.values())); @@ -210,6 +240,10 @@ public List liveLeaders() { return Collections.unmodifiableList(data.liveLeaders()); } + public Type requestType() { + return Type.fromByte(data.type()); + } + @Override public LeaderAndIsrRequestData data() { return data; diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7e3b2a29227ea..cf93bf37d9c44 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] + private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.None private var metadataInstance: ControllerChannelContext = _ def sendRequest(brokerId: Int, @@ -398,6 +399,10 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, metadataInstance = metadataProvider() } + def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = { + this.updateType = updateType + } + def clear(): Unit = { leaderAndIsrRequestMap.clear() stopReplicaRequestMap.clear() @@ -543,8 +548,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, .toSet[String] .map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID))) .toMap - val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, - controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava, kraftController) + val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder( + leaderAndIsrRequestVersion, + controllerId, + controllerEpoch, + brokerEpoch, + leaderAndIsrPartitionStates.values.toBuffer.asJava, + topicIds.asJava, + leaders.asJava, + kraftController, + updateType + ) sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => { val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse] handleLeaderAndIsrResponse(leaderAndIsrResponse, broker) diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 6dfd92461c97b..aaaeb8787e930 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -610,8 +610,12 @@ object LocalLog extends Logging { /** a directory that is used for future partition */ private[log] val FutureDirSuffix = "-future" + /** a directory that is used for stray partition */ + private[log] val StrayDirSuffix = "-stray" + private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") + private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix") private[log] val UnknownOffset = -1L @@ -622,10 +626,17 @@ object LocalLog extends Logging { * from exceeding 255 characters. */ private[log] def logDeleteDirName(topicPartition: TopicPartition): String = { - val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") - val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix" - val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size) - s"${topicPartition.topic().substring(0, prefixLength)}$suffix" + logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix) + } + + /** + * Return a directory name to rename the log directory to for stray partition deletion. + * The name will be in the following format: "topic-partitionId.uniqueId-stray". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + private[log] def logStrayDirName(topicPartition: TopicPartition): String = { + logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix) } /** @@ -636,6 +647,18 @@ object LocalLog extends Logging { logDirNameWithSuffix(topicPartition, FutureDirSuffix) } + /** + * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = { + val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") + val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix" + val prefixLength = Math.min(topicPartition.topic().size, 255 - fullSuffix.size) + s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix" + } + private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = { val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") s"${logDirName(topicPartition)}.$uniqueId$suffix" @@ -666,11 +689,13 @@ object LocalLog extends Logging { if (dirName == null || dirName.isEmpty || !dirName.contains('-')) throw exception(dir) if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches || - dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches) + dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches || + dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches) throw exception(dir) val name: String = - if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.')) + if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix)) + dirName.substring(0, dirName.lastIndexOf('.')) else dirName val index = name.lastIndexOf('-') diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 89e1be0b60ba9..6aa91ad42f9f1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File], // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion. private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]() + // Map of stray partition to stray log. This holds all stray logs detected on the broker. + // Visible for testing + private val strayLogs = new Pool[TopicPartition, UnifiedLog]() + private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile private var _currentDefaultConfig = initialDefaultConfig @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir @@ -302,6 +306,10 @@ class LogManager(logDirs: Seq[File], this.logsToBeDeleted.add((log, time.milliseconds())) } + def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit = { + this.strayLogs.put(strayPartition, strayLog) + } + // Only for testing private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty @@ -337,6 +345,9 @@ class LogManager(logDirs: Seq[File], if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) { addLogToBeDeleted(log) + } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { + addStrayLog(topicPartition, log) + warn(s"Loaded stray log: $logDir") } else { val previous = { if (log.isFuture) @@ -1202,7 +1213,8 @@ class LogManager(logDirs: Seq[File], */ def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false, - checkpoint: Boolean = true): Option[UnifiedLog] = { + checkpoint: Boolean = true, + isStray: Boolean = false): Option[UnifiedLog] = { val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized { removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition) } @@ -1215,15 +1227,22 @@ class LogManager(logDirs: Seq[File], cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition)) } } - removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) + if (isStray) { + // Move aside stray partitions, don't delete them + removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false) + info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath}") + } else { + removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) + addLogToBeDeleted(removedLog) + + info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") + } if (checkpoint) { val logDir = removedLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } - addLogToBeDeleted(removedLog) - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") case None => if (offlineLogDirs.nonEmpty) { @@ -1243,6 +1262,7 @@ class LogManager(logDirs: Seq[File], * topic-partition is raised */ def asyncDelete(topicPartitions: Set[TopicPartition], + isStray: Boolean, errorHandler: (TopicPartition, Throwable) => Unit): Unit = { val logDirs = mutable.Set.empty[File] @@ -1250,11 +1270,11 @@ class LogManager(logDirs: Seq[File], try { getLog(topicPartition).foreach { log => logDirs += log.parentDirFile - asyncDelete(topicPartition, checkpoint = false) + asyncDelete(topicPartition, checkpoint = false, isStray = isStray) } getLog(topicPartition, isFuture = true).foreach { log => logDirs += log.parentDirFile - asyncDelete(topicPartition, isFuture = true, checkpoint = false) + asyncDelete(topicPartition, isFuture = true, checkpoint = false, isStray = isStray) } } catch { case e: Throwable => errorHandler(topicPartition, e) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 601d444d0e69c..87298f6ea729f 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1867,6 +1867,8 @@ object UnifiedLog extends Logging { val DeleteDirSuffix = LocalLog.DeleteDirSuffix + val StrayDirSuffix = LocalLog.StrayDirSuffix + val FutureDirSuffix = LocalLog.FutureDirSuffix private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern @@ -1951,6 +1953,8 @@ object UnifiedLog extends Logging { def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition) + def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition) + def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition) def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.offsetIndexFile(dir, offset, suffix) diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala index c573991b84e4b..71e2e41166a88 100644 --- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala +++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala @@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep import kafka.server.KafkaConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.utils.Time import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage} import org.apache.kafka.metadata.PartitionRegistration @@ -225,6 +226,7 @@ class MigrationPropagator( requestBatch.sendRequestsToBrokers(zkControllerEpoch) requestBatch.newBatch() + requestBatch.setUpdateType(LeaderAndIsrRequest.Type.Full) // When we need to send RPCs from the image, we're sending 'full' requests meaning we let // every broker know about all the metadata and all the LISR requests it needs to handle. // Note that we cannot send StopReplica requests from the image. We don't have any state diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index eb140eb31fdc1..86a6d771c6773 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -402,6 +402,47 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.removeMetrics(topic) } + private[server] def updateStrayLogs(strayPartitions: Set[TopicPartition]): Unit = { + if (strayPartitions.isEmpty) { + return + } + warn(s"Found stray partitions ${strayPartitions.mkString(",")}") + + // First, stop the partitions. This will shutdown the fetchers and other managers + val partitionsToStop = strayPartitions.map { tp => tp -> false }.toMap + stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) => + error(s"Unable to stop stray partition $topicPartition", exception) + } + + // Next, delete the in-memory partition state. Normally, stopPartitions would do this, but since we're not + // actually deleting the log, so we can't rely on the "deleteLocalLog" behavior in stopPartitions. + strayPartitions.foreach { topicPartition => + getPartition(topicPartition) match { + case hostedPartition: HostedPartition.Online => + if (allPartitions.remove(topicPartition, hostedPartition)) { + maybeRemoveTopicMetrics(topicPartition.topic) + hostedPartition.partition.delete() + } + case _ => + } + } + + // Mark the log as stray in-memory and rename the directory + strayPartitions.foreach { tp => + logManager.getLog(tp).foreach(logManager.addStrayLog(tp, _)) + logManager.getLog(tp, isFuture = true).foreach(logManager.addStrayLog(tp, _)) + } + logManager.asyncDelete(strayPartitions, isStray = true, (topicPartition, e) => { + error(s"Failed to delete stray partition $topicPartition due to " + + s"${e.getClass.getName} exception: ${e.getMessage}") + }) + } + + // Find logs which exist on the broker, but aren't present in the full LISR + private[server] def findStrayPartitionsFromLeaderAndIsr(partitionsFromRequest: Set[TopicPartition]): Set[TopicPartition] = { + logManager.allLogs.map(_.topicPartition).filterNot(partitionsFromRequest.contains).toSet + } + protected def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = { val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition) delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) @@ -591,7 +632,7 @@ class ReplicaManager(val config: KafkaConfig, val errorMap = new mutable.HashMap[TopicPartition, Throwable]() if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. - logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) + logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => errorMap.put(tp, e)) } remoteLogManager.foreach { rlm => // exclude the partitions with offline/error state @@ -1800,6 +1841,13 @@ class ReplicaManager(val config: KafkaConfig, // have been completely populated before starting the checkpointing there by avoiding weird race conditions startHighWatermarkCheckPointThread() + // In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller. + // LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the + // request came from a KRaft controller. + if (config.migrationEnabled && leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.Full) { + updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(partitions.map(_.topicPartition))) + } + maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest) replicaFetcherManager.shutdownIdleFetcherThreads() diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala index 70acc861c773c..eb9e3a1f5989d 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala @@ -48,9 +48,13 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.") } val allTopics = zkClient.getAllTopicsInCluster() - val topicDeletions = zkClient.getTopicDeletions - info(s"Found ${topicDeletions.size} pending topic deletions: ${topicDeletions}. These will be migrated to KRaft and then deleted.") - val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(allTopics) + val topicDeletions = readPendingTopicDeletions().asScala + val topicsToMigrated = allTopics -- topicDeletions + if (topicDeletions.nonEmpty) { + warn(s"Found ${topicDeletions.size} pending topic deletions: $topicDeletions. These will be not migrated " + + s"to KRaft. After the migration, the brokers will reconcile their logs with these pending topic deletions.") + } + val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated) replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) => val topicAssignment = partitionAssignments.map { case (partition, assignment) => partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava @@ -197,7 +201,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie override def deleteTopic( topicName: String, - removePendingDeletion: Boolean, state: ZkMigrationLeadershipState ): ZkMigrationLeadershipState = wrapZkException { // Delete the partition state ZNodes recursively, then topic config, and finally the topic znode @@ -207,14 +210,8 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie DeleteRequest(childPath, ZkVersion.MatchAnyVersion) } ++ Seq( DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion), - DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion), - ) ++ { - if (removePendingDeletion) { - Seq(DeleteRequest(DeleteTopicsTopicZNode.path(topicName), ZkVersion.MatchAnyVersion)) - } else { - Seq.empty - } - } + DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion) + ) val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state) val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap @@ -330,4 +327,21 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie override def readPendingTopicDeletions(): util.Set[String] = { zkClient.getTopicDeletions.toSet.asJava } + + override def clearPendingTopicDeletions( + pendingTopicDeletions: util.Set[String], + state: ZkMigrationLeadershipState + ): ZkMigrationLeadershipState = { + val deleteRequests = pendingTopicDeletions.asScala.map { topicName => + DeleteRequest(DeleteTopicsTopicZNode.path(topicName), ZkVersion.MatchAnyVersion) + }.toSeq + + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests.toSeq, state) + val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap + if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) { + state.withMigrationZkVersion(migrationZkVersion) + } else { + throw new MigrationClientException(s"Failed to delete pending topic deletions: $pendingTopicDeletions. ZK transaction had results $resultCodes") + } + } } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 5b99c5564dd5e..238d0bd5e8806 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -317,14 +317,38 @@ class ZkMigrationIntegrationTest { "Timed out waiting for migration to complete", 30000) - Thread.sleep(30000) - admin = zkCluster.createAdminClient() TestUtils.waitUntilTrue( () => admin.listTopics().names().get(30, TimeUnit.SECONDS).isEmpty, "Timed out waiting for topics to be deleted", 300000) - log.info("Topics after migration {}", admin.listTopics().names().get(30, TimeUnit.SECONDS)) + + val newTopics = new util.ArrayList[NewTopic]() + newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort)) + newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort)) + newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get(60, TimeUnit.SECONDS) + + TestUtils.waitUntilTrue( + () => admin.listTopics().names().get(30, TimeUnit.SECONDS).size() == 3, + "Timed out waiting for topics to be created", + 300000) + + val topicDescriptions = admin.describeTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJavaCollection) + .topicNameValues().asScala.map { case (name, description) => + name -> description.get(60, TimeUnit.SECONDS) + }.toMap + + assertEquals(2, topicDescriptions("test-topic-1").partitions().size()) + assertEquals(1, topicDescriptions("test-topic-2").partitions().size()) + assertEquals(10, topicDescriptions("test-topic-3").partitions().size()) + topicDescriptions.foreach { case (topic, description) => + description.partitions().forEach(partition => { + assertEquals(3, partition.replicas().size(), s"Unexpected number of replicas for ${topic}-${partition.partition()}") + assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}") + }) + } admin.close() } finally { shutdownInSequence(zkCluster, kraftCluster) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0921a747bae21..c1e04c01419dc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -561,7 +561,9 @@ class ReplicaManagerTest { .setIsNew(true)).asJava, Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, - false).build() + false, + LeaderAndIsrRequest.Type.None + ).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) .localLogOrException @@ -2543,6 +2545,55 @@ class ReplicaManagerTest { } } + @Test + def testUpdateStrayLogs(): Unit = { + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time) + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = time.scheduler, + logManager = logManager, + quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""), + metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName)) + + logManager.startup(Set.empty[String]) + + // Create a hosted topic, a hosted topic that will become stray, and a stray topic + val validLogs = createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet + createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet + createStrayLogs(10, logManager) + + val allReplicasFromLISR = Set(new TopicPartition("hosted-topic", 0), new TopicPartition("hosted-topic", 1)) + + replicaManager.updateStrayLogs(replicaManager.findStrayPartitionsFromLeaderAndIsr(allReplicasFromLISR)) + + assertEquals(validLogs, logManager.allLogs.toSet) + assertEquals(validLogs.size, replicaManager.partitionCount.value) + + replicaManager.shutdown() + logManager.shutdown() + } + + private def createHostedLogs(name: String, numLogs: Int, replicaManager: ReplicaManager): Seq[UnifiedLog] = { + for (i <- 0 until numLogs) yield { + val topicPartition = new TopicPartition(name, i) + val partition = replicaManager.createPartition(topicPartition) + partition.createLogIfNotExists(isNew = true, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), topicId = None) + partition.log.get + } + } + + private def createStrayLogs(numLogs: Int, logManager: LogManager): Seq[UnifiedLog] = { + val name = "stray" + for (i <- 0 until numLogs) + yield logManager.getOrCreateLog(new TopicPartition(name, i), topicId = None) + } + private def sendProducerAppend( replicaManager: ReplicaManager, topicPartition: TopicPartition, diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala index ce6a3e6d1fd40..773c42a66e4a6 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala @@ -97,7 +97,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness { assertEquals(List(3), partition1.isr) // Delete whole topic - migrationState = migrationClient.topicClient().deleteTopic("test", false, migrationState) + migrationState = migrationClient.topicClient().deleteTopic("test", migrationState) assertEquals(2, migrationState.migrationZkVersion()) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index c27e64eab83e3..5b437cec754ac 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1031,22 +1031,6 @@ public void abortMigration() { // next controller abort the migration transaction (if in use). fatalFaultHandler.handleFault("Aborting the ZK migration"); } - - @Override - public CompletableFuture deleteTopic(String topicName) { - Uuid topicId = QuorumController.this.replicationControl().getTopicId(topicName); - if (topicId == null) { - return CompletableFuture.completedFuture(null); - } - ControllerWriteEvent deleteEvent = new ControllerWriteEvent<>( - "Delete Topic " + topicName, - new MigrationWriteOperation(Collections.singletonList( - new ApiMessageAndVersion( - new RemoveTopicRecord().setTopicId(topicId), (short) 0)) - ), EnumSet.noneOf(ControllerOperationFlag.class)); - queue.append(deleteEvent); - return deleteEvent.future; - } } class QuorumMetaLogListener implements RaftClient.Listener { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index ec97d8f83ec94..aa60390cc06d4 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -722,21 +722,6 @@ public void run() throws Exception { log.info("Sending RPCs to broker before moving to dual-write mode using " + "at offset and epoch {}", image.highestOffsetAndEpoch()); propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); - - // After full metadata update, delete the pending topic deletions from KRaft. This will cause the - // next round of RPCs sent to brokers to include StopReplicas - zkMigrationClient.topicClient().readPendingTopicDeletions().forEach(topicToDelete -> { - try { - log.info("Deleting topic {} which was marked for deletion prior to the migration", topicToDelete); - CompletableFuture future = zkRecordConsumer.deleteTopic(topicToDelete); - FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", - "the controller to delete topic " + topicToDelete, - future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); - } catch (Throwable t) { - log.error("Failed to delete topic " + topicToDelete, t); - } - }); - // Migration leadership state doesn't change since we're not doing any Zk writes. transitionTo(MigrationDriverState.DUAL_WRITE); } else { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 90543cc3955d7..6c82c9cb9ccf9 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -69,6 +69,7 @@ public class KRaftMigrationZkWriter { private static final String CREATE_TOPIC = "CreateTopic"; private static final String UPDATE_TOPIC = "UpdateTopic"; private static final String DELETE_TOPIC = "DeleteTopic"; + private static final String DELETE_PENDING_TOPIC_DELETION = "DeletePendingTopicDeletion"; private static final String UPDATE_PARTITION = "UpdatePartition"; private static final String DELETE_PARTITION = "DeletePartition"; private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig"; @@ -145,7 +146,15 @@ void handleTopicsSnapshot(TopicsImage topicsImage, KRaftMigrationOperationConsum Map> extraneousPartitionsInZk = new HashMap<>(); Map> changedPartitions = new HashMap<>(); Map> newPartitions = new HashMap<>(); + Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); + if (!pendingTopicDeletions.isEmpty()) { + operationConsumer.accept( + DELETE_PENDING_TOPIC_DELETION, + "Delete pending topic deletions", + migrationState -> migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, migrationState) + ); + } migrationClient.topicClient().iterateTopics( EnumSet.of( @@ -235,7 +244,7 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat operationConsumer.accept( DELETE_TOPIC, "Delete Topic " + topicName + ", ID " + topicId, - migrationState -> migrationClient.topicClient().deleteTopic(topicName, pendingTopicDeletions.contains(topicName), migrationState) + migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState) ); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); operationConsumer.accept( @@ -281,11 +290,10 @@ void handleTopicsDelta( TopicsDelta topicsDelta, KRaftMigrationOperationConsumer operationConsumer ) { - Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); topicsDelta.deletedTopicIds().forEach(topicId -> { String name = deletedTopicNameResolver.apply(topicId); operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + ", ID " + topicId, - migrationState -> migrationClient.topicClient().deleteTopic(name, pendingTopicDeletions.contains(name), migrationState)); + migrationState -> migrationClient.topicClient().deleteTopic(name, migrationState)); }); topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java index 15edc41a0e46d..5eafd72b29854 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java @@ -44,9 +44,13 @@ default void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistra Set readPendingTopicDeletions(); + ZkMigrationLeadershipState clearPendingTopicDeletions( + Set pendingTopicDeletions, + ZkMigrationLeadershipState state + ); + ZkMigrationLeadershipState deleteTopic( String topicName, - boolean removePendingDeletion, ZkMigrationLeadershipState state ); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java index 6b9dbfd4859e1..910a58bec796a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java @@ -27,6 +27,4 @@ public interface ZkRecordConsumer { CompletableFuture acceptBatch(List recordBatch); CompletableFuture completeMigration(); void abortMigration(); - - CompletableFuture deleteTopic(String topicName); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java index cd32812486809..8b8e5acc5f35c 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java @@ -56,10 +56,17 @@ public Set readPendingTopicDeletions() { return Collections.emptySet(); } + @Override + public ZkMigrationLeadershipState clearPendingTopicDeletions( + Set pendingTopicDeletions, + ZkMigrationLeadershipState state + ) { + return state; + } + @Override public ZkMigrationLeadershipState deleteTopic( String topicName, - boolean removePendingDeletion, ZkMigrationLeadershipState state ) { deletedTopics.add(topicName); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 5e48c5f3f015e..1bca3e2f2b2e8 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -143,11 +143,6 @@ public CompletableFuture completeMigration() { public void abortMigration() { } - - @Override - public CompletableFuture deleteTopic(String topicName) { - return CompletableFuture.completedFuture(null); - } } static class CountingMetadataPropagator implements LegacyPropagator { From f399c3b8c8d187083754d58c7ea90f37e606ff48 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 19 Oct 2023 13:18:24 -0400 Subject: [PATCH 03/12] WIP --- .../src/main/scala/kafka/log/LogManager.scala | 3 +- .../migration/KRaftMigrationDriver.java | 37 +++++++++--- .../migration/KRaftMigrationDriverTest.java | 57 ++++++++++++++++++- 3 files changed, 87 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 6aa91ad42f9f1..929cdc6defaf8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1230,11 +1230,10 @@ class LogManager(logDirs: Seq[File], if (isStray) { // Move aside stray partitions, don't delete them removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false) - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath}") + warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}") } else { removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) addLogToBeDeleted(removedLog) - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } if (checkpoint) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index aa60390cc06d4..233e267d3456b 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -308,7 +308,8 @@ private boolean checkDriverState(MigrationDriverState expectedState) { } } - private void transitionTo(MigrationDriverState newState) { + // Visible for testing + void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { throw new IllegalStateException( String.format("Invalid transition in migration driver from %s to %s", migrationState, newState)); @@ -498,6 +499,17 @@ public void run() throws Exception { return; } + // Until the metadata has been migrated, the migrationLeadershipState offset is -1. We need to ignore + // metadata images until we see that the migration has happened and the image exceeds the offset of the + // migration + if (!migrationLeadershipState.initialZkMigrationComplete()) { + log.info("Ignoring {} {} since the migration has not finished.", metadataType, provenance); + completionHandler.accept(null); + return; + } + + // If the migration has finished, the migrationLeadershipState offset will be positive. Ignore any images + // which are older than the offset that has been written to ZK. if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) { log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance); completionHandler.accept(null); @@ -532,13 +544,17 @@ public void run() throws Exception { applyMigrationOperation("Updating ZK migration state after " + metadataType, state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite)); - // TODO: Unhappy path: Probably relinquish leadership and let new controller - // retry the write? - if (delta.topicsDelta() != null || delta.clusterDelta() != null) { - log.trace("Sending RPCs to brokers for metadata {}.", metadataType); - propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); + if (isSnapshot) { + // When we load a snapshot, need to send full metadata updates to the brokers + propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); } else { - log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType); + // delta + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + log.trace("Sending RPCs to brokers for metadata {}.", metadataType); + propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); + } else { + log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType); + } } completionHandler.accept(null); @@ -699,6 +715,13 @@ class SyncKRaftMetadataEvent extends MigrationEvent { @Override public void run() throws Exception { if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) { + // The migration offset will be non-negative at this point, so we just need to check that the image + // we have actually includes the migration metadata. + if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) { + log.info("Ignoring image {} which does not contain a superset of the metadata in ZK. Staying in " + + "SYNC_KRAFT_TO_ZK until a newer image is loaded", image.provenance()); + return; + } log.info("Performing a full metadata sync from KRaft to ZK."); Map dualWriteCounts = new TreeMap<>(); long startTime = time.nanoseconds(); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 1bca3e2f2b2e8..eda9dc38ffa01 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -615,7 +615,7 @@ public void testTopicDualWriteDelta() throws Exception { // Wait for migration TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz provenance = new MetadataProvenance(200, 1, 1); @@ -634,6 +634,61 @@ public void testTopicDualWriteDelta() throws Exception { }); } + @Test + public void testNoDualWriteBeforeMigration() throws Exception { + setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { + MetadataImage image = new MetadataImage( + MetadataProvenance.EMPTY, + FeaturesImage.EMPTY, + ClusterImage.EMPTY, + IMAGE1, + ConfigurationsImage.EMPTY, + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY, + AclsImage.EMPTY, + ScramImage.EMPTY, + DelegationTokenImage.EMPTY); + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + setupDeltaForMigration(delta, true); + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); + delta.replay(zkBrokerRecord(0)); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + delta.replay(zkBrokerRecord(4)); + delta.replay(zkBrokerRecord(5)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + // Publish a delta with this node (3000) as the leader + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); + driver.onControllerChange(newLeader); + + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + + driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build()); + + driver.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); + driver.transitionTo(MigrationDriverState.BECOME_CONTROLLER); + driver.transitionTo(MigrationDriverState.ZK_MIGRATION); + driver.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); + + provenance = new MetadataProvenance(200, 1, 1); + delta = new MetadataDelta(image); + RecordTestUtils.replayAll(delta, DELTA1_RECORDS); + image = delta.apply(provenance); + driver.onMetadataUpdate(delta, image, new SnapshotManifest(provenance, 100)); + + + // Wait for migration + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + }); + } + @Test public void testControllerFailover() throws Exception { setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { From 3918116b30c26b5f9f8b7f368cac2366dcd58490 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 19 Oct 2023 20:43:54 -0400 Subject: [PATCH 04/12] PR feedback, more tests --- .../common/requests/LeaderAndIsrRequest.java | 12 +-- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../kafka/zk/ZkMigrationIntegrationTest.scala | 18 ++++- .../kafka/server/ReplicaManagerTest.scala | 76 +++++++++++++++++++ 4 files changed, 103 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 3c81c7e802f5c..b8fcc6270369c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -43,16 +43,16 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { public enum Type { - None(0), + Unknown(0), Incremental(1), Full(2); private final byte type; - Type(int type) { + private Type(int type) { this.type = (byte) type; } - public byte asByte() { + public byte toByte() { return type; } @@ -62,7 +62,7 @@ public static Type fromByte(byte type) { return t; } } - throw new IllegalArgumentException("No Type enum for value " + type); + return Unknown; } } @@ -77,7 +77,7 @@ public Builder(short version, int controllerId, int controllerEpoch, long broker List partitionStates, Map topicIds, Collection liveLeaders) { this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds, - liveLeaders, false, Type.None); + liveLeaders, false, Type.Unknown); } public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, @@ -109,7 +109,7 @@ public LeaderAndIsrRequest build(short version) { } if (version >= 5) { - data.setType(updateType.asByte()); + data.setType(updateType.toByte()); } if (version >= 2) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 86a6d771c6773..1c5611bd5e3a8 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1844,7 +1844,11 @@ class ReplicaManager(val config: KafkaConfig, // In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller. // LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the // request came from a KRaft controller. - if (config.migrationEnabled && leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.Full) { + if ( + config.migrationEnabled && + leaderAndIsrRequest.isKRaftController && + leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.Full + ) { updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(partitions.map(_.topicPartition))) } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 238d0bd5e8806..ac6e92f335af3 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -47,7 +47,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.{Assumptions, Timeout} import org.junit.jupiter.api.extension.ExtendWith import org.slf4j.LoggerFactory @@ -273,6 +273,8 @@ class ZkMigrationIntegrationTest { newTopics.add(new NewTopic("test-topic-1", 100, 3.toShort)) newTopics.add(new NewTopic("test-topic-2", 100, 3.toShort)) newTopics.add(new NewTopic("test-topic-3", 100, 3.toShort)) + newTopics.add(new NewTopic("test-topic-4", 100, 3.toShort)) + newTopics.add(new NewTopic("test-topic-5", 100, 3.toShort)) val createTopicResult = admin.createTopics(newTopics) createTopicResult.all().get(300, TimeUnit.SECONDS) admin.close() @@ -296,7 +298,7 @@ class ZkMigrationIntegrationTest { // Start a delete, but don't wait for it admin = zkCluster.createAdminClient() - admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJava) + admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", "test-topic-4", "test-topic-5").asJava) admin.close() // Enable migration configs and restart brokers @@ -310,6 +312,11 @@ class ZkMigrationIntegrationTest { zkCluster.waitForReadyBrokers() readyFuture.get(30, TimeUnit.SECONDS) + val topicDeletions = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions + // This will mark the test as "skipped" instead of failed if the topics all get deleted in time. + Assumptions.assumeTrue(topicDeletions.nonEmpty, + "This test needs pending topic deletions after a migration in order to verify the behavior") + // Wait for migration to begin log.info("Waiting for ZK migration to complete") TestUtils.waitUntilTrue( @@ -349,6 +356,13 @@ class ZkMigrationIntegrationTest { assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}") }) } + + val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala + assertTrue(absentTopics.contains("test-topic-1")) + assertTrue(absentTopics.contains("test-topic-2")) + assertTrue(absentTopics.contains("test-topic-3")) + assertFalse(absentTopics.contains("test-topic-4")) + assertFalse(absentTopics.contains("test-topic-5")) admin.close() } finally { shutdownInSequence(zkCluster, kraftCluster) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index c1e04c01419dc..445774e7b1e94 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -68,6 +68,7 @@ import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import com.yammer.metrics.core.Gauge import kafka.log.remote.RemoteLogManager import org.apache.kafka.common.config.AbstractConfig +import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.util.timer.MockTimer import org.mockito.invocation.InvocationOnMock @@ -2545,6 +2546,81 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = { + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) + if (zkMigrationEnabled) { + props.put(KafkaConfig.MigrationEnabledProp, zkMigrationEnabled) + props.put(RaftConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071") + props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + config = KafkaConfig.fromProps(props) + } + + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time) + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = time.scheduler, + logManager = logManager, + quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""), + metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName)) + + logManager.startup(Set.empty[String]) + + // Create a hosted topic, a hosted topic that will become stray + createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet + createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet + + val lisr = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, + 3000, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName("hosted-topic") + .setPartitionIndex(0) + .setControllerEpoch(controllerEpoch) + .setLeader(0) + .setLeaderEpoch(10) + .setIsr(Seq[Integer](0, 1).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0, 1).asJava) + .setIsNew(false), + new LeaderAndIsrPartitionState() + .setTopicName("hosted-topic") + .setPartitionIndex(1) + .setControllerEpoch(controllerEpoch) + .setLeader(1) + .setLeaderEpoch(10) + .setIsr(Seq[Integer](1, 0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](1, 0).asJava) + .setIsNew(false) + ).asJava, + topicIds.asJava, + Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava, + true, + LeaderAndIsrRequest.Type.Full + ).build() + + replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ()) + + val ht0 = replicaManager.getPartition(new TopicPartition("hosted-topic", 0)) + assertTrue(ht0.isInstanceOf[HostedPartition.Online]) + + val stray0 = replicaManager.getPartition(new TopicPartition("hosted-stray", 0)) + + if (zkMigrationEnabled) { + assertEquals(HostedPartition.None, stray0) + } else { + assertTrue(stray0.isInstanceOf[HostedPartition.Online]) + } + } + @Test def testUpdateStrayLogs(): Unit = { val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time) From 49418aa2beced803838dca175d3b7a9f10fe6d6f Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 20 Oct 2023 10:37:43 -0400 Subject: [PATCH 05/12] fixup --- .../scala/kafka/controller/ControllerChannelManager.scala | 2 +- .../scala/kafka/zk/migration/ZkTopicMigrationClient.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index cf93bf37d9c44..90b037283ef0f 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] - private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.None + private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.Unknown private var metadataInstance: ControllerChannelContext = _ def sendRequest(brokerId: Int, diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala index eb9e3a1f5989d..18b3cdc94c0b4 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala @@ -51,9 +51,14 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie val topicDeletions = readPendingTopicDeletions().asScala val topicsToMigrated = allTopics -- topicDeletions if (topicDeletions.nonEmpty) { - warn(s"Found ${topicDeletions.size} pending topic deletions: $topicDeletions. These will be not migrated " + + warn(s"Found ${topicDeletions.size} pending topic deletions. These will be not migrated " + s"to KRaft. After the migration, the brokers will reconcile their logs with these pending topic deletions.") } + logger.whenTraceEnabled { + topicDeletions.foreach { + deletion => logger.trace(s"Not migrating pending deleted topic: $deletion") + } + } val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated) replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) => val topicAssignment = partitionAssignments.map { case (partition, assignment) => From f6f44e4f35db106fb2ca0bbfbc47e1b3fe250875 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 20 Oct 2023 11:05:32 -0400 Subject: [PATCH 06/12] fixup --- core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 809bdfc187c6e..26a03f4897eed 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -563,7 +563,7 @@ class ReplicaManagerTest { Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, false, - LeaderAndIsrRequest.Type.None + LeaderAndIsrRequest.Type.Unknown ).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) From de6898cf14d3e14442de1bc74305ec0895a4311d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 23 Oct 2023 08:30:02 -0400 Subject: [PATCH 07/12] decrease number of partitions in test --- .../kafka/zk/ZkMigrationIntegrationTest.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index ac6e92f335af3..6fea6f054aeb8 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -270,11 +270,11 @@ class ZkMigrationIntegrationTest { // Create a topic in ZK mode var admin = zkCluster.createAdminClient() val newTopics = new util.ArrayList[NewTopic]() - newTopics.add(new NewTopic("test-topic-1", 100, 3.toShort)) - newTopics.add(new NewTopic("test-topic-2", 100, 3.toShort)) - newTopics.add(new NewTopic("test-topic-3", 100, 3.toShort)) - newTopics.add(new NewTopic("test-topic-4", 100, 3.toShort)) - newTopics.add(new NewTopic("test-topic-5", 100, 3.toShort)) + newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort)) val createTopicResult = admin.createTopics(newTopics) createTopicResult.all().get(300, TimeUnit.SECONDS) admin.close() From 5c545e6086e4c05e38354c5826367e7b8a9f2ae6 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 23 Oct 2023 08:33:56 -0400 Subject: [PATCH 08/12] increase some timeouts --- .../kafka/zk/ZkMigrationIntegrationTest.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 6fea6f054aeb8..687b0b35da9a7 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -267,7 +267,7 @@ class ZkMigrationIntegrationTest { @ClusterTemplate("zkClustersForAllMigrationVersions") def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = { - // Create a topic in ZK mode + // Create some topics in ZK mode var admin = zkCluster.createAdminClient() val newTopics = new util.ArrayList[NewTopic]() newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort)) @@ -296,7 +296,7 @@ class ZkMigrationIntegrationTest { kraftCluster.startup() val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) - // Start a delete, but don't wait for it + // Start a deletion that will take some time, but don't wait for it admin = zkCluster.createAdminClient() admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", "test-topic-4", "test-topic-5").asJava) admin.close() @@ -310,7 +310,7 @@ class ZkMigrationIntegrationTest { zkCluster.rollingBrokerRestart() zkCluster.waitForReadyBrokers() - readyFuture.get(30, TimeUnit.SECONDS) + readyFuture.get(60, TimeUnit.SECONDS) val topicDeletions = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions // This will mark the test as "skipped" instead of failed if the topics all get deleted in time. @@ -326,7 +326,7 @@ class ZkMigrationIntegrationTest { admin = zkCluster.createAdminClient() TestUtils.waitUntilTrue( - () => admin.listTopics().names().get(30, TimeUnit.SECONDS).isEmpty, + () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty, "Timed out waiting for topics to be deleted", 300000) @@ -338,7 +338,7 @@ class ZkMigrationIntegrationTest { createTopicResult.all().get(60, TimeUnit.SECONDS) TestUtils.waitUntilTrue( - () => admin.listTopics().names().get(30, TimeUnit.SECONDS).size() == 3, + () => admin.listTopics().names().get(60, TimeUnit.SECONDS).size() == 3, "Timed out waiting for topics to be created", 300000) From 07f59f8fd5e1884609ece751f5f156fe7b7decf9 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 23 Oct 2023 12:09:52 -0400 Subject: [PATCH 09/12] add retry to test --- .../kafka/zk/ZkMigrationIntegrationTest.scala | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 687b0b35da9a7..25ae532752a93 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -312,8 +312,9 @@ class ZkMigrationIntegrationTest { zkCluster.waitForReadyBrokers() readyFuture.get(60, TimeUnit.SECONDS) + // Only continue with the test if there are some pending deletions to verify. If there are not any pending + // deletions, this will mark the test as "skipped" instead of failed. val topicDeletions = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions - // This will mark the test as "skipped" instead of failed if the topics all get deleted in time. Assumptions.assumeTrue(topicDeletions.nonEmpty, "This test needs pending topic deletions after a migration in order to verify the behavior") @@ -324,6 +325,8 @@ class ZkMigrationIntegrationTest { "Timed out waiting for migration to complete", 30000) + // At this point, some of the topics may have been deleted by ZK controller and the rest will be + // implicitly deleted by the KRaft controller and remove from the ZK brokers as stray partitions admin = zkCluster.createAdminClient() TestUtils.waitUntilTrue( () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty, @@ -337,32 +340,37 @@ class ZkMigrationIntegrationTest { val createTopicResult = admin.createTopics(newTopics) createTopicResult.all().get(60, TimeUnit.SECONDS) + val expectedNewTopics = Seq("test-topic-1", "test-topic-2", "test-topic-3") TestUtils.waitUntilTrue( - () => admin.listTopics().names().get(60, TimeUnit.SECONDS).size() == 3, + () => admin.listTopics().names().get(60, TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava), "Timed out waiting for topics to be created", 300000) - val topicDescriptions = admin.describeTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJavaCollection) - .topicNameValues().asScala.map { case (name, description) => - name -> description.get(60, TimeUnit.SECONDS) - }.toMap - - assertEquals(2, topicDescriptions("test-topic-1").partitions().size()) - assertEquals(1, topicDescriptions("test-topic-2").partitions().size()) - assertEquals(10, topicDescriptions("test-topic-3").partitions().size()) - topicDescriptions.foreach { case (topic, description) => - description.partitions().forEach(partition => { - assertEquals(3, partition.replicas().size(), s"Unexpected number of replicas for ${topic}-${partition.partition()}") - assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}") - }) + TestUtils.retry(300000) { + // Need a retry here since topic metadata may be inconsistent between brokers + val topicDescriptions = admin.describeTopics(expectedNewTopics.asJavaCollection) + .topicNameValues().asScala.map { case (name, description) => + name -> description.get(60, TimeUnit.SECONDS) + }.toMap + + assertEquals(2, topicDescriptions("test-topic-1").partitions().size()) + assertEquals(1, topicDescriptions("test-topic-2").partitions().size()) + assertEquals(10, topicDescriptions("test-topic-3").partitions().size()) + topicDescriptions.foreach { case (topic, description) => + description.partitions().forEach(partition => { + assertEquals(3, partition.replicas().size(), s"Unexpected number of replicas for ${topic}-${partition.partition()}") + assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}") + }) + } + + val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala + assertTrue(absentTopics.contains("test-topic-1")) + assertTrue(absentTopics.contains("test-topic-2")) + assertTrue(absentTopics.contains("test-topic-3")) + assertFalse(absentTopics.contains("test-topic-4")) + assertFalse(absentTopics.contains("test-topic-5")) } - val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala - assertTrue(absentTopics.contains("test-topic-1")) - assertTrue(absentTopics.contains("test-topic-2")) - assertTrue(absentTopics.contains("test-topic-3")) - assertFalse(absentTopics.contains("test-topic-4")) - assertFalse(absentTopics.contains("test-topic-5")) admin.close() } finally { shutdownInSequence(zkCluster, kraftCluster) From 3bf0a5a31271d310f887b345fadbcd96c2e9fc19 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 23 Oct 2023 17:29:45 -0400 Subject: [PATCH 10/12] fix a few things --- .../scala/kafka/controller/ControllerChannelManager.scala | 2 ++ core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 90b037283ef0f..ad823516b6767 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -409,6 +409,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, updateMetadataRequestBrokerSet.clear() updateMetadataRequestPartitionInfoMap.clear() metadataInstance = null + updateType = LeaderAndIsrRequest.Type.Unknown } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], @@ -566,6 +567,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, } } leaderAndIsrRequestMap.clear() + updateType = LeaderAndIsrRequest.Type.Unknown } def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1c5611bd5e3a8..76b6045e255d7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1729,10 +1729,12 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]() val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]() val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]() + val allTopicPartitionsInRequest = new mutable.HashSet[TopicPartition]() // First create the partition if it doesn't exist already requestPartitionStates.foreach { partitionState => val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) + allTopicPartitionsInRequest += topicPartition val partitionOpt = getPartition(topicPartition) match { case HostedPartition.Offline => stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + @@ -1849,7 +1851,7 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.isKRaftController && leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.Full ) { - updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(partitions.map(_.topicPartition))) + updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest)) } maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest) From 85809fcbe06b2d27b2b8945f7a4f4e193034973a Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 26 Oct 2023 08:29:19 -0400 Subject: [PATCH 11/12] PR feedback --- .../kafka/common/requests/LeaderAndIsrRequest.java | 10 +++++----- .../kafka/controller/ControllerChannelManager.scala | 6 +++--- .../scala/kafka/migration/MigrationPropagator.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/zk/migration/ZkTopicMigrationClient.scala | 6 ++---- .../scala/unit/kafka/server/ReplicaManagerTest.scala | 4 ++-- 6 files changed, 14 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index b8fcc6270369c..dbd59b5b69252 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -43,9 +43,9 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { public enum Type { - Unknown(0), - Incremental(1), - Full(2); + UNKNOWN(0), + INCREMENTAL(1), + FULL(2); private final byte type; private Type(int type) { @@ -62,7 +62,7 @@ public static Type fromByte(byte type) { return t; } } - return Unknown; + return UNKNOWN; } } @@ -77,7 +77,7 @@ public Builder(short version, int controllerId, int controllerEpoch, long broker List partitionStates, Map topicIds, Collection liveLeaders) { this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds, - liveLeaders, false, Type.Unknown); + liveLeaders, false, Type.UNKNOWN); } public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ad823516b6767..33a335dcc2f60 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] - private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.Unknown + private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN private var metadataInstance: ControllerChannelContext = _ def sendRequest(brokerId: Int, @@ -409,7 +409,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, updateMetadataRequestBrokerSet.clear() updateMetadataRequestPartitionInfoMap.clear() metadataInstance = null - updateType = LeaderAndIsrRequest.Type.Unknown + updateType = LeaderAndIsrRequest.Type.UNKNOWN } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], @@ -567,7 +567,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, } } leaderAndIsrRequestMap.clear() - updateType = LeaderAndIsrRequest.Type.Unknown + updateType = LeaderAndIsrRequest.Type.UNKNOWN } def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala index 71e2e41166a88..1a18ca42fcb0c 100644 --- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala +++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala @@ -226,7 +226,7 @@ class MigrationPropagator( requestBatch.sendRequestsToBrokers(zkControllerEpoch) requestBatch.newBatch() - requestBatch.setUpdateType(LeaderAndIsrRequest.Type.Full) + requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL) // When we need to send RPCs from the image, we're sending 'full' requests meaning we let // every broker know about all the metadata and all the LISR requests it needs to handle. // Note that we cannot send StopReplica requests from the image. We don't have any state diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 76b6045e255d7..8ac86bd675016 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1849,7 +1849,7 @@ class ReplicaManager(val config: KafkaConfig, if ( config.migrationEnabled && leaderAndIsrRequest.isKRaftController && - leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.Full + leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL ) { updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest)) } diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala index 18b3cdc94c0b4..dd042ff96a767 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala @@ -54,10 +54,8 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie warn(s"Found ${topicDeletions.size} pending topic deletions. These will be not migrated " + s"to KRaft. After the migration, the brokers will reconcile their logs with these pending topic deletions.") } - logger.whenTraceEnabled { - topicDeletions.foreach { - deletion => logger.trace(s"Not migrating pending deleted topic: $deletion") - } + topicDeletions.foreach { + deletion => logger.info(s"Not migrating pending deleted topic: $deletion") } val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated) replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 26a03f4897eed..c90d3cc869f47 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -563,7 +563,7 @@ class ReplicaManagerTest { Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, false, - LeaderAndIsrRequest.Type.Unknown + LeaderAndIsrRequest.Type.UNKNOWN ).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) @@ -2604,7 +2604,7 @@ class ReplicaManagerTest { topicIds.asJava, Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava, true, - LeaderAndIsrRequest.Type.Full + LeaderAndIsrRequest.Type.FULL ).build() replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ()) From d47c33ad42702a6869b2f21544c9e17914f158d8 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 26 Oct 2023 08:44:50 -0400 Subject: [PATCH 12/12] add another DEBUG log for full metadata --- .../apache/kafka/metadata/migration/KRaftMigrationDriver.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 233e267d3456b..879f7de40bd38 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -546,11 +546,12 @@ public void run() throws Exception { if (isSnapshot) { // When we load a snapshot, need to send full metadata updates to the brokers + log.debug("Sending full metadata RPCs to brokers for snapshot."); propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); } else { // delta if (delta.topicsDelta() != null || delta.clusterDelta() != null) { - log.trace("Sending RPCs to brokers for metadata {}.", metadataType); + log.trace("Sending incremental metadata RPCs to brokers for delta."); propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); } else { log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);