From 320146c1dc8a521704d24e3e746cb36162abb6d5 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 16 May 2024 15:18:00 +0800 Subject: [PATCH] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestExtensions --- checkstyle/import-control-storage.xml | 1 + .../test/java/kafka/test/ClusterInstance.java | 77 ++++++++--- .../junit/RaftClusterInvocationContext.java | 75 +++-------- .../junit/ZkClusterInvocationContext.java | 65 +++------ .../AllocateProducerIdsRequestTest.scala | 8 +- .../BrokerRegistrationRequestTest.scala | 4 +- .../ConsumerGroupHeartbeatRequestTest.scala | 16 +-- .../GroupCoordinatorBaseRequestTest.scala | 18 +-- .../RemoteLogMetadataManagerTestUtils.java | 124 ++++++++++++++++++ ...cBasedRemoteLogMetadataManagerHarness.java | 83 +----------- ...opicBasedRemoteLogMetadataManagerTest.java | 123 ++++++++--------- 11 files changed, 292 insertions(+), 302 deletions(-) create mode 100644 storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 97294dd7c7e7d..39c45342ea8ad 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -53,6 +53,7 @@ + diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index e7ef9822e8f9a..8cb5ae3d2152b 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -18,12 +18,15 @@ package kafka.test; import kafka.network.SocketServer; -import kafka.server.BrokerFeatures; +import kafka.server.BrokerServer; +import kafka.server.ControllerServer; +import kafka.server.KafkaBroker; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.Type; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.test.TestUtils; import java.util.Collection; import java.util.Collections; @@ -32,6 +35,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -46,6 +50,10 @@ default boolean isKRaftTest() { return type() == Type.KRAFT || type() == Type.CO_KRAFT; } + Map brokers(); + + Map controllers(); + /** * The immutable cluster configuration used to create this cluster. */ @@ -61,7 +69,9 @@ default boolean isKRaftTest() { /** * Return the set of all broker IDs configured for this test. */ - Set brokerIds(); + default Set brokerIds() { + return brokers().keySet(); + } /** * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If @@ -97,7 +107,11 @@ default Optional controlPlaneListenerName() { * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is * acting as the controller (since ZK controllers serve both broker and controller roles). */ - Collection brokerSocketServers(); + default Collection brokerSocketServers() { + return brokers().values().stream() + .map(KafkaBroker::socketServer) + .collect(Collectors.toList()); + } /** * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also @@ -108,17 +122,20 @@ default Optional controlPlaneListenerName() { /** * Return any one of the broker servers. Throw an error if none are found */ - SocketServer anyBrokerSocketServer(); + default SocketServer anyBrokerSocketServer() { + return brokerSocketServers().stream() + .findFirst() + .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); + } /** * Return any one of the controller servers. Throw an error if none are found */ - SocketServer anyControllerSocketServer(); - - /** - * Return a mapping of the underlying broker IDs to their supported features - */ - Map brokerFeatures(); + default SocketServer anyControllerSocketServer() { + return controllerSocketServers().stream() + .findFirst() + .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); + } String clusterId(); @@ -137,16 +154,6 @@ default Admin createAdminClient() { return createAdminClient(new Properties()); } - void start(); - - void stop(); - - void shutdownBroker(int brokerId); - - void startBroker(int brokerId); - - void waitForReadyBrokers() throws InterruptedException; - default Set supportedGroupProtocols() { Map serverProperties = config().serverProperties(); Set supportedGroupProtocols = new HashSet<>(); @@ -160,4 +167,34 @@ default Set supportedGroupProtocols() { return Collections.unmodifiableSet(supportedGroupProtocols); } + + //---------------------------[modify]---------------------------// + + void start(); + + void stop(); + + void shutdownBroker(int brokerId); + + void startBroker(int brokerId); + + //---------------------------[wait]---------------------------// + + void waitForReadyBrokers() throws InterruptedException; + + default void waitForTopic(String topic, int partitions) throws InterruptedException { + // wait for metadata + TestUtils.waitForCondition( + () -> brokers().values().stream().allMatch(broker -> partitions == 0 ? + broker.metadataCache().numPartitions(topic).isEmpty() : + broker.metadataCache().numPartitions(topic).contains(partitions) + ), 60000L, topic + " metadata not propagated after 60000 ms"); + + for (ControllerServer controller : controllers().values()) { + long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset - 1; + TestUtils.waitForCondition( + () -> brokers().values().stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset), + 60000L, "Timeout waiting for controller metadata propagating to brokers"); + } + } } diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index ad4d549e6ae80..ec981312c5937 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -18,9 +18,9 @@ package kafka.test.junit; import kafka.network.SocketServer; -import kafka.server.BrokerFeatures; import kafka.server.BrokerServer; import kafka.server.ControllerServer; +import kafka.server.KafkaBroker; import kafka.test.annotation.Type; import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -151,13 +152,6 @@ public String bootstrapControllers() { return clusterReference.get().bootstrapControllers(); } - @Override - public Collection brokerSocketServers() { - return brokers() - .map(BrokerServer::socketServer) - .collect(Collectors.toList()); - } - @Override public ListenerName clientListener() { return ListenerName.normalised("EXTERNAL"); @@ -165,50 +159,24 @@ public ListenerName clientListener() { @Override public Optional controllerListenerName() { - return OptionConverters.toJava(controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new)); + return controllers().values().stream() + .findAny() + .flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption())) + .map(ListenerName::new); } @Override public Collection controllerSocketServers() { - return controllers() + return controllers().values().stream() .map(ControllerServer::socketServer) .collect(Collectors.toList()); } - @Override - public SocketServer anyBrokerSocketServer() { - return brokers() - .map(BrokerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); - } - - @Override - public SocketServer anyControllerSocketServer() { - return controllers() - .map(ControllerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); - } - - @Override - public Map brokerFeatures() { - return brokers().collect(Collectors.toMap( - brokerServer -> brokerServer.config().nodeId(), - BrokerServer::brokerFeatures - )); - } - @Override public String clusterId() { - return controllers().findFirst().map(ControllerServer::clusterId).orElse( - brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( - () -> new RuntimeException("No controllers or brokers!")) - ); - } - - public Collection controllerServers() { - return controllers().collect(Collectors.toList()); + return Stream.concat(controllers().values().stream().map(ControllerServer::clusterId), + brokers().values().stream().map(KafkaBroker::clusterId)).findFirst() + .orElseThrow(() -> new RuntimeException("No controllers or brokers!")); } @Override @@ -223,16 +191,7 @@ public ClusterConfig config() { @Override public Set controllerIds() { - return controllers() - .map(controllerServer -> controllerServer.config().nodeId()) - .collect(Collectors.toSet()); - } - - @Override - public Set brokerIds() { - return brokers() - .map(brokerServer -> brokerServer.config().nodeId()) - .collect(Collectors.toSet()); + return controllers().keySet(); } @Override @@ -295,12 +254,16 @@ private BrokerServer findBrokerOrThrow(int brokerId) { .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - public Stream brokers() { - return clusterReference.get().brokers().values().stream(); + @Override + public Map brokers() { + return clusterReference.get().brokers().entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public Stream controllers() { - return clusterReference.get().controllers().values().stream(); + @Override + public Map controllers() { + return Collections.unmodifiableMap(clusterReference.get().controllers()); } } diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index de1cf53ae597b..69ce7475c1972 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -19,7 +19,8 @@ import kafka.api.IntegrationTestHarness; import kafka.network.SocketServer; -import kafka.server.BrokerFeatures; +import kafka.server.ControllerServer; +import kafka.server.KafkaBroker; import kafka.server.KafkaServer; import kafka.test.annotation.Type; import kafka.test.ClusterConfig; @@ -52,7 +53,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Objects.requireNonNull; import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG; @@ -132,13 +132,6 @@ public String bootstrapControllers() { throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters."); } - @Override - public Collection brokerSocketServers() { - return servers() - .map(KafkaServer::socketServer) - .collect(Collectors.toList()); - } - @Override public ListenerName clientListener() { return clusterReference.get().listenerName(); @@ -152,40 +145,15 @@ public Optional controlPlaneListenerName() { @Override public Collection controllerSocketServers() { - return servers() - .filter(broker -> broker.kafkaController().isActive()) - .map(KafkaServer::socketServer) + return brokers().values().stream() + .filter(s -> ((KafkaServer) s).kafkaController().isActive()) + .map(KafkaBroker::socketServer) .collect(Collectors.toList()); } - @Override - public SocketServer anyBrokerSocketServer() { - return servers() - .map(KafkaServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); - } - - @Override - public SocketServer anyControllerSocketServer() { - return servers() - .filter(broker -> broker.kafkaController().isActive()) - .map(KafkaServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); - } - - @Override - public Map brokerFeatures() { - return servers().collect(Collectors.toMap( - brokerServer -> brokerServer.config().nodeId(), - KafkaServer::brokerFeatures - )); - } - @Override public String clusterId() { - return servers().findFirst().map(KafkaServer::clusterId).orElseThrow( + return brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow( () -> new RuntimeException("No broker instances found")); } @@ -204,13 +172,6 @@ public Set controllerIds() { return brokerIds(); } - @Override - public Set brokerIds() { - return servers() - .map(brokerServer -> brokerServer.config().nodeId()) - .collect(Collectors.toSet()); - } - @Override public IntegrationTestHarness getUnderlying() { return clusterReference.get(); @@ -274,14 +235,22 @@ public void waitForReadyBrokers() throws InterruptedException { } private KafkaServer findBrokerOrThrow(int brokerId) { - return servers() + return brokers().values().stream() .filter(server -> server.config().brokerId() == brokerId) + .map(s -> (KafkaServer) s) .findFirst() .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - public Stream servers() { - return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream(); + @Override + public Map controllers() { + return Collections.emptyMap(); + } + + @Override + public Map brokers() { + return JavaConverters.asJavaCollection(clusterReference.get().servers()) + .stream().collect(Collectors.toMap(s -> s.config().brokerId(), s -> s)); } } diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index cea7835187af2..ad5c1eacb5993 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -39,10 +39,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { @ClusterTest def testAllocateProducersIdSentToController(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] - val sourceBroker = raftCluster.brokers.findFirst().get() + val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt - val controllerServer = raftCluster.controllers() + val controllerServer = raftCluster.controllers.values().stream() .filter(_.config.nodeId == controllerId) .findFirst() .get() @@ -56,10 +56,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { @ClusterTest(controllers = 3) def testAllocateProducersIdSentToNonController(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] - val sourceBroker = raftCluster.brokers.findFirst().get() + val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt - val controllerServer = raftCluster.controllers() + val controllerServer = raftCluster.controllers().values().stream() .filter(_.config.nodeId != controllerId) .findFirst() .get() diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 73cab9cd0f90f..575c611072f92 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -202,7 +202,7 @@ class BrokerRegistrationRequestTest { autoStart = AutoStart.NO, serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) def testRegisterZkWithKRaftMigrationEnabled(clusterInstance: ClusterInstance): Unit = { - clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup()) + clusterInstance.asInstanceOf[RaftClusterInstance].controllers().values().forEach(_.startup()) val clusterId = clusterInstance.clusterId() val channelManager = brokerToControllerChannelManager(clusterInstance) @@ -239,7 +239,7 @@ class BrokerRegistrationRequestTest { serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) )) def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = { - clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup()) + clusterInstance.asInstanceOf[RaftClusterInstance].controllers().values().forEach(_.startup()) val channelManager = brokerToControllerChannelManager(clusterInstance) try { diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index af67f774690b6..16e0f3d0ef0ee 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -25,11 +25,9 @@ import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, Consu import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull} -import org.junit.jupiter.api.Tag -import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.extension.ExtendWith -import java.util.stream.Collectors import scala.jdk.CollectionConverters._ @Timeout(120) @@ -61,8 +59,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, - controllers = raftCluster.controllerServers().asScala.toSeq + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq ) // Heartbeat request to join the group. Note that the member subscribes @@ -150,8 +148,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, - controllers = raftCluster.controllerServers().asScala.toSeq + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq ) // Heartbeat request so that a static member joins the group @@ -266,8 +264,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, - controllers = raftCluster.controllerServers().asScala.toSeq + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq ) // Heartbeat request to join the group. Note that the member subscribes diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 6fea4c0237b37..d4717d78ebcf3 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -17,8 +17,6 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance -import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.{NotNothing, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} @@ -36,21 +34,9 @@ import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { - private def brokers(): Seq[KafkaBroker] = { - if (cluster.isKRaftTest) { - cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq - } else { - cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq - } - } + private def brokers(): Seq[KafkaBroker] = cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq - private def controllerServers(): Seq[ControllerServer] = { - if (cluster.isKRaftTest) { - cluster.asInstanceOf[RaftClusterInstance].controllerServers().asScala.toSeq - } else { - Seq.empty - } - } + private def controllerServers(): Seq[ControllerServer] = cluster.controllers().values().asScala.toSeq protected def createOffsetsTopic(): Unit = { TestUtils.createOffsetsTopicWithAdmin( diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java new file mode 100644 index 0000000000000..73de6a2608530 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; + +class RemoteLogMetadataManagerTestUtils { + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class); + + private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3; + private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2; + private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L; + + static Builder builder() { + return new Builder(); + } + + static class Builder { + private String bootstrapServers; + private boolean startConsumerThread; + private Map overrideRemoteLogMetadataManagerProps = Collections.emptyMap(); + private Set topicIdPartitions = Collections.emptySet(); + private Supplier remotePartitionMetadataStore = RemotePartitionMetadataStore::new; + private Function remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new; + + private Builder() { + } + + public Builder bootstrapServers(String bootstrapServers) { + this.bootstrapServers = Objects.requireNonNull(bootstrapServers); + return this; + } + + public Builder startConsumerThread(boolean startConsumerThread) { + this.startConsumerThread = startConsumerThread; + return this; + } + + public Builder remotePartitionMetadataStore(Supplier remotePartitionMetadataStore) { + this.remotePartitionMetadataStore = remotePartitionMetadataStore; + return this; + } + + public Builder remoteLogMetadataTopicPartitioner(Function remoteLogMetadataTopicPartitioner) { + this.remoteLogMetadataTopicPartitioner = Objects.requireNonNull(remoteLogMetadataTopicPartitioner); + return this; + } + + public Builder overrideRemoteLogMetadataManagerProps(Map overrideRemoteLogMetadataManagerProps) { + this.overrideRemoteLogMetadataManagerProps = Objects.requireNonNull(overrideRemoteLogMetadataManagerProps); + return this; + } + + public Builder topicIdPartitions(Set topicIdPartitions) { + this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions); + return this; + } + + public TopicBasedRemoteLogMetadataManager build() { + Objects.requireNonNull(bootstrapServers); + String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); + TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = + new TopicBasedRemoteLogMetadataManager(startConsumerThread, + remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore); + + // Initialize TopicBasedRemoteLogMetadataManager. + Map configs = new HashMap<>(); + configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(BROKER_ID, 0); + configs.put(LOG_DIR, logDir); + configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT); + configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR); + configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS); + + log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs); + // Add override properties. + configs.putAll(overrideRemoteLogMetadataManagerProps); + log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); + + topicBasedRemoteLogMetadataManager.configure(configs); + + Assertions.assertDoesNotThrow(() -> TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized, 60_000L, + "Time out reached before it is initialized successfully")); + + topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet()); + + return topicBasedRemoteLogMetadataManager; + } + } +} diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java index f585fa5ba3a8f..a063fa8820a82 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java @@ -18,41 +18,20 @@ import kafka.api.IntegrationTestHarness; import kafka.utils.EmptyTestInfo; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.test.TestUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; - /** * A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0. */ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness { - private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class); - - protected static final int METADATA_TOPIC_PARTITIONS_COUNT = 3; - protected static final short METADATA_TOPIC_REPLICATION_FACTOR = 2; - protected static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L; private TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager; @@ -84,62 +63,14 @@ public void initializeRemoteLogMetadataManager(Set topicIdPart boolean startConsumerThread, Function remoteLogMetadataTopicPartitioner, Supplier remotePartitionMetadataStoreSupplier) { - String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); - topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remoteLogMetadataTopicPartitioner, remotePartitionMetadataStoreSupplier) { - @Override - public void onPartitionLeadershipChanges(Set leaderPartitions, - Set followerPartitions) { - Set allReplicas = new HashSet<>(leaderPartitions); - allReplicas.addAll(followerPartitions); - // Make sure the topic partition dirs exist as the topics might not have been created on this broker. - for (TopicIdPartition topicIdPartition : allReplicas) { - // Create partition directory in the log directory created by topicBasedRemoteLogMetadataManager. - File partitionDir = new File(new File(config().logDir()), topicIdPartition.topicPartition().topic() + "-" + topicIdPartition.topicPartition().partition()); - partitionDir.mkdirs(); - if (!partitionDir.exists()) { - throw new KafkaException("Partition directory:[" + partitionDir + "] could not be created successfully."); - } - } - - super.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); - } - }; - - // Initialize TopicBasedRemoteLogMetadataManager. - Map configs = new HashMap<>(); - configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName())); - configs.put(BROKER_ID, 0); - configs.put(LOG_DIR, logDir); - configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT); - configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR); - configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS); - - log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs); - // Add override properties. - configs.putAll(overrideRemoteLogMetadataManagerProps()); - log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); - - topicBasedRemoteLogMetadataManager.configure(configs); - try { - waitUntilInitialized(60_000); - } catch (TimeoutException e) { - throw new KafkaException(e); - } - - topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet()); - } - // Visible for testing. - public void waitUntilInitialized(long waitTimeMs) throws TimeoutException { - long startMs = System.currentTimeMillis(); - while (!topicBasedRemoteLogMetadataManager.isInitialized()) { - long currentTimeMs = System.currentTimeMillis(); - if (currentTimeMs > startMs + waitTimeMs) { - throw new TimeoutException("Time out reached before it is initialized successfully"); - } - - Utils.sleep(100); - } + topicBasedRemoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() + .topicIdPartitions(topicIdPartitions) + .bootstrapServers(bootstrapServers(listenerName())) + .startConsumerThread(startConsumerThread) + .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner) + .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier) + .build(); } @Override diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index bb3def893cd6d..183a84934127d 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -16,11 +16,15 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; @@ -29,111 +33,88 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +@ExtendWith(ClusterTestExtensions.class) +@ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerTest { - private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class); - private static final int SEG_SIZE = 1024 * 1024; + private final ClusterInstance clusterInstance; + private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); private final Time time = new MockTime(1); - private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); - private RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler; - - @BeforeEach - public void setup() { - // Start the cluster and initialize TopicBasedRemoteLogMetadataManager. - spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); - remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true, () -> spyRemotePartitionMetadataEventHandler); + private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager; + + TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; } - @AfterEach - public void teardown() throws IOException { - remoteLogMetadataManagerHarness.close(); + private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { + if (remoteLogMetadataManager == null) + remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() + .bootstrapServers(clusterInstance.bootstrapServers()) + .startConsumerThread(true) + .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataEventHandler) + .build(); + return remoteLogMetadataManager; } - public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + @AfterEach + public void teardown() throws IOException { + if (remoteLogMetadataManager != null) remoteLogMetadataManager.close(); } - @Test - public void testDoesTopicExist() { - Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); - ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); - try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { + @ClusterTest + public void testDoesTopicExist() throws ExecutionException, InterruptedException { + try (Admin admin = clusterInstance.createAdminClient()) { String topic = "test-topic-exist"; - remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new Properties(), - listenerName, adminConfig); + admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get(); + clusterInstance.waitForTopic(topic, 1); boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); Assertions.assertTrue(doesTopicExist); } } - @Test + @ClusterTest public void testTopicDoesNotExist() { - Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); - ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); - try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { + try (Admin admin = clusterInstance.createAdminClient()) { String topic = "dummy-test-topic"; boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); Assertions.assertFalse(doesTopicExist); } } - @Test - public void testWithNoAssignedPartitions() throws Exception { + @ClusterTest + public void testWithNoAssignedPartitions() { // This test checks simple lifecycle of TopicBasedRemoteLogMetadataManager with out assigning any leader/follower partitions. // This should close successfully releasing the resources. - log.info("Not assigning any partitions on TopicBasedRemoteLogMetadataManager"); + topicBasedRlmm(); } - @Test + @ClusterTest public void testNewPartitionUpdates() throws Exception { // Create topics. String leaderTopic = "new-leader"; - HashMap> assignedLeaderTopicReplicas = new HashMap<>(); - List leaderTopicReplicas = new ArrayList<>(); - // Set broker id 0 as the first entry which is taken as the leader. - leaderTopicReplicas.add(0); - leaderTopicReplicas.add(1); - leaderTopicReplicas.add(2); - assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); - remoteLogMetadataManagerHarness.createTopicWithAssignment( - leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), - remoteLogMetadataManagerHarness.listenerName()); - String followerTopic = "new-follower"; - HashMap> assignedFollowerTopicReplicas = new HashMap<>(); - List followerTopicReplicas = new ArrayList<>(); - // Set broker id 1 as the first entry which is taken as the leader. - followerTopicReplicas.add(1); - followerTopicReplicas.add(2); - followerTopicReplicas.add(0); - assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); - remoteLogMetadataManagerHarness.createTopicWithAssignment( - followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), - remoteLogMetadataManagerHarness.listenerName()); + try (Admin admin = clusterInstance.createAdminClient()) { + // Set broker id 0 as the first entry which is taken as the leader. + admin.createTopics(Collections.singletonList(new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))))).all().get(); + clusterInstance.waitForTopic(leaderTopic, 1); + admin.createTopics(Collections.singletonList(new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0))))).all().get(); + clusterInstance.waitForTopic(followerTopic, 1); + } final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); @@ -185,14 +166,14 @@ public void testNewPartitionUpdates() throws Exception { Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } - @Test + @ClusterTest public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); } - @Test - public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException, InterruptedException { + @ClusterTest + public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); @@ -237,8 +218,8 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize); } - @Test - public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException { + @ClusterTest + public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); CountDownLatch initializationLatch = new CountDownLatch(1); @@ -282,8 +263,8 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); } - @Test - public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException { + @ClusterTest + public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); CountDownLatch initializationLatch = new CountDownLatch(1);