diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index 9f89f47e824e0..61305f2410135 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -78,15 +78,14 @@ object AlterPartitionManager { config: KafkaConfig, metadataCache: MetadataCache, scheduler: KafkaScheduler, + controllerNodeProvider: ControllerNodeProvider, time: Time, metrics: Metrics, threadNamePrefix: Option[String], brokerEpochSupplier: () => Long, ): AlterPartitionManager = { - val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) - val channelManager = BrokerToControllerChannelManager( - controllerNodeProvider = nodeProvider, + controllerNodeProvider, time = time, metrics = metrics, config = config, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index b62d118096b81..d0d2a98b483f9 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -129,7 +129,7 @@ class BrokerServer( var forwardingManager: ForwardingManager = null - var alterIsrManager: AlterPartitionManager = null + var alterPartitionManager: AlterPartitionManager = null var autoTopicCreationManager: AutoTopicCreationManager = null @@ -241,24 +241,17 @@ class BrokerServer( clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) - val alterIsrChannelManager = BrokerToControllerChannelManager( - controllerNodeProvider, - time, - metrics, + alterPartitionManager = AlterPartitionManager( config, - channelName = "alterIsr", - threadNamePrefix, - retryTimeoutMs = Long.MaxValue - ) - alterIsrManager = new DefaultAlterPartitionManager( - controllerChannelManager = alterIsrChannelManager, + metadataCache, scheduler = kafkaScheduler, + controllerNodeProvider, time = time, - brokerId = config.nodeId, - brokerEpochSupplier = () => lifecycleManager.brokerEpoch, - metadataVersionSupplier = () => metadataCache.metadataVersion() + metrics, + threadNamePrefix, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) - alterIsrManager.start() + alterPartitionManager.start() this._replicaManager = new ReplicaManager( config = config, @@ -269,7 +262,7 @@ class BrokerServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterPartitionManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = None, @@ -343,10 +336,22 @@ class BrokerServer( k -> VersionRange.of(v.min, v.max) }.asJava - lifecycleManager.start(() => metadataListener.highestMetadataOffset, - BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, - "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), - metaProps.clusterId, networkListeners, featuresRemapped) + val brokerLifecycleChannelManager = BrokerToControllerChannelManager( + controllerNodeProvider, + time, + metrics, + config, + "heartbeat", + threadNamePrefix, + config.brokerSessionTimeoutMs.toLong + ) + lifecycleManager.start( + () => metadataListener.highestMetadataOffset, + brokerLifecycleChannelManager, + metaProps.clusterId, + networkListeners, + featuresRemapped + ) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) @@ -544,8 +549,8 @@ class BrokerServer( if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown(), this) - if (alterIsrManager != null) - CoreUtils.swallow(alterIsrManager.shutdown(), this) + if (alterPartitionManager != null) + CoreUtils.swallow(alterPartitionManager.shutdown(), this) if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 86395f015c48a..37f3a47e29199 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -165,7 +165,6 @@ class BrokerToControllerChannelManagerImpl( private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() - private val currentNodeApiVersions = NodeApiVersions.create() private val requestThread = newRequestThread def start(): Unit = { @@ -253,10 +252,7 @@ class BrokerToControllerChannelManagerImpl( def controllerApiVersions(): Option[NodeApiVersions] = { requestThread.activeControllerAddress().flatMap { activeController => - if (activeController.id == config.brokerId) - Some(currentNodeApiVersions) - else - Option(apiVersions.get(activeController.idString)) + Option(apiVersions.get(activeController.idString)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b4e0b9449c054..a9fbda6c21079 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami distinctRoles } + def isKRaftCoResidentMode: Boolean = { + processRoles == Set(BrokerRole, ControllerRole) + } + def metadataLogDir: String = { Option(getString(KafkaConfig.MetadataLogDirProp)) match { case Some(dir) => dir @@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerListenerExistsForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() - } else if (processRoles == Set(BrokerRole, ControllerRole)) { + } else if (isKRaftCoResidentMode) { // KRaft colocated broker and controller validateNonEmptyQuorumVotersForKRaft() validateControlPlaneListenerEmptyForKRaft() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 64cf88d4eef3a..6b52511c1bab3 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -140,7 +140,7 @@ class KafkaServer( var clientToControllerChannelManager: BrokerToControllerChannelManager = null - var alterIsrManager: AlterPartitionManager = null + var alterPartitionManager: AlterPartitionManager = null var kafkaScheduler: KafkaScheduler = null @@ -263,6 +263,7 @@ class KafkaServer( logManager.startup(zkClient.getAllTopicsInCluster()) metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures) + val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) /* initialize feature change listener */ _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient) @@ -276,13 +277,14 @@ class KafkaServer( credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) clientToControllerChannelManager = BrokerToControllerChannelManager( - controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache), + controllerNodeProvider = controllerNodeProvider, time = time, metrics = metrics, config = config, channelName = "forwarding", threadNamePrefix = threadNamePrefix, - retryTimeoutMs = config.requestTimeoutMs.longValue) + retryTimeoutMs = config.requestTimeoutMs.longValue + ) clientToControllerChannelManager.start() /* start forwarding manager */ @@ -309,11 +311,12 @@ class KafkaServer( socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) // Start alter partition manager based on the IBP version - alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { + alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { AlterPartitionManager( config = config, metadataCache = metadataCache, scheduler = kafkaScheduler, + controllerNodeProvider, time = time, metrics = metrics, threadNamePrefix = threadNamePrefix, @@ -322,7 +325,7 @@ class KafkaServer( } else { AlterPartitionManager(kafkaScheduler, time, zkClient) } - alterIsrManager.start() + alterPartitionManager.start() // Start replica manager _replicaManager = createReplicaManager(isShuttingDown) @@ -478,7 +481,7 @@ class KafkaServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterPartitionManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = Some(zkClient), @@ -755,8 +758,8 @@ class KafkaServer( if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown(), this) - if (alterIsrManager != null) - CoreUtils.swallow(alterIsrManager.shutdown(), this) + if (alterPartitionManager != null) + CoreUtils.swallow(alterPartitionManager.shutdown(), this) if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 33780f795eb85..63ca13725316d 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -90,6 +90,10 @@ public void testClusterTemplate() { @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz") + }), + @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { + @ClusterConfigProperty(key = "foo", value = "baz"), + @ClusterConfigProperty(key = "spam", value = "eggz") }) }) public void testClusterTests() { diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java index 0d1a161dabe92..933ca5011341b 100644 --- a/core/src/test/java/kafka/test/annotation/Type.java +++ b/core/src/test/java/kafka/test/annotation/Type.java @@ -31,7 +31,13 @@ public enum Type { KRAFT { @Override public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf())); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); + } + }, + CO_KRAFT { + @Override + public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true)); } }, ZK { @@ -40,10 +46,11 @@ public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf())); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true)); invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf())); } }, diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 73fe67836a3f8..cef71042d3f92 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final ClusterConfig clusterConfig; private final AtomicReference clusterReference; + private final boolean isCoResident; - public RaftClusterInvocationContext(ClusterConfig clusterConfig) { + public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) { this.clusterConfig = clusterConfig; this.clusterReference = new AtomicReference<>(); + this.isCoResident = isCoResident; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); - return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc); + .map(Object::toString) + .collect(Collectors.joining(", ")); + return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc); } @Override @@ -86,6 +88,7 @@ public List getAdditionalExtensions() { (BeforeTestExecutionCallback) context -> { TestKitNodes nodes = new TestKitNodes.Builder(). setBootstrapMetadataVersion(clusterConfig.metadataVersion()). + setCoResident(isCoResident). setNumBrokerNodes(clusterConfig.numBrokers()). setNumControllerNodes(clusterConfig.numControllers()).build(); nodes.brokerNodes().forEach((brokerId, brokerNode) -> { diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index a930bafde6378..c961d71bbe588 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -150,15 +150,16 @@ public KafkaClusterTestKit build() throws Exception { ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { Map props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller"); + props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); props.put(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(node.id())); props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), node.metadataDirectory()); props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), - "CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), - "CONTROLLER://localhost:0"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); + props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), + nodes.interBrokerListenerName().value()); props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER"); // Note: we can't accurately set controller.quorum.voters yet, since we don't @@ -203,7 +204,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS } for (BrokerNode node : nodes.brokerNodes().values()) { Map props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker"); + props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); props.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(node.id())); props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), @@ -212,8 +213,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS String.join(",", node.logDataDirectories())); props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), - "EXTERNAL://localhost:0"); + props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), nodes.interBrokerListenerName().value()); props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), @@ -231,9 +231,15 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS String threadNamePrefix = String.format("broker%d_", node.id()); MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id()); TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0); - KafkaRaftManager raftManager = new KafkaRaftManager<>( + KafkaRaftManager raftManager; + if (raftManagers.containsKey(node.id())) { + raftManager = raftManagers.get(node.id()); + } else { + raftManager = new KafkaRaftManager<>( metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); + raftManagers.put(node.id(), raftManager); + } BrokerServer broker = new BrokerServer( config, nodes.brokerProperties(node.id()), @@ -245,7 +251,6 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS connectFutureManager.future ); brokers.put(node.id(), broker); - raftManagers.put(node.id(), raftManager); } } catch (Exception e) { if (executorService != null) { @@ -271,6 +276,26 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS brokers, raftManagers, connectFutureManager, baseDirectory); } + private String listeners(int node) { + if (nodes.isCoResidentNode(node)) { + return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; + } + if (nodes.controllerNodes().containsKey(node)) { + return "CONTROLLER://localhost:0"; + } + return "EXTERNAL://localhost:0"; + } + + private String roles(int node) { + if (nodes.isCoResidentNode(node)) { + return "broker,controller"; + } + if (nodes.controllerNodes().containsKey(node)) { + return "controller"; + } + return "broker"; + } + static private void setupNodeDirectories(File baseDirectory, String metadataDirectory, Collection logDataDirectories) throws Exception { diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index f91e62d179815..14692ccc9624d 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -33,6 +33,7 @@ public class TestKitNodes { public static class Builder { + private boolean coResident = false; private Uuid clusterId = null; private MetadataVersion bootstrapMetadataVersion = null; private final NavigableMap controllerNodes = new TreeMap<>(); @@ -48,6 +49,11 @@ public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) { return this; } + public Builder setCoResident(boolean coResident) { + this.coResident = coResident; + return this; + } + public Builder addNodes(TestKitNode[] nodes) { for (TestKitNode node : nodes) { addNode(node); @@ -78,7 +84,7 @@ public Builder setNumControllerNodes(int numControllerNodes) { controllerNodes.pollFirstEntry(); } while (controllerNodes.size() < numControllerNodes) { - int nextId = 3000; + int nextId = startControllerId(); if (!controllerNodes.isEmpty()) { nextId = controllerNodes.lastKey() + 1; } @@ -96,7 +102,7 @@ public Builder setNumBrokerNodes(int numBrokerNodes) { brokerNodes.pollFirstEntry(); } while (brokerNodes.size() < numBrokerNodes) { - int nextId = 0; + int nextId = startBrokerId(); if (!brokerNodes.isEmpty()) { nextId = brokerNodes.lastKey() + 1; } @@ -115,6 +121,17 @@ public TestKitNodes build() { } return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes); } + + private int startBrokerId() { + return 0; + } + + private int startControllerId() { + if (coResident) { + return startBrokerId(); + } + return startBrokerId() + 3000; + } } private final Uuid clusterId; @@ -122,6 +139,10 @@ public TestKitNodes build() { private final NavigableMap controllerNodes; private final NavigableMap brokerNodes; + public boolean isCoResidentNode(int node) { + return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); + } + private TestKitNodes(Uuid clusterId, MetadataVersion bootstrapMetadataVersion, NavigableMap controllerNodes, diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 785054901d099..aebd479f18a92 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -35,7 +35,7 @@ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{BeforeEach, Tag} @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) @Tag("integration") final class LeaderElectionCommandTest(cluster: ClusterInstance) { import LeaderElectionCommandTest._ diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index b1778ba7dfb38..bc45b72077da0 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -29,7 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 1) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { @BeforeEach diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala index c1322fe6fe2ec..dc69076619d5b 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.BOTH) +@ClusterTestDefaults(clusterType = Type.ALL) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class BrokerMetricNamesTest(cluster: ClusterInstance) { @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala index 71321c1f20885..904fbbc21654e 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala @@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.BOTH) +@ClusterTestDefaults(clusterType = Type.ALL) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @Tag("integration") class ClientQuotasRequestTest(cluster: ClusterInstance) {