Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import java.io.File
import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Paths
import java.util.OptionalInt
import java.util.{OptionalInt, Properties, Collection => JCollection, Map => JMap}
import java.util.concurrent.CompletableFuture
import java.util.{Map => JMap}
import java.util.{Collection => JCollection}
import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
Expand Down Expand Up @@ -157,9 +155,16 @@ class KafkaRaftManager[T](
controllerListeners: Endpoints,
fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {
val props = new Properties()
props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG))
props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG))
props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG))
props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG))
props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG))
props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, config.getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG))

val apiVersions = new ApiVersions()
private val raftConfig = new QuorumConfig(config)
private val raftConfig = new QuorumConfig(props)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix the build error

private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
private val logContext = new LogContext(s"[RaftManager id=${config.nodeId}] ")
this.logIdent = logContext.logPrefix()
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)
val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
private val _quorumConfig = new QuorumConfig(this)
def quorumConfig = _quorumConfig
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use this to return quorum-related configs and then remove all related getters from KafkaConfig? for example: quorumElectionBackoffMs, quorumFetchTimeoutMs, quorumElectionTimeoutMs, quorumLingerMs and quorumRetryBackoffMs can be removed. quorumRequestTimeoutMs can be removed and its' callers should use QuorumConfig#requestTimeoutMs instead


/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
Expand Down
54 changes: 14 additions & 40 deletions raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ public class QuorumConfig {
public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;

public static final ConfigDef CONFIG_DEF = new ConfigDef()
private final AbstractConfig config;

public QuorumConfig(AbstractConfig config) {
this.config = config;
}

public static ConfigDef configDef() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this change?

return new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
.define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC)
Expand All @@ -108,63 +115,30 @@ public class QuorumConfig {
.define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QUORUM_LINGER_MS_DOC)
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);

private final int requestTimeoutMs;
private final int retryBackoffMs;
private final int electionTimeoutMs;
private final int electionBackoffMaxMs;
private final int fetchTimeoutMs;
private final int appendLingerMs;

public QuorumConfig(AbstractConfig abstractConfig) {
this(
abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG)
);
}

public QuorumConfig(
int requestTimeoutMs,
int retryBackoffMs,
int electionTimeoutMs,
int electionBackoffMaxMs,
int fetchTimeoutMs,
int appendLingerMs
) {
this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs;
this.electionTimeoutMs = electionTimeoutMs;
this.electionBackoffMaxMs = electionBackoffMaxMs;
this.fetchTimeoutMs = fetchTimeoutMs;
this.appendLingerMs = appendLingerMs;
}

public int requestTimeoutMs() {
return requestTimeoutMs;
return config.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
}

public int retryBackoffMs() {
return retryBackoffMs;
return config.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
}

public int electionTimeoutMs() {
return electionTimeoutMs;
return config.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
}

public int electionBackoffMaxMs() {
return electionBackoffMaxMs;
return config.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
}

public int fetchTimeoutMs() {
return fetchTimeoutMs;
return config.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
}

public int appendLingerMs() {
return appendLingerMs;
return config.getInt(QUORUM_LINGER_MS_CONFIG);
}

private static Integer parseVoterId(String idString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Properties;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unnecessary change

import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -762,14 +763,14 @@ void start(int nodeId) {
.stream()
.collect(Collectors.toMap(Node::id, Cluster::nodeAddress));

QuorumConfig quorumConfig = new QuorumConfig(
REQUEST_TIMEOUT_MS,
RETRY_BACKOFF_MS,
ELECTION_TIMEOUT_MS,
ELECTION_JITTER_MS,
FETCH_TIMEOUT_MS,
LINGER_MS
);
Properties props = new Properties();
props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
props.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS);
props.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS);
props.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS);
props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS);
QuorumConfig quorumConfig = new QuorumConfig(props);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix compile error

Metrics metrics = new Metrics(time);

persistentState.log.reopen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
ShareGroupConfig.CONFIG_DEF,
TransactionLogConfigs.CONFIG_DEF,
TransactionStateManagerConfigs.CONFIG_DEF,
QuorumConfig.CONFIG_DEF,
QuorumConfig.configDef(),
MetricConfigs.CONFIG_DEF,
QuotaConfigs.CONFIG_DEF,
BrokerSecurityConfigs.CONFIG_DEF,
Expand Down