diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index c57a95b1f8044..12bed9042b4d6 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -20,10 +20,8 @@ import java.io.File import java.net.InetSocketAddress import java.nio.file.Files import java.nio.file.Paths -import java.util.OptionalInt +import java.util.{OptionalInt, Properties, Collection => JCollection, Map => JMap} import java.util.concurrent.CompletableFuture -import java.util.{Map => JMap} -import java.util.{Collection => JCollection} import kafka.log.LogManager import kafka.log.UnifiedLog import kafka.server.KafkaConfig @@ -157,9 +155,16 @@ class KafkaRaftManager[T]( controllerListeners: Endpoints, fatalFaultHandler: FaultHandler ) extends RaftManager[T] with Logging { + val props = new Properties() + props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)) + props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)) val apiVersions = new ApiVersions() - private val raftConfig = new QuorumConfig(config) + private val raftConfig = new QuorumConfig(props) private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft") private val logContext = new LogContext(s"[RaftManager id=${config.nodeId}] ") this.logIdent = logContext.logPrefix() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6d3587da145d5..a50e8f9f69bd1 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -676,6 +676,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG) val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG) val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG) + private val _quorumConfig = new QuorumConfig(this) + def quorumConfig = _quorumConfig /** Internal Configurations **/ val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG) diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java index 05b27ada396e7..730ac5225f27a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java @@ -99,7 +99,14 @@ public class QuorumConfig { public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20; - public static final ConfigDef CONFIG_DEF = new ConfigDef() + private final AbstractConfig config; + + public QuorumConfig(AbstractConfig config) { + this.config = config; + } + + public static ConfigDef configDef() { + return new ConfigDef() .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC) .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC) .define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC) @@ -108,63 +115,30 @@ public class QuorumConfig { .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QUORUM_LINGER_MS_DOC) .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC) .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC); - - private final int requestTimeoutMs; - private final int retryBackoffMs; - private final int electionTimeoutMs; - private final int electionBackoffMaxMs; - private final int fetchTimeoutMs; - private final int appendLingerMs; - - public QuorumConfig(AbstractConfig abstractConfig) { - this( - abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG), - abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG), - abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG) - ); - } - - public QuorumConfig( - int requestTimeoutMs, - int retryBackoffMs, - int electionTimeoutMs, - int electionBackoffMaxMs, - int fetchTimeoutMs, - int appendLingerMs - ) { - this.requestTimeoutMs = requestTimeoutMs; - this.retryBackoffMs = retryBackoffMs; - this.electionTimeoutMs = electionTimeoutMs; - this.electionBackoffMaxMs = electionBackoffMaxMs; - this.fetchTimeoutMs = fetchTimeoutMs; - this.appendLingerMs = appendLingerMs; } public int requestTimeoutMs() { - return requestTimeoutMs; + return config.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG); } public int retryBackoffMs() { - return retryBackoffMs; + return config.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG); } public int electionTimeoutMs() { - return electionTimeoutMs; + return config.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG); } public int electionBackoffMaxMs() { - return electionBackoffMaxMs; + return config.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG); } public int fetchTimeoutMs() { - return fetchTimeoutMs; + return config.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG); } public int appendLingerMs() { - return appendLingerMs; + return config.getInt(QUORUM_LINGER_MS_CONFIG); } private static Integer parseVoterId(String idString) { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 31d696484f36b..86aa3415cfb12 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -79,6 +79,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.Properties; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index c57c441b98b5f..bf5feeb075997 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -59,6 +59,7 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.PriorityQueue; +import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -762,14 +763,14 @@ void start(int nodeId) { .stream() .collect(Collectors.toMap(Node::id, Cluster::nodeAddress)); - QuorumConfig quorumConfig = new QuorumConfig( - REQUEST_TIMEOUT_MS, - RETRY_BACKOFF_MS, - ELECTION_TIMEOUT_MS, - ELECTION_JITTER_MS, - FETCH_TIMEOUT_MS, - LINGER_MS - ); + Properties props = new Properties(); + props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); + props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS); + props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS); + props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS); + QuorumConfig quorumConfig = new QuorumConfig(props); Metrics metrics = new Metrics(time); persistentState.log.reopen(); diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index ea7f3b78ed4dd..22ce26f0d49c6 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -59,7 +59,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { ShareGroupConfig.CONFIG_DEF, TransactionLogConfigs.CONFIG_DEF, TransactionStateManagerConfigs.CONFIG_DEF, - QuorumConfig.CONFIG_DEF, + QuorumConfig.configDef(), MetricConfigs.CONFIG_DEF, QuotaConfigs.CONFIG_DEF, BrokerSecurityConfigs.CONFIG_DEF,