From 28cde037d1257ac729014161f76b233ed4b25fa1 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Sep 2022 21:11:48 +0800 Subject: [PATCH 1/2] KAFKA-14233: do not init managers twice to avoid resource leak --- .../kafka/server/metadata/BrokerMetadataPublisherTest.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index b571e2d0abb86..5863454045f8b 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -207,6 +207,10 @@ class BrokerMetadataPublisherTest { cluster.waitForReadyBrokers() val broker = cluster.brokers().values().iterator().next() val publisher = newMockPublisher(broker) + // Since the we've initialize managers during cluster#startup above, + // we don't want the mock publisher to create them again to cause resource leak after cluster closed + publisher._firstPublish = false + val numTimesReloadCalled = new AtomicInteger(0) Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())). thenAnswer(new Answer[Unit]() { From 98628718708e84eb0407e45e3e35103edd58274f Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 29 Sep 2022 20:33:36 +0800 Subject: [PATCH 2/2] KAFKA-14242: mock logmanager and other managers to avoid duplicate resource allocation --- .../BrokerMetadataPublisherTest.scala | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 86f22db8e5947..b0936d12f3eea 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -17,11 +17,14 @@ package kafka.server.metadata +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator + import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} -import kafka.log.UnifiedLog -import kafka.server.{BrokerServer, KafkaConfig} +import kafka.log.{LogManager, UnifiedLog} +import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET @@ -35,7 +38,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito import org.mockito.Mockito.doThrow @@ -178,15 +181,21 @@ class BrokerMetadataPublisherTest { private def newMockPublisher( broker: BrokerServer, + logManager: LogManager, + replicaManager: ReplicaManager, + groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, errorHandler: FaultHandler = new MockFaultHandler("publisher") ): BrokerMetadataPublisher = { + val mockLogManager = Mockito.mock(classOf[LogManager]) + Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) Mockito.spy(new BrokerMetadataPublisher( conf = broker.config, metadataCache = broker.metadataCache, - logManager = broker.logManager, - replicaManager = broker.replicaManager, - groupCoordinator = broker.groupCoordinator, - txnCoordinator = broker.transactionCoordinator, + logManager, + replicaManager, + groupCoordinator, + txnCoordinator, clientQuotaMetadataManager = broker.clientQuotaMetadataManager, dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap, _authorizer = Option.empty, @@ -195,7 +204,6 @@ class BrokerMetadataPublisherTest { )) } - @Disabled @Test def testReloadUpdatedFilesWithoutConfigChange(): Unit = { val cluster = new KafkaClusterTestKit.Builder( @@ -207,10 +215,13 @@ class BrokerMetadataPublisherTest { cluster.startup() cluster.waitForReadyBrokers() val broker = cluster.brokers().values().iterator().next() - val publisher = newMockPublisher(broker) - // Since the we've initialize managers during cluster#startup above, - // we don't want the mock publisher to create them again to cause resource leak after cluster closed - publisher._firstPublish = false + val mockLogManager = Mockito.mock(classOf[LogManager]) + Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) + val mockReplicaManager = Mockito.mock(classOf[ReplicaManager]) + val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator]) + val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator]) + + val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator) val numTimesReloadCalled = new AtomicInteger(0) Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).