From f3e70fe7285bae66ed524a5f72e46b00f898f638 Mon Sep 17 00:00:00 2001 From: likangning <422766572@qq.com> Date: Thu, 19 Sep 2024 10:07:36 +0800 Subject: [PATCH 1/4] finish coding --- .../org/apache/kafka/raft/QuorumConfig.java | 47 ++++--------------- .../kafka/raft/RaftClientTestContext.java | 17 +++---- .../kafka/raft/RaftEventSimulationTest.java | 17 +++---- 3 files changed, 28 insertions(+), 53 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 05b27ada396e7..8470551781b45 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -47,7 +47,7 @@ * controller should be able to transition from standby to active without reloading all of * the metadata. The standby is a "hot" standby, not a "cold" one. */ -public class QuorumConfig { +public class QuorumConfig extends AbstractConfig { private static final String QUORUM_PREFIX = "controller.quorum."; @@ -109,62 +109,35 @@ public class QuorumConfig { .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); - private final int requestTimeoutMs; - private final int retryBackoffMs; - private final int electionTimeoutMs; - private final int electionBackoffMaxMs; - private final int fetchTimeoutMs; - private final int appendLingerMs; + private final AbstractConfig config; public QuorumConfig(AbstractConfig abstractConfig) { - this( - abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG), - abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG) - ); - } - - public QuorumConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMaxMs, - int fetchTimeoutMs, - int appendLingerMs - ) { - this.requestTimeoutMs = requestTimeoutMs; - this.retryBackoffMs = retryBackoffMs; - this.electionTimeoutMs = electionTimeoutMs; - this.electionBackoffMaxMs = electionBackoffMaxMs; - this.fetchTimeoutMs = fetchTimeoutMs; - this.appendLingerMs = appendLingerMs; + super(CONFIG_DEF, abstractConfig.originals()); + this.config = abstractConfig; } public int requestTimeoutMs() { - return requestTimeoutMs; + return config.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); } public int retryBackoffMs() { - return retryBackoffMs; + return config.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); } public int electionTimeoutMs() { - return electionTimeoutMs; + return config.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); } public int electionBackoffMaxMs() { - return electionBackoffMaxMs; + return config.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); } public int fetchTimeoutMs() { - return fetchTimeoutMs; + return config.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); } public int appendLingerMs() { - return appendLingerMs; + return config.getInt(QUORUM_LINGER_MS_CONFIG); } private static Integer parseVoterId(String idString) { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 0f1cfd6f3c43e..f9f1c9e0f95a0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; @@ -383,14 +384,14 @@ public RaftClientTestContext build() throws IOException { Endpoints.empty() : this.localListeners; - QuorumConfig quorumConfig = new QuorumConfig( - requestTimeoutMs, - RETRY_BACKOFF_MS, - electionTimeoutMs, - ELECTION_BACKOFF_MAX_MS, - FETCH_TIMEOUT_MS, - appendLingerMs - ); + Map configMap = new HashMap<>(); + configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs); + configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_BACKOFF_MAX_MS); + configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs); + QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap)); List computedBootstrapServers = bootstrapServers.orElseGet(() -> { if (isStartingVotersStatic) { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index cd8743635ca3e..7ecd4dc4e222c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.metrics.Metrics; @@ -763,14 +764,14 @@ void start(int nodeId) { .stream() .collect(Collectors.toMap(Node::id, Cluster::nodeAddress)); - QuorumConfig quorumConfig = new QuorumConfig( - REQUEST_TIMEOUT_MS, - RETRY_BACKOFF_MS, - ELECTION_TIMEOUT_MS, - ELECTION_JITTER_MS, - FETCH_TIMEOUT_MS, - LINGER_MS - ); + Map configMap = new HashMap<>(); + configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS); + configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS); + QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap)); Metrics metrics = new Metrics(time); persistentState.log.reopen(); From 92133406890ad59b7774a58026519e49bcdec001 Mon Sep 17 00:00:00 2001 From: likangning <422766572@qq.com> Date: Fri, 20 Sep 2024 10:13:37 +0800 Subject: [PATCH 2/4] remove extend AbstractConfig --- raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 8470551781b45..1ce6474de3529 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -47,7 +47,7 @@ * controller should be able to transition from standby to active without reloading all of * the metadata. The standby is a "hot" standby, not a "cold" one. */ -public class QuorumConfig extends AbstractConfig { +public class QuorumConfig { private static final String QUORUM_PREFIX = "controller.quorum."; @@ -112,7 +112,6 @@ public class QuorumConfig extends AbstractConfig { private final AbstractConfig config; public QuorumConfig(AbstractConfig abstractConfig) { - super(CONFIG_DEF, abstractConfig.originals()); this.config = abstractConfig; } From 93d4567ea312735580ea040cd169ffc8ca3c8a26 Mon Sep 17 00:00:00 2001 From: likangning <422766572@qq.com> Date: Mon, 23 Sep 2024 20:36:27 +0800 Subject: [PATCH 3/4] remove getter methods from KafkaConfig.scala --- .../main/scala/kafka/raft/RaftManager.scala | 4 +- .../main/scala/kafka/server/KafkaConfig.scala | 19 +++------- .../scala/kafka/server/KafkaRaftServer.scala | 4 +- .../main/scala/kafka/server/KafkaServer.scala | 4 +- .../scala/kafka/tools/TestRaftServer.scala | 4 +- .../unit/kafka/raft/RaftManagerTest.scala | 4 +- .../unit/kafka/server/KafkaConfigTest.scala | 4 +- .../org/apache/kafka/raft/QuorumConfig.java | 38 +++++++++++++++---- 8 files changed, 48 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index d248443e611af..bfde57c6706cf 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -247,7 +247,7 @@ class KafkaRaftManager[T]( private def buildNetworkChannel(): KafkaNetworkChannel = { val (listenerName, netClient) = buildNetworkClient() - new KafkaNetworkChannel(time, listenerName, netClient, config.quorumRequestTimeoutMs, threadNamePrefix) + new KafkaNetworkChannel(time, listenerName, netClient, config.quorumConfig.requestTimeoutMs, threadNamePrefix) } private def createDataDir(): File = { @@ -313,7 +313,7 @@ class KafkaRaftManager[T]( reconnectBackoffMsMs, Selectable.USE_DEFAULT_BUFFER_SIZE, config.socketReceiveBufferBytes, - config.quorumRequestTimeoutMs, + config.quorumConfig.requestTimeoutMs, config.connectionSetupTimeoutMs, config.connectionSetupTimeoutMaxMs, time, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 454308990c82b..be581e89905b1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,6 +231,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig + private val _quorumConfig = new QuorumConfig(this) + def quorumConfig = _quorumConfig + private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this) def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig @@ -664,16 +667,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG) def zstdCompressionLevel = getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG) - /** ********* Raft Quorum Configuration *********/ - val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG) - val quorumBootstrapServers = getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG) - val quorumElectionTimeoutMs = getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG) - val quorumFetchTimeoutMs = getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG) - val quorumElectionBackoffMs = getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG) - val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG) - val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG) - val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG) - /** Internal Configurations **/ val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG) val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG) @@ -878,9 +871,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet // validate KRaft-related configs - val voterIds = QuorumConfig.parseVoterIds(quorumVoters) + val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters) def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = { - if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) { throw new ConfigException( s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable @@ -889,7 +882,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = { - if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) { + if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) { throw new ConfigException( s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index dbc6f763eb91f..7f56600f8b017 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -70,8 +70,8 @@ class KafkaRaftServer( metaPropsEnsemble, time, metrics, - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), new StandardFaultHandlerFactory(), ) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0a9edc32c6bcb..1ebf6c7e89935 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -429,7 +429,7 @@ class KafkaServer( logger.info("Successfully deleted local metadata log. It will be re-created.") // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller - val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumVoters) + val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumConfig.voters) raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaPropsEnsemble.clusterId().get(), config, @@ -442,7 +442,7 @@ class KafkaServer( metrics, threadNamePrefix, CompletableFuture.completedFuture(quorumVoters), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), // Endpoint information is only needed for KRaft controllers (voters). ZK brokers // (observers) can never be KRaft controllers Endpoints.empty(), diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 070a09f52d6dd..5cc4c249eb367 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -104,8 +104,8 @@ class TestRaftServer( time, metrics, Some(threadNamePrefix), - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), endpoints, new ProcessTerminatingFaultHandler.Builder().build() ) diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 13a75007417f2..09b35318818e0 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -128,8 +128,8 @@ class RaftManagerTest { Time.SYSTEM, new Metrics(Time.SYSTEM), Option.empty, - CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), - QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers), + CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), endpoints, mock(classOf[FaultHandler]) ) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 66550cf140be8..c2fddfc33a53e 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1462,7 +1462,7 @@ class KafkaConfigTest { private def assertValidQuorumVoters(expectedVoters: util.Map[Integer, InetSocketAddress], value: String): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, value) - val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumVoters) + val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumConfig.voters) assertEquals(expectedVoters, addresses) } @@ -1477,7 +1477,7 @@ class KafkaConfigTest { props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092") val addresses = QuorumConfig.parseBootstrapServers( - KafkaConfig.fromProps(props).quorumBootstrapServers + KafkaConfig.fromProps(props).quorumConfig.bootstrapServers ) assertEquals(expected, addresses) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 1ce6474de3529..1a7fff83ee6b0 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -109,34 +109,56 @@ public class QuorumConfig { .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); - private final AbstractConfig config; + private final List voters; + private final List bootstrapServers; + private final int requestTimeoutMs; + private final int retryBackoffMs; + private final int electionTimeoutMs; + private final int electionBackoffMaxMs; + private final int fetchTimeoutMs; + private final int appendLingerMs; public QuorumConfig(AbstractConfig abstractConfig) { - this.config = abstractConfig; + this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG); + this.bootstrapServers = abstractConfig.getList(QUORUM_BOOTSTRAP_SERVERS_CONFIG); + this.requestTimeoutMs = abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); + this.retryBackoffMs = abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); + this.electionTimeoutMs = abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); + this.electionBackoffMaxMs = abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); + this.fetchTimeoutMs = abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); + this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG); + } + + public List voters() { + return voters; + } + + public List bootstrapServers() { + return bootstrapServers; } public int requestTimeoutMs() { - return config.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); + return requestTimeoutMs; } public int retryBackoffMs() { - return config.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); + return retryBackoffMs; } public int electionTimeoutMs() { - return config.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); + return electionTimeoutMs; } public int electionBackoffMaxMs() { - return config.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); + return electionBackoffMaxMs; } public int fetchTimeoutMs() { - return config.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); + return fetchTimeoutMs; } public int appendLingerMs() { - return config.getInt(QUORUM_LINGER_MS_CONFIG); + return appendLingerMs; } private static Integer parseVoterId(String idString) { From 7510bd545a9b2cce341e05ce3f710c24c8fd82bc Mon Sep 17 00:00:00 2001 From: likangning <422766572@qq.com> Date: Wed, 25 Sep 2024 09:10:22 +0800 Subject: [PATCH 4/4] declare the type explicitly --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index be581e89905b1..237da5c8e3b2e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -232,7 +232,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def remoteLogManagerConfig = _remoteLogManagerConfig private val _quorumConfig = new QuorumConfig(this) - def quorumConfig = _quorumConfig + def quorumConfig: QuorumConfig = _quorumConfig private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)