From 651f59687f20c6c458ace0868ab21054406a04b3 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Wed, 24 Oct 2018 11:22:02 +0300 Subject: [PATCH 1/2] MINOR: Reduce intermittent test failures for testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics The problem was that the segment size for the logs was too small, triggering ~60 log rolls per tests. Sometimes that would pass the 15s timeout --- .../unit/kafka/log/AbstractLogCleanerIntegrationTest.scala | 2 +- .../kafka/log/LogCleanerParameterizedIntegrationTest.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 2a483fa95d90d..82480808a2982 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -42,7 +42,7 @@ abstract class AbstractLogCleanerIntegrationTest { private val defaultMinCleanableDirtyRatio = 0.0F private val defaultCompactionLag = 0L private val defaultDeleteDelay = 1000 - private val defaultSegmentSize = 256 + private val defaultSegmentSize = 5120 def time: MockTime diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 232cfdbf1c181..143d70a9adca8 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -96,7 +96,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A logProps.put(LogConfig.CleanupPolicyProp, "compact,delete") def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = { - cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L) + cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L, segmentSize = 2560) val log = cleaner.logs.get(topicPartitions(0)) val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec) @@ -190,10 +190,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A return val maxMessageSize = 192 - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256) val log = cleaner.logs.get(topicPartitions(0)) - val props = logConfigProperties(maxMessageSize = maxMessageSize) + val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256) props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version) log.config = new LogConfig(props) From 31c8f69008e4970f7fe7174fd5bb6688b04ab318 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Sat, 27 Oct 2018 20:14:49 +0300 Subject: [PATCH 2/2] Change LogCleaner integration tests' default segment size to 2048 --- .../unit/kafka/log/AbstractLogCleanerIntegrationTest.scala | 2 +- .../unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 82480808a2982..cc35f1d9ff655 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -42,7 +42,7 @@ abstract class AbstractLogCleanerIntegrationTest { private val defaultMinCleanableDirtyRatio = 0.0F private val defaultCompactionLag = 0L private val defaultDeleteDelay = 1000 - private val defaultSegmentSize = 5120 + private val defaultSegmentSize = 2048 def time: MockTime diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 143d70a9adca8..61e3ea57604d0 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -96,7 +96,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A logProps.put(LogConfig.CleanupPolicyProp, "compact,delete") def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = { - cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L, segmentSize = 2560) + cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L) val log = cleaner.logs.get(topicPartitions(0)) val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)