From 8b3c352df1374bba936662bfc9a9961e0e109834 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Wed, 22 Apr 2020 17:45:10 +0100 Subject: [PATCH 1/3] Fixed bug in log validator tests. --- .../scala/unit/kafka/log/LogValidatorTest.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7d2738b91ae3d..41babb7f9f890 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -112,6 +112,11 @@ class LogValidatorTest { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) } + @Test + def testLogAppendTimeNonCompressedV2(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) + } + private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = { val now = System.currentTimeMillis() // The timestamps should be overwritten @@ -135,17 +140,16 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + // we index from last offset in version 2 instead of base offset + val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 + assertEquals(s"The offset of max timestamp should be $expectedMaxTimestampOffset", + expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, compressed = false) } - def testLogAppendTimeNonCompressedV2(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - @Test def testLogAppendTimeWithRecompressionV1(): Unit = { checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1) From d7436de345e0f6ae550a6d646efa8f8c8e5ba578 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Wed, 24 Jun 2020 09:05:30 +0100 Subject: [PATCH 2/3] Fixed memory leak. --- .../java/org/apache/kafka/clients/producer/MockProducer.java | 2 +- .../kafka/common/memory/GarbageCollectedMemoryPoolTest.java | 1 + .../org/apache/kafka/common/network/SslTransportLayerTest.java | 1 + .../expiring/ExpiringCredentialRefreshingLoginTest.java | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index ba89ab7317be5..9e3223ba5743c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -69,7 +69,7 @@ public class MockProducer implements Producer { private boolean producerFenced; private boolean sentOffsets; private long commitCount = 0L; - private Map mockMetrics; + private final Map mockMetrics; public RuntimeException initTransactionException = null; public RuntimeException beginTransactionException = null; diff --git a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java index 788d447551125..02b38564e30a3 100644 --- a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java @@ -80,6 +80,7 @@ public void testReleaseForeignBuffer() throws Exception { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null); ByteBuffer fellOffATruck = ByteBuffer.allocate(1); pool.release(fellOffATruck); + pool.close(); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 473881a18ca65..007302e094619 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -926,6 +926,7 @@ SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); server.close(); selector.close(); + serverChannelBuilder.close(); } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java index 3a39242927d43..f87405a1f8a77 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java @@ -368,6 +368,7 @@ public void testRefresh() throws Exception { inOrder.verify(mockLoginContext).login(); } } + testExpiringCredentialRefreshingLogin.close(); } } } From 895e5ffbbf6ed07ae50fb7f70aae849b19a04269 Mon Sep 17 00:00:00 2001 From: Leonard Ge Date: Wed, 24 Jun 2020 09:05:30 +0100 Subject: [PATCH 3/3] Fixed memory leak. --- .../java/org/apache/kafka/clients/producer/MockProducer.java | 2 +- .../kafka/common/memory/GarbageCollectedMemoryPoolTest.java | 1 + .../org/apache/kafka/common/network/SslTransportLayerTest.java | 1 + .../expiring/ExpiringCredentialRefreshingLoginTest.java | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index ba89ab7317be5..9e3223ba5743c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -69,7 +69,7 @@ public class MockProducer implements Producer { private boolean producerFenced; private boolean sentOffsets; private long commitCount = 0L; - private Map mockMetrics; + private final Map mockMetrics; public RuntimeException initTransactionException = null; public RuntimeException beginTransactionException = null; diff --git a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java index 788d447551125..02b38564e30a3 100644 --- a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java @@ -80,6 +80,7 @@ public void testReleaseForeignBuffer() throws Exception { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null); ByteBuffer fellOffATruck = ByteBuffer.allocate(1); pool.release(fellOffATruck); + pool.close(); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index ac94817dc8ffb..5f4f27b323fd2 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -903,6 +903,7 @@ SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); server.close(); selector.close(); + serverChannelBuilder.close(); } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java index 3a39242927d43..f87405a1f8a77 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java @@ -368,6 +368,7 @@ public void testRefresh() throws Exception { inOrder.verify(mockLoginContext).login(); } } + testExpiringCredentialRefreshingLogin.close(); } } }