Skip to content
Merged
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.network" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ object TransactionCoordinator {
metadataCache: MetadataCache,
time: Time): TransactionCoordinator = {

val txnConfig = TransactionConfig(config.transactionalIdExpirationMs,
config.transactionMaxTimeoutMs,
config.transactionTopicPartitions,
config.transactionTopicReplicationFactor,
config.transactionTopicSegmentBytes,
config.transactionsLoadBufferSize,
config.transactionTopicMinISR,
config.transactionAbortTimedOutTransactionCleanupIntervalMs,
config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
config.transactionLogConfig.transactionTopicPartitions,
config.transactionLogConfig.transactionTopicReplicationFactor,
config.transactionLogConfig.transactionTopicSegmentBytes,
config.transactionLogConfig.transactionsLoadBufferSize,
config.transactionLogConfig.transactionTopicMinISR,
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)

val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
Expand Down Expand Up @@ -92,13 +92,13 @@ class TransactionStateManager(brokerId: Int,
@volatile private var transactionTopicPartitionCount: Int = _

/** setup metrics*/
private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfigs.LOAD_TIME_SENSOR)
private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfig.LOAD_TIME_SENSOR)

partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
TransactionStateManagerConfigs.METRICS_GROUP,
TransactionStateManagerConfig.METRICS_GROUP,
"The max time it took to load the partitions in the last 30sec"), new Max())
partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
TransactionStateManagerConfigs.METRICS_GROUP,
TransactionStateManagerConfig.METRICS_GROUP,
"The avg time it took to load the partitions in the last 30sec"), new Avg())

private[transaction] def usesFlexibleRecords(): Boolean = {
Expand Down Expand Up @@ -809,15 +809,15 @@ private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int,
transactionMetadata: TransactionMetadata)

private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
transactionMaxTimeoutMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
transactionLogNumPartitions: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
transactionLogReplicationFactor: Short = TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
transactionLogSegmentBytes: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
transactionLogLoadBufferSize: Int = TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
transactionLogMinInsyncReplicas: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
transactionMaxTimeoutMs: Int = TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
transactionLogNumPartitions: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
transactionLogReplicationFactor: Short = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
transactionLogSegmentBytes: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
transactionLogLoadBufferSize: Int = TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
transactionLogMinInsyncReplicas: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
requestTimeoutMs: Int = ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)

case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1588,9 +1588,9 @@ object LogManager {
flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs, config.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs = config.producerIdExpirationCheckIntervalMs,
maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs,
producerStateManagerConfig = new ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, config.transactionLogConfig.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs = config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ class DefaultAutoTopicCreationManager(
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
.setNumPartitions(config.transactionTopicPartitions)
.setReplicationFactor(config.transactionTopicReplicationFactor)
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))
case topicName =>
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
Expand Down Expand Up @@ -87,7 +87,7 @@ import scala.jdk.CollectionConverters._
object DynamicBrokerConfig {

private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)

val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
Expand Down Expand Up @@ -1128,19 +1128,19 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi

class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) {
info(s"Reconfigure ${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}")
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
if (producerStateManagerConfig.producerIdExpirationMs != newConfig.transactionLogConfig.producerIdExpirationMs) {
info(s"Reconfigure ${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.transactionLogConfig.producerIdExpirationMs}")
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs)
}
if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) {
info(s"Reconfigure ${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}")
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionLogConfig.transactionPartitionVerificationEnable) {
info(s"Reconfigure ${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}")
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable)
}
}

def validateReconfiguration(newConfig: KafkaConfig): Unit = {
if (newConfig.producerIdExpirationMs < 0)
throw new ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}")
if (newConfig.transactionLogConfig.producerIdExpirationMs < 0)
throw new ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.transactionLogConfig.producerIdExpirationMs}")
}

override def reconfigurableConfigs: Set[String] = DynamicProducerStateManagerConfig
Expand Down
23 changes: 6 additions & 17 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
Expand Down Expand Up @@ -238,6 +238,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _shareGroupConfig = new ShareGroupConfig(this)
def shareGroupConfig: ShareGroupConfig = _shareGroupConfig

private val _transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this)
def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig

private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
Expand Down Expand Up @@ -596,22 +601,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
protocols
}

/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
val transactionTopicMinISR = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG)
val transactionsLoadBufferSize = getInt(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG)
val transactionTopicReplicationFactor = getShort(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG)
val transactionTopicPartitions = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG)
val transactionTopicSegmentBytes = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG)

def transactionPartitionVerificationEnable = getBoolean(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)

def producerIdExpirationMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG)
val producerIdExpirationCheckIntervalMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG)

/** ********* Metric Configuration **************/
val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG)
val metricSampleWindowMs = getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ class KafkaServer(
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(1, true, "transaction-log-manager-"),
() => producerIdManager, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup(
() => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
() => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))

/* start auto topic creation manager */
this.autoTopicCreationManager = AutoTopicCreationManager(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
// Skip verification if the request is not transactional or transaction verification is disabled.
if (transactionalId == null ||
!config.transactionPartitionVerificationEnable
!config.transactionLogConfig.transactionPartitionVerificationEnable
|| addPartitionsToTxnManager.isEmpty
) {
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class BrokerMetadataPublisher(
try {
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
Expand Down Expand Up @@ -63,10 +63,10 @@ class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
val props = new Properties()
props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
// Set a smaller value for the number of partitions for speed
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString)
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString)
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "2000")
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString)
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString)
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString)
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "2000")
props
}

Expand Down
Loading