Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,10 @@ class BrokerServer(
authorizer
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler,
lifecycleManager
sharedServer.metadataPublishingFaultHandler
)
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
logManager.directoryIdsSet.toList.asJava,
() => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
metadataPublishers.add(brokerRegistrationTracker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.server.metadata
import java.util.{OptionalInt, Properties}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.server.{BrokerLifecycleManager, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
Expand Down Expand Up @@ -73,7 +73,6 @@ class BrokerMetadataPublisher(
aclPublisher: AclPublisher,
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler,
brokerLifecycleManager: BrokerLifecycleManager,
) extends MetadataPublisher with Logging {
logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.LogManager
import kafka.server.{BrokerLifecycleManager, BrokerServer, KafkaConfig, ReplicaManager}
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
Expand Down Expand Up @@ -200,8 +200,7 @@ class BrokerMetadataPublisherTest {
mock(classOf[DelegationTokenPublisher]),
mock(classOf[AclPublisher]),
faultHandler,
faultHandler,
mock(classOf[BrokerLifecycleManager]),
faultHandler
)

val image = MetadataImage.EMPTY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.image.publisher;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
Expand All @@ -26,8 +25,6 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;

import java.util.List;

/**
* Tracks the registration of a specific broker, and executes a callback if it should be refreshed.
*
Expand All @@ -49,12 +46,10 @@ public class BrokerRegistrationTracker implements MetadataPublisher {
* Create the tracker.
*
* @param id The ID of this broker.
* @param targetDirectories The directories managed by this broker.
* @param refreshRegistrationCallback Callback to run if we need to refresh the registration.
*/
public BrokerRegistrationTracker(
int id,
List<Uuid> targetDirectories,
Runnable refreshRegistrationCallback
) {
this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] ").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public class BrokerRegistrationTrackerTest {

static class BrokerRegistrationTrackerTestContext {
AtomicInteger numCalls = new AtomicInteger(0);
BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1,
Arrays.asList(B, A), () -> numCalls.incrementAndGet());
BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1, () -> numCalls.incrementAndGet());

MetadataImage image = MetadataImage.EMPTY;

Expand Down