From 5aad61995ffc351a7a362eaaa3a46906de976a93 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Wed, 22 Apr 2020 17:45:10 +0100 Subject: [PATCH 1/4] Fixed bug in log validator tests. --- .../scala/unit/kafka/log/LogValidatorTest.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7d2738b91ae3d..41babb7f9f890 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -112,6 +112,11 @@ class LogValidatorTest { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) } + @Test + def testLogAppendTimeNonCompressedV2(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) + } + private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() // The timestamps should be overwritten @@ -135,17 +140,16 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + // we index from last offset in version 2 instead of base offset + val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 + assertEquals(s"The offset of max timestamp should be $expectedMaxTimestampOffset", + expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, compressed = false) } - def testLogAppendTimeNonCompressedV2(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - @Test def testLogAppendTimeWithRecompressionV1(): Unit = { checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1) From 8b3c352df1374bba936662bfc9a9961e0e109834 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Wed, 22 Apr 2020 17:45:10 +0100 Subject: [PATCH 2/4] Fixed bug in log validator tests. --- .../scala/unit/kafka/log/LogValidatorTest.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7d2738b91ae3d..41babb7f9f890 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -112,6 +112,11 @@ class LogValidatorTest { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) } + @Test + def testLogAppendTimeNonCompressedV2(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) + } + private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() // The timestamps should be overwritten @@ -135,17 +140,16 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + // we index from last offset in version 2 instead of base offset + val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 + assertEquals(s"The offset of max timestamp should be $expectedMaxTimestampOffset", + expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, compressed = false) } - def testLogAppendTimeNonCompressedV2(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - @Test def testLogAppendTimeWithRecompressionV1(): Unit = { checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1) From 4db31750a0dcdc51d962ba4fcb8d3043223aa9b3 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Mon, 27 Apr 2020 23:29:10 +0100 Subject: [PATCH 3/4] Removed duplicated check in controller. --- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 82be66ae2cbe2..c6916a2ac34f9 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1065,7 +1065,7 @@ class KafkaController(val config: KafkaConfig, if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress - val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && + val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && controllerContext.allTopics.contains(tp.topic) && From ad25337d3555ec5e7ec3ec48a3ccf56d7d14ae83 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Mon, 27 Apr 2020 23:33:14 +0100 Subject: [PATCH 4/4] Revert previous change. --- .../scala/unit/kafka/log/LogValidatorTest.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 41babb7f9f890..7d2738b91ae3d 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -112,11 +112,6 @@ class LogValidatorTest { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) } - @Test - def testLogAppendTimeNonCompressedV2(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() // The timestamps should be overwritten @@ -140,16 +135,17 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) - // we index from last offset in version 2 instead of base offset - val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 - assertEquals(s"The offset of max timestamp should be $expectedMaxTimestampOffset", - expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, compressed = false) } + def testLogAppendTimeNonCompressedV2(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) + } + @Test def testLogAppendTimeWithRecompressionV1(): Unit = { checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1)