diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml
index 97294dd7c7e7d..39c45342ea8ad 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -53,6 +53,7 @@
+
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index e7ef9822e8f9a..8cb5ae3d2152b 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -18,12 +18,15 @@
package kafka.test;
import kafka.network.SocketServer;
-import kafka.server.BrokerFeatures;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.KafkaBroker;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.test.TestUtils;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +35,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
@@ -46,6 +50,10 @@ default boolean isKRaftTest() {
return type() == Type.KRAFT || type() == Type.CO_KRAFT;
}
+ Map brokers();
+
+ Map controllers();
+
/**
* The immutable cluster configuration used to create this cluster.
*/
@@ -61,7 +69,9 @@ default boolean isKRaftTest() {
/**
* Return the set of all broker IDs configured for this test.
*/
- Set brokerIds();
+ default Set brokerIds() {
+ return brokers().keySet();
+ }
/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
@@ -97,7 +107,11 @@ default Optional controlPlaneListenerName() {
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
*/
- Collection brokerSocketServers();
+ default Collection brokerSocketServers() {
+ return brokers().values().stream()
+ .map(KafkaBroker::socketServer)
+ .collect(Collectors.toList());
+ }
/**
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
@@ -108,17 +122,20 @@ default Optional controlPlaneListenerName() {
/**
* Return any one of the broker servers. Throw an error if none are found
*/
- SocketServer anyBrokerSocketServer();
+ default SocketServer anyBrokerSocketServer() {
+ return brokerSocketServers().stream()
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+ }
/**
* Return any one of the controller servers. Throw an error if none are found
*/
- SocketServer anyControllerSocketServer();
-
- /**
- * Return a mapping of the underlying broker IDs to their supported features
- */
- Map brokerFeatures();
+ default SocketServer anyControllerSocketServer() {
+ return controllerSocketServers().stream()
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
+ }
String clusterId();
@@ -137,16 +154,6 @@ default Admin createAdminClient() {
return createAdminClient(new Properties());
}
- void start();
-
- void stop();
-
- void shutdownBroker(int brokerId);
-
- void startBroker(int brokerId);
-
- void waitForReadyBrokers() throws InterruptedException;
-
default Set supportedGroupProtocols() {
Map serverProperties = config().serverProperties();
Set supportedGroupProtocols = new HashSet<>();
@@ -160,4 +167,34 @@ default Set supportedGroupProtocols() {
return Collections.unmodifiableSet(supportedGroupProtocols);
}
+
+ //---------------------------[modify]---------------------------//
+
+ void start();
+
+ void stop();
+
+ void shutdownBroker(int brokerId);
+
+ void startBroker(int brokerId);
+
+ //---------------------------[wait]---------------------------//
+
+ void waitForReadyBrokers() throws InterruptedException;
+
+ default void waitForTopic(String topic, int partitions) throws InterruptedException {
+ // wait for metadata
+ TestUtils.waitForCondition(
+ () -> brokers().values().stream().allMatch(broker -> partitions == 0 ?
+ broker.metadataCache().numPartitions(topic).isEmpty() :
+ broker.metadataCache().numPartitions(topic).contains(partitions)
+ ), 60000L, topic + " metadata not propagated after 60000 ms");
+
+ for (ControllerServer controller : controllers().values()) {
+ long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset - 1;
+ TestUtils.waitForCondition(
+ () -> brokers().values().stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
+ 60000L, "Timeout waiting for controller metadata propagating to brokers");
+ }
+ }
}
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index ad4d549e6ae80..ec981312c5937 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -18,9 +18,9 @@
package kafka.test.junit;
import kafka.network.SocketServer;
-import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
+import kafka.server.KafkaBroker;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
@@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -151,13 +152,6 @@ public String bootstrapControllers() {
return clusterReference.get().bootstrapControllers();
}
- @Override
- public Collection brokerSocketServers() {
- return brokers()
- .map(BrokerServer::socketServer)
- .collect(Collectors.toList());
- }
-
@Override
public ListenerName clientListener() {
return ListenerName.normalised("EXTERNAL");
@@ -165,50 +159,24 @@ public ListenerName clientListener() {
@Override
public Optional controllerListenerName() {
- return OptionConverters.toJava(controllers().findAny().get().config().controllerListenerNames().headOption().map(ListenerName::new));
+ return controllers().values().stream()
+ .findAny()
+ .flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption()))
+ .map(ListenerName::new);
}
@Override
public Collection controllerSocketServers() {
- return controllers()
+ return controllers().values().stream()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}
- @Override
- public SocketServer anyBrokerSocketServer() {
- return brokers()
- .map(BrokerServer::socketServer)
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
- }
-
- @Override
- public SocketServer anyControllerSocketServer() {
- return controllers()
- .map(ControllerServer::socketServer)
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
- }
-
- @Override
- public Map brokerFeatures() {
- return brokers().collect(Collectors.toMap(
- brokerServer -> brokerServer.config().nodeId(),
- BrokerServer::brokerFeatures
- ));
- }
-
@Override
public String clusterId() {
- return controllers().findFirst().map(ControllerServer::clusterId).orElse(
- brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(
- () -> new RuntimeException("No controllers or brokers!"))
- );
- }
-
- public Collection controllerServers() {
- return controllers().collect(Collectors.toList());
+ return Stream.concat(controllers().values().stream().map(ControllerServer::clusterId),
+ brokers().values().stream().map(KafkaBroker::clusterId)).findFirst()
+ .orElseThrow(() -> new RuntimeException("No controllers or brokers!"));
}
@Override
@@ -223,16 +191,7 @@ public ClusterConfig config() {
@Override
public Set controllerIds() {
- return controllers()
- .map(controllerServer -> controllerServer.config().nodeId())
- .collect(Collectors.toSet());
- }
-
- @Override
- public Set brokerIds() {
- return brokers()
- .map(brokerServer -> brokerServer.config().nodeId())
- .collect(Collectors.toSet());
+ return controllers().keySet();
}
@Override
@@ -295,12 +254,16 @@ private BrokerServer findBrokerOrThrow(int brokerId) {
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
- public Stream brokers() {
- return clusterReference.get().brokers().values().stream();
+ @Override
+ public Map brokers() {
+ return clusterReference.get().brokers().entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
- public Stream controllers() {
- return clusterReference.get().controllers().values().stream();
+ @Override
+ public Map controllers() {
+ return Collections.unmodifiableMap(clusterReference.get().controllers());
}
}
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index de1cf53ae597b..69ce7475c1972 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -19,7 +19,8 @@
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
-import kafka.server.BrokerFeatures;
+import kafka.server.ControllerServer;
+import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
@@ -52,7 +53,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
@@ -132,13 +132,6 @@ public String bootstrapControllers() {
throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
}
- @Override
- public Collection brokerSocketServers() {
- return servers()
- .map(KafkaServer::socketServer)
- .collect(Collectors.toList());
- }
-
@Override
public ListenerName clientListener() {
return clusterReference.get().listenerName();
@@ -152,40 +145,15 @@ public Optional controlPlaneListenerName() {
@Override
public Collection controllerSocketServers() {
- return servers()
- .filter(broker -> broker.kafkaController().isActive())
- .map(KafkaServer::socketServer)
+ return brokers().values().stream()
+ .filter(s -> ((KafkaServer) s).kafkaController().isActive())
+ .map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}
- @Override
- public SocketServer anyBrokerSocketServer() {
- return servers()
- .map(KafkaServer::socketServer)
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
- }
-
- @Override
- public SocketServer anyControllerSocketServer() {
- return servers()
- .filter(broker -> broker.kafkaController().isActive())
- .map(KafkaServer::socketServer)
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
- }
-
- @Override
- public Map brokerFeatures() {
- return servers().collect(Collectors.toMap(
- brokerServer -> brokerServer.config().nodeId(),
- KafkaServer::brokerFeatures
- ));
- }
-
@Override
public String clusterId() {
- return servers().findFirst().map(KafkaServer::clusterId).orElseThrow(
+ return brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
() -> new RuntimeException("No broker instances found"));
}
@@ -204,13 +172,6 @@ public Set controllerIds() {
return brokerIds();
}
- @Override
- public Set brokerIds() {
- return servers()
- .map(brokerServer -> brokerServer.config().nodeId())
- .collect(Collectors.toSet());
- }
-
@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
@@ -274,14 +235,22 @@ public void waitForReadyBrokers() throws InterruptedException {
}
private KafkaServer findBrokerOrThrow(int brokerId) {
- return servers()
+ return brokers().values().stream()
.filter(server -> server.config().brokerId() == brokerId)
+ .map(s -> (KafkaServer) s)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
- public Stream servers() {
- return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
+ @Override
+ public Map controllers() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map brokers() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers())
+ .stream().collect(Collectors.toMap(s -> s.config().brokerId(), s -> s));
}
}
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index cea7835187af2..ad5c1eacb5993 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -39,10 +39,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testAllocateProducersIdSentToController(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
- val sourceBroker = raftCluster.brokers.findFirst().get()
+ val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
- val controllerServer = raftCluster.controllers()
+ val controllerServer = raftCluster.controllers.values().stream()
.filter(_.config.nodeId == controllerId)
.findFirst()
.get()
@@ -56,10 +56,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
@ClusterTest(controllers = 3)
def testAllocateProducersIdSentToNonController(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
- val sourceBroker = raftCluster.brokers.findFirst().get()
+ val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
- val controllerServer = raftCluster.controllers()
+ val controllerServer = raftCluster.controllers().values().stream()
.filter(_.config.nodeId != controllerId)
.findFirst()
.get()
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 73cab9cd0f90f..575c611072f92 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -202,7 +202,7 @@ class BrokerRegistrationRequestTest {
autoStart = AutoStart.NO,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
def testRegisterZkWithKRaftMigrationEnabled(clusterInstance: ClusterInstance): Unit = {
- clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup())
+ clusterInstance.asInstanceOf[RaftClusterInstance].controllers().values().forEach(_.startup())
val clusterId = clusterInstance.clusterId()
val channelManager = brokerToControllerChannelManager(clusterInstance)
@@ -239,7 +239,7 @@ class BrokerRegistrationRequestTest {
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
))
def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = {
- clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup())
+ clusterInstance.asInstanceOf[RaftClusterInstance].controllers().values().forEach(_.startup())
val channelManager = brokerToControllerChannelManager(clusterInstance)
try {
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index af67f774690b6..16e0f3d0ef0ee 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -25,11 +25,9 @@ import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, Consu
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
-import org.junit.jupiter.api.Tag
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
-import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
@Timeout(120)
@@ -61,8 +59,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
- brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
- controllers = raftCluster.controllerServers().asScala.toSeq
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@@ -150,8 +148,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
- brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
- controllers = raftCluster.controllerServers().asScala.toSeq
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request so that a static member joins the group
@@ -266,8 +264,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
- brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
- controllers = raftCluster.controllerServers().asScala.toSeq
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 6fea4c0237b37..d4717d78ebcf3 100644
--- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -17,8 +17,6 @@
package kafka.server
import kafka.test.ClusterInstance
-import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
-import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.utils.{NotNothing, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
@@ -36,21 +34,9 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
- private def brokers(): Seq[KafkaBroker] = {
- if (cluster.isKRaftTest) {
- cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq
- } else {
- cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala.toSeq
- }
- }
+ private def brokers(): Seq[KafkaBroker] = cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq
- private def controllerServers(): Seq[ControllerServer] = {
- if (cluster.isKRaftTest) {
- cluster.asInstanceOf[RaftClusterInstance].controllerServers().asScala.toSeq
- } else {
- Seq.empty
- }
- }
+ private def controllerServers(): Seq[ControllerServer] = cluster.controllers().values().asScala.toSeq
protected def createOffsetsTopic(): Unit = {
TestUtils.createOffsetsTopicWithAdmin(
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
new file mode 100644
index 0000000000000..73de6a2608530
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
+
+class RemoteLogMetadataManagerTestUtils {
+ private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class);
+
+ private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
+ private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
+ private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L;
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private String bootstrapServers;
+ private boolean startConsumerThread;
+ private Map overrideRemoteLogMetadataManagerProps = Collections.emptyMap();
+ private Set topicIdPartitions = Collections.emptySet();
+ private Supplier remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
+ private Function remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;
+
+ private Builder() {
+ }
+
+ public Builder bootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
+ return this;
+ }
+
+ public Builder startConsumerThread(boolean startConsumerThread) {
+ this.startConsumerThread = startConsumerThread;
+ return this;
+ }
+
+ public Builder remotePartitionMetadataStore(Supplier remotePartitionMetadataStore) {
+ this.remotePartitionMetadataStore = remotePartitionMetadataStore;
+ return this;
+ }
+
+ public Builder remoteLogMetadataTopicPartitioner(Function remoteLogMetadataTopicPartitioner) {
+ this.remoteLogMetadataTopicPartitioner = Objects.requireNonNull(remoteLogMetadataTopicPartitioner);
+ return this;
+ }
+
+ public Builder overrideRemoteLogMetadataManagerProps(Map overrideRemoteLogMetadataManagerProps) {
+ this.overrideRemoteLogMetadataManagerProps = Objects.requireNonNull(overrideRemoteLogMetadataManagerProps);
+ return this;
+ }
+
+ public Builder topicIdPartitions(Set topicIdPartitions) {
+ this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions);
+ return this;
+ }
+
+ public TopicBasedRemoteLogMetadataManager build() {
+ Objects.requireNonNull(bootstrapServers);
+ String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
+ TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager =
+ new TopicBasedRemoteLogMetadataManager(startConsumerThread,
+ remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore);
+
+ // Initialize TopicBasedRemoteLogMetadataManager.
+ Map configs = new HashMap<>();
+ configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configs.put(BROKER_ID, 0);
+ configs.put(LOG_DIR, logDir);
+ configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT);
+ configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR);
+ configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS);
+
+ log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs);
+ // Add override properties.
+ configs.putAll(overrideRemoteLogMetadataManagerProps);
+ log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);
+
+ topicBasedRemoteLogMetadataManager.configure(configs);
+
+ Assertions.assertDoesNotThrow(() -> TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized, 60_000L,
+ "Time out reached before it is initialized successfully"));
+
+ topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet());
+
+ return topicBasedRemoteLogMetadataManager;
+ }
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index f585fa5ba3a8f..a063fa8820a82 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -18,41 +18,20 @@
import kafka.api.IntegrationTestHarness;
import kafka.utils.EmptyTestInfo;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
-
/**
* A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0.
*/
public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness {
- private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
-
- protected static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
- protected static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
- protected static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L;
private TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager;
@@ -84,62 +63,14 @@ public void initializeRemoteLogMetadataManager(Set topicIdPart
boolean startConsumerThread,
Function remoteLogMetadataTopicPartitioner,
Supplier remotePartitionMetadataStoreSupplier) {
- String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
- topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remoteLogMetadataTopicPartitioner, remotePartitionMetadataStoreSupplier) {
- @Override
- public void onPartitionLeadershipChanges(Set leaderPartitions,
- Set followerPartitions) {
- Set allReplicas = new HashSet<>(leaderPartitions);
- allReplicas.addAll(followerPartitions);
- // Make sure the topic partition dirs exist as the topics might not have been created on this broker.
- for (TopicIdPartition topicIdPartition : allReplicas) {
- // Create partition directory in the log directory created by topicBasedRemoteLogMetadataManager.
- File partitionDir = new File(new File(config().logDir()), topicIdPartition.topicPartition().topic() + "-" + topicIdPartition.topicPartition().partition());
- partitionDir.mkdirs();
- if (!partitionDir.exists()) {
- throw new KafkaException("Partition directory:[" + partitionDir + "] could not be created successfully.");
- }
- }
-
- super.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
- }
- };
-
- // Initialize TopicBasedRemoteLogMetadataManager.
- Map configs = new HashMap<>();
- configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName()));
- configs.put(BROKER_ID, 0);
- configs.put(LOG_DIR, logDir);
- configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT);
- configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR);
- configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS);
-
- log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs);
- // Add override properties.
- configs.putAll(overrideRemoteLogMetadataManagerProps());
- log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);
-
- topicBasedRemoteLogMetadataManager.configure(configs);
- try {
- waitUntilInitialized(60_000);
- } catch (TimeoutException e) {
- throw new KafkaException(e);
- }
-
- topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet());
- }
- // Visible for testing.
- public void waitUntilInitialized(long waitTimeMs) throws TimeoutException {
- long startMs = System.currentTimeMillis();
- while (!topicBasedRemoteLogMetadataManager.isInitialized()) {
- long currentTimeMs = System.currentTimeMillis();
- if (currentTimeMs > startMs + waitTimeMs) {
- throw new TimeoutException("Time out reached before it is initialized successfully");
- }
-
- Utils.sleep(100);
- }
+ topicBasedRemoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder()
+ .topicIdPartitions(topicIdPartitions)
+ .bootstrapServers(bootstrapServers(listenerName()))
+ .startConsumerThread(startConsumerThread)
+ .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner)
+ .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier)
+ .build();
}
@Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index bb3def893cd6d..183a84934127d 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -16,11 +16,15 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
@@ -29,111 +33,88 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
+@ExtendWith(ClusterTestExtensions.class)
+@ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerTest {
- private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class);
-
private static final int SEG_SIZE = 1024 * 1024;
+ private final ClusterInstance clusterInstance;
+ private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore());
private final Time time = new MockTime(1);
- private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
- private RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler;
-
- @BeforeEach
- public void setup() {
- // Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
- spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore());
- remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true, () -> spyRemotePartitionMetadataEventHandler);
+ private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager;
+
+ TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
}
- @AfterEach
- public void teardown() throws IOException {
- remoteLogMetadataManagerHarness.close();
+ private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
+ if (remoteLogMetadataManager == null)
+ remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .startConsumerThread(true)
+ .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataEventHandler)
+ .build();
+ return remoteLogMetadataManager;
}
- public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
- return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+ @AfterEach
+ public void teardown() throws IOException {
+ if (remoteLogMetadataManager != null) remoteLogMetadataManager.close();
}
- @Test
- public void testDoesTopicExist() {
- Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig();
- ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName();
- try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+ @ClusterTest
+ public void testDoesTopicExist() throws ExecutionException, InterruptedException {
+ try (Admin admin = clusterInstance.createAdminClient()) {
String topic = "test-topic-exist";
- remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new Properties(),
- listenerName, adminConfig);
+ admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get();
+ clusterInstance.waitForTopic(topic, 1);
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
Assertions.assertTrue(doesTopicExist);
}
}
- @Test
+ @ClusterTest
public void testTopicDoesNotExist() {
- Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig();
- ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName();
- try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+ try (Admin admin = clusterInstance.createAdminClient()) {
String topic = "dummy-test-topic";
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
Assertions.assertFalse(doesTopicExist);
}
}
- @Test
- public void testWithNoAssignedPartitions() throws Exception {
+ @ClusterTest
+ public void testWithNoAssignedPartitions() {
// This test checks simple lifecycle of TopicBasedRemoteLogMetadataManager with out assigning any leader/follower partitions.
// This should close successfully releasing the resources.
- log.info("Not assigning any partitions on TopicBasedRemoteLogMetadataManager");
+ topicBasedRlmm();
}
- @Test
+ @ClusterTest
public void testNewPartitionUpdates() throws Exception {
// Create topics.
String leaderTopic = "new-leader";
- HashMap