diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 3013635a33e52..cda4d49f5fc87 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -21,6 +21,7 @@ import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_8_2} import kafka.cluster.EndPoint +import kafka.log.LogConfig import kafka.message._ import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException @@ -780,6 +781,88 @@ class KafkaConfigTest { }) } + @Test + def testDynamicLogConfigs(): Unit = { + def getBaseProperties(): Properties = { + val validRequiredProperties = new Properties() + validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + validRequiredProperties + } + + val props = getBaseProperties() + val config = KafkaConfig.fromProps(props) + + def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = { + val initial = accessor() + props.put(property, value.toString) + config.updateCurrentConfig(new KafkaConfig(props)) + assertNotEquals(initial, accessor()) + } + + // Test dynamic log config values can be correctly passed through via KafkaConfig to LogConfig + // Every log config prop must be explicitly accounted for here. + // A value other than the default value for this config should be set to ensure that we can check whether + // the value is dynamically updatable. + LogConfig.TopicConfigSynonyms.foreach { case (logConfig, kafkaConfigProp) => + logConfig match { + case LogConfig.CleanupPolicyProp => + assertDynamic(kafkaConfigProp, Defaults.Compact, () => config.logCleanupPolicy) + case LogConfig.CompressionTypeProp => + assertDynamic(kafkaConfigProp, "lz4", () => config.compressionType) + case LogConfig.SegmentBytesProp => + assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes) + case LogConfig.SegmentMsProp => + assertDynamic(kafkaConfigProp, 10001L, () => config.logRollTimeMillis) + case LogConfig.DeleteRetentionMsProp => + assertDynamic(kafkaConfigProp, 10002L, () => config.logCleanerDeleteRetentionMs) + case LogConfig.FileDeleteDelayMsProp => + assertDynamic(kafkaConfigProp, 10003L, () => config.logDeleteDelayMs) + case LogConfig.FlushMessagesProp => + assertDynamic(kafkaConfigProp, 10004L, () => config.logFlushIntervalMessages) + case LogConfig.FlushMsProp => + assertDynamic(kafkaConfigProp, 10005L, () => config.logFlushIntervalMs) + case LogConfig.MaxCompactionLagMsProp => + assertDynamic(kafkaConfigProp, 10006L, () => config.logCleanerMaxCompactionLagMs) + case LogConfig.IndexIntervalBytesProp => + assertDynamic(kafkaConfigProp, 10007, () => config.logIndexIntervalBytes) + case LogConfig.MaxMessageBytesProp => + assertDynamic(kafkaConfigProp, 10008, () => config.messageMaxBytes) + case LogConfig.MessageDownConversionEnableProp => + assertDynamic(kafkaConfigProp, false, () => config.logMessageDownConversionEnable) + case LogConfig.MessageTimestampDifferenceMaxMsProp => + assertDynamic(kafkaConfigProp, 10009, () => config.logMessageTimestampDifferenceMaxMs) + case LogConfig.MessageTimestampTypeProp => + assertDynamic(kafkaConfigProp, "LogAppendTime", () => config.logMessageTimestampType.name) + case LogConfig.MinCleanableDirtyRatioProp => + assertDynamic(kafkaConfigProp, 0.01, () => config.logCleanerMinCleanRatio) + case LogConfig.MinCompactionLagMsProp => + assertDynamic(kafkaConfigProp, 10010L, () => config.logCleanerMinCompactionLagMs) + case LogConfig.MinInSyncReplicasProp => + assertDynamic(kafkaConfigProp, 4, () => config.minInSyncReplicas) + case LogConfig.PreAllocateEnableProp => + assertDynamic(kafkaConfigProp, true, () => config.logPreAllocateEnable) + case LogConfig.RetentionBytesProp => + assertDynamic(kafkaConfigProp, 10011L, () => config.logRetentionBytes) + case LogConfig.RetentionMsProp => + assertDynamic(kafkaConfigProp, 10012L, () => config.logRetentionTimeMillis) + case LogConfig.SegmentIndexBytesProp => + assertDynamic(kafkaConfigProp, 10013, () => config.logIndexSizeMaxBytes) + case LogConfig.SegmentJitterMsProp => + assertDynamic(kafkaConfigProp, 10014L, () => config.logRollTimeJitterMillis) + case LogConfig.UncleanLeaderElectionEnableProp => + assertDynamic(kafkaConfigProp, true, () => config.uncleanLeaderElectionEnable) + case LogConfig.MessageFormatVersionProp => + // not dynamically updatable + case LogConfig.FollowerReplicationThrottledReplicasProp => + // topic only config + case LogConfig.LeaderReplicationThrottledReplicasProp => + // topic only config + case prop => + fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.") + } + } + } + @Test def testSpecificProperties(): Unit = { val defaults = new Properties()