diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 472d7ef550b44..b0936d12f3eea 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -17,11 +17,14 @@ package kafka.server.metadata +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator + import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} -import kafka.log.UnifiedLog -import kafka.server.{BrokerServer, KafkaConfig} +import kafka.log.{LogManager, UnifiedLog} +import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET @@ -35,7 +38,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito import org.mockito.Mockito.doThrow @@ -178,15 +181,21 @@ class BrokerMetadataPublisherTest { private def newMockPublisher( broker: BrokerServer, + logManager: LogManager, + replicaManager: ReplicaManager, + groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, errorHandler: FaultHandler = new MockFaultHandler("publisher") ): BrokerMetadataPublisher = { + val mockLogManager = Mockito.mock(classOf[LogManager]) + Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) Mockito.spy(new BrokerMetadataPublisher( conf = broker.config, metadataCache = broker.metadataCache, - logManager = broker.logManager, - replicaManager = broker.replicaManager, - groupCoordinator = broker.groupCoordinator, - txnCoordinator = broker.transactionCoordinator, + logManager, + replicaManager, + groupCoordinator, + txnCoordinator, clientQuotaMetadataManager = broker.clientQuotaMetadataManager, dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap, _authorizer = Option.empty, @@ -195,7 +204,6 @@ class BrokerMetadataPublisherTest { )) } - @Disabled @Test def testReloadUpdatedFilesWithoutConfigChange(): Unit = { val cluster = new KafkaClusterTestKit.Builder( @@ -207,7 +215,14 @@ class BrokerMetadataPublisherTest { cluster.startup() cluster.waitForReadyBrokers() val broker = cluster.brokers().values().iterator().next() - val publisher = newMockPublisher(broker) + val mockLogManager = Mockito.mock(classOf[LogManager]) + Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) + val mockReplicaManager = Mockito.mock(classOf[ReplicaManager]) + val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator]) + val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator]) + + val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator) + val numTimesReloadCalled = new AtomicInteger(0) Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())). thenAnswer(new Answer[Unit]() {