Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class KafkaRaftManager[T](

private def buildNetworkChannel(): KafkaNetworkChannel = {
val (listenerName, netClient) = buildNetworkClient()
new KafkaNetworkChannel(time, listenerName, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
new KafkaNetworkChannel(time, listenerName, netClient, config.quorumConfig.requestTimeoutMs, threadNamePrefix)
}

private def createDataDir(): File = {
Expand Down Expand Up @@ -313,7 +313,7 @@ class KafkaRaftManager[T](
reconnectBackoffMsMs,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.socketReceiveBufferBytes,
config.quorumRequestTimeoutMs,
config.quorumConfig.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
time,
Expand Down
19 changes: 6 additions & 13 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

private val _quorumConfig = new QuorumConfig(this)
def quorumConfig: QuorumConfig = _quorumConfig

private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)

def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig
Expand Down Expand Up @@ -664,16 +667,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG)
def zstdCompressionLevel = getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG)

/** ********* Raft Quorum Configuration *********/
val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG)
val quorumBootstrapServers = getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG)
val quorumElectionTimeoutMs = getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
val quorumFetchTimeoutMs = getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
val quorumElectionBackoffMs = getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
val quorumLingerMs = getInt(QuorumConfig.QUORUM_LINGER_MS_CONFIG)
val quorumRequestTimeoutMs = getInt(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
val quorumRetryBackoffMs = getInt(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)

/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)
Expand Down Expand Up @@ -878,9 +871,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet

// validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
Expand All @@ -889,7 +882,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}
def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class KafkaRaftServer(
metaPropsEnsemble,
time,
metrics,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
new StandardFaultHandlerFactory(),
)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ class KafkaServer(
logger.info("Successfully deleted local metadata log. It will be re-created.")

// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumVoters)
val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumConfig.voters)
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
Expand All @@ -442,7 +442,7 @@ class KafkaServer(
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(quorumVoters),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
// Endpoint information is only needed for KRaft controllers (voters). ZK brokers
// (observers) can never be KRaft controllers
Endpoints.empty(),
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class TestRaftServer(
time,
metrics,
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build()
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class RaftManagerTest {
Time.SYSTEM,
new Metrics(Time.SYSTEM),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
mock(classOf[FaultHandler])
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ class KafkaConfigTest {
private def assertValidQuorumVoters(expectedVoters: util.Map[Integer, InetSocketAddress], value: String): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, value)
val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumVoters)
val addresses = QuorumConfig.parseVoterConnections(KafkaConfig.fromProps(props).quorumConfig.voters)
assertEquals(expectedVoters, addresses)
}

Expand All @@ -1477,7 +1477,7 @@ class KafkaConfigTest {
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092")

val addresses = QuorumConfig.parseBootstrapServers(
KafkaConfig.fromProps(props).quorumBootstrapServers
KafkaConfig.fromProps(props).quorumConfig.bootstrapServers
)

assertEquals(expected, addresses)
Expand Down
38 changes: 16 additions & 22 deletions raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public class QuorumConfig {
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);

private final List<String> voters;
private final List<String> bootstrapServers;
private final int requestTimeoutMs;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please keep those local variables since they are not dynamic?

private final int retryBackoffMs;
private final int electionTimeoutMs;
Expand All @@ -117,30 +119,22 @@ public class QuorumConfig {
private final int appendLingerMs;

public QuorumConfig(AbstractConfig abstractConfig) {
this(
abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG),
abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG),
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG),
abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG)
);
this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
this.bootstrapServers = abstractConfig.getList(QUORUM_BOOTSTRAP_SERVERS_CONFIG);
this.requestTimeoutMs = abstractConfig.getInt(QUORUM_REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = abstractConfig.getInt(QUORUM_RETRY_BACKOFF_MS_CONFIG);
this.electionTimeoutMs = abstractConfig.getInt(QUORUM_ELECTION_TIMEOUT_MS_CONFIG);
this.electionBackoffMaxMs = abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
this.fetchTimeoutMs = abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
}

public QuorumConfig(
int requestTimeoutMs,
int retryBackoffMs,
int electionTimeoutMs,
int electionBackoffMaxMs,
int fetchTimeoutMs,
int appendLingerMs
) {
this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs;
this.electionTimeoutMs = electionTimeoutMs;
this.electionBackoffMaxMs = electionBackoffMaxMs;
this.fetchTimeoutMs = fetchTimeoutMs;
this.appendLingerMs = appendLingerMs;
public List<String> voters() {
return voters;
}

public List<String> bootstrapServers() {
return bootstrapServers;
}

public int requestTimeoutMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
Expand Down Expand Up @@ -383,14 +384,14 @@ public RaftClientTestContext build() throws IOException {
Endpoints.empty() :
this.localListeners;

QuorumConfig quorumConfig = new QuorumConfig(
requestTimeoutMs,
RETRY_BACKOFF_MS,
electionTimeoutMs,
ELECTION_BACKOFF_MAX_MS,
FETCH_TIMEOUT_MS,
appendLingerMs
);
Map<String, Integer> configMap = new HashMap<>();
configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs);
configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_BACKOFF_MAX_MS);
configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs);
QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));

List<InetSocketAddress> computedBootstrapServers = bootstrapServers.orElseGet(() -> {
if (isStartingVotersStatic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -763,14 +764,14 @@ void start(int nodeId) {
.stream()
.collect(Collectors.toMap(Node::id, Cluster::nodeAddress));

QuorumConfig quorumConfig = new QuorumConfig(
REQUEST_TIMEOUT_MS,
RETRY_BACKOFF_MS,
ELECTION_TIMEOUT_MS,
ELECTION_JITTER_MS,
FETCH_TIMEOUT_MS,
LINGER_MS
);
Map<String, Integer> configMap = new HashMap<>();
configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, ELECTION_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_JITTER_MS);
configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, LINGER_MS);
QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
Metrics metrics = new Metrics(time);

persistentState.log.reopen();
Expand Down