Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
Expand Down Expand Up @@ -240,7 +240,7 @@ class KafkaRaftManager[T](
clusterId,
bootstrapServers,
localListeners,
Features.KRAFT_VERSION.supportedVersionRange(),
Feature.KRAFT_VERSION.supportedVersionRange(),
raftConfig
)
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subpa
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
Expand Down Expand Up @@ -88,11 +88,11 @@ object StorageTool extends Logging {
0

case "version-mapping" =>
runVersionMappingCommand(namespace, printStream, Features.PRODUCTION_FEATURES)
runVersionMappingCommand(namespace, printStream, Feature.PRODUCTION_FEATURES)
0

case "feature-dependencies" =>
runFeatureDependenciesCommand(namespace, printStream, Features.PRODUCTION_FEATURES)
runFeatureDependenciesCommand(namespace, printStream, Feature.PRODUCTION_FEATURES)
0

case "random-uuid" =>
Expand Down Expand Up @@ -171,7 +171,7 @@ object StorageTool extends Logging {
def runVersionMappingCommand(
namespace: Namespace,
printStream: PrintStream,
validFeatures: java.util.List[Features]
validFeatures: java.util.List[Feature]
): Unit = {
val releaseVersion = Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
try {
Expand All @@ -181,7 +181,7 @@ object StorageTool extends Logging {
printStream.print(f"metadata.version=$metadataVersionLevel%d ($releaseVersion%s)%n")

for (feature <- validFeatures.asScala) {
val featureLevel = feature.defaultValue(metadataVersion)
val featureLevel = feature.defaultLevel(metadataVersion)
printStream.print(f"${feature.featureName}%s=$featureLevel%d%n")
}
} catch {
Expand All @@ -194,7 +194,7 @@ object StorageTool extends Logging {
def runFeatureDependenciesCommand(
namespace: Namespace,
printStream: PrintStream,
validFeatures: java.util.List[Features]
validFeatures: java.util.List[Feature]
): Unit = {
val featureArgs = Option(namespace.getList[String]("feature")).map(_.asScala.toList).getOrElse(List.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Features, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Feature, MetadataVersion, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith

Expand Down Expand Up @@ -102,7 +102,7 @@ class BrokerRegistrationRequestTest {
.setMaxSupportedVersion(max.featureLevel())
)
}
Features.PRODUCTION_FEATURES.stream().filter(_.featureName != MetadataVersion.FEATURE_NAME).forEach {
Feature.PRODUCTION_FEATURES.stream().filter(_.featureName != MetadataVersion.FEATURE_NAME).forEach {
feature =>
features.add(new BrokerRegistrationRequestData.Feature()
.setName(feature.featureName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.junit.jupiter.api.extension.ExtendWith

Expand All @@ -50,7 +50,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 0)
new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
)
)
def testConsumerGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, Consu
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.extension.ExtendWith

Expand Down Expand Up @@ -60,7 +60,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {

@ClusterTest(
features = Array(
new ClusterFeature(feature = Features.GROUP_VERSION, version = 0)
new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
)
)
def testConsumerGroupHeartbeatIsInaccessibleWhenFeatureFlagNotEnabled(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener,
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT
import org.apache.kafka.controller.ControllerRequestContextUtil
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -282,7 +282,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.latestProduction().featureLevel())
.setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()))
Features.PRODUCTION_FEATURES.forEach { feature =>
Feature.PRODUCTION_FEATURES.forEach { feature =>
features.add(new BrokerRegistrationRequestData.Feature()
.setName(feature.featureName())
.setMinSupportedVersion(feature.minimumProduction())
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
Expand All @@ -54,7 +54,7 @@ class StorageToolTest {
properties
}

val testingFeatures = Features.FEATURES.toList.asJava
val testingFeatures = Feature.FEATURES.toList.asJava

@Test
def testConfigToLogDirectories(): Unit = {
Expand Down Expand Up @@ -571,8 +571,8 @@ Found problem:
s"Output did not contain expected Metadata Version: $output"
)

for (feature <- Features.PRODUCTION_FEATURES.asScala) {
val featureLevel = feature.defaultValue(metadataVersion)
for (feature <- Feature.PRODUCTION_FEATURES.asScala) {
val featureLevel = feature.defaultLevel(metadataVersion)
assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
s"Output did not contain expected feature mapping: $output"
)
Expand All @@ -594,8 +594,8 @@ Found problem:
s"Output did not contain expected Metadata Version: $output"
)

for (feature <- Features.PRODUCTION_FEATURES.asScala) {
val featureLevel = feature.defaultValue(metadataVersion)
for (feature <- Feature.PRODUCTION_FEATURES.asScala) {
val featureLevel = feature.defaultLevel(metadataVersion)
assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
s"Output did not contain expected feature mapping: $output"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
Expand Down Expand Up @@ -251,9 +251,9 @@ private ApiError updateFeature(
} else {
// Validate dependencies for features that are not metadata.version
try {
Features.validateVersion(
Feature.validateVersion(
// Allow unstable feature versions is true because the version range is already checked above.
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),
Feature.featureFromName(featureName).fromFeatureLevel(newVersion, true),
proposedUpdatedVersions);
} catch (IllegalArgumentException e) {
return invalidUpdateVersion(featureName, newVersion, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.ArrayList;
Expand Down Expand Up @@ -62,7 +62,7 @@ public static Map<String, VersionRange> defaultFeatureMap(boolean enableUnstable
enableUnstable ?
MetadataVersion.latestTesting().featureLevel() :
MetadataVersion.latestProduction().featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
for (Feature feature : Feature.PRODUCTION_FEATURES) {
short maxVersion = enableUnstable ? feature.latestTesting() : feature.latestProduction();
if (maxVersion > 0) {
features.put(feature.featureName(), VersionRange.of(feature.minimumProduction(), maxVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.FeatureVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.snapshot.FileRawSnapshotWriter;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class Formatter {
/**
* The features that are supported.
*/
private List<Features> supportedFeatures = Features.PRODUCTION_FEATURES;
private List<Feature> supportedFeatures = Feature.PRODUCTION_FEATURES;

/**
* The current node id.
Expand Down Expand Up @@ -139,7 +139,7 @@ public Formatter setPrintStream(PrintStream printStream) {
return this;
}

public Formatter setSupportedFeatures(List<Features> supportedFeatures) {
public Formatter setSupportedFeatures(List<Feature> supportedFeatures) {
this.supportedFeatures = supportedFeatures;
return this;
}
Expand Down Expand Up @@ -298,7 +298,7 @@ MetadataVersion verifyReleaseVersion(MetadataVersion metadataVersion) {
}

Map<String, Short> calculateEffectiveFeatureLevels() {
Map<String, Features> nameToSupportedFeature = new TreeMap<>();
Map<String, Feature> nameToSupportedFeature = new TreeMap<>();
supportedFeatures.forEach(feature -> nameToSupportedFeature.put(feature.featureName(), feature));
Map<String, Short> newFeatureLevels = new TreeMap<>();
// Verify that all specified features are known to us.
Expand All @@ -321,7 +321,7 @@ Map<String, Short> calculateEffectiveFeatureLevels() {
Optional.ofNullable(newFeatureLevels.get(KRaftVersion.FEATURE_NAME))));
} else if (!newFeatureLevels.containsKey(supportedFeature.featureName())) {
newFeatureLevels.put(supportedFeature.featureName(),
supportedFeature.defaultValue(releaseVersion));
supportedFeature.defaultLevel(releaseVersion));
}
});
// Verify that the specified features support the given levels. This requires the full
Comment thread
dongnuo123 marked this conversation as resolved.
Expand All @@ -330,10 +330,10 @@ Map<String, Short> calculateEffectiveFeatureLevels() {
String featureName = entry.getKey();
if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
short level = entry.getValue();
Features supportedFeature = nameToSupportedFeature.get(featureName);
Feature supportedFeature = nameToSupportedFeature.get(featureName);
FeatureVersion featureVersion =
supportedFeature.fromFeatureLevel(level, unstableFeatureVersionsEnabled);
Features.validateVersion(featureVersion, newFeatureLevels);
Feature.validateVersion(featureVersion, newFeatureLevels);
}
}
return newFeatureLevels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.server.common.TransactionVersion;
Expand Down Expand Up @@ -381,32 +381,32 @@ public void testCreateFeatureLevelRecords() {
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel()));
localSupportedFeatures.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 2));
localSupportedFeatures.put(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 2));
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())).
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Features.TEST_VERSION.featureName(), VersionRange.of(0, 3)))),
Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 3)))),
emptyList())).
build();
ControllerResult<ApiError> result = manager.updateFeatures(
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 1),
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE),
Collections.singletonMap(Feature.TEST_VERSION.featureName(), (short) 1),
Collections.singletonMap(Feature.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE),
false);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)),
new FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)),
ApiError.NONE), result);
RecordTestUtils.replayAll(manager, result.records());
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));

ControllerResult<ApiError> result2 = manager.updateFeatures(
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 0),
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
Collections.singletonMap(Feature.TEST_VERSION.featureName(), (short) 0),
Collections.singletonMap(Feature.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
false);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)),
new FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)),
ApiError.NONE), result2);
RecordTestUtils.replayAll(manager, result2.records());
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
Expand Down Expand Up @@ -696,7 +696,7 @@ public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short b
if (brokerMaxSupportedKraftVersion != 0) {
brokerFeatures.add(new BrokerRegistrationRequestData.Feature()
.setName(KRaftVersion.FEATURE_NAME)
.setMinSupportedVersion(Features.KRAFT_VERSION.minimumProduction())
.setMinSupportedVersion(Feature.KRAFT_VERSION.minimumProduction())
.setMaxSupportedVersion(brokerMaxSupportedKraftVersion));
}
BrokerRegistrationRequestData request = new BrokerRegistrationRequestData().
Expand Down
Loading