diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index d248443e611af..bfde57c6706cf 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -247,7 +247,7 @@ class KafkaRaftManager[T]( private def buildNetworkChannel(): KafkaNetworkChannel = { val (listenerName, netClient) = buildNetworkClient() - new KafkaNetworkChannel(time, listenerName, netClient, config.quorumRequestTimeoutMs, threadNamePrefix) + new KafkaNetworkChannel(time, listenerName, netClient, config.quorumConfig.requestTimeoutMs, threadNamePrefix) } private def createDataDir(): File = { @@ -313,7 +313,7 @@ class KafkaRaftManager[T]( reconnectBackoffMsMs, Selectable.USE_DEFAULT_BUFFER_SIZE, config.socketReceiveBufferBytes, - config.quorumRequestTimeoutMs, + config.quorumConfig.requestTimeoutMs, config.connectionSetupTimeoutMs, config.connectionSetupTimeoutMaxMs, time, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 454308990c82b..237da5c8e3b2e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,6 +231,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig + private val _quorumConfig = new QuorumConfig(this) + def quorumConfig: QuorumConfig = _quorumConfig + private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this) def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig @@ -664,16 +667,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG) def zstdCompressionLevel = getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG) - /** ********* Raft Quorum Configuration *********/ - val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG) - val quorumBootstrapServers = getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG) - val quorumElectionTimeoutMs = getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG) - val quorumFetchTimeoutMs = getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG) - val quorumElectionBackoffMs = getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG) - val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG) - val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG) - val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG) - /** Internal Configurations **/ val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG) val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG) @@ -878,9 +871,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet // validate KRaft-related configs - val voterIds = QuorumConfig.parseVoterIds(quorumVoters) + val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters) def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = { - if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) { throw new ConfigException( s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable @@ -889,7 +882,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = { - if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) { throw new ConfigException( s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index dbc6f763eb91f..7f56600f8b017 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -70,8 +70,8 @@ class KafkaRaftServer( metaPropsEnsemble, time, metrics, - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), new StandardFaultHandlerFactory(), ) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0a9edc32c6bcb..1ebf6c7e89935 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -429,7 +429,7 @@ class KafkaServer( logger.info("Successfully deleted local metadata log. It will be re-created.") // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller - val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumVoters) + val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumConfig.voters) raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaPropsEnsemble.clusterId().get(), config, @@ -442,7 +442,7 @@ class KafkaServer( metrics, threadNamePrefix, CompletableFuture.completedFuture(quorumVoters), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), // Endpoint information is only needed for KRaft controllers (voters). ZK brokers // (observers) can never be KRaft controllers Endpoints.empty(), diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 070a09f52d6dd..5cc4c249eb367 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -104,8 +104,8 @@ class TestRaftServer( time, metrics, Some(threadNamePrefix), - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), endpoints, new ProcessTerminatingFaultHandler.Builder().build() ) diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 13a75007417f2..09b35318818e0 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -128,8 +128,8 @@ class RaftManagerTest { Time.SYSTEM, new Metrics(Time.SYSTEM), Option.empty, - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), endpoints, mock(classOf[FaultHandler]) ) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 66550cf140be8..c2fddfc33a53e 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1462,7 +1462,7 @@ class KafkaConfigTest { private def assertValidQuorumVoters(expectedVoters: util.Map[Integer, InetSocketAddress], value: String): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, value) - val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumVoters) + val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumConfig.voters) assertEquals(expectedVoters, addresses) } @@ -1477,7 +1477,7 @@ class KafkaConfigTest { props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092") val addresses = QuorumConfig.parseBootstrapServers( - KafkaConfig.fromProps(props).quorumBootstrapServers + KafkaConfig.fromProps(props).quorumConfig.bootstrapServers ) assertEquals(expected, addresses) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 05b27ada396e7..1a7fff83ee6b0 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -109,6 +109,8 @@ public class QuorumConfig { .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); + private final List voters; + private final List bootstrapServers; private final int requestTimeoutMs; private final int retryBackoffMs; private final int electionTimeoutMs; @@ -117,30 +119,22 @@ public class QuorumConfig { private final int appendLingerMs; public QuorumConfig(AbstractConfig abstractConfig) { - this( - abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG), - abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG) - ); + this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG); + this.bootstrapServers = abstractConfig.getList(QUORUM_BOOTSTRAP_SERVERS_CONFIG); + this.requestTimeoutMs = abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); + this.retryBackoffMs = abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); + this.electionTimeoutMs = abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); + this.electionBackoffMaxMs = abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); + this.fetchTimeoutMs = abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); + this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG); } - public QuorumConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMaxMs, - int fetchTimeoutMs, - int appendLingerMs - ) { - this.requestTimeoutMs = requestTimeoutMs; - this.retryBackoffMs = retryBackoffMs; - this.electionTimeoutMs = electionTimeoutMs; - this.electionBackoffMaxMs = electionBackoffMaxMs; - this.fetchTimeoutMs = fetchTimeoutMs; - this.appendLingerMs = appendLingerMs; + public List voters() { + return voters; + } + + public List bootstrapServers() { + return bootstrapServers; } public int requestTimeoutMs() { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 0f1cfd6f3c43e..f9f1c9e0f95a0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; @@ -383,14 +384,14 @@ public RaftClientTestContext build() throws IOException { Endpoints.empty() : this.localListeners; - QuorumConfig quorumConfig = new QuorumConfig( - requestTimeoutMs, - RETRY_BACKOFF_MS, - electionTimeoutMs, - ELECTION_BACKOFF_MAX_MS, - FETCH_TIMEOUT_MS, - appendLingerMs - ); + Map configMap = new HashMap<>(); + configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); + configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_BACKOFF_MAX_MS); + configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); + QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap)); List computedBootstrapServers = bootstrapServers.orElseGet(() -> { if (isStartingVotersStatic) { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index cd8743635ca3e..7ecd4dc4e222c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.metrics.Metrics; @@ -763,14 +764,14 @@ void start(int nodeId) { .stream() .collect(Collectors.toMap(Node::id, Cluster::nodeAddress)); - QuorumConfig quorumConfig = new QuorumConfig( - REQUEST_TIMEOUT_MS, - RETRY_BACKOFF_MS, - ELECTION_TIMEOUT_MS, - ELECTION_JITTER_MS, - FETCH_TIMEOUT_MS, - LINGER_MS - ); + Map configMap = new HashMap<>(); + configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS); + configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS); + QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap)); Metrics metrics = new Metrics(time); persistentState.log.reopen();