Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/AlterPartitionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,14 @@ object AlterPartitionManager {
config: KafkaConfig,
metadataCache: MetadataCache,
scheduler: KafkaScheduler,
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)

val channelManager = BrokerToControllerChannelManager(
controllerNodeProvider = nodeProvider,
controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
Expand Down
49 changes: 27 additions & 22 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class BrokerServer(

var forwardingManager: ForwardingManager = null

var alterIsrManager: AlterPartitionManager = null
var alterPartitionManager: AlterPartitionManager = null

var autoTopicCreationManager: AutoTopicCreationManager = null

Expand Down Expand Up @@ -241,24 +241,17 @@ class BrokerServer(

clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)

val alterIsrChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
alterPartitionManager = AlterPartitionManager(
config,
channelName = "alterIsr",
threadNamePrefix,
retryTimeoutMs = Long.MaxValue
)
alterIsrManager = new DefaultAlterPartitionManager(
controllerChannelManager = alterIsrChannelManager,
metadataCache,
scheduler = kafkaScheduler,
controllerNodeProvider,
time = time,
brokerId = config.nodeId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
metadataVersionSupplier = () => metadataCache.metadataVersion()
metrics,
threadNamePrefix,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
alterIsrManager.start()
alterPartitionManager.start()

this._replicaManager = new ReplicaManager(
config = config,
Expand All @@ -269,7 +262,7 @@ class BrokerServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterIsrManager,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
Expand Down Expand Up @@ -343,10 +336,22 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava

lifecycleManager.start(() => metadataListener.highestMetadataOffset,
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, featuresRemapped)
val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
"heartbeat",
threadNamePrefix,
config.brokerSessionTimeoutMs.toLong
)
lifecycleManager.start(
() => metadataListener.highestMetadataOffset,
brokerLifecycleChannelManager,
metaProps.clusterId,
networkListeners,
featuresRemapped
)

// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)
Expand Down Expand Up @@ -544,8 +549,8 @@ class BrokerServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)

if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)

if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
private val currentNodeApiVersions = NodeApiVersions.create()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the source of the bug, we always assume currentNodeApiVersions=ApiKeys.zkBrokerApis

private val requestThread = newRequestThread

def start(): Unit = {
Expand Down Expand Up @@ -253,10 +252,7 @@ class BrokerToControllerChannelManagerImpl(

def controllerApiVersions(): Option[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
if (activeController.id == config.brokerId)
Some(currentNodeApiVersions)
else
Option(apiVersions.get(activeController.idString))
Option(apiVersions.get(activeController.idString))
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
distinctRoles
}

def isKRaftCoResidentMode: Boolean = {
processRoles == Set(BrokerRole, ControllerRole)
}

def metadataLogDir: String = {
Option(getString(KafkaConfig.MetadataLogDirProp)) match {
case Some(dir) => dir
Expand Down Expand Up @@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateControllerListenerExistsForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else if (processRoles == Set(BrokerRole, ControllerRole)) {
} else if (isKRaftCoResidentMode) {
// KRaft colocated broker and controller
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class KafkaServer(

var clientToControllerChannelManager: BrokerToControllerChannelManager = null

var alterIsrManager: AlterPartitionManager = null
var alterPartitionManager: AlterPartitionManager = null

var kafkaScheduler: KafkaScheduler = null

Expand Down Expand Up @@ -263,6 +263,7 @@ class KafkaServer(
logManager.startup(zkClient.getAllTopicsInCluster())

metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures)
val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)

/* initialize feature change listener */
_featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
Expand All @@ -276,13 +277,14 @@ class KafkaServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
channelName = "forwarding",
threadNamePrefix = threadNamePrefix,
retryTimeoutMs = config.requestTimeoutMs.longValue)
retryTimeoutMs = config.requestTimeoutMs.longValue
)
clientToControllerChannelManager.start()

/* start forwarding manager */
Expand All @@ -309,11 +311,12 @@ class KafkaServer(
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

// Start alter partition manager based on the IBP version
alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
AlterPartitionManager(
config = config,
metadataCache = metadataCache,
scheduler = kafkaScheduler,
controllerNodeProvider,
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
Expand All @@ -322,7 +325,7 @@ class KafkaServer(
} else {
AlterPartitionManager(kafkaScheduler, time, zkClient)
}
alterIsrManager.start()
alterPartitionManager.start()

// Start replica manager
_replicaManager = createReplicaManager(isShuttingDown)
Expand Down Expand Up @@ -478,7 +481,7 @@ class KafkaServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterIsrManager,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
Expand Down Expand Up @@ -755,8 +758,8 @@ class KafkaServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)

if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)

if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public void testClusterTemplate() {
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
})
})
public void testClusterTests() {
Expand Down
13 changes: 10 additions & 3 deletions core/src/test/java/kafka/test/annotation/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public enum Type {
KRAFT {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
}
},
CO_KRAFT {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
}
},
ZK {
Expand All @@ -40,10 +46,11 @@ public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvoca
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
BOTH {
ALL {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte

private final ClusterConfig clusterConfig;
private final AtomicReference<KafkaClusterTestKit> clusterReference;
private final boolean isCoResident;

public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
this.isCoResident = isCoResident;
}

@Override
public String getDisplayName(int invocationIndex) {
String clusterDesc = clusterConfig.nameTags().entrySet().stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc);
.map(Object::toString)
.collect(Collectors.joining(", "));
return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc);
}

@Override
Expand All @@ -86,6 +88,7 @@ public List<Extension> getAdditionalExtensions() {
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setCoResident(isCoResident).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
Expand Down
43 changes: 34 additions & 9 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,16 @@ public KafkaClusterTestKit build() throws Exception {
ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.NodeIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
node.metadataDirectory());
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
"CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(),
"CONTROLLER://localhost:0");
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
"CONTROLLER");
// Note: we can't accurately set controller.quorum.voters yet, since we don't
Expand Down Expand Up @@ -203,7 +204,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
}
for (BrokerNode node : nodes.brokerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
Expand All @@ -212,8 +213,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
String.join(",", node.logDataDirectories()));
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(),
"EXTERNAL://localhost:0");
props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
Expand All @@ -231,9 +231,15 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
String threadNamePrefix = String.format("broker%d_", node.id());
MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
KafkaRaftManager<ApiMessageAndVersion> raftManager;
if (raftManagers.containsKey(node.id())) {
raftManager = raftManagers.get(node.id());
} else {
raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
raftManagers.put(node.id(), raftManager);
}
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
Expand All @@ -245,7 +251,6 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
connectFutureManager.future
);
brokers.put(node.id(), broker);
raftManagers.put(node.id(), raftManager);
}
} catch (Exception e) {
if (executorService != null) {
Expand All @@ -271,6 +276,26 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
brokers, raftManagers, connectFutureManager, baseDirectory);
}

private String listeners(int node) {
if (nodes.isCoResidentNode(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
}
if (nodes.controllerNodes().containsKey(node)) {
return "CONTROLLER://localhost:0";
}
return "EXTERNAL://localhost:0";
}

private String roles(int node) {
if (nodes.isCoResidentNode(node)) {
return "broker,controller";
}
if (nodes.controllerNodes().containsKey(node)) {
return "controller";
}
return "broker";
}

static private void setupNodeDirectories(File baseDirectory,
String metadataDirectory,
Collection<String> logDataDirectories) throws Exception {
Expand Down
Loading