From 15f3218d98e9cd98c63ce410249da5b6a892a8ea Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Wed, 30 Oct 2019 15:39:58 -0700 Subject: [PATCH 1/5] MINOR: improve test coverage for dynamic LogConfig(s) --- .../unit/kafka/server/KafkaConfigTest.scala | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2709ee6047014..2fa9b5fd001ef 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,10 +17,12 @@ package kafka.server +import java.util.Collections import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_8_2} import kafka.cluster.EndPoint +import kafka.log.LogConfig import kafka.message._ import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException @@ -766,6 +768,143 @@ class KafkaConfigTest { }) } + @Test + def testDynamicLogConfigs(): Unit = { + def getBaseProperties(): Properties = { + val validRequiredProperties = new Properties() + validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + validRequiredProperties + } + + val props = getBaseProperties() + val config = KafkaConfig.fromProps(props) + + // Test dynamic log config values can be correctly passed through via KafkaConfig to LogConfig + // Every log config prop must be explicitly accounted for here + LogConfig.configNames.foreach { + case LogConfig.CleanupPolicyProp => + props.put(KafkaConfig.LogCleanupPolicyProp, Defaults.Compact) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(Collections.singletonList(Defaults.Compact), config.logCleanupPolicy) + case LogConfig.CompressionTypeProp => + val update = "lz4" + props.put(KafkaConfig.CompressionTypeProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.compressionType) + case LogConfig.SegmentBytesProp => + val update = 10000 + props.put(KafkaConfig.LogSegmentBytesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logSegmentBytes) + case LogConfig.SegmentMsProp => + val update = 10001L + props.put(KafkaConfig.LogRollTimeMillisProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logRollTimeMillis) + case LogConfig.DeleteRetentionMsProp => + val update = 10002L + props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logCleanerDeleteRetentionMs) + case LogConfig.FileDeleteDelayMsProp => + val update = 10003L + props.put(KafkaConfig.LogDeleteDelayMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logDeleteDelayMs) + case LogConfig.FlushMessagesProp => + val update = 10004L + props.put(KafkaConfig.LogFlushIntervalMessagesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logFlushIntervalMessages) + case LogConfig.FlushMsProp => + val update = 10005L + props.put(KafkaConfig.LogFlushIntervalMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logFlushIntervalMs) + case LogConfig.MaxCompactionLagMsProp => + val update = 10006L + props.put(KafkaConfig.LogCleanerMaxCompactionLagMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logCleanerMaxCompactionLagMs) + case LogConfig.IndexIntervalBytesProp => + val update = 10007 + props.put(KafkaConfig.LogIndexIntervalBytesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logIndexIntervalBytes) + case LogConfig.MaxMessageBytesProp => + val update = 10008 + props.put(KafkaConfig.MessageMaxBytesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.messageMaxBytes) + case LogConfig.MessageDownConversionEnableProp => + props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(false, config.logMessageDownConversionEnable) + case LogConfig.MessageTimestampDifferenceMaxMsProp => + val update = 10009 + props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logMessageTimestampDifferenceMaxMs) + case LogConfig.MessageTimestampTypeProp => + props.put(KafkaConfig.LogMessageTimestampTypeProp, "LogAppendTime") + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals("LogAppendTime", config.logMessageTimestampType.name) + case LogConfig.MinCleanableDirtyRatioProp => + val update = 0.01 + props.put(KafkaConfig.LogCleanerMinCleanRatioProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logCleanerMinCleanRatio) + case LogConfig.MinCompactionLagMsProp => + val update = 10010L + props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logCleanerMinCompactionLagMs) + case LogConfig.MinInSyncReplicasProp => + val update = 4 + props.put(KafkaConfig.MinInSyncReplicasProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.minInSyncReplicas) + case LogConfig.PreAllocateEnableProp => + val update = true + props.put(KafkaConfig.LogPreAllocateProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logPreAllocateEnable) + case LogConfig.RetentionBytesProp => + val update = 10011L + props.put(KafkaConfig.LogRetentionBytesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logRetentionBytes) + case LogConfig.RetentionMsProp => + val update = 10012L + props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logCleanerDeleteRetentionMs) + case LogConfig.SegmentIndexBytesProp => + val update = 10013 + props.put(KafkaConfig.LogIndexIntervalBytesProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logIndexIntervalBytes) + case LogConfig.SegmentJitterMsProp => + val update = 10014L + props.put(KafkaConfig.LogRollTimeJitterMillisProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.logRollTimeJitterMillis) + case LogConfig.UncleanLeaderElectionEnableProp => + val update = true + props.put(KafkaConfig.UncleanLeaderElectionEnableProp, update) + config.updateCurrentConfig(new KafkaConfig(props)) + assertEquals(update, config.uncleanLeaderElectionEnable) + case LogConfig.MessageFormatVersionProp => + // not dynamically updatable + case LogConfig.FollowerReplicationThrottledReplicasProp => + // topic only config + case LogConfig.LeaderReplicationThrottledReplicasProp => + // topic only config + case prop => + fail(prop + " must be explicitly checked for dynamic updatability") + } + } + @Test def testSpecificProperties(): Unit = { val defaults = new Properties() From 90c35b41e20d85f4d583ba01b77dfb9ff73f27a6 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Wed, 30 Oct 2019 15:44:41 -0700 Subject: [PATCH 2/5] Improve comment --- core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2fa9b5fd001ef..09770ebb31e53 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -780,7 +780,9 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) // Test dynamic log config values can be correctly passed through via KafkaConfig to LogConfig - // Every log config prop must be explicitly accounted for here + // Every log config prop must be explicitly accounted for here. + // A value other than the default value for this config should be set to ensure that we can check whether + // the value is dynamically updatable. LogConfig.configNames.foreach { case LogConfig.CleanupPolicyProp => props.put(KafkaConfig.LogCleanupPolicyProp, Defaults.Compact) @@ -901,7 +903,7 @@ class KafkaConfigTest { case LogConfig.LeaderReplicationThrottledReplicasProp => // topic only config case prop => - fail(prop + " must be explicitly checked for dynamic updatability") + fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.") } } From aa7d509f99d18bb53dc35344175e4a43b5d3b8f3 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Thu, 31 Oct 2019 10:17:55 -0700 Subject: [PATCH 3/5] Extract some boilerplate, fix jdk 1.8 compile --- .../unit/kafka/server/KafkaConfigTest.scala | 83 ++++++++----------- 1 file changed, 34 insertions(+), 49 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 09770ebb31e53..b7905f7c59541 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -779,122 +779,107 @@ class KafkaConfigTest { val props = getBaseProperties() val config = KafkaConfig.fromProps(props) + def setConfig(property: String, value: Any): Unit = { + props.put(property, value.toString) + config.updateCurrentConfig(new KafkaConfig(props)) + } + // Test dynamic log config values can be correctly passed through via KafkaConfig to LogConfig // Every log config prop must be explicitly accounted for here. // A value other than the default value for this config should be set to ensure that we can check whether // the value is dynamically updatable. LogConfig.configNames.foreach { case LogConfig.CleanupPolicyProp => - props.put(KafkaConfig.LogCleanupPolicyProp, Defaults.Compact) - config.updateCurrentConfig(new KafkaConfig(props)) - assertEquals(Collections.singletonList(Defaults.Compact), config.logCleanupPolicy) + val update = Defaults.Compact + setConfig(KafkaConfig.LogCleanupPolicyProp, update) + assertEquals(Collections.singletonList(update), config.logCleanupPolicy) case LogConfig.CompressionTypeProp => val update = "lz4" - props.put(KafkaConfig.CompressionTypeProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.CompressionTypeProp, update) assertEquals(update, config.compressionType) case LogConfig.SegmentBytesProp => val update = 10000 - props.put(KafkaConfig.LogSegmentBytesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogSegmentBytesProp, update) assertEquals(update, config.logSegmentBytes) case LogConfig.SegmentMsProp => val update = 10001L - props.put(KafkaConfig.LogRollTimeMillisProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogRollTimeMillisProp, update) assertEquals(update, config.logRollTimeMillis) case LogConfig.DeleteRetentionMsProp => val update = 10002L - props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) assertEquals(update, config.logCleanerDeleteRetentionMs) case LogConfig.FileDeleteDelayMsProp => val update = 10003L - props.put(KafkaConfig.LogDeleteDelayMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogDeleteDelayMsProp, update) assertEquals(update, config.logDeleteDelayMs) case LogConfig.FlushMessagesProp => val update = 10004L - props.put(KafkaConfig.LogFlushIntervalMessagesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogFlushIntervalMessagesProp, update) assertEquals(update, config.logFlushIntervalMessages) case LogConfig.FlushMsProp => val update = 10005L - props.put(KafkaConfig.LogFlushIntervalMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogFlushIntervalMsProp, update) assertEquals(update, config.logFlushIntervalMs) case LogConfig.MaxCompactionLagMsProp => val update = 10006L - props.put(KafkaConfig.LogCleanerMaxCompactionLagMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogCleanerMaxCompactionLagMsProp, update) assertEquals(update, config.logCleanerMaxCompactionLagMs) case LogConfig.IndexIntervalBytesProp => val update = 10007 - props.put(KafkaConfig.LogIndexIntervalBytesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogIndexIntervalBytesProp, update) assertEquals(update, config.logIndexIntervalBytes) case LogConfig.MaxMessageBytesProp => val update = 10008 - props.put(KafkaConfig.MessageMaxBytesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.MessageMaxBytesProp, update) assertEquals(update, config.messageMaxBytes) case LogConfig.MessageDownConversionEnableProp => - props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") - config.updateCurrentConfig(new KafkaConfig(props)) - assertEquals(false, config.logMessageDownConversionEnable) + val update = false + setConfig(KafkaConfig.LogMessageDownConversionEnableProp, update) + assertEquals(update, config.logMessageDownConversionEnable) case LogConfig.MessageTimestampDifferenceMaxMsProp => val update = 10009 - props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, update) assertEquals(update, config.logMessageTimestampDifferenceMaxMs) case LogConfig.MessageTimestampTypeProp => - props.put(KafkaConfig.LogMessageTimestampTypeProp, "LogAppendTime") - config.updateCurrentConfig(new KafkaConfig(props)) - assertEquals("LogAppendTime", config.logMessageTimestampType.name) + val update = "LogAppendTime" + setConfig(KafkaConfig.LogMessageTimestampTypeProp, update) + assertEquals(update, config.logMessageTimestampType.name) case LogConfig.MinCleanableDirtyRatioProp => val update = 0.01 - props.put(KafkaConfig.LogCleanerMinCleanRatioProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogCleanerMinCleanRatioProp, update) assertEquals(update, config.logCleanerMinCleanRatio) case LogConfig.MinCompactionLagMsProp => val update = 10010L - props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogCleanerMinCompactionLagMsProp, update) assertEquals(update, config.logCleanerMinCompactionLagMs) case LogConfig.MinInSyncReplicasProp => val update = 4 - props.put(KafkaConfig.MinInSyncReplicasProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.MinInSyncReplicasProp, update) assertEquals(update, config.minInSyncReplicas) case LogConfig.PreAllocateEnableProp => val update = true - props.put(KafkaConfig.LogPreAllocateProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogPreAllocateProp, update) assertEquals(update, config.logPreAllocateEnable) case LogConfig.RetentionBytesProp => val update = 10011L - props.put(KafkaConfig.LogRetentionBytesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogRetentionBytesProp, update) assertEquals(update, config.logRetentionBytes) case LogConfig.RetentionMsProp => val update = 10012L - props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) assertEquals(update, config.logCleanerDeleteRetentionMs) case LogConfig.SegmentIndexBytesProp => val update = 10013 - props.put(KafkaConfig.LogIndexIntervalBytesProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogIndexIntervalBytesProp, update) assertEquals(update, config.logIndexIntervalBytes) case LogConfig.SegmentJitterMsProp => val update = 10014L - props.put(KafkaConfig.LogRollTimeJitterMillisProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.LogRollTimeJitterMillisProp, update) assertEquals(update, config.logRollTimeJitterMillis) case LogConfig.UncleanLeaderElectionEnableProp => val update = true - props.put(KafkaConfig.UncleanLeaderElectionEnableProp, update) - config.updateCurrentConfig(new KafkaConfig(props)) + setConfig(KafkaConfig.UncleanLeaderElectionEnableProp, update) assertEquals(update, config.uncleanLeaderElectionEnable) case LogConfig.MessageFormatVersionProp => // not dynamically updatable From 239333f1490175e9b4f28841a7e744493570cb6d Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Thu, 31 Oct 2019 10:28:54 -0700 Subject: [PATCH 4/5] Check for actual change more explicitly --- .../unit/kafka/server/KafkaConfigTest.scala | 96 +++++-------------- 1 file changed, 26 insertions(+), 70 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b7905f7c59541..0e190fb54ab82 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -779,9 +779,11 @@ class KafkaConfigTest { val props = getBaseProperties() val config = KafkaConfig.fromProps(props) - def setConfig(property: String, value: Any): Unit = { + def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = { + val initial = accessor() props.put(property, value.toString) config.updateCurrentConfig(new KafkaConfig(props)) + assertNotEquals(initial, accessor()) } // Test dynamic log config values can be correctly passed through via KafkaConfig to LogConfig @@ -790,97 +792,51 @@ class KafkaConfigTest { // the value is dynamically updatable. LogConfig.configNames.foreach { case LogConfig.CleanupPolicyProp => - val update = Defaults.Compact - setConfig(KafkaConfig.LogCleanupPolicyProp, update) - assertEquals(Collections.singletonList(update), config.logCleanupPolicy) + assertDynamic(KafkaConfig.LogCleanupPolicyProp, Defaults.Compact, () => config.logCleanupPolicy) case LogConfig.CompressionTypeProp => - val update = "lz4" - setConfig(KafkaConfig.CompressionTypeProp, update) - assertEquals(update, config.compressionType) + assertDynamic(KafkaConfig.CompressionTypeProp, "lz4", () => config.compressionType) case LogConfig.SegmentBytesProp => - val update = 10000 - setConfig(KafkaConfig.LogSegmentBytesProp, update) - assertEquals(update, config.logSegmentBytes) + assertDynamic(KafkaConfig.LogSegmentBytesProp, 10000, () => config.logSegmentBytes) case LogConfig.SegmentMsProp => - val update = 10001L - setConfig(KafkaConfig.LogRollTimeMillisProp, update) - assertEquals(update, config.logRollTimeMillis) + assertDynamic(KafkaConfig.LogRollTimeMillisProp, 10001L, () => config.logRollTimeMillis) case LogConfig.DeleteRetentionMsProp => - val update = 10002L - setConfig(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) - assertEquals(update, config.logCleanerDeleteRetentionMs) + assertDynamic(KafkaConfig.LogCleanerDeleteRetentionMsProp, 10002L, () => config.logCleanerDeleteRetentionMs) case LogConfig.FileDeleteDelayMsProp => - val update = 10003L - setConfig(KafkaConfig.LogDeleteDelayMsProp, update) - assertEquals(update, config.logDeleteDelayMs) + assertDynamic(KafkaConfig.LogDeleteDelayMsProp, 10003L, () => config.logDeleteDelayMs) case LogConfig.FlushMessagesProp => - val update = 10004L - setConfig(KafkaConfig.LogFlushIntervalMessagesProp, update) - assertEquals(update, config.logFlushIntervalMessages) + assertDynamic(KafkaConfig.LogFlushIntervalMessagesProp, 10004L, () => config.logFlushIntervalMessages) case LogConfig.FlushMsProp => - val update = 10005L - setConfig(KafkaConfig.LogFlushIntervalMsProp, update) - assertEquals(update, config.logFlushIntervalMs) + assertDynamic(KafkaConfig.LogFlushIntervalMsProp, 10005L, () => config.logFlushIntervalMs) case LogConfig.MaxCompactionLagMsProp => - val update = 10006L - setConfig(KafkaConfig.LogCleanerMaxCompactionLagMsProp, update) - assertEquals(update, config.logCleanerMaxCompactionLagMs) + assertDynamic(KafkaConfig.LogCleanerMaxCompactionLagMsProp, 10006L, () => config.logCleanerMaxCompactionLagMs) case LogConfig.IndexIntervalBytesProp => - val update = 10007 - setConfig(KafkaConfig.LogIndexIntervalBytesProp, update) - assertEquals(update, config.logIndexIntervalBytes) + assertDynamic(KafkaConfig.LogIndexIntervalBytesProp, 10007, () => config.logIndexIntervalBytes) case LogConfig.MaxMessageBytesProp => - val update = 10008 - setConfig(KafkaConfig.MessageMaxBytesProp, update) - assertEquals(update, config.messageMaxBytes) + assertDynamic(KafkaConfig.MessageMaxBytesProp, 10008, () => config.messageMaxBytes) case LogConfig.MessageDownConversionEnableProp => - val update = false - setConfig(KafkaConfig.LogMessageDownConversionEnableProp, update) - assertEquals(update, config.logMessageDownConversionEnable) + assertDynamic(KafkaConfig.LogMessageDownConversionEnableProp, false, () => config.logMessageDownConversionEnable) case LogConfig.MessageTimestampDifferenceMaxMsProp => - val update = 10009 - setConfig(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, update) - assertEquals(update, config.logMessageTimestampDifferenceMaxMs) + assertDynamic(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, 10009, () => config.logMessageTimestampDifferenceMaxMs) case LogConfig.MessageTimestampTypeProp => - val update = "LogAppendTime" - setConfig(KafkaConfig.LogMessageTimestampTypeProp, update) - assertEquals(update, config.logMessageTimestampType.name) + assertDynamic(KafkaConfig.LogMessageTimestampTypeProp, "LogAppendTime", () => config.logMessageTimestampType.name) case LogConfig.MinCleanableDirtyRatioProp => - val update = 0.01 - setConfig(KafkaConfig.LogCleanerMinCleanRatioProp, update) - assertEquals(update, config.logCleanerMinCleanRatio) + assertDynamic(KafkaConfig.LogCleanerMinCleanRatioProp, 0.01, () => config.logCleanerMinCleanRatio) case LogConfig.MinCompactionLagMsProp => - val update = 10010L - setConfig(KafkaConfig.LogCleanerMinCompactionLagMsProp, update) - assertEquals(update, config.logCleanerMinCompactionLagMs) + assertDynamic(KafkaConfig.LogCleanerMinCompactionLagMsProp, 10010L, () => config.logCleanerMinCompactionLagMs) case LogConfig.MinInSyncReplicasProp => - val update = 4 - setConfig(KafkaConfig.MinInSyncReplicasProp, update) - assertEquals(update, config.minInSyncReplicas) + assertDynamic(KafkaConfig.MinInSyncReplicasProp, 4, () => config.minInSyncReplicas) case LogConfig.PreAllocateEnableProp => - val update = true - setConfig(KafkaConfig.LogPreAllocateProp, update) - assertEquals(update, config.logPreAllocateEnable) + assertDynamic(KafkaConfig.LogPreAllocateProp, true, () => config.logPreAllocateEnable) case LogConfig.RetentionBytesProp => - val update = 10011L - setConfig(KafkaConfig.LogRetentionBytesProp, update) - assertEquals(update, config.logRetentionBytes) + assertDynamic(KafkaConfig.LogRetentionBytesProp, 10011L, () => config.logRetentionBytes) case LogConfig.RetentionMsProp => - val update = 10012L - setConfig(KafkaConfig.LogCleanerDeleteRetentionMsProp, update) - assertEquals(update, config.logCleanerDeleteRetentionMs) + assertDynamic(KafkaConfig.LogCleanerDeleteRetentionMsProp, 10012L, () => config.logCleanerDeleteRetentionMs) case LogConfig.SegmentIndexBytesProp => - val update = 10013 - setConfig(KafkaConfig.LogIndexIntervalBytesProp, update) - assertEquals(update, config.logIndexIntervalBytes) + assertDynamic(KafkaConfig.LogIndexIntervalBytesProp, 10013, () => config.logIndexIntervalBytes) case LogConfig.SegmentJitterMsProp => - val update = 10014L - setConfig(KafkaConfig.LogRollTimeJitterMillisProp, update) - assertEquals(update, config.logRollTimeJitterMillis) + assertDynamic(KafkaConfig.LogRollTimeJitterMillisProp, 10014L, () => config.logRollTimeJitterMillis) case LogConfig.UncleanLeaderElectionEnableProp => - val update = true - setConfig(KafkaConfig.UncleanLeaderElectionEnableProp, update) - assertEquals(update, config.uncleanLeaderElectionEnable) + assertDynamic(KafkaConfig.UncleanLeaderElectionEnableProp, true, () => config.uncleanLeaderElectionEnable) case LogConfig.MessageFormatVersionProp => // not dynamically updatable case LogConfig.FollowerReplicationThrottledReplicasProp => From 246989aab37fe8cca769ee7d454692626f3ed44e Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Sun, 3 Nov 2019 17:14:29 -0800 Subject: [PATCH 5/5] Use LogConfig.TopicConfigSynonyms to supply kafka config props. Picked up two bugs in the checks --- .../unit/kafka/server/KafkaConfigTest.scala | 113 +++++++++--------- 1 file changed, 57 insertions(+), 56 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 0e190fb54ab82..725da19c800d7 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,7 +17,6 @@ package kafka.server -import java.util.Collections import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_8_2} @@ -790,61 +789,63 @@ class KafkaConfigTest { // Every log config prop must be explicitly accounted for here. // A value other than the default value for this config should be set to ensure that we can check whether // the value is dynamically updatable. - LogConfig.configNames.foreach { - case LogConfig.CleanupPolicyProp => - assertDynamic(KafkaConfig.LogCleanupPolicyProp, Defaults.Compact, () => config.logCleanupPolicy) - case LogConfig.CompressionTypeProp => - assertDynamic(KafkaConfig.CompressionTypeProp, "lz4", () => config.compressionType) - case LogConfig.SegmentBytesProp => - assertDynamic(KafkaConfig.LogSegmentBytesProp, 10000, () => config.logSegmentBytes) - case LogConfig.SegmentMsProp => - assertDynamic(KafkaConfig.LogRollTimeMillisProp, 10001L, () => config.logRollTimeMillis) - case LogConfig.DeleteRetentionMsProp => - assertDynamic(KafkaConfig.LogCleanerDeleteRetentionMsProp, 10002L, () => config.logCleanerDeleteRetentionMs) - case LogConfig.FileDeleteDelayMsProp => - assertDynamic(KafkaConfig.LogDeleteDelayMsProp, 10003L, () => config.logDeleteDelayMs) - case LogConfig.FlushMessagesProp => - assertDynamic(KafkaConfig.LogFlushIntervalMessagesProp, 10004L, () => config.logFlushIntervalMessages) - case LogConfig.FlushMsProp => - assertDynamic(KafkaConfig.LogFlushIntervalMsProp, 10005L, () => config.logFlushIntervalMs) - case LogConfig.MaxCompactionLagMsProp => - assertDynamic(KafkaConfig.LogCleanerMaxCompactionLagMsProp, 10006L, () => config.logCleanerMaxCompactionLagMs) - case LogConfig.IndexIntervalBytesProp => - assertDynamic(KafkaConfig.LogIndexIntervalBytesProp, 10007, () => config.logIndexIntervalBytes) - case LogConfig.MaxMessageBytesProp => - assertDynamic(KafkaConfig.MessageMaxBytesProp, 10008, () => config.messageMaxBytes) - case LogConfig.MessageDownConversionEnableProp => - assertDynamic(KafkaConfig.LogMessageDownConversionEnableProp, false, () => config.logMessageDownConversionEnable) - case LogConfig.MessageTimestampDifferenceMaxMsProp => - assertDynamic(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, 10009, () => config.logMessageTimestampDifferenceMaxMs) - case LogConfig.MessageTimestampTypeProp => - assertDynamic(KafkaConfig.LogMessageTimestampTypeProp, "LogAppendTime", () => config.logMessageTimestampType.name) - case LogConfig.MinCleanableDirtyRatioProp => - assertDynamic(KafkaConfig.LogCleanerMinCleanRatioProp, 0.01, () => config.logCleanerMinCleanRatio) - case LogConfig.MinCompactionLagMsProp => - assertDynamic(KafkaConfig.LogCleanerMinCompactionLagMsProp, 10010L, () => config.logCleanerMinCompactionLagMs) - case LogConfig.MinInSyncReplicasProp => - assertDynamic(KafkaConfig.MinInSyncReplicasProp, 4, () => config.minInSyncReplicas) - case LogConfig.PreAllocateEnableProp => - assertDynamic(KafkaConfig.LogPreAllocateProp, true, () => config.logPreAllocateEnable) - case LogConfig.RetentionBytesProp => - assertDynamic(KafkaConfig.LogRetentionBytesProp, 10011L, () => config.logRetentionBytes) - case LogConfig.RetentionMsProp => - assertDynamic(KafkaConfig.LogCleanerDeleteRetentionMsProp, 10012L, () => config.logCleanerDeleteRetentionMs) - case LogConfig.SegmentIndexBytesProp => - assertDynamic(KafkaConfig.LogIndexIntervalBytesProp, 10013, () => config.logIndexIntervalBytes) - case LogConfig.SegmentJitterMsProp => - assertDynamic(KafkaConfig.LogRollTimeJitterMillisProp, 10014L, () => config.logRollTimeJitterMillis) - case LogConfig.UncleanLeaderElectionEnableProp => - assertDynamic(KafkaConfig.UncleanLeaderElectionEnableProp, true, () => config.uncleanLeaderElectionEnable) - case LogConfig.MessageFormatVersionProp => - // not dynamically updatable - case LogConfig.FollowerReplicationThrottledReplicasProp => - // topic only config - case LogConfig.LeaderReplicationThrottledReplicasProp => - // topic only config - case prop => - fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.") + LogConfig.TopicConfigSynonyms.foreach { case (logConfig, kafkaConfigProp) => + logConfig match { + case LogConfig.CleanupPolicyProp => + assertDynamic(kafkaConfigProp, Defaults.Compact, () => config.logCleanupPolicy) + case LogConfig.CompressionTypeProp => + assertDynamic(kafkaConfigProp, "lz4", () => config.compressionType) + case LogConfig.SegmentBytesProp => + assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes) + case LogConfig.SegmentMsProp => + assertDynamic(kafkaConfigProp, 10001L, () => config.logRollTimeMillis) + case LogConfig.DeleteRetentionMsProp => + assertDynamic(kafkaConfigProp, 10002L, () => config.logCleanerDeleteRetentionMs) + case LogConfig.FileDeleteDelayMsProp => + assertDynamic(kafkaConfigProp, 10003L, () => config.logDeleteDelayMs) + case LogConfig.FlushMessagesProp => + assertDynamic(kafkaConfigProp, 10004L, () => config.logFlushIntervalMessages) + case LogConfig.FlushMsProp => + assertDynamic(kafkaConfigProp, 10005L, () => config.logFlushIntervalMs) + case LogConfig.MaxCompactionLagMsProp => + assertDynamic(kafkaConfigProp, 10006L, () => config.logCleanerMaxCompactionLagMs) + case LogConfig.IndexIntervalBytesProp => + assertDynamic(kafkaConfigProp, 10007, () => config.logIndexIntervalBytes) + case LogConfig.MaxMessageBytesProp => + assertDynamic(kafkaConfigProp, 10008, () => config.messageMaxBytes) + case LogConfig.MessageDownConversionEnableProp => + assertDynamic(kafkaConfigProp, false, () => config.logMessageDownConversionEnable) + case LogConfig.MessageTimestampDifferenceMaxMsProp => + assertDynamic(kafkaConfigProp, 10009, () => config.logMessageTimestampDifferenceMaxMs) + case LogConfig.MessageTimestampTypeProp => + assertDynamic(kafkaConfigProp, "LogAppendTime", () => config.logMessageTimestampType.name) + case LogConfig.MinCleanableDirtyRatioProp => + assertDynamic(kafkaConfigProp, 0.01, () => config.logCleanerMinCleanRatio) + case LogConfig.MinCompactionLagMsProp => + assertDynamic(kafkaConfigProp, 10010L, () => config.logCleanerMinCompactionLagMs) + case LogConfig.MinInSyncReplicasProp => + assertDynamic(kafkaConfigProp, 4, () => config.minInSyncReplicas) + case LogConfig.PreAllocateEnableProp => + assertDynamic(kafkaConfigProp, true, () => config.logPreAllocateEnable) + case LogConfig.RetentionBytesProp => + assertDynamic(kafkaConfigProp, 10011L, () => config.logRetentionBytes) + case LogConfig.RetentionMsProp => + assertDynamic(kafkaConfigProp, 10012L, () => config.logRetentionTimeMillis) + case LogConfig.SegmentIndexBytesProp => + assertDynamic(kafkaConfigProp, 10013, () => config.logIndexSizeMaxBytes) + case LogConfig.SegmentJitterMsProp => + assertDynamic(kafkaConfigProp, 10014L, () => config.logRollTimeJitterMillis) + case LogConfig.UncleanLeaderElectionEnableProp => + assertDynamic(kafkaConfigProp, true, () => config.uncleanLeaderElectionEnable) + case LogConfig.MessageFormatVersionProp => + // not dynamically updatable + case LogConfig.FollowerReplicationThrottledReplicasProp => + // topic only config + case LogConfig.LeaderReplicationThrottledReplicasProp => + // topic only config + case prop => + fail(prop + " must be explicitly checked for dynamic updatability. Note that LogConfig(s) require that KafkaConfig value lookups are dynamic and not static values.") + } } }