From 8f795a443b71e32a8995add5c4431e09616e0b7c Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 17 Feb 2022 16:30:38 +0800 Subject: [PATCH 1/6] Support co-resident in Test Kit --- .../kafka/test/ClusterTestExtensionsTest.java | 4 ++ .../test/java/kafka/test/annotation/Type.java | 18 +++++++- .../junit/RaftClusterInvocationContext.java | 11 +++-- .../kafka/testkit/KafkaClusterTestKit.java | 43 +++++++++++++++---- .../test/java/kafka/testkit/TestKitNodes.java | 25 ++++++++++- .../admin/LeaderElectionCommandTest.scala | 2 +- .../kafka/server/BrokerMetricNamesTest.scala | 2 +- .../server/ClientQuotasRequestTest.scala | 2 +- 8 files changed, 87 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 33780f795eb85..63ca13725316d 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -90,6 +90,10 @@ public void testClusterTemplate() { @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz") + }), + @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { + @ClusterConfigProperty(key = "foo", value = "baz"), + @ClusterConfigProperty(key = "spam", value = "eggz") }) }) public void testClusterTests() { diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java index 0d1a161dabe92..c80907e4d8610 100644 --- a/core/src/test/java/kafka/test/annotation/Type.java +++ b/core/src/test/java/kafka/test/annotation/Type.java @@ -31,7 +31,13 @@ public enum Type { KRAFT { @Override public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf())); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); + } + }, + CO_KRAFT { + @Override + public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true)); } }, ZK { @@ -43,7 +49,15 @@ public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf())); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); + invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf())); + } + }, + ALL { + @Override + public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); + invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true)); invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf())); } }, diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 73fe67836a3f8..cef71042d3f92 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final ClusterConfig clusterConfig; private final AtomicReference clusterReference; + private final boolean isCoResident; - public RaftClusterInvocationContext(ClusterConfig clusterConfig) { + public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) { this.clusterConfig = clusterConfig; this.clusterReference = new AtomicReference<>(); + this.isCoResident = isCoResident; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); - return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc); + .map(Object::toString) + .collect(Collectors.joining(", ")); + return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc); } @Override @@ -86,6 +88,7 @@ public List getAdditionalExtensions() { (BeforeTestExecutionCallback) context -> { TestKitNodes nodes = new TestKitNodes.Builder(). setBootstrapMetadataVersion(clusterConfig.metadataVersion()). + setCoResident(isCoResident). setNumBrokerNodes(clusterConfig.numBrokers()). setNumControllerNodes(clusterConfig.numControllers()).build(); nodes.brokerNodes().forEach((brokerId, brokerNode) -> { diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index a930bafde6378..c961d71bbe588 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -150,15 +150,16 @@ public KafkaClusterTestKit build() throws Exception { ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { Map props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller"); + props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); props.put(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(node.id())); props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), node.metadataDirectory()); props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), - "CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), - "CONTROLLER://localhost:0"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); + props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), + nodes.interBrokerListenerName().value()); props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER"); // Note: we can't accurately set controller.quorum.voters yet, since we don't @@ -203,7 +204,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS } for (BrokerNode node : nodes.brokerNodes().values()) { Map props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker"); + props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); props.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(node.id())); props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), @@ -212,8 +213,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS String.join(",", node.logDataDirectories())); props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), - "EXTERNAL://localhost:0"); + props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), nodes.interBrokerListenerName().value()); props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), @@ -231,9 +231,15 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS String threadNamePrefix = String.format("broker%d_", node.id()); MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id()); TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0); - KafkaRaftManager raftManager = new KafkaRaftManager<>( + KafkaRaftManager raftManager; + if (raftManagers.containsKey(node.id())) { + raftManager = raftManagers.get(node.id()); + } else { + raftManager = new KafkaRaftManager<>( metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); + raftManagers.put(node.id(), raftManager); + } BrokerServer broker = new BrokerServer( config, nodes.brokerProperties(node.id()), @@ -245,7 +251,6 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS connectFutureManager.future ); brokers.put(node.id(), broker); - raftManagers.put(node.id(), raftManager); } } catch (Exception e) { if (executorService != null) { @@ -271,6 +276,26 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS brokers, raftManagers, connectFutureManager, baseDirectory); } + private String listeners(int node) { + if (nodes.isCoResidentNode(node)) { + return "EXTERNAL://localhost:0,CONTROLLER://localhost:0"; + } + if (nodes.controllerNodes().containsKey(node)) { + return "CONTROLLER://localhost:0"; + } + return "EXTERNAL://localhost:0"; + } + + private String roles(int node) { + if (nodes.isCoResidentNode(node)) { + return "broker,controller"; + } + if (nodes.controllerNodes().containsKey(node)) { + return "controller"; + } + return "broker"; + } + static private void setupNodeDirectories(File baseDirectory, String metadataDirectory, Collection logDataDirectories) throws Exception { diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index f91e62d179815..14692ccc9624d 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -33,6 +33,7 @@ public class TestKitNodes { public static class Builder { + private boolean coResident = false; private Uuid clusterId = null; private MetadataVersion bootstrapMetadataVersion = null; private final NavigableMap controllerNodes = new TreeMap<>(); @@ -48,6 +49,11 @@ public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) { return this; } + public Builder setCoResident(boolean coResident) { + this.coResident = coResident; + return this; + } + public Builder addNodes(TestKitNode[] nodes) { for (TestKitNode node : nodes) { addNode(node); @@ -78,7 +84,7 @@ public Builder setNumControllerNodes(int numControllerNodes) { controllerNodes.pollFirstEntry(); } while (controllerNodes.size() < numControllerNodes) { - int nextId = 3000; + int nextId = startControllerId(); if (!controllerNodes.isEmpty()) { nextId = controllerNodes.lastKey() + 1; } @@ -96,7 +102,7 @@ public Builder setNumBrokerNodes(int numBrokerNodes) { brokerNodes.pollFirstEntry(); } while (brokerNodes.size() < numBrokerNodes) { - int nextId = 0; + int nextId = startBrokerId(); if (!brokerNodes.isEmpty()) { nextId = brokerNodes.lastKey() + 1; } @@ -115,6 +121,17 @@ public TestKitNodes build() { } return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes); } + + private int startBrokerId() { + return 0; + } + + private int startControllerId() { + if (coResident) { + return startBrokerId(); + } + return startBrokerId() + 3000; + } } private final Uuid clusterId; @@ -122,6 +139,10 @@ public TestKitNodes build() { private final NavigableMap controllerNodes; private final NavigableMap brokerNodes; + public boolean isCoResidentNode(int node) { + return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); + } + private TestKitNodes(Uuid clusterId, MetadataVersion bootstrapMetadataVersion, NavigableMap controllerNodes, diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 785054901d099..aebd479f18a92 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -35,7 +35,7 @@ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{BeforeEach, Tag} @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) @Tag("integration") final class LeaderElectionCommandTest(cluster: ClusterInstance) { import LeaderElectionCommandTest._ diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala index c1322fe6fe2ec..dc69076619d5b 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.BOTH) +@ClusterTestDefaults(clusterType = Type.ALL) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class BrokerMetricNamesTest(cluster: ClusterInstance) { @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala index 71321c1f20885..904fbbc21654e 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala @@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.BOTH) +@ClusterTestDefaults(clusterType = Type.ALL) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @Tag("integration") class ClientQuotasRequestTest(cluster: ClusterInstance) { From 0e76ad93e5ab30e532e00b40ae7222fc05733667 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 26 Aug 2021 10:07:53 +0800 Subject: [PATCH 2/6] KAFKA-13228: Ensure ApiVersionRequest is properly handled in co-resident KRaft --- .../kafka/server/AlterPartitionManager.scala | 18 ++----- .../scala/kafka/server/BrokerServer.scala | 47 ++++++++++++++----- .../BrokerToControllerChannelManager.scala | 12 +++-- .../main/scala/kafka/server/KafkaConfig.scala | 6 ++- .../main/scala/kafka/server/KafkaServer.scala | 40 +++++++++++----- 5 files changed, 77 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index 9f89f47e824e0..ee4b6b1292e4d 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.OperationNotAttemptedException import org.apache.kafka.common.message.AlterPartitionRequestData -import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse} @@ -77,25 +76,14 @@ object AlterPartitionManager { def apply( config: KafkaConfig, metadataCache: MetadataCache, + alterPartitionChannelManager: BrokerToControllerChannelManager, scheduler: KafkaScheduler, time: Time, - metrics: Metrics, - threadNamePrefix: Option[String], - brokerEpochSupplier: () => Long, + brokerEpochSupplier: () => Long ): AlterPartitionManager = { - val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) - val channelManager = BrokerToControllerChannelManager( - controllerNodeProvider = nodeProvider, - time = time, - metrics = metrics, - config = config, - channelName = "alterPartition", - threadNamePrefix = threadNamePrefix, - retryTimeoutMs = Long.MaxValue - ) new DefaultAlterPartitionManager( - controllerChannelManager = channelManager, + controllerChannelManager = alterPartitionChannelManager, scheduler = scheduler, time = time, brokerId = config.brokerId, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index b62d118096b81..2e57e6411cef4 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -34,11 +34,14 @@ import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.metadata.BrokerServerMetrics import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} import kafka.utils.{CoreUtils, KafkaScheduler} +import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.ApiVersionsResponse import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache @@ -129,7 +132,7 @@ class BrokerServer( var forwardingManager: ForwardingManager = null - var alterIsrManager: AlterPartitionManager = null + var alterPartitionManager: AlterPartitionManager = null var autoTopicCreationManager: AutoTopicCreationManager = null @@ -212,6 +215,11 @@ class BrokerServer( val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) + val currentNodeControllerApiVersions = if (config.isKRaftCoResidentMode) { + Some(NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava)) + } else { + None + } clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, @@ -220,7 +228,8 @@ class BrokerServer( config, channelName = "forwarding", threadNamePrefix, - retryTimeoutMs = 60000 + retryTimeoutMs = 60000, + currentNodeControllerApiVersions ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) @@ -248,9 +257,10 @@ class BrokerServer( config, channelName = "alterIsr", threadNamePrefix, - retryTimeoutMs = Long.MaxValue + retryTimeoutMs = Long.MaxValue, + currentNodeControllerApiVersions ) - alterIsrManager = new DefaultAlterPartitionManager( + alterPartitionManager = new DefaultAlterPartitionManager( controllerChannelManager = alterIsrChannelManager, scheduler = kafkaScheduler, time = time, @@ -258,7 +268,7 @@ class BrokerServer( brokerEpochSupplier = () => lifecycleManager.brokerEpoch, metadataVersionSupplier = () => metadataCache.metadataVersion() ) - alterIsrManager.start() + alterPartitionManager.start() this._replicaManager = new ReplicaManager( config = config, @@ -269,7 +279,7 @@ class BrokerServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterPartitionManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = None, @@ -343,10 +353,23 @@ class BrokerServer( k -> VersionRange.of(v.min, v.max) }.asJava - lifecycleManager.start(() => metadataListener.highestMetadataOffset, - BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, - "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), - metaProps.clusterId, networkListeners, featuresRemapped) + val brokerLifecycleChannelManager = BrokerToControllerChannelManager( + controllerNodeProvider, + time, + metrics, + config, + "heartbeat", + threadNamePrefix, + config.brokerSessionTimeoutMs.toLong, + currentNodeControllerApiVersions + ) + lifecycleManager.start( + () => metadataListener.highestMetadataOffset, + brokerLifecycleChannelManager, + metaProps.clusterId, + networkListeners, + featuresRemapped + ) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) @@ -544,8 +567,8 @@ class BrokerServer( if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown(), this) - if (alterIsrManager != null) - CoreUtils.swallow(alterIsrManager.shutdown(), this) + if (alterPartitionManager != null) + CoreUtils.swallow(alterPartitionManager.shutdown(), this) if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 86395f015c48a..c29635676b6a4 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -122,7 +122,8 @@ object BrokerToControllerChannelManager { config: KafkaConfig, channelName: String, threadNamePrefix: Option[String], - retryTimeoutMs: Long + retryTimeoutMs: Long, + currentNodeControllerApiVersions: Option[NodeApiVersions] ): BrokerToControllerChannelManager = { new BrokerToControllerChannelManagerImpl( controllerNodeProvider, @@ -131,7 +132,8 @@ object BrokerToControllerChannelManager { config, channelName, threadNamePrefix, - retryTimeoutMs + retryTimeoutMs, + currentNodeControllerApiVersions ) } } @@ -160,12 +162,12 @@ class BrokerToControllerChannelManagerImpl( config: KafkaConfig, channelName: String, threadNamePrefix: Option[String], - retryTimeoutMs: Long + retryTimeoutMs: Long, + currentNodeControllerApiVersions: Option[NodeApiVersions] ) extends BrokerToControllerChannelManager with Logging { private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() - private val currentNodeApiVersions = NodeApiVersions.create() private val requestThread = newRequestThread def start(): Unit = { @@ -254,7 +256,7 @@ class BrokerToControllerChannelManagerImpl( def controllerApiVersions(): Option[NodeApiVersions] = { requestThread.activeControllerAddress().flatMap { activeController => if (activeController.id == config.brokerId) - Some(currentNodeApiVersions) + currentNodeControllerApiVersions else Option(apiVersions.get(activeController.idString)) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b4e0b9449c054..a9fbda6c21079 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami distinctRoles } + def isKRaftCoResidentMode: Boolean = { + processRoles == Set(BrokerRole, ControllerRole) + } + def metadataLogDir: String = { Option(getString(KafkaConfig.MetadataLogDirProp)) match { case Some(dir) => dir @@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerListenerExistsForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() - } else if (processRoles == Set(BrokerRole, ControllerRole)) { + } else if (isKRaftCoResidentMode) { // KRaft colocated broker and controller validateNonEmptyQuorumVotersForKRaft() validateControlPlaneListenerEmptyForKRaft() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 64cf88d4eef3a..6f87fbefff1d2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -34,14 +34,14 @@ import kafka.security.CredentialProvider import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache} import kafka.utils._ import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient} -import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} +import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils, NodeApiVersions} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network._ -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{ApiVersionsResponse, ControlledShutdownRequest, ControlledShutdownResponse} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} @@ -140,7 +140,7 @@ class KafkaServer( var clientToControllerChannelManager: BrokerToControllerChannelManager = null - var alterIsrManager: AlterPartitionManager = null + var alterPartitionManager: AlterPartitionManager = null var kafkaScheduler: KafkaScheduler = null @@ -228,6 +228,9 @@ class KafkaServer( logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ") this.logIdent = logContext.logPrefix + val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) + val currentNodeControllerApiVersions = Some(NodeApiVersions.create(ApiKeys.zkBrokerApis.asScala.map(ApiVersionsResponse.toApiVersion).asJava)) + // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after ZkConfigManager starts. config.dynamicConfig.initialize(Some(zkClient)) @@ -276,13 +279,15 @@ class KafkaServer( credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) clientToControllerChannelManager = BrokerToControllerChannelManager( - controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache), + controllerNodeProvider = controllerNodeProvider, time = time, metrics = metrics, config = config, channelName = "forwarding", threadNamePrefix = threadNamePrefix, - retryTimeoutMs = config.requestTimeoutMs.longValue) + retryTimeoutMs = config.requestTimeoutMs.longValue, + currentNodeControllerApiVersions + ) clientToControllerChannelManager.start() /* start forwarding manager */ @@ -309,20 +314,29 @@ class KafkaServer( socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) // Start alter partition manager based on the IBP version - alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { + alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { + val alterPartitionChannelManager = BrokerToControllerChannelManager( + controllerNodeProvider, + time = time, + metrics = metrics, + config = config, + channelName = "alterIsr", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = Long.MaxValue, + currentNodeControllerApiVersions + ) AlterPartitionManager( config = config, metadataCache = metadataCache, + alterPartitionChannelManager, scheduler = kafkaScheduler, time = time, - metrics = metrics, - threadNamePrefix = threadNamePrefix, brokerEpochSupplier = () => kafkaController.brokerEpoch ) } else { AlterPartitionManager(kafkaScheduler, time, zkClient) } - alterIsrManager.start() + alterPartitionManager.start() // Start replica manager _replicaManager = createReplicaManager(isShuttingDown) @@ -478,7 +492,7 @@ class KafkaServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterPartitionManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = Some(zkClient), @@ -755,8 +769,8 @@ class KafkaServer( if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown(), this) - if (alterIsrManager != null) - CoreUtils.swallow(alterIsrManager.shutdown(), this) + if (alterPartitionManager != null) + CoreUtils.swallow(alterPartitionManager.shutdown(), this) if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) From ac07a785415fccedce6a4a72a15c58649521fab5 Mon Sep 17 00:00:00 2001 From: dengziming Date: Wed, 25 May 2022 17:29:59 +0800 Subject: [PATCH 3/6] Enable Kraft co-resident mode in ApiVersionTest --- core/src/test/java/kafka/test/annotation/Type.java | 7 ------- .../scala/unit/kafka/server/ApiVersionsRequestTest.scala | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java index c80907e4d8610..933ca5011341b 100644 --- a/core/src/test/java/kafka/test/annotation/Type.java +++ b/core/src/test/java/kafka/test/annotation/Type.java @@ -46,13 +46,6 @@ public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false)); - invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf())); - } - }, ALL { @Override public void invocationContexts(ClusterConfig config, Consumer invocationConsumer) { diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index b1778ba7dfb38..bc45b72077da0 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -29,7 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 1) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { @BeforeEach From d4b9a04f2aeaf4a3bd0971274e50ca7393d836f1 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 30 Jun 2022 11:25:26 +0800 Subject: [PATCH 4/6] resolve comments --- .../kafka/server/AlterPartitionManager.scala | 4 ++-- .../main/scala/kafka/server/BrokerServer.scala | 17 +++-------------- .../BrokerToControllerChannelManager.scala | 14 ++++---------- .../main/scala/kafka/server/KafkaServer.scala | 13 +++++-------- 4 files changed, 14 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index ee4b6b1292e4d..9bc24edbd9b72 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -76,14 +76,14 @@ object AlterPartitionManager { def apply( config: KafkaConfig, metadataCache: MetadataCache, - alterPartitionChannelManager: BrokerToControllerChannelManager, + channelManager: BrokerToControllerChannelManager, scheduler: KafkaScheduler, time: Time, brokerEpochSupplier: () => Long ): AlterPartitionManager = { new DefaultAlterPartitionManager( - controllerChannelManager = alterPartitionChannelManager, + controllerChannelManager = channelManager, scheduler = scheduler, time = time, brokerId = config.brokerId, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2e57e6411cef4..a6e0cb375999d 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -34,14 +34,11 @@ import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.metadata.BrokerServerMetrics import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} import kafka.utils.{CoreUtils, KafkaScheduler} -import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.requests.ApiVersionsResponse import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache @@ -215,11 +212,6 @@ class BrokerServer( val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) - val currentNodeControllerApiVersions = if (config.isKRaftCoResidentMode) { - Some(NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava)) - } else { - None - } clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, @@ -228,8 +220,7 @@ class BrokerServer( config, channelName = "forwarding", threadNamePrefix, - retryTimeoutMs = 60000, - currentNodeControllerApiVersions + retryTimeoutMs = 60000 ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) @@ -257,8 +248,7 @@ class BrokerServer( config, channelName = "alterIsr", threadNamePrefix, - retryTimeoutMs = Long.MaxValue, - currentNodeControllerApiVersions + retryTimeoutMs = Long.MaxValue ) alterPartitionManager = new DefaultAlterPartitionManager( controllerChannelManager = alterIsrChannelManager, @@ -360,8 +350,7 @@ class BrokerServer( config, "heartbeat", threadNamePrefix, - config.brokerSessionTimeoutMs.toLong, - currentNodeControllerApiVersions + config.brokerSessionTimeoutMs.toLong ) lifecycleManager.start( () => metadataListener.highestMetadataOffset, diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index c29635676b6a4..37f3a47e29199 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -122,8 +122,7 @@ object BrokerToControllerChannelManager { config: KafkaConfig, channelName: String, threadNamePrefix: Option[String], - retryTimeoutMs: Long, - currentNodeControllerApiVersions: Option[NodeApiVersions] + retryTimeoutMs: Long ): BrokerToControllerChannelManager = { new BrokerToControllerChannelManagerImpl( controllerNodeProvider, @@ -132,8 +131,7 @@ object BrokerToControllerChannelManager { config, channelName, threadNamePrefix, - retryTimeoutMs, - currentNodeControllerApiVersions + retryTimeoutMs ) } } @@ -162,8 +160,7 @@ class BrokerToControllerChannelManagerImpl( config: KafkaConfig, channelName: String, threadNamePrefix: Option[String], - retryTimeoutMs: Long, - currentNodeControllerApiVersions: Option[NodeApiVersions] + retryTimeoutMs: Long ) extends BrokerToControllerChannelManager with Logging { private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val manualMetadataUpdater = new ManualMetadataUpdater() @@ -255,10 +252,7 @@ class BrokerToControllerChannelManagerImpl( def controllerApiVersions(): Option[NodeApiVersions] = { requestThread.activeControllerAddress().flatMap { activeController => - if (activeController.id == config.brokerId) - currentNodeControllerApiVersions - else - Option(apiVersions.get(activeController.idString)) + Option(apiVersions.get(activeController.idString)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 6f87fbefff1d2..01561ce58d610 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -34,14 +34,14 @@ import kafka.security.CredentialProvider import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache} import kafka.utils._ import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient} -import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils, NodeApiVersions} +import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network._ -import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{ApiVersionsResponse, ControlledShutdownRequest, ControlledShutdownResponse} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} @@ -229,7 +229,6 @@ class KafkaServer( this.logIdent = logContext.logPrefix val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) - val currentNodeControllerApiVersions = Some(NodeApiVersions.create(ApiKeys.zkBrokerApis.asScala.map(ApiVersionsResponse.toApiVersion).asJava)) // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after ZkConfigManager starts. @@ -285,8 +284,7 @@ class KafkaServer( config = config, channelName = "forwarding", threadNamePrefix = threadNamePrefix, - retryTimeoutMs = config.requestTimeoutMs.longValue, - currentNodeControllerApiVersions + retryTimeoutMs = config.requestTimeoutMs.longValue ) clientToControllerChannelManager.start() @@ -322,8 +320,7 @@ class KafkaServer( config = config, channelName = "alterIsr", threadNamePrefix = threadNamePrefix, - retryTimeoutMs = Long.MaxValue, - currentNodeControllerApiVersions + retryTimeoutMs = Long.MaxValue ) AlterPartitionManager( config = config, From c631dae0e1c1a69ae1e7cfd2ea162a2e15bf8344 Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 1 Jul 2022 10:38:36 +0800 Subject: [PATCH 5/6] resolve comments --- .../kafka/server/AlterPartitionManager.scala | 17 ++++++++++++++--- .../scala/kafka/server/BrokerServer.scala | 19 ++++++------------- .../main/scala/kafka/server/KafkaServer.scala | 13 +++---------- 3 files changed, 23 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index 9bc24edbd9b72..61305f2410135 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.OperationNotAttemptedException import org.apache.kafka.common.message.AlterPartitionRequestData +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse} @@ -76,12 +77,22 @@ object AlterPartitionManager { def apply( config: KafkaConfig, metadataCache: MetadataCache, - channelManager: BrokerToControllerChannelManager, scheduler: KafkaScheduler, + controllerNodeProvider: ControllerNodeProvider, time: Time, - brokerEpochSupplier: () => Long + metrics: Metrics, + threadNamePrefix: Option[String], + brokerEpochSupplier: () => Long, ): AlterPartitionManager = { - + val channelManager = BrokerToControllerChannelManager( + controllerNodeProvider, + time = time, + metrics = metrics, + config = config, + channelName = "alterPartition", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = Long.MaxValue + ) new DefaultAlterPartitionManager( controllerChannelManager = channelManager, scheduler = scheduler, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a6e0cb375999d..d0d2a98b483f9 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -241,22 +241,15 @@ class BrokerServer( clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) - val alterIsrChannelManager = BrokerToControllerChannelManager( - controllerNodeProvider, - time, - metrics, + alterPartitionManager = AlterPartitionManager( config, - channelName = "alterIsr", - threadNamePrefix, - retryTimeoutMs = Long.MaxValue - ) - alterPartitionManager = new DefaultAlterPartitionManager( - controllerChannelManager = alterIsrChannelManager, + metadataCache, scheduler = kafkaScheduler, + controllerNodeProvider, time = time, - brokerId = config.nodeId, - brokerEpochSupplier = () => lifecycleManager.brokerEpoch, - metadataVersionSupplier = () => metadataCache.metadataVersion() + metrics, + threadNamePrefix, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) alterPartitionManager.start() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 01561ce58d610..80323254f67f4 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -313,21 +313,14 @@ class KafkaServer( // Start alter partition manager based on the IBP version alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { - val alterPartitionChannelManager = BrokerToControllerChannelManager( - controllerNodeProvider, - time = time, - metrics = metrics, - config = config, - channelName = "alterIsr", - threadNamePrefix = threadNamePrefix, - retryTimeoutMs = Long.MaxValue - ) AlterPartitionManager( config = config, metadataCache = metadataCache, - alterPartitionChannelManager, scheduler = kafkaScheduler, + controllerNodeProvider, time = time, + metrics = metrics, + threadNamePrefix = threadNamePrefix, brokerEpochSupplier = () => kafkaController.brokerEpoch ) } else { From 58af486f86d13b2d114af4edf849136a10edc6d0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 1 Jul 2022 09:59:47 -0700 Subject: [PATCH 6/6] Create controller node provider after metadata cache --- core/src/main/scala/kafka/server/KafkaServer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 80323254f67f4..6b52511c1bab3 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -228,8 +228,6 @@ class KafkaServer( logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ") this.logIdent = logContext.logPrefix - val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) - // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after ZkConfigManager starts. config.dynamicConfig.initialize(Some(zkClient)) @@ -265,6 +263,7 @@ class KafkaServer( logManager.startup(zkClient.getAllTopicsInCluster()) metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures) + val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) /* initialize feature change listener */ _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)