From 148510dff9412fcc555d662ed75046ea3f75d9fd Mon Sep 17 00:00:00 2001 From: Johnny Hsu Date: Wed, 19 Jun 2024 00:22:25 +0800 Subject: [PATCH 1/4] refactor with abstract config --- .../org/apache/kafka/raft/QuorumConfig.java | 48 +++++-------------- .../kafka/raft/RaftClientTestContext.java | 12 +---- .../kafka/raft/RaftEventSimulationTest.java | 17 +++---- .../server/config/AbstractKafkaConfig.java | 2 +- 4 files changed, 22 insertions(+), 57 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 05b27ada396e7..f2dd3c3c24af7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -47,7 +47,7 @@ * controller should be able to transition from standby to active without reloading all of * the metadata. The standby is a "hot" standby, not a "cold" one. */ -public class QuorumConfig { +public class QuorumConfig extends AbstractConfig { private static final String QUORUM_PREFIX = "controller.quorum."; @@ -99,7 +99,8 @@ public class QuorumConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; - public static final ConfigDef CONFIG_DEF = new ConfigDef() + public static ConfigDef configDef() { + return new ConfigDef() .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC) .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC) .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC) @@ -108,63 +109,36 @@ public class QuorumConfig { .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QUORUM_LINGER_MS_DOC) .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); - - private final int requestTimeoutMs; - private final int retryBackoffMs; - private final int electionTimeoutMs; - private final int electionBackoffMaxMs; - private final int fetchTimeoutMs; - private final int appendLingerMs; - - public QuorumConfig(AbstractConfig abstractConfig) { - this( - abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG), - abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG) - ); } public QuorumConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMaxMs, - int fetchTimeoutMs, - int appendLingerMs + Map props ) { - this.requestTimeoutMs = requestTimeoutMs; - this.retryBackoffMs = retryBackoffMs; - this.electionTimeoutMs = electionTimeoutMs; - this.electionBackoffMaxMs = electionBackoffMaxMs; - this.fetchTimeoutMs = fetchTimeoutMs; - this.appendLingerMs = appendLingerMs; + super(configDef(), props); } public int requestTimeoutMs() { - return requestTimeoutMs; + return getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); } public int retryBackoffMs() { - return retryBackoffMs; + return getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); } public int electionTimeoutMs() { - return electionTimeoutMs; + return getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); } public int electionBackoffMaxMs() { - return electionBackoffMaxMs; + return getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); } public int fetchTimeoutMs() { - return fetchTimeoutMs; + return getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); } public int appendLingerMs() { - return appendLingerMs; + return getInt(QUORUM_LINGER_MS_CONFIG); } private static Integer parseVoterId(String idString) { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 31d696484f36b..21d1e900c05bb 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -69,17 +69,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; -import java.util.Set; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index c57c441b98b5f..551a12fe320c7 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -60,6 +60,7 @@ import java.util.OptionalLong; import java.util.PriorityQueue; import java.util.Random; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -762,14 +763,14 @@ void start(int nodeId) { .stream() .collect(Collectors.toMap(Node::id, Cluster::nodeAddress)); - QuorumConfig quorumConfig = new QuorumConfig( - REQUEST_TIMEOUT_MS, - RETRY_BACKOFF_MS, - ELECTION_TIMEOUT_MS, - ELECTION_JITTER_MS, - FETCH_TIMEOUT_MS, - LINGER_MS - ); + Properties props = new Properties(); + props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS); + props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS); + QuorumConfig quorumConfig = new QuorumConfig(props); Metrics metrics = new Metrics(time); persistentState.log.reopen(); diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index ea7f3b78ed4dd..22ce26f0d49c6 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -59,7 +59,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { ShareGroupConfig.CONFIG_DEF, TransactionLogConfigs.CONFIG_DEF, TransactionStateManagerConfigs.CONFIG_DEF, - QuorumConfig.CONFIG_DEF, + QuorumConfig.configDef(), MetricConfigs.CONFIG_DEF, QuotaConfigs.CONFIG_DEF, BrokerSecurityConfigs.CONFIG_DEF, From 8878b7f42ded1d2277ce7589b718f4bd8c9ee931 Mon Sep 17 00:00:00 2001 From: Johnny Hsu Date: Wed, 19 Jun 2024 01:21:08 +0800 Subject: [PATCH 2/4] fix error --- core/src/main/scala/kafka/raft/RaftManager.scala | 13 +++++++++---- .../apache/kafka/raft/RaftClientTestContext.java | 13 ++++++++++++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index c57a95b1f8044..12bed9042b4d6 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -20,10 +20,8 @@ import java.io.File import java.net.InetSocketAddress import java.nio.file.Files import java.nio.file.Paths -import java.util.OptionalInt +import java.util.{OptionalInt, Properties, Collection => JCollection, Map => JMap} import java.util.concurrent.CompletableFuture -import java.util.{Map => JMap} -import java.util.{Collection => JCollection} import kafka.log.LogManager import kafka.log.UnifiedLog import kafka.server.KafkaConfig @@ -157,9 +155,16 @@ class KafkaRaftManager[T]( controllerListeners: Endpoints, fatalFaultHandler: FaultHandler ) extends RaftManager[T] with Logging { + val props = new Properties() + props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)) val apiVersions = new ApiVersions() - private val raftConfig = new QuorumConfig(config) + private val raftConfig = new QuorumConfig(props) private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft") private val logContext = new LogContext(s"[RaftManager id=${config.nodeId}] ") this.logIdent = logContext.logPrefix() diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 21d1e900c05bb..86aa3415cfb12 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -69,7 +69,18 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Properties; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; From d4b643cfd8ee092b773defc77c8269b232040997 Mon Sep 17 00:00:00 2001 From: Johnny Hsu Date: Wed, 19 Jun 2024 18:27:44 +0800 Subject: [PATCH 3/4] fix error --- .../java/org/apache/kafka/raft/RaftEventSimulationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 551a12fe320c7..bf5feeb075997 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -59,8 +59,8 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.PriorityQueue; -import java.util.Random; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; From d3c729c79c5174de68f2b626c2a4e97b7c72a0fd Mon Sep 17 00:00:00 2001 From: Johnny Hsu Date: Thu, 27 Jun 2024 20:55:03 +0800 Subject: [PATCH 4/4] save temp changes --- .../main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../org/apache/kafka/raft/QuorumConfig.java | 26 +++++++++---------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6d3587da145d5..a50e8f9f69bd1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -676,6 +676,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG) val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG) val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG) + private val _quorumConfig = new QuorumConfig(this) + def quorumConfig = _quorumConfig /** Internal Configurations **/ val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index f2dd3c3c24af7..730ac5225f27a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -47,7 +47,7 @@ * controller should be able to transition from standby to active without reloading all of * the metadata. The standby is a "hot" standby, not a "cold" one. */ -public class QuorumConfig extends AbstractConfig { +public class QuorumConfig { private static final String QUORUM_PREFIX = "controller.quorum."; @@ -99,6 +99,12 @@ public class QuorumConfig extends AbstractConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; + private final AbstractConfig config; + + public QuorumConfig(AbstractConfig config) { + this.config = config; + } + public static ConfigDef configDef() { return new ConfigDef() .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC) @@ -111,34 +117,28 @@ public static ConfigDef configDef() { .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); } - public QuorumConfig( - Map props - ) { - super(configDef(), props); - } - public int requestTimeoutMs() { - return getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); + return config.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); } public int retryBackoffMs() { - return getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); + return config.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); } public int electionTimeoutMs() { - return getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); + return config.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); } public int electionBackoffMaxMs() { - return getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); + return config.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); } public int fetchTimeoutMs() { - return getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); + return config.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); } public int appendLingerMs() { - return getInt(QUORUM_LINGER_MS_CONFIG); + return config.getInt(QUORUM_LINGER_MS_CONFIG); } private static Integer parseVoterId(String idString) {