diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 285163816f2ae..f7d7ef798bdec 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -406,7 +406,7 @@ - + diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index b3d5d9ef4805c..586ba1b4f9885 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -44,15 +44,15 @@ object TransactionCoordinator { metadataCache: MetadataCache, time: Time): TransactionCoordinator = { - val txnConfig = TransactionConfig(config.transactionalIdExpirationMs, - config.transactionMaxTimeoutMs, - config.transactionTopicPartitions, - config.transactionTopicReplicationFactor, - config.transactionTopicSegmentBytes, - config.transactionsLoadBufferSize, - config.transactionTopicMinISR, - config.transactionAbortTimedOutTransactionCleanupIntervalMs, - config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs, + val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs, + config.transactionStateManagerConfig.transactionMaxTimeoutMs, + config.transactionLogConfig.transactionTopicPartitions, + config.transactionLogConfig.transactionTopicReplicationFactor, + config.transactionLogConfig.transactionTopicSegmentBytes, + config.transactionLogConfig.transactionsLoadBufferSize, + config.transactionLogConfig.transactionTopicMinISR, + config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs, + config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs, config.requestTimeoutMs) val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 2894bbdd94b1e..3daf966d8c3ec 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.server.common.TransactionVersion import org.apache.kafka.server.config.ServerConfigs import org.apache.kafka.server.record.BrokerCompressionType @@ -92,13 +92,13 @@ class TransactionStateManager(brokerId: Int, @volatile private var transactionTopicPartitionCount: Int = _ /** setup metrics*/ - private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfigs.LOAD_TIME_SENSOR) + private val partitionLoadSensor = metrics.sensor(TransactionStateManagerConfig.LOAD_TIME_SENSOR) partitionLoadSensor.add(metrics.metricName("partition-load-time-max", - TransactionStateManagerConfigs.METRICS_GROUP, + TransactionStateManagerConfig.METRICS_GROUP, "The max time it took to load the partitions in the last 30sec"), new Max()) partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", - TransactionStateManagerConfigs.METRICS_GROUP, + TransactionStateManagerConfig.METRICS_GROUP, "The avg time it took to load the partitions in the last 30sec"), new Avg()) private[transaction] def usesFlexibleRecords(): Boolean = { @@ -809,15 +809,15 @@ private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int, private[transaction] case class CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int, transactionMetadata: TransactionMetadata) -private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, - transactionMaxTimeoutMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, - transactionLogNumPartitions: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, - transactionLogReplicationFactor: Short = TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, - transactionLogSegmentBytes: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, - transactionLogLoadBufferSize: Int = TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, - transactionLogMinInsyncReplicas: Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, - abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, - removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, +private[transaction] case class TransactionConfig(transactionalIdExpirationMs: Int = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, + transactionMaxTimeoutMs: Int = TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, + transactionLogNumPartitions: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, + transactionLogReplicationFactor: Short = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, + transactionLogSegmentBytes: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, + transactionLogLoadBufferSize: Int = TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, + transactionLogMinInsyncReplicas: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, + abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, + removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, requestTimeoutMs: Int = ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT) case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index e9d782bba9d37..3a447e22ef81a 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1588,9 +1588,9 @@ object LogManager { flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, - maxTransactionTimeoutMs = config.transactionMaxTimeoutMs, - producerStateManagerConfig = new ProducerStateManagerConfig(config.producerIdExpirationMs, config.transactionPartitionVerificationEnable), - producerIdExpirationCheckIntervalMs = config.producerIdExpirationCheckIntervalMs, + maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs, + producerStateManagerConfig = new ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, config.transactionLogConfig.transactionPartitionVerificationEnable), + producerIdExpirationCheckIntervalMs = config.transactionLogConfig.producerIdExpirationCheckIntervalMs, scheduler = kafkaScheduler, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index a5c0146f84c06..d0efdb910b878 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -240,8 +240,8 @@ class DefaultAutoTopicCreationManager( case TRANSACTION_STATE_TOPIC_NAME => new CreatableTopic() .setName(topic) - .setNumPartitions(config.transactionTopicPartitions) - .setReplicationFactor(config.transactionTopicReplicationFactor) + .setNumPartitions(config.transactionLogConfig.transactionTopicPartitions) + .setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor) .setConfigs(convertToTopicConfigCollections( txnCoordinator.transactionTopicConfigs)) case topicName => diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 23c99e5bcb3c0..a1d03cf9bce7a 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ProcessRole @@ -87,7 +87,7 @@ import scala.jdk.CollectionConverters._ object DynamicBrokerConfig { private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala - private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) + private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) val AllDynamicConfigs = DynamicSecurityConfigs ++ LogCleaner.ReconfigurableConfigs ++ @@ -1128,19 +1128,19 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) { - info(s"Reconfigure ${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") - producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) + if (producerStateManagerConfig.producerIdExpirationMs != newConfig.transactionLogConfig.producerIdExpirationMs) { + info(s"Reconfigure ${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.transactionLogConfig.producerIdExpirationMs}") + producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs) } - if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) { - info(s"Reconfigure ${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") - producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable) + if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionLogConfig.transactionPartitionVerificationEnable) { + info(s"Reconfigure ${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}") + producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable) } } def validateReconfiguration(newConfig: KafkaConfig): Unit = { - if (newConfig.producerIdExpirationMs < 0) - throw new ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") + if (newConfig.transactionLogConfig.producerIdExpirationMs < 0) + throw new ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.transactionLogConfig.producerIdExpirationMs}") } override def reconfigurableConfigs: Set[String] = DynamicProducerStateManagerConfig diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5f099b3172469..46bebf3c116e0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AuthorizerUtils @@ -238,6 +238,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _shareGroupConfig = new ShareGroupConfig(this) def shareGroupConfig: ShareGroupConfig = _shareGroupConfig + private val _transactionLogConfig = new TransactionLogConfig(this) + private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this) + def transactionLogConfig: TransactionLogConfig = _transactionLogConfig + def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig + private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided // Need to translate any system property value from true/false (String) to true/false (Boolean) @@ -596,22 +601,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) protocols } - /** ********* Transaction management configuration ***********/ - val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG) - val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG) - val transactionTopicMinISR = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG) - val transactionsLoadBufferSize = getInt(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG) - val transactionTopicReplicationFactor = getShort(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG) - val transactionTopicPartitions = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG) - val transactionTopicSegmentBytes = getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG) - val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG) - val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG) - - def transactionPartitionVerificationEnable = getBoolean(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) - - def producerIdExpirationMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG) - val producerIdExpirationCheckIntervalMs = getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG) - /** ********* Metric Configuration **************/ val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG) val metricSampleWindowMs = getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7edd80467ed97..a34c075822f0a 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -525,7 +525,7 @@ class KafkaServer( transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(1, true, "transaction-log-manager-"), () => producerIdManager, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup( - () => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions)) + () => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions)) /* start auto topic creation manager */ this.autoTopicCreationManager = AutoTopicCreationManager( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 09c3b1dc4d424..e9c114567d957 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig, ): Unit = { // Skip verification if the request is not transactional or transaction verification is disabled. if (transactionalId == null || - !config.transactionPartitionVerificationEnable + !config.transactionLogConfig.transactionPartitionVerificationEnable || addPartitionsToTxnManager.isEmpty ) { callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard])) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 680b88335815c..386d355a49c57 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -308,7 +308,7 @@ class BrokerMetadataPublisher( try { // Start the transaction coordinator. txnCoordinator.startup(() => metadataCache.numPartitions( - Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions)) + Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions)) } catch { case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t) } diff --git a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala index 5754dd12787fd..2e2847b3f4f06 100644 --- a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala @@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.server.config.ServerLogConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout} @@ -63,10 +63,10 @@ class AdminFenceProducersIntegrationTest extends IntegrationTestHarness { val props = new Properties() props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString) // Set a smaller value for the number of partitions for speed - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString) - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString) - props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "2000") + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString) + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString) + props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "2000") props } diff --git a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala index 5e181b9e82366..42addf1dd5477 100644 --- a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRA import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.{BeforeEach, TestInfo} @@ -114,9 +114,9 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest { properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") properties.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true") properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName) } diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index e053966c07989..7723c38ac5eb0 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -28,8 +28,8 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.apache.kafka.server.config.ServerConfigs @@ -91,9 +91,9 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName) } diff --git a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala index f9e5d79169543..69184a5e1580f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} @@ -204,7 +204,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { } private def producerIdExpirationConfig(configValue: String): util.Map[ConfigResource, util.Collection[AlterConfigOp]] = { - val producerIdCfg = new ConfigEntry(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue) + val producerIdCfg = new ConfigEntry(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue) val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET)) Collections.singletonMap(configResource, configs) } @@ -230,18 +230,18 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { // Set a smaller value for the number of partitions for the __consumer_offsets topic // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString) serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString) serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString) serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "5000") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500") - serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, "10000") - serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "5000") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500") + serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, "10000") + serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500") serverProps } } diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index f8440226caa51..7190c939b2fc0 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -27,8 +27,8 @@ import org.junit.jupiter.api.Assertions._ import kafka.utils.TestUtils import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -43,8 +43,8 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { val brokerCount = 1 this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") - this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") + this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val topic = "topic" diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index 3cb508958a482..ee47dcae05f18 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -24,8 +24,8 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.TopicPartition -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ @@ -57,8 +57,8 @@ class TransactionsBounceTest extends IntegrationTestHarness { overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 2.toString) overridingProps.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") // set small enough session timeout overridingProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) - overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 3.toString) + overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) + overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 3.toString) // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing diff --git a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala index a31385224fd08..6063d522992f1 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException} -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions.assertEquals @@ -205,18 +205,18 @@ class TransactionsExpirationTest extends KafkaServerTestHarness { // Set a smaller value for the number of partitions for the __consumer_offsets topic // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString) serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString) serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString) serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "10000") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500") - serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, "5000") - serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "10000") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500") + serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, "5000") + serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500") serverProps } } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 9c8b93012a45e..0a64ec9ed516a 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -63,14 +63,14 @@ class TransactionsTest extends IntegrationTestHarness { props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString) // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) - props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) + props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString) props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString) props.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString) props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") + props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") props } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala index ae73cde5e848f..4167485254902 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala @@ -25,7 +25,7 @@ import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -107,14 +107,14 @@ class TransactionsWithMaxInFlightOneTest extends KafkaServerTestHarness { serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString) serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString) - serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString) + serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString) serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString) serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString) serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString) serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") + serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") serverProps } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 01aef960dd4db..03a56d9a9e0dc 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -61,7 +61,7 @@ import org.apache.kafka.common.requests.MetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} @@ -1274,7 +1274,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup // Dynamically turn verification off. val configPrefix = listenerPrefix(SecureExternal) val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS, configPrefix) - updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") + updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") alterConfigsUsingConfigCommand(updatedProps) verifyConfiguration(false) @@ -1286,7 +1286,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyConfiguration(false) // Turn verification back on. - updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") + updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") alterConfigsUsingConfigCommand(updatedProps) verifyConfiguration(true) } diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 78fda569a36ae..a604884a06e00 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.utils.{Exit, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -52,8 +52,8 @@ object StressTestLog { scheduler = time.scheduler, time = time, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 0811cec64c528..adef2c6380986 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compre import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Exit, Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} import org.apache.kafka.server.util.CommandLineUtils import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} @@ -235,8 +235,8 @@ object TestLinearWriteSpeed { brokerTopicStats = new BrokerTopicStats, time = Time.SYSTEM, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 1f70ccfccd3d2..9711913036b38 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Uuid} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.ReplicationConfigs @@ -304,7 +304,7 @@ class PartitionLockTest extends Logging { val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, mockTime.scheduler) val maxTransactionTimeout = 5 * 60 * 1000 - val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) val producerStateManager = new ProducerStateManager( log.topicPartition, log.dir, diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 1d5e2677c67ab..27e343d5f0377 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -53,7 +53,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -439,7 +439,7 @@ class PartitionTest extends AbstractPartitionTest { val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, time.scheduler) val maxTransactionTimeoutMs = 5 * 60 * 1000 - val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true) + val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true) val producerStateManager = new ProducerStateManager( log.topicPartition, log.dir, diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 8ba9b1a8f798c..584a49bbc2c2d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult} import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch} -import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig import org.apache.kafka.server.util.MockScheduler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -1004,7 +1004,7 @@ class TransactionCoordinatorTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) val expectedTransition = TxnTransitMetadata(producerId, producerId, (producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH, - txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) + txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId), ArgumentMatchers.eq(coordinatorEpoch), @@ -1015,7 +1015,7 @@ class TransactionCoordinatorTest { ).thenAnswer(_ => {}) coordinator.startup(() => transactionStatePartitionCount, false) - time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) + time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) scheduler.tick() verify(transactionManager).timedOutTransactions() verify(transactionManager, times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId)) @@ -1064,7 +1064,7 @@ class TransactionCoordinatorTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, metadata)))) coordinator.startup(() => transactionStatePartitionCount, false) - time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) + time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) scheduler.tick() verify(transactionManager).timedOutTransactions() verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) @@ -1088,7 +1088,7 @@ class TransactionCoordinatorTest { val bumpedEpoch = (producerEpoch + 1).toShort val expectedTransition = TxnTransitMetadata(producerId, producerId, bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, - PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) + PrepareAbort, partitions.toSet, now, now + TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId), ArgumentMatchers.eq(coordinatorEpoch), @@ -1099,7 +1099,7 @@ class TransactionCoordinatorTest { ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS)) coordinator.startup(() => transactionStatePartitionCount, false) - time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) + time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT) scheduler.tick() verify(transactionManager).timedOutTransactions() diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index ff94377b2f1ff..874701685274c 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -114,8 +114,8 @@ abstract class AbstractLogCleanerIntegrationTest { time = time, brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true) diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 1495d688d5c02..36a08b875f4f6 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} @@ -65,8 +65,8 @@ class BrokerCompressionTest { time = time, brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 978b856a9fa11..44a8a90a39c05 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -56,7 +56,7 @@ class LogCleanerManagerTest extends Logging { val logConfig: LogConfig = new LogConfig(logProps) val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` val offset = 999 - val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() @@ -107,7 +107,7 @@ class LogCleanerManagerTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(10) val config = createLowRetentionLogConfig(logSegmentSize, TopicConfig.CLEANUP_POLICY_COMPACT) val maxTransactionTimeoutMs = 5 * 60 * 1000 - val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val segments = new LogSegments(tp) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) @@ -817,7 +817,7 @@ class LogCleanerManagerTest extends Logging { brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = producerStateManagerConfig, - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true) @@ -871,7 +871,7 @@ class LogCleanerManagerTest extends Logging { brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = producerStateManagerConfig, - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index f8350cf74278f..8db79e8186b48 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} @@ -64,7 +64,7 @@ class LogCleanerTest extends Logging { val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", time) val tombstoneRetentionMs = 86400000 val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1 - val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) @AfterEach def teardown(): Unit = { @@ -187,7 +187,7 @@ class LogCleanerTest extends Logging { val topicPartition = UnifiedLog.parseTopicPartitionName(dir) val logDirFailureChannel = new LogDirFailureChannel(10) val maxTransactionTimeoutMs = 5 * 60 * 1000 - val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val logSegments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( dir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) @@ -2057,7 +2057,7 @@ class LogCleanerTest extends Logging { brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = producerStateManagerConfig, - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala index 0793768bfbd89..126ebdd4e6971 100644 --- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala @@ -23,7 +23,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -152,8 +152,8 @@ class LogConcurrencyTest { brokerTopicStats = brokerTopicStats, time = Time.SYSTEM, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 5c0bcd3b4d7d3..10b7ba4a507a1 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.util.{MockTime, Scheduler} @@ -60,8 +60,8 @@ class LogLoaderTest { var config: KafkaConfig = _ val brokerTopicStats = new BrokerTopicStats val maxTransactionTimeoutMs: Int = 5 * 60 * 1000 - val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) - val producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) + val producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val tmpDir = TestUtils.tempDir() val logDir = TestUtils.randomPartitionLogDir(tmpDir) var logsToClose: Seq[UnifiedLog] = Seq() @@ -103,7 +103,7 @@ class LogLoaderTest { val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) val maxTransactionTimeoutMs = 5 * 60 * 1000 - val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT // Create a LogManager with some overridden methods to facilitate interception of clean shutdown // flag and to inject an error @@ -353,7 +353,7 @@ class LogLoaderTest { def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = { val maxTransactionTimeoutMs = 5 * 60 * 1000 - val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) val logDirFailureChannel = new LogDirFailureChannel(10) // Intercept all segment read calls @@ -512,7 +512,7 @@ class LogLoaderTest { firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch) assertEquals(firstAppendTimestamp, log.producerStateManager.lastEntry(producerId).get.lastTimestamp) - val maxProducerIdExpirationMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT + val maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT mockTime.sleep(maxProducerIdExpirationMs) assertEquals(Optional.empty(), log.producerStateManager.lastEntry(producerId)) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 9e96ec16bdce1..2d5d6a849b846 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopic import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.{TopicImage, TopicsImage} import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} @@ -829,7 +829,7 @@ class LogManagerTest { val segmentBytes = 1024 val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, - 5 * 60 * 1000, new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT) + 5 * 60 * 1000, new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT) assertTrue(expectedSegmentsPerLog > 0) // calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024 @@ -965,7 +965,7 @@ class LogManagerTest { recoveryPoint = 0, maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, scheduler = mock(classOf[Scheduler]), time = mockTime, brokerTopicStats = mockBrokerTopicStats, @@ -1375,8 +1375,8 @@ class LogManagerTest { flushStartOffsetCheckpointMs = 10000L, retentionCheckMs = 1000L, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, scheduler = scheduler, time = Time.SYSTEM, brokerTopicStats = new BrokerTopicStats, diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index e2272941ab3d6..a715bcc1ba96c 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache @@ -640,7 +640,7 @@ class LogSegmentTest { topicPartition, logDir, 5 * 60 * 1000, - new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), new MockTime() ) } diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 2803786125391..12562e4aac302 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile @@ -99,8 +99,8 @@ object LogTestUtils { logStartOffset: Long = 0L, recoveryPoint: Long = 0L, maxTransactionTimeoutMs: Int = 5 * 60 * 1000, - producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, + producerStateManagerConfig: ProducerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, lastShutdownClean: Boolean = true, topicId: Option[Uuid] = None, keepPartitionMetadataFile: Boolean = true, diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 3c5a8ab468b37..1c617c20d36ac 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} @@ -66,7 +66,7 @@ class UnifiedLogTest { val logDir = TestUtils.randomPartitionLogDir(tmpDir) val mockTime = new MockTime() var logsToClose: Seq[UnifiedLog] = Seq() - val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) def metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala @BeforeEach @@ -4452,7 +4452,7 @@ class UnifiedLogTest { time: Time = mockTime, maxTransactionTimeoutMs: Int = 60 * 60 * 1000, producerStateManagerConfig: ProducerStateManagerConfig = producerStateManagerConfig, - producerIdExpirationCheckIntervalMs: Int = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, lastShutdownClean: Boolean = true, topicId: Option[Uuid] = None, keepPartitionMetadataFile: Boolean = true, diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala index 9732b0c025dc4..dbd7febaaed6b 100644 --- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala @@ -36,8 +36,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.{SecurityUtils, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.config.ServerConfigs import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} @@ -69,10 +69,10 @@ class AutoTopicCreationManagerTest { props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString) - props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString) + props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString) props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) - props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) + props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) config = KafkaConfig.fromProps(props) val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) @@ -358,7 +358,7 @@ class AutoTopicCreationManagerTest { case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName, numPartitions = config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = config.groupCoordinatorConfig.offsetsTopicReplicationFactor) case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName, - numPartitions = config.transactionTopicPartitions, replicationFactor = config.transactionTopicReplicationFactor) + numPartitions = config.transactionLogConfig.transactionTopicPartitions, replicationFactor = config.transactionLogConfig.transactionTopicReplicationFactor) } } else { getNewTopic(topicName) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index bb460bac25dba..f7102d1952781 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -76,7 +76,7 @@ import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AclEntry @@ -1339,8 +1339,8 @@ class KafkaApisTest extends Logging { groupId, AuthorizationResult.ALLOWED) Topic.GROUP_METADATA_TOPIC_NAME case CoordinatorType.TRANSACTION => - topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) - topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) + topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) + topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.TRANSACTIONAL_ID, groupId, AuthorizationResult.ALLOWED) @@ -1458,8 +1458,8 @@ class KafkaApisTest extends Logging { true case Topic.TRANSACTION_STATE_TOPIC_NAME => - topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) - topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) + topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, numBrokersNeeded.toString) + topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, numBrokersNeeded.toString) when(txnCoordinator.transactionTopicConfigs).thenReturn(new Properties) true case _ => @@ -8879,7 +8879,7 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(offsetCommitRequest) - + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index f120784870ab9..b056c19b3f13c 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} +import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.PasswordEncoderConfigs @@ -982,13 +982,13 @@ class KafkaConfigTest { case GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") - case TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 29c4c602c17e0..fa77e38c7c00e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image._ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} @@ -2549,9 +2549,9 @@ class ReplicaManagerTest { // Dynamically enable verification. config.dynamicConfig.initialize(None, None) val props = new Properties() - props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") + props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) - TestUtils.waitUntilTrue(() => config.transactionPartitionVerificationEnable == true, "Config did not dynamically update.") + TestUtils.waitUntilTrue(() => config.transactionLogConfig.transactionPartitionVerificationEnable == true, "Config did not dynamically update.") // Try to append more records. We don't need to send a request since the transaction is already ongoing. val moreTransactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 1, @@ -2601,9 +2601,9 @@ class ReplicaManagerTest { // Disable verification config.dynamicConfig.initialize(None, None) val props = new Properties() - props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") + props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) - TestUtils.waitUntilTrue(() => config.transactionPartitionVerificationEnable == false, "Config did not dynamically update.") + TestUtils.waitUntilTrue(() => config.transactionLogConfig.transactionPartitionVerificationEnable == false, "Config did not dynamically update.") // Confirm we did not write to the log and instead returned error. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 04d42dadd75be..f8f5eb8b62a76 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion} @@ -93,8 +93,8 @@ class DumpLogSegmentsTest { time = time, brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 917f53eed605d..d9004824080ff 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic._ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import kafka.log.{LocalLog, LogLoader, UnifiedLog} import kafka.utils.TestUtils.retry -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.{LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -134,8 +134,8 @@ class SchedulerTest { val logConfig = new LogConfig(new Properties()) val brokerTopicStats = new BrokerTopicStats val maxTransactionTimeoutMs = 5 * 60 * 1000 - val maxProducerIdExpirationMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT - val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT + val maxProducerIdExpirationMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT + val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) val logDirFailureChannel = new LogDirFailureChannel(10) val segments = new LogSegments(topicPartition) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c29769e99edd2..c6f20214f32f1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.Utils.formatAddress import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.queue.KafkaEventQueue @@ -1194,8 +1194,8 @@ object TestUtils extends Logging { flushStartOffsetCheckpointMs = 10000L, retentionCheckMs = 1000L, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, transactionVerificationEnabled), - producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, transactionVerificationEnabled), + producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index ea7f3b78ed4dd..b8479642e724b 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; -import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.security.PasswordEncoderConfigs; @@ -57,8 +57,8 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { CleanerConfig.CONFIG_DEF, LogConfig.SERVER_CONFIG_DEF, ShareGroupConfig.CONFIG_DEF, - TransactionLogConfigs.CONFIG_DEF, - TransactionStateManagerConfigs.CONFIG_DEF, + TransactionLogConfig.CONFIG_DEF, + TransactionStateManagerConfig.CONFIG_DEF, QuorumConfig.CONFIG_DEF, MetricConfigs.CONFIG_DEF, QuotaConfigs.CONFIG_DEF, diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 9f5eee05c9942..acff9bc6f2969 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -59,7 +59,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static org.apache.kafka.coordinator.transaction.TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT; import static org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index dc82464107bbc..f4f03b9833076 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.server.config.ConfigType; import org.apache.kafka.server.config.ServerConfigs; @@ -124,7 +124,7 @@ public void start() throws IOException { putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0); putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1); putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5); - putIfAbsent(brokerConfig, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5); + putIfAbsent(brokerConfig, TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5); putIfAbsent(brokerConfig, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true); for (int i = 0; i < brokers.length; i++) { diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java similarity index 59% rename from transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java rename to transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java index b80a87c6e869e..c8f236f4d6a79 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; @@ -25,7 +26,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; -public final class TransactionLogConfigs { +public final class TransactionLogConfig { // Log-level config and default values public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; @@ -61,15 +62,67 @@ public final class TransactionLogConfigs { public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 600000; public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = "The interval at which to remove producer IDs that have expired due to producer.id.expiration.ms passing."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) + .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_MIN_ISR_DOC) + .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) + .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) + .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_PARTITIONS_DOC) + .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) - .define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) + .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) - .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC) + .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_MS_DOC) // Configuration for testing only as default value should be sufficient for typical usage - .defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + + private final AbstractConfig config; + private final int transactionTopicMinISR; + private final int transactionsLoadBufferSize; + private final short transactionTopicReplicationFactor; + private final int transactionTopicPartitions; + private final int transactionTopicSegmentBytes; + private final int producerIdExpirationCheckIntervalMs; + + public TransactionLogConfig(AbstractConfig config) { + this.config = config; + this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); + this.transactionsLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); + this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); + this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); + this.producerIdExpirationCheckIntervalMs = config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG); + } + + public int transactionTopicMinISR() { + return transactionTopicMinISR; + } + + public int transactionsLoadBufferSize() { + return transactionsLoadBufferSize; + } + + public short transactionTopicReplicationFactor() { + return transactionTopicReplicationFactor; + } + + public int transactionTopicPartitions() { + return transactionTopicPartitions; + } + + public int transactionTopicSegmentBytes() { + return transactionTopicSegmentBytes; + } + + public int producerIdExpirationCheckIntervalMs() { + return producerIdExpirationCheckIntervalMs; + } + + // This is a broker dynamic config used for DynamicProducerStateManagerConfig + public Boolean transactionPartitionVerificationEnable() { + return config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); + } + + // This is a broker dynamic config used for DynamicProducerStateManagerConfig + public int producerIdExpirationMs() { + return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG); + } } diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java similarity index 61% rename from transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java rename to transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java index 4da091afc52eb..46dfb46d129c3 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import java.util.concurrent.TimeUnit; @@ -25,7 +26,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.INT; -public final class TransactionStateManagerConfigs { +public final class TransactionStateManagerConfig { // Transaction management configs and default values public static final String TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG = "transaction.max.timeout.ms"; public static final int TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT = (int) TimeUnit.MINUTES.toMillis(15); @@ -49,9 +50,35 @@ public final class TransactionStateManagerConfigs { public static final String METRICS_GROUP = "transaction-coordinator-metrics"; public static final String LOAD_TIME_SENSOR = "TransactionsPartitionLoadTime"; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) - .define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC) - .define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC) - .define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC); + .define(TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, TRANSACTIONAL_ID_EXPIRATION_MS_DOC) + .define(TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_MAX_TIMEOUT_MS_DOC) + .define(TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC) + .define(TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC); + private final int transactionalIdExpirationMs; + private final int transactionMaxTimeoutMs; + private final int transactionAbortTimedOutTransactionCleanupIntervalMs; + private final int transactionRemoveExpiredTransactionalIdCleanupIntervalMs; + + public TransactionStateManagerConfig(AbstractConfig config) { + transactionalIdExpirationMs = config.getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG); + transactionMaxTimeoutMs = config.getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG); + transactionAbortTimedOutTransactionCleanupIntervalMs = config.getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG); + transactionRemoveExpiredTransactionalIdCleanupIntervalMs = config.getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG); + } + public int transactionalIdExpirationMs() { + return transactionalIdExpirationMs; + } + + public int transactionMaxTimeoutMs() { + return transactionMaxTimeoutMs; + } + + public int transactionAbortTimedOutTransactionCleanupIntervalMs() { + return transactionAbortTimedOutTransactionCleanupIntervalMs; + } + + public int transactionRemoveExpiredTransactionalIdCleanupIntervalMs() { + return transactionRemoveExpiredTransactionalIdCleanupIntervalMs; + } }