Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package kafka.server.metadata

import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator

import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.UnifiedLog
import kafka.server.{BrokerServer, KafkaConfig}
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
Expand All @@ -35,7 +38,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
import org.mockito.Mockito.doThrow
Expand Down Expand Up @@ -178,15 +181,21 @@ class BrokerMetadataPublisherTest {

private def newMockPublisher(
broker: BrokerServer,
logManager: LogManager,
replicaManager: ReplicaManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
errorHandler: FaultHandler = new MockFaultHandler("publisher")
): BrokerMetadataPublisher = {
val mockLogManager = Mockito.mock(classOf[LogManager])
Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
Mockito.spy(new BrokerMetadataPublisher(
conf = broker.config,
metadataCache = broker.metadataCache,
logManager = broker.logManager,
replicaManager = broker.replicaManager,
groupCoordinator = broker.groupCoordinator,
txnCoordinator = broker.transactionCoordinator,
Comment on lines 192 to -189
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feed mock publisher with mock log manager and other managers to avoid duplicate resource allocation in initManager

logManager,
replicaManager,
groupCoordinator,
txnCoordinator,
clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
_authorizer = Option.empty,
Expand All @@ -195,7 +204,6 @@ class BrokerMetadataPublisherTest {
))
}

@Disabled
@Test
def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
Expand All @@ -207,7 +215,14 @@ class BrokerMetadataPublisherTest {
cluster.startup()
cluster.waitForReadyBrokers()
val broker = cluster.brokers().values().iterator().next()
val publisher = newMockPublisher(broker)
val mockLogManager = Mockito.mock(classOf[LogManager])
Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
val mockReplicaManager = Mockito.mock(classOf[ReplicaManager])
val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator])
val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator])

val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator)

val numTimesReloadCalled = new AtomicInteger(0)
Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
thenAnswer(new Answer[Unit]() {
Expand Down