diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 80fa1af5894e8..4fe5020a97445 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -45,7 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog} import org.apache.kafka.server.ProcessRole -import org.apache.kafka.server.common.Features +import org.apache.kafka.server.common.Feature import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.{FileLock, KafkaScheduler} import org.apache.kafka.server.fault.FaultHandler @@ -240,7 +240,7 @@ class KafkaRaftManager[T]( clusterId, bootstrapServers, localListeners, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), raftConfig ) } diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index b3ff5321625a4..0131b1f1248d6 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -27,7 +27,7 @@ import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subpa import net.sourceforge.argparse4j.internal.HelpScreenException import org.apache.kafka.common.Uuid import org.apache.kafka.common.utils.{Exit, Utils} -import org.apache.kafka.server.common.{Features, MetadataVersion} +import org.apache.kafka.server.common.{Feature, MetadataVersion} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.storage.{Formatter, FormatterException} import org.apache.kafka.raft.{DynamicVoters, QuorumConfig} @@ -88,11 +88,11 @@ object StorageTool extends Logging { 0 case "version-mapping" => - runVersionMappingCommand(namespace, printStream, Features.PRODUCTION_FEATURES) + runVersionMappingCommand(namespace, printStream, Feature.PRODUCTION_FEATURES) 0 case "feature-dependencies" => - runFeatureDependenciesCommand(namespace, printStream, Features.PRODUCTION_FEATURES) + runFeatureDependenciesCommand(namespace, printStream, Feature.PRODUCTION_FEATURES) 0 case "random-uuid" => @@ -171,7 +171,7 @@ object StorageTool extends Logging { def runVersionMappingCommand( namespace: Namespace, printStream: PrintStream, - validFeatures: java.util.List[Features] + validFeatures: java.util.List[Feature] ): Unit = { val releaseVersion = Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString) try { @@ -181,7 +181,7 @@ object StorageTool extends Logging { printStream.print(f"metadata.version=$metadataVersionLevel%d ($releaseVersion%s)%n") for (feature <- validFeatures.asScala) { - val featureLevel = feature.defaultValue(metadataVersion) + val featureLevel = feature.defaultLevel(metadataVersion) printStream.print(f"${feature.featureName}%s=$featureLevel%d%n") } } catch { @@ -194,7 +194,7 @@ object StorageTool extends Logging { def runFeatureDependenciesCommand( namespace: Namespace, printStream: PrintStream, - validFeatures: java.util.List[Features] + validFeatures: java.util.List[Feature] ): Unit = { val featureArgs = Option(namespace.getList[String]("feature")).map(_.asScala.toList).getOrElse(List.empty) diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 3fec68da273b8..1a24eeb460a91 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, Uuid} -import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Features, MetadataVersion, NodeToControllerChannelManager} +import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Feature, MetadataVersion, NodeToControllerChannelManager} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.extension.ExtendWith @@ -102,7 +102,7 @@ class BrokerRegistrationRequestTest { .setMaxSupportedVersion(max.featureLevel()) ) } - Features.PRODUCTION_FEATURES.stream().filter(_.featureName != MetadataVersion.FEATURE_NAME).forEach { + Feature.PRODUCTION_FEATURES.stream().filter(_.featureName != MetadataVersion.FEATURE_NAME).forEach { feature => features.add(new BrokerRegistrationRequestData.Feature() .setName(feature.featureName) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala index 47e287a7166a3..8753ceb78dc81 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.resource.ResourceType import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.common.Features +import org.apache.kafka.server.common.Feature import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.junit.jupiter.api.extension.ExtendWith @@ -50,7 +50,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") ), features = Array( - new ClusterFeature(feature = Features.GROUP_VERSION, version = 0) + new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0) ) ) def testConsumerGroupDescribeWhenFeatureFlagNotEnabled(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index e597d1cf7c28a..23b5589225dd7 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, Consu import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} -import org.apache.kafka.server.common.Features +import org.apache.kafka.server.common.Feature import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull} import org.junit.jupiter.api.extension.ExtendWith @@ -60,7 +60,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest( features = Array( - new ClusterFeature(feature = Features.GROUP_VERSION, version = 0) + new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0) ) ) def testConsumerGroupHeartbeatIsInaccessibleWhenFeatureFlagNotEnabled(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 7ac8966d3630a..34720797e11e8 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT import org.apache.kafka.controller.ControllerRequestContextUtil -import org.apache.kafka.server.common.{Features, MetadataVersion} +import org.apache.kafka.server.common.{Feature, MetadataVersion} import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.server.quota.QuotaType import org.junit.jupiter.api.Assertions._ @@ -282,7 +282,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { .setName(MetadataVersion.FEATURE_NAME) .setMinSupportedVersion(MetadataVersion.latestProduction().featureLevel()) .setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel())) - Features.PRODUCTION_FEATURES.forEach { feature => + Feature.PRODUCTION_FEATURES.forEach { feature => features.add(new BrokerRegistrationRequestData.Feature() .setName(feature.featureName()) .setMinSupportedVersion(feature.minimumProduction()) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index b38a2178bae72..996e80834bac7 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -27,7 +27,7 @@ import kafka.utils.TestUtils import net.sourceforge.argparse4j.inf.ArgumentParserException import org.apache.kafka.common.metadata.UserScramCredentialRecord import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{Features, MetadataVersion} +import org.apache.kafka.server.common.{Feature, MetadataVersion} import org.apache.kafka.metadata.bootstrap.BootstrapDirectory import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils} import org.apache.kafka.metadata.storage.FormatterException @@ -54,7 +54,7 @@ class StorageToolTest { properties } - val testingFeatures = Features.FEATURES.toList.asJava + val testingFeatures = Feature.FEATURES.toList.asJava @Test def testConfigToLogDirectories(): Unit = { @@ -571,8 +571,8 @@ Found problem: s"Output did not contain expected Metadata Version: $output" ) - for (feature <- Features.PRODUCTION_FEATURES.asScala) { - val featureLevel = feature.defaultValue(metadataVersion) + for (feature <- Feature.PRODUCTION_FEATURES.asScala) { + val featureLevel = feature.defaultLevel(metadataVersion) assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"), s"Output did not contain expected feature mapping: $output" ) @@ -594,8 +594,8 @@ Found problem: s"Output did not contain expected Metadata Version: $output" ) - for (feature <- Features.PRODUCTION_FEATURES.asScala) { - val featureLevel = feature.defaultValue(metadataVersion) + for (feature <- Feature.PRODUCTION_FEATURES.asScala) { + val featureLevel = feature.defaultLevel(metadataVersion) assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"), s"Output did not contain expected feature mapping: $output" ) diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index 436c9d868cf71..9d3481cee7e18 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -27,7 +27,7 @@ import org.apache.kafka.metadata.VersionRange; import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.timeline.SnapshotRegistry; @@ -251,9 +251,9 @@ private ApiError updateFeature( } else { // Validate dependencies for features that are not metadata.version try { - Features.validateVersion( + Feature.validateVersion( // Allow unstable feature versions is true because the version range is already checked above. - Features.featureFromName(featureName).fromFeatureLevel(newVersion, true), + Feature.featureFromName(featureName).fromFeatureLevel(newVersion, true), proposedUpdatedVersions); } catch (IllegalArgumentException e) { return invalidUpdateVersion(featureName, newVersion, e.getMessage()); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index 17ec3acd6a257..9b79b5760443f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -19,7 +19,7 @@ import org.apache.kafka.metadata.ControllerRegistration; import org.apache.kafka.metadata.VersionRange; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import java.util.ArrayList; @@ -62,7 +62,7 @@ public static Map defaultFeatureMap(boolean enableUnstable enableUnstable ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); - for (Features feature : Features.PRODUCTION_FEATURES) { + for (Feature feature : Feature.PRODUCTION_FEATURES) { short maxVersion = enableUnstable ? feature.latestTesting() : feature.latestProduction(); if (maxVersion > 0) { features.put(feature.featureName(), VersionRange.of(feature.minimumProduction(), maxVersion)); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index d512545384a40..79437d4da6d3c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -29,8 +29,8 @@ import org.apache.kafka.raft.KafkaRaftClient; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.FeatureVersion; -import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.snapshot.FileRawSnapshotWriter; @@ -69,7 +69,7 @@ public class Formatter { /** * The features that are supported. */ - private List supportedFeatures = Features.PRODUCTION_FEATURES; + private List supportedFeatures = Feature.PRODUCTION_FEATURES; /** * The current node id. @@ -139,7 +139,7 @@ public Formatter setPrintStream(PrintStream printStream) { return this; } - public Formatter setSupportedFeatures(List supportedFeatures) { + public Formatter setSupportedFeatures(List supportedFeatures) { this.supportedFeatures = supportedFeatures; return this; } @@ -298,7 +298,7 @@ MetadataVersion verifyReleaseVersion(MetadataVersion metadataVersion) { } Map calculateEffectiveFeatureLevels() { - Map nameToSupportedFeature = new TreeMap<>(); + Map nameToSupportedFeature = new TreeMap<>(); supportedFeatures.forEach(feature -> nameToSupportedFeature.put(feature.featureName(), feature)); Map newFeatureLevels = new TreeMap<>(); // Verify that all specified features are known to us. @@ -321,7 +321,7 @@ Map calculateEffectiveFeatureLevels() { Optional.ofNullable(newFeatureLevels.get(KRaftVersion.FEATURE_NAME)))); } else if (!newFeatureLevels.containsKey(supportedFeature.featureName())) { newFeatureLevels.put(supportedFeature.featureName(), - supportedFeature.defaultValue(releaseVersion)); + supportedFeature.defaultLevel(releaseVersion)); } }); // Verify that the specified features support the given levels. This requires the full @@ -330,10 +330,10 @@ Map calculateEffectiveFeatureLevels() { String featureName = entry.getKey(); if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { short level = entry.getValue(); - Features supportedFeature = nameToSupportedFeature.get(featureName); + Feature supportedFeature = nameToSupportedFeature.get(featureName); FeatureVersion featureVersion = supportedFeature.fromFeatureLevel(level, unstableFeatureVersionsEnabled); - Features.validateVersion(featureVersion, newFeatureLevels); + Feature.validateVersion(featureVersion, newFeatureLevels); } } return newFeatureLevels; diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 378f6367cc629..31221da79ef8f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TestFeatureVersion; import org.apache.kafka.server.common.TransactionVersion; @@ -381,32 +381,32 @@ public void testCreateFeatureLevelRecords() { Map localSupportedFeatures = new HashMap<>(); localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel())); - localSupportedFeatures.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 2)); + localSupportedFeatures.put(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 2)); FeatureControlManager manager = new FeatureControlManager.Builder(). setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())). setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber( - Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Features.TEST_VERSION.featureName(), VersionRange.of(0, 3)))), + Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 3)))), emptyList())). build(); ControllerResult result = manager.updateFeatures( - Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 1), - Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE), + Collections.singletonMap(Feature.TEST_VERSION.featureName(), (short) 1), + Collections.singletonMap(Feature.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE), false); assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( - new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)), + new FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)), ApiError.NONE), result); RecordTestUtils.replayAll(manager, result.records()); - assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName())); + assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName())); ControllerResult result2 = manager.updateFeatures( - Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 0), - Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), + Collections.singletonMap(Feature.TEST_VERSION.featureName(), (short) 0), + Collections.singletonMap(Feature.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), false); assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( - new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)), + new FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)), ApiError.NONE), result2); RecordTestUtils.replayAll(manager, result2.records()); - assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName())); + assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName())); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 6e64483a3116f..a5c16641859a4 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -107,7 +107,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TopicIdPartition; @@ -696,7 +696,7 @@ public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short b if (brokerMaxSupportedKraftVersion != 0) { brokerFeatures.add(new BrokerRegistrationRequestData.Feature() .setName(KRaftVersion.FEATURE_NAME) - .setMinSupportedVersion(Features.KRAFT_VERSION.minimumProduction()) + .setMinSupportedVersion(Feature.KRAFT_VERSION.minimumProduction()) .setMaxSupportedVersion(brokerMaxSupportedKraftVersion)); } BrokerRegistrationRequestData request = new BrokerRegistrationRequestData(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 5df71a043ae8e..044a161018559 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.metadata.ControllerRegistration; import org.apache.kafka.metadata.VersionRange; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; @@ -60,8 +60,8 @@ public void testDefaultFeatureMap() { expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.LATEST_PRODUCTION.featureLevel())); - for (Features feature : Features.PRODUCTION_FEATURES) { - short maxVersion = feature.defaultValue(MetadataVersion.LATEST_PRODUCTION); + for (Feature feature : Feature.PRODUCTION_FEATURES) { + short maxVersion = feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); if (maxVersion > 0) { expectedFeatures.put(feature.featureName(), VersionRange.of( feature.minimumProduction(), @@ -78,8 +78,8 @@ public void testDefaultFeatureMapWithUnstable() { expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel())); - for (Features feature : Features.PRODUCTION_FEATURES) { - short maxVersion = feature.defaultValue(MetadataVersion.latestTesting()); + for (Feature feature : Feature.PRODUCTION_FEATURES) { + short maxVersion = feature.defaultLevel(MetadataVersion.latestTesting()); if (maxVersion > 0) { expectedFeatures.put(feature.featureName(), VersionRange.of( feature.minimumProduction(), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 39f5925a0d29f..fd0a4086adddd 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.GroupVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TestFeatureVersion; @@ -331,7 +331,7 @@ public void testFormatWithScram() throws Exception { public void testFeatureFlag(short version) throws Exception { try (TestEnv testEnv = new TestEnv(1)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES); formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version); formatter1.formatter.run(); BootstrapMetadata bootstrapMetadata = @@ -357,7 +357,7 @@ public void testFeatureFlag(short version) throws Exception { public void testInvalidFeatureFlag() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES); formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); assertEquals("Unsupported feature: nonexistent.feature. Supported features " + "are: eligible.leader.replicas.version, group.version, kraft.version, " + diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 0443af88845c2..8dddba6a10dc0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -36,7 +36,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -1573,7 +1573,7 @@ void testUpdateVoter() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -1631,7 +1631,7 @@ void testLeaderUpdatesVoter() throws Exception { VoterSet.VoterNode.of( local, localListeners, - Features.KRAFT_VERSION.supportedVersionRange() + Feature.KRAFT_VERSION.supportedVersionRange() ) ); assertEquals(updatedVoterSet, context.listener.lastCommittedVoterSet()); @@ -1659,7 +1659,7 @@ public void testUpdateVoterInvalidClusterId() throws Exception { "", follower, epoch, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty() ) ); @@ -1676,7 +1676,7 @@ public void testUpdateVoterInvalidClusterId() throws Exception { "invalid-uuid", follower, epoch, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty() ) ); @@ -1709,7 +1709,7 @@ void testUpdateVoterOldEpoch() throws Exception { context.clusterId, follower, epoch - 1, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty() ) ); @@ -1742,7 +1742,7 @@ void testUpdateVoterNewEpoch() throws Exception { context.clusterId, follower, epoch + 1, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty() ) ); @@ -1771,7 +1771,7 @@ void testUpdateVoterToNotLeader() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), Endpoints.empty() ) ); @@ -1815,7 +1815,7 @@ void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -1867,7 +1867,7 @@ void testUpdateVoterWithKraftVersion0() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -1918,7 +1918,7 @@ void testUpdateVoterWithNoneVoter() throws Exception { context.deliverRequest( context.updateVoterRequest( replicaKey(follower.id(), true), - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -1969,7 +1969,7 @@ void testUpdateVoterWithNoneVoterId() throws Exception { context.deliverRequest( context.updateVoterRequest( ReplicaKey.of(follower.id() + 1, follower.directoryId().get()), - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -2039,7 +2039,7 @@ void testUpdateVoterWithPendingAddVoter() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -2105,7 +2105,7 @@ void testFollowerSendsUpdateVoter() throws Exception { RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, epoch, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), localListeners ); context.deliverResponse( @@ -2226,7 +2226,7 @@ void testUpdateVoterResponseCausesEpochChange() throws Exception { RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest( local, epoch, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), localListeners ); context.deliverResponse( @@ -2331,7 +2331,7 @@ private int randomReplicaId() { } private static ApiVersionsResponseData apiVersionsResponse(Errors error) { - return apiVersionsResponse(error, Features.KRAFT_VERSION.supportedVersionRange()); + return apiVersionsResponse(error, Feature.KRAFT_VERSION.supportedVersionRange()); } private static ApiVersionsResponseData apiVersionsResponse(Errors error, SupportedVersionRange supportedVersions) { diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 3038a775074eb..360c3bba62a80 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.params.ParameterizedTest; @@ -79,7 +79,7 @@ private QuorumState buildQuorumState( localDirectoryId, mockPartitionState, localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : Endpoints.empty(), - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, store, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index dc8e978abfcb7..f338b1df0817e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -65,7 +65,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.StringSerde; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; @@ -420,7 +420,7 @@ public RaftClientTestContext build() throws IOException { clusterId, computedBootstrapServers, localListeners, - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 8aa599079f702..a011ddc438c6e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.raft.MockLog.LogBatch; import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -793,7 +793,7 @@ void start(int nodeId) { clusterId, Collections.emptyList(), endpointsFromId(nodeId, channel.listenerName()), - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig diff --git a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java index 1c1287344f142..0c6b8bfb57ec8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -370,7 +370,7 @@ public static VoterSet.VoterNode voterNode(ReplicaKey replicaKey) { ) ) ), - Features.KRAFT_VERSION.supportedVersionRange() + Feature.KRAFT_VERSION.supportedVersionRange() ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 86f18a1aefae4..fc632672f9849 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.raft.ReplicaKey; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.api.AfterEach; @@ -86,7 +86,7 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio localDirectoryId, mockPartitionState, voterSet.listeners(localId), - Features.KRAFT_VERSION.supportedVersionRange(), + Feature.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, new MockQuorumStateStore(), diff --git a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java index fd10690a66364..68dabd2594ade 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java @@ -29,6 +29,8 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion { public static final String FEATURE_NAME = "eligible.leader.replicas.version"; + public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = ELRV_0; + private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; private final Map dependencies; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java new file mode 100644 index 0000000000000..8a812fe521d8b --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import org.apache.kafka.common.feature.SupportedVersionRange; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.common.UnitTestFeatureVersion.FV0.UT_FV0_0; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + *
+ * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Feature { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ + KRAFT_VERSION(KRaftVersion.FEATURE_NAME, KRaftVersion.values(), KRaftVersion.LATEST_PRODUCTION), + TRANSACTION_VERSION(TransactionVersion.FEATURE_NAME, TransactionVersion.values(), TransactionVersion.LATEST_PRODUCTION), + GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION), + ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION), + + /** + * Features defined only for unit tests and are not used in production. + */ + TEST_VERSION(TestFeatureVersion.FEATURE_NAME, TestFeatureVersion.values(), TestFeatureVersion.LATEST_PRODUCTION), + UNIT_TEST_VERSION_0(UnitTestFeatureVersion.FV0.FEATURE_NAME, new FeatureVersion[]{UT_FV0_0}, UnitTestFeatureVersion.FV0.LATEST_PRODUCTION), + UNIT_TEST_VERSION_1(UnitTestFeatureVersion.FV1.FEATURE_NAME, UnitTestFeatureVersion.FV1.values(), UnitTestFeatureVersion.FV1.LATEST_PRODUCTION), + UNIT_TEST_VERSION_2(UnitTestFeatureVersion.FV2.FEATURE_NAME, UnitTestFeatureVersion.FV2.values(), UnitTestFeatureVersion.FV2.LATEST_PRODUCTION), + UNIT_TEST_VERSION_3(UnitTestFeatureVersion.FV3.FEATURE_NAME, UnitTestFeatureVersion.FV3.values(), UnitTestFeatureVersion.FV3.LATEST_PRODUCTION), + UNIT_TEST_VERSION_4(UnitTestFeatureVersion.FV4.FEATURE_NAME, UnitTestFeatureVersion.FV4.values(), UnitTestFeatureVersion.FV4.LATEST_PRODUCTION), + UNIT_TEST_VERSION_5(UnitTestFeatureVersion.FV5.FEATURE_NAME, UnitTestFeatureVersion.FV5.values(), UnitTestFeatureVersion.FV5.LATEST_PRODUCTION), + UNIT_TEST_VERSION_6(UnitTestFeatureVersion.FV6.FEATURE_NAME, UnitTestFeatureVersion.FV6.values(), UnitTestFeatureVersion.FV6.LATEST_PRODUCTION), + UNIT_TEST_VERSION_7(UnitTestFeatureVersion.FV7.FEATURE_NAME, UnitTestFeatureVersion.FV7.values(), UnitTestFeatureVersion.FV7.LATEST_PRODUCTION); + + public static final Feature[] FEATURES; + + // The list of features that are not unit test features. + public static final List TEST_AND_PRODUCTION_FEATURES; + + public static final List PRODUCTION_FEATURES; + + public static final List PRODUCTION_FEATURE_NAMES; + private final String name; + private final FeatureVersion[] featureVersions; + + // The latest production version of the feature, owned and updated by the feature owner + // in the respective feature definition. The value should not be smaller than the default + // value calculated with {@link #defaultValue(MetadataVersion)}. + public final FeatureVersion latestProduction; + + Feature(String name, + FeatureVersion[] featureVersions, + FeatureVersion latestProduction) { + this.name = name; + this.featureVersions = featureVersions; + this.latestProduction = latestProduction; + } + + static { + Feature[] enumValues = Feature.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + TEST_AND_PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME) + ).collect(Collectors.toList()); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + !feature.name.equals(TEST_VERSION.featureName()) && + !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME) + ).collect(Collectors.toList()); + PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> + feature.name).collect(Collectors.toList()); + + validateDefaultValueAndLatestProductionValue(TEST_VERSION); + for (Feature feature : PRODUCTION_FEATURES) { + validateDefaultValueAndLatestProductionValue(feature); + } + } + + public String featureName() { + return name; + } + + public FeatureVersion[] featureVersions() { + return featureVersions; + } + + public short latestProduction() { + return latestProduction.featureLevel(); + } + + public short minimumProduction() { + return featureVersions[0].featureLevel(); + } + + public short latestTesting() { + return featureVersions[featureVersions.length - 1].featureLevel(); + } + + public SupportedVersionRange supportedVersionRange() { + return new SupportedVersionRange( + minimumProduction(), + latestTesting() + ); + } + + /** + * Creates a FeatureVersion from a level. + * + * @param level the level of the feature + * @param allowUnstableFeatureVersions whether unstable versions can be used + * @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. + * @throws IllegalArgumentException if the feature is not known. + */ + public FeatureVersion fromFeatureLevel(short level, + boolean allowUnstableFeatureVersions) { + return Arrays.stream(featureVersions).filter(featureVersion -> + featureVersion.featureLevel() == level && (allowUnstableFeatureVersions || level <= latestProduction())).findFirst().orElseThrow( + () -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level)); + } + + /** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersion#dependencies()} + *

+ * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 in kraft require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param features the feature versions we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ + public static void validateVersion(FeatureVersion feature, Map features) { + Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME); + + if (feature.featureLevel() >= 1 && (metadataVersion == null || metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel())) + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on metadata.version=4 (" + MetadataVersion.IBP_3_3_IV0 + ")"); + + for (Map.Entry dependency: feature.dependencies().entrySet()) { + Short featureLevel = features.get(dependency.getKey()); + + if (featureLevel == null || featureLevel < dependency.getValue()) { + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on " + dependency.getKey() + " level " + dependency.getValue()); + } + } + } + + /** + * A method to return the default (latest production) version of a feature based on the metadata version provided. + * + * Every time a new feature is added, it should create a mapping from metadata version to feature version + * with {@link FeatureVersion#bootstrapMetadataVersion()}. The feature version should be marked as production ready + * before the metadata version is made production ready. + * + * @param metadataVersion the metadata version we want to use to set the default. + * @return the default version given the feature and provided metadata version + */ + public FeatureVersion defaultVersion(MetadataVersion metadataVersion) { + FeatureVersion version = featureVersions[0]; + for (Iterator it = Arrays.stream(featureVersions).iterator(); it.hasNext(); ) { + FeatureVersion feature = it.next(); + if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) || feature.bootstrapMetadataVersion().equals(metadataVersion)) + version = feature; + else + return version; + } + return version; + } + + public short defaultLevel(MetadataVersion metadataVersion) { + return defaultVersion(metadataVersion).featureLevel(); + } + + public static Feature featureFromName(String featureName) { + for (Feature feature : FEATURES) { + if (feature.name.equals(featureName)) + return feature; + } + throw new IllegalArgumentException("Feature " + featureName + " not found."); + } + + public boolean isProductionReady(short featureVersion) { + return featureVersion <= latestProduction(); + } + + public boolean hasFeatureVersion(FeatureVersion featureVersion) { + for (FeatureVersion v : featureVersions()) { + if (v == featureVersion) { + return true; + } + } + return false; + } + + /** + * The method ensures that the following statements are met: + * 1. The latest production value is one of the feature values. + * 2. The latest production value >= the default value. + * 3. The dependencies of the latest production value <= their latest production values. + * 4. The dependencies of all default values <= their default values. + * 5. If the latest production depends on MetadataVersion, the value should be <= MetadataVersion.LATEST_PRODUCTION. + * 6. If any default value depends on MetadataVersion, the value should be <= the default value bootstrap MV. + * + * Suppose we have feature X as the feature being validated. + * Invalid examples: + * - The feature X has default version = XV_10 (dependency = {}), latest production = XV_5 (dependency = {}) + * (Violating rule 2. The latest production value XV_5 is smaller than the default value) + * - The feature X has latest production = XV_11 (dependency = {Y: YV_4}) + * The feature Y has latest production = YV_3 (dependency = {}) + * (Violating rule 3. For latest production XV_11, Y's latest production YV_3 is smaller than the dependency value YV_4) + * - The feature X has default version = XV_10 (dependency = {Y: YV_4}) + * The feature Y has default version = YV_3 (dependency = {}) + * (Violating rule 4. For default version XV_10, Y's default value YV_3 is smaller than the dependency value YV_4) + * - The feature X has latest production = XV_11 (dependency = {MetadataVersion: IBP_4_0_IV1}), MetadataVersion.LATEST_PRODUCTION is IBP_4_0_IV0 + * (Violating rule 5. The dependency MV IBP_4_0_IV1 is behind MV latest production IBP_4_0_IV0) + * - The feature X has default version = XV_10 (dependency = {MetadataVersion: IBP_4_0_IV1}) and bootstrap MV = IBP_4_0_IV0 + * (Violating rule 6. When MV latest production is IBP_4_0_IV0, feature X will be set to XV_10 by default whereas it depends on MV IBP_4_0_IV1) + * Valid examples: + * - The feature X has default version = XV_10 (dependency = {}), latest production = XV_10 (dependency = {}) + * - The feature X has default version = XV_10 (dependency = {Y: YV_3}), latest production = XV_11 (dependency = {Y: YV_4}) + * The feature Y has default version = YV_3 (dependency = {}), latest production = YV_4 (dependency = {}) + * - The feature X has default version = XV_10 (dependency = {MetadataVersion: IBP_4_0_IV0}), boostrap MV = IBP_4_0_IV0, + * latest production = XV_11 (dependency = {MetadataVersion: IBP_4_0_IV1}), MV latest production = IBP_4_0_IV1 + * + * @param feature the feature to validate. + * @return true if the feature is valid, false otherwise. + * @throws IllegalArgumentException if the feature violates any of the rules thus is not valid. + */ + public static void validateDefaultValueAndLatestProductionValue( + Feature feature + ) throws IllegalArgumentException { + FeatureVersion defaultVersion = feature.defaultVersion(MetadataVersion.LATEST_PRODUCTION); + FeatureVersion latestProduction = feature.latestProduction; + + if (!feature.hasFeatureVersion(latestProduction)) { + throw new IllegalArgumentException(String.format("Feature %s has latest production version %s " + + "which is not one of its feature versions.", feature.name(), latestProduction)); + } + + if (latestProduction.featureLevel() < defaultVersion.featureLevel()) { + throw new IllegalArgumentException(String.format("Feature %s has latest production value %s " + + "smaller than its default version %s with latest production MV.", + feature.name(), latestProduction, defaultVersion)); + } + + for (Map.Entry dependency: latestProduction.dependencies().entrySet()) { + String dependencyFeatureName = dependency.getKey(); + if (!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) { + Feature dependencyFeature = featureFromName(dependencyFeatureName); + if (!dependencyFeature.isProductionReady(dependency.getValue())) { + throw new IllegalArgumentException(String.format("Feature %s has latest production FeatureVersion %s " + + "with dependency %s that is not production ready. (%s latest production: %s)", + feature.name(), latestProduction, dependencyFeature.fromFeatureLevel(dependency.getValue(), true), + dependencyFeature, dependencyFeature.latestProduction)); + } + } else { + if (dependency.getValue() > MetadataVersion.LATEST_PRODUCTION.featureLevel()) { + throw new IllegalArgumentException(String.format("Feature %s has latest production FeatureVersion %s " + + "with MV dependency %s that is not production ready. (MV latest production: %s)", + feature.name(), latestProduction, MetadataVersion.fromFeatureLevel(dependency.getValue()), + MetadataVersion.LATEST_PRODUCTION)); + } + } + } + + for (MetadataVersion metadataVersion: MetadataVersion.values()) { + // Only checking the kraft metadata versions. + if (metadataVersion.compareTo(MetadataVersion.MINIMUM_KRAFT_VERSION) < 0) { + continue; + } + + defaultVersion = feature.defaultVersion(metadataVersion); + for (Map.Entry dependency: defaultVersion.dependencies().entrySet()) { + String dependencyFeatureName = dependency.getKey(); + if (!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) { + Feature dependencyFeature = featureFromName(dependencyFeatureName); + if (dependency.getValue() > dependencyFeature.defaultLevel(metadataVersion)) { + throw new IllegalArgumentException(String.format("Feature %s has default FeatureVersion %s " + + "when MV=%s with dependency %s that is behind its default version %s.", + feature.name(), defaultVersion, metadataVersion, + dependencyFeature.fromFeatureLevel(dependency.getValue(), true), + dependencyFeature.defaultVersion(metadataVersion))); + } + } else { + if (dependency.getValue() > defaultVersion.bootstrapMetadataVersion().featureLevel()) { + throw new IllegalArgumentException(String.format("Feature %s has default FeatureVersion %s " + + "when MV=%s with MV dependency %s that is behind its bootstrap MV %s.", + feature.name(), defaultVersion, metadataVersion, + MetadataVersion.fromFeatureLevel(dependency.getValue()), + defaultVersion.bootstrapMetadataVersion())); + } + } + } + } + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java deleted file mode 100644 index bd4fa0c861509..0000000000000 --- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.common; - -import org.apache.kafka.common.feature.SupportedVersionRange; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * This is enum for the various features implemented for Kafka clusters. - * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. - * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, - * they need to be specified via the StorageTool or FeatureCommand tools. - *
- * Having a unified enum for the features that will use a shared type in the API used to set and update them - * makes it easier to process these features. - */ -public enum Features { - - /** - * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when - * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. - * - * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. - */ - TEST_VERSION("test.feature.version", TestFeatureVersion.values()), - KRAFT_VERSION("kraft.version", KRaftVersion.values()), - TRANSACTION_VERSION("transaction.version", TransactionVersion.values()), - GROUP_VERSION("group.version", GroupVersion.values()), - ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.values()); - - public static final Features[] FEATURES; - public static final List PRODUCTION_FEATURES; - - public static final List PRODUCTION_FEATURE_NAMES; - private final String name; - private final FeatureVersion[] featureVersions; - - Features(String name, - FeatureVersion[] featureVersions) { - this.name = name; - this.featureVersions = featureVersions; - } - - static { - Features[] enumValues = Features.values(); - FEATURES = Arrays.copyOf(enumValues, enumValues.length); - - PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> - !feature.name.equals(TEST_VERSION.featureName())).collect(Collectors.toList()); - PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> - feature.name).collect(Collectors.toList()); - } - - public String featureName() { - return name; - } - - public FeatureVersion[] featureVersions() { - return featureVersions; - } - - public short latestProduction() { - return defaultValue(MetadataVersion.LATEST_PRODUCTION); - } - - public short minimumProduction() { - return featureVersions[0].featureLevel(); - } - - public short latestTesting() { - return featureVersions[featureVersions.length - 1].featureLevel(); - } - - public SupportedVersionRange supportedVersionRange() { - return new SupportedVersionRange( - minimumProduction(), - latestTesting() - ); - } - - /** - * Creates a FeatureVersion from a level. - * - * @param level the level of the feature - * @param allowUnstableFeatureVersions whether unstable versions can be used - * @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. - * @throws IllegalArgumentException if the feature is not known. - */ - public FeatureVersion fromFeatureLevel(short level, - boolean allowUnstableFeatureVersions) { - return Arrays.stream(featureVersions).filter(featureVersion -> - featureVersion.featureLevel() == level && (allowUnstableFeatureVersions || level <= latestProduction())).findFirst().orElseThrow( - () -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level)); - } - - /** - * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be - * captured in {@link FeatureVersion#dependencies()} - *

- * For example, say feature X level x relies on feature Y level y: - * if feature X >= x then throw an error if feature Y < y. - * - * All feature levels above 0 in kraft require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. - * - * @param feature the feature we are validating - * @param features the feature versions we have (or want to set) - * @throws IllegalArgumentException if the feature is not valid - */ - public static void validateVersion(FeatureVersion feature, Map features) { - Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME); - - if (feature.featureLevel() >= 1 && (metadataVersion == null || metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel())) - throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + - " because it depends on metadata.version=4 (" + MetadataVersion.IBP_3_3_IV0 + ")"); - - for (Map.Entry dependency: feature.dependencies().entrySet()) { - Short featureLevel = features.get(dependency.getKey()); - - if (featureLevel == null || featureLevel < dependency.getValue()) { - throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + - " because it depends on " + dependency.getKey() + " level " + dependency.getValue()); - } - } - } - - /** - * A method to return the default (latest production) level of a feature based on the metadata version provided. - * - * Every time a new feature is added, it should create a mapping from metadata version to feature version - * with {@link FeatureVersion#bootstrapMetadataVersion()}. When the feature version is production ready, the metadata - * version should be made production ready as well. - * - * @param metadataVersion the metadata version we want to use to set the default. - * @return the default version level given the feature and provided metadata version - */ - public short defaultValue(MetadataVersion metadataVersion) { - short level = 0; - for (Iterator it = Arrays.stream(featureVersions).iterator(); it.hasNext(); ) { - FeatureVersion feature = it.next(); - if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) || feature.bootstrapMetadataVersion().equals(metadataVersion)) - level = feature.featureLevel(); - else - return level; - } - return level; - } - - public static Features featureFromName(String featureName) { - for (Features features : FEATURES) { - if (features.name.equals(featureName)) - return features; - } - throw new IllegalArgumentException("Feature " + featureName + " not found."); - } - - /** - * Utility method to map a list of FeatureVersion to a map of feature name to feature level - */ - public static Map featureImplsToMap(List features) { - return features.stream().collect(Collectors.toMap(FeatureVersion::featureName, FeatureVersion::featureLevel)); - } -} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java index 3ddbe4d15f03e..881031e6ecfce 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java @@ -29,6 +29,8 @@ public enum GroupVersion implements FeatureVersion { public static final String FEATURE_NAME = "group.version"; + public static final GroupVersion LATEST_PRODUCTION = GV_1; + private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; private final Map dependencies; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index a55dc7318c4a1..734b515b5a835 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -28,6 +28,8 @@ public enum KRaftVersion implements FeatureVersion { public static final String FEATURE_NAME = "kraft.version"; + public static final KRaftVersion LATEST_PRODUCTION = KRAFT_VERSION_1; + private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java index 2d929d198977d..e9d54d0f21114 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java @@ -23,7 +23,7 @@ public enum TestFeatureVersion implements FeatureVersion { TEST_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), // TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), - // TEST_2 is not yet released and maps to the latest testing version, and it depends on this metadata version + // TEST_2 is not yet set to be the default version and maps to the latest testing version, and it depends on this metadata version TEST_2(2, MetadataVersion.latestTesting(), Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.latestTesting().featureLevel())); private final short featureLevel; @@ -32,6 +32,9 @@ public enum TestFeatureVersion implements FeatureVersion { public static final String FEATURE_NAME = "test.feature.version"; + public static final TestFeatureVersion LATEST_PRODUCTION = + MetadataVersion.latestProduction() == MetadataVersion.latestTesting() ? TEST_2 : TEST_1; + TestFeatureVersion(int featureLevel, MetadataVersion metadataVersionMapping, Map dependencies) { this.featureLevel = (short) featureLevel; this.metadataVersionMapping = metadataVersionMapping; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java index 36dadb5cf11d6..069440d35c9b9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java @@ -30,6 +30,8 @@ public enum TransactionVersion implements FeatureVersion { public static final String FEATURE_NAME = "transaction.version"; + public static final TransactionVersion LATEST_PRODUCTION = TV_0; + private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; private final Map dependencies; @@ -50,7 +52,7 @@ public short featureLevel() { } public static TransactionVersion fromFeatureLevel(short version) { - return (TransactionVersion) Features.TRANSACTION_VERSION.fromFeatureLevel(version, true); + return (TransactionVersion) Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true); } @Override diff --git a/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java new file mode 100644 index 0000000000000..ea107998e7611 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +/** + * Test versions only used for unit test FeatureTest.java. + */ +public class UnitTestFeatureVersion { + /** + * The feature is used for testing latest production is not one of the feature versions. + */ + public enum FV0 implements FeatureVersion { + UT_FV0_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV0_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.0"; + + public static final FV0 LATEST_PRODUCTION = UT_FV0_1; + + FV0(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test latest production lags behind the default value. + */ + public enum FV1 implements FeatureVersion { + UT_FV1_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV1_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.1"; + + public static final FV1 LATEST_PRODUCTION = UT_FV1_0; + + FV1(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the dependency of the latest production that is not yet production ready. + */ + public enum FV2 implements FeatureVersion { + UT_FV2_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV2_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.2"; + + public static final FV2 LATEST_PRODUCTION = UT_FV2_0; + + FV2(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the dependency of the latest production that is not yet production ready. + */ + public enum FV3 implements FeatureVersion { + UT_FV3_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV3_1(1, MetadataVersion.IBP_3_7_IV0, Collections.singletonMap(FV2.FEATURE_NAME, (short) 1)); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.3"; + + public static final FV3 LATEST_PRODUCTION = UT_FV3_1; + + FV3(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the dependency of the default value that is not yet default ready. + */ + public enum FV4 implements FeatureVersion { + UT_FV4_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV4_1(1, MetadataVersion.latestTesting(), Collections.emptyMap()); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.4"; + + public static final FV4 LATEST_PRODUCTION = UT_FV4_1; + + FV4(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the dependency of the default value that is not yet default ready. + */ + public enum FV5 implements FeatureVersion { + UT_FV5_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV5_1(1, MetadataVersion.IBP_3_7_IV0, Collections.singletonMap(FV4.FEATURE_NAME, (short) 1)); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.5"; + + public static final FV5 LATEST_PRODUCTION = UT_FV5_1; + + FV5(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the latest production has MV dependency that is not yet production ready. + */ + public enum FV6 implements FeatureVersion { + UT_FV6_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()), + UT_FV6_1(1, MetadataVersion.latestTesting(), Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.latestTesting().featureLevel())); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.6"; + + public static final FV6 LATEST_PRODUCTION = UT_FV6_1; + + FV6(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } + + /** + * The feature is used to test the default value has MV dependency that is behind the bootstrap MV. + */ + public enum FV7 implements FeatureVersion { + UT_FV7_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel())), + UT_FV7_1(1, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + public static final String FEATURE_NAME = "unit.test.feature.version.7"; + + public static final FV7 LATEST_PRODUCTION = UT_FV7_1; + + FV7(int featureLevel, MetadataVersion bootstrapMetadataVersion, Map dependencies) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java new file mode 100644 index 0000000000000..462107c9b93a7 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FeatureTest { + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testV0SupportedInEarliestMV(Feature feature) { + assertTrue(feature.featureVersions().length >= 1); + assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, + feature.featureVersions()[0].bootstrapMetadataVersion()); + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testFromFeatureLevelAllFeatures(Feature feature) { + FeatureVersion[] featureImplementations = feature.featureVersions(); + int numFeatures = featureImplementations.length; + short latestProductionLevel = feature.latestProduction(); + + for (short i = 0; i < numFeatures; i++) { + short level = i; + if (latestProductionLevel < i) { + assertEquals(featureImplementations[i], feature.fromFeatureLevel(level, true)); + assertThrows(IllegalArgumentException.class, () -> feature.fromFeatureLevel(level, false)); + } else { + assertEquals(featureImplementations[i], feature.fromFeatureLevel(level, false)); + } + } + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testValidateVersionAllFeatures(Feature feature) { + for (FeatureVersion featureImpl : feature.featureVersions()) { + // Ensure the minimum bootstrap metadata version is included if no metadata version dependency. + Map deps = new HashMap<>(); + deps.putAll(featureImpl.dependencies()); + if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { + deps.put(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.featureLevel()); + } + + // Ensure that the feature is valid given the typical metadataVersionMapping and the dependencies. + // Note: Other metadata versions are valid, but this one should always be valid. + Feature.validateVersion(featureImpl, deps); + } + } + + @Test + public void testInvalidValidateVersion() { + // No MetadataVersion is invalid + assertThrows(IllegalArgumentException.class, + () -> Feature.validateVersion( + TestFeatureVersion.TEST_1, + Collections.emptyMap() + ) + ); + + // Using too low of a MetadataVersion is invalid + assertThrows(IllegalArgumentException.class, + () -> Feature.validateVersion( + TestFeatureVersion.TEST_1, + Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_2_8_IV0.featureLevel()) + ) + ); + + // Using a version that is lower than the dependency will fail. + assertThrows(IllegalArgumentException.class, + () -> Feature.validateVersion( + TestFeatureVersion.TEST_2, + Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel()) + ) + ); + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testDefaultLevelAllFeatures(Feature feature) { + for (FeatureVersion featureImpl : feature.featureVersions()) { + // If features have the same bootstrapMetadataVersion, the highest level feature should be chosen. + short defaultLevel = feature.defaultLevel(featureImpl.bootstrapMetadataVersion()); + if (defaultLevel != featureImpl.featureLevel()) { + FeatureVersion otherFeature = feature.fromFeatureLevel(defaultLevel, true); + assertEquals(featureImpl.bootstrapMetadataVersion(), otherFeature.bootstrapMetadataVersion()); + assertTrue(defaultLevel > featureImpl.featureLevel()); + } + } + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testLatestProductionIsOneOfFeatureValues(Feature feature) { + assertTrue(feature.hasFeatureVersion(feature.latestProduction)); + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testLatestProductionIsNotBehindLatestMetadataVersion(Feature feature) { + assertTrue(feature.latestProduction() >= feature.defaultLevel(MetadataVersion.latestProduction())); + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testLatestProductionDependencyIsProductionReady(Feature feature) { + for (Map.Entry dependency: feature.latestProduction.dependencies().entrySet()) { + String featureName = dependency.getKey(); + if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { + Feature dependencyFeature = Feature.featureFromName(featureName); + assertTrue(dependencyFeature.isProductionReady(dependency.getValue())); + } + } + } + + @ParameterizedTest + @EnumSource(value = Feature.class, names = { + "UNIT_TEST_VERSION_0", + "UNIT_TEST_VERSION_1", + "UNIT_TEST_VERSION_2", + "UNIT_TEST_VERSION_3", + "UNIT_TEST_VERSION_4", + "UNIT_TEST_VERSION_5", + "UNIT_TEST_VERSION_6", + "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE) + public void testDefaultVersionDependencyIsDefaultReady(Feature feature) { + for (Map.Entry dependency: feature.defaultVersion(MetadataVersion.LATEST_PRODUCTION).dependencies().entrySet()) { + String featureName = dependency.getKey(); + if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { + Feature dependencyFeature = Feature.featureFromName(featureName); + assertTrue(dependency.getValue() <= dependencyFeature.defaultLevel(MetadataVersion.LATEST_PRODUCTION)); + } + } + } + + @ParameterizedTest + @EnumSource(MetadataVersion.class) + public void testDefaultTestVersion(MetadataVersion metadataVersion) { + short expectedVersion; + if (!metadataVersion.isLessThan(MetadataVersion.latestTesting())) { + expectedVersion = 2; + } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) { + expectedVersion = 1; + } else { + expectedVersion = 0; + } + assertEquals(expectedVersion, Feature.TEST_VERSION.defaultLevel(metadataVersion)); + } + + @Test + public void testUnstableTestVersion() { + // If the latest MetadataVersion is stable, we don't throw an error. In that case, we don't worry about unstable feature + // versions since all feature versions are stable. + if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) { + assertThrows(IllegalArgumentException.class, () -> + Feature.TEST_VERSION.fromFeatureLevel(Feature.TEST_VERSION.latestTesting(), false)); + } + Feature.TEST_VERSION.fromFeatureLevel(Feature.TEST_VERSION.latestTesting(), true); + } + + @Test + public void testValidateWithNonExistentLatestProduction() { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_0), + "Feature UNIT_TEST_VERSION_0 has latest production version UT_FV0_1 " + + "which is not one of its feature versions."); + } + + @Test + public void testValidateWithLaggingLatestProduction() { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_1), + "Feature UNIT_TEST_VERSION_1 has latest production value UT_FV1_0 " + + "smaller than its default version UT_FV1_1 with latest production MV."); + } + + @Test + public void testValidateWithDependencyNotProductionReady() { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_3), + "Feature UNIT_TEST_VERSION_3 has latest production FeatureVersion UT_FV3_1 with dependency " + + "UT_FV2_1 that is not production ready. (UNIT_TEST_VERSION_2 latest production: UT_FV2_0)"); + } + + @Test + public void testValidateWithDefaultValueDependencyAheadOfItsDefaultLevel() { + if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_5), + "Feature UNIT_TEST_VERSION_5 has default FeatureVersion UT_FV5_1 when MV=3.7-IV0 with " + + "dependency UT_FV4_1 that is behind its default version UT_FV4_0."); + } + } + + @Test + public void testValidateWithMVDependencyNotProductionReady() { + if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_6), + "Feature UNIT_TEST_VERSION_6 has latest production FeatureVersion UT_FV6_1 with " + + "MV dependency 4.0-IV3 that is not production ready. (MV latest production: 4.0-IV0)"); + } + } + + @Test + public void testValidateWithMVDependencyAheadOfBootstrapMV() { + assertThrows(IllegalArgumentException.class, () -> + validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_7), + "Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 when MV=3.0-IV1 with " + + "MV dependency 3.7-IV0 that is behind its bootstrap MV 3.0-IV1."); + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java deleted file mode 100644 index dd74b6e2d8a9c..0000000000000 --- a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.common; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class FeaturesTest { - @ParameterizedTest - @EnumSource(Features.class) - public void testV0SupportedInEarliestMV(Features feature) { - assertTrue(feature.featureVersions().length >= 1); - assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, - feature.featureVersions()[0].bootstrapMetadataVersion()); - } - - @ParameterizedTest - @EnumSource(Features.class) - public void testFromFeatureLevelAllFeatures(Features feature) { - FeatureVersion[] featureImplementations = feature.featureVersions(); - int numFeatures = featureImplementations.length; - short latestProductionLevel = feature.latestProduction(); - - for (short i = 0; i < numFeatures; i++) { - short level = i; - if (latestProductionLevel < i) { - assertEquals(featureImplementations[i], feature.fromFeatureLevel(level, true)); - assertThrows(IllegalArgumentException.class, () -> feature.fromFeatureLevel(level, false)); - } else { - assertEquals(featureImplementations[i], feature.fromFeatureLevel(level, false)); - } - } - } - - @ParameterizedTest - @EnumSource(Features.class) - public void testValidateVersionAllFeatures(Features feature) { - for (FeatureVersion featureImpl : feature.featureVersions()) { - // Ensure the minimum bootstrap metadata version is included if no metadata version dependency. - Map deps = new HashMap<>(); - deps.putAll(featureImpl.dependencies()); - if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { - deps.put(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.featureLevel()); - } - - // Ensure that the feature is valid given the typical metadataVersionMapping and the dependencies. - // Note: Other metadata versions are valid, but this one should always be valid. - Features.validateVersion(featureImpl, deps); - } - } - - @Test - public void testInvalidValidateVersion() { - // No MetadataVersion is invalid - assertThrows(IllegalArgumentException.class, - () -> Features.validateVersion( - TestFeatureVersion.TEST_1, - Collections.emptyMap() - ) - ); - - // Using too low of a MetadataVersion is invalid - assertThrows(IllegalArgumentException.class, - () -> Features.validateVersion( - TestFeatureVersion.TEST_1, - Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_2_8_IV0.featureLevel()) - ) - ); - - // Using a version that is lower than the dependency will fail. - assertThrows(IllegalArgumentException.class, - () -> Features.validateVersion( - TestFeatureVersion.TEST_2, - Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel()) - ) - ); - } - - @ParameterizedTest - @EnumSource(Features.class) - public void testDefaultValueAllFeatures(Features feature) { - for (FeatureVersion featureImpl : feature.featureVersions()) { - // If features have the same bootstrapMetadataVersion, the highest level feature should be chosen. - short defaultLevel = feature.defaultValue(featureImpl.bootstrapMetadataVersion()); - if (defaultLevel != featureImpl.featureLevel()) { - FeatureVersion otherFeature = feature.fromFeatureLevel(defaultLevel, true); - assertEquals(featureImpl.bootstrapMetadataVersion(), otherFeature.bootstrapMetadataVersion()); - assertTrue(defaultLevel > featureImpl.featureLevel()); - } - } - } - - @ParameterizedTest - @EnumSource(Features.class) - public void testLatestProductionMapsToLatestMetadataVersion(Features features) { - assertEquals(features.latestProduction(), features.defaultValue(MetadataVersion.LATEST_PRODUCTION)); - } - - @ParameterizedTest - @EnumSource(MetadataVersion.class) - public void testDefaultTestVersion(MetadataVersion metadataVersion) { - short expectedVersion; - if (!metadataVersion.isLessThan(MetadataVersion.latestTesting())) { - expectedVersion = 2; - } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) { - expectedVersion = 1; - } else { - expectedVersion = 0; - } - assertEquals(expectedVersion, Features.TEST_VERSION.defaultValue(metadataVersion)); - } - - @Test - public void testUnstableTestVersion() { - // If the latest MetadataVersion is stable, we don't throw an error. In that case, we don't worry about unstable feature - // versions since all feature versions are stable. - if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) { - assertThrows(IllegalArgumentException.class, () -> - Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), false)); - } - Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), true); - } -} diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java index 95777effe5e81..197ebea427a58 100644 --- a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java +++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.kafka.server.common.Features.PRODUCTION_FEATURES; +import static org.apache.kafka.server.common.Feature.PRODUCTION_FEATURES; /** * A class that encapsulates the latest features supported by the Broker and also provides APIs to diff --git a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java index 8963b40fceb90..4bf9934457128 100644 --- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java +++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java @@ -27,9 +27,9 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION; -import static org.apache.kafka.server.common.Features.GROUP_VERSION; -import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION; +import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION; +import static org.apache.kafka.server.common.Feature.GROUP_VERSION; +import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java index 1697a1fe0a690..1edf465cf345a 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import java.io.File; @@ -66,7 +66,7 @@ public class ClusterConfig { private final Map saslClientProperties; private final List tags; private final Map> perServerProperties; - private final Map features; + private final Map features; @SuppressWarnings("checkstyle:ParameterNumber") private ClusterConfig(Set types, int brokers, int controllers, int disksPerBroker, boolean autoStart, @@ -75,7 +75,7 @@ private ClusterConfig(Set types, int brokers, int controllers, int disksPe MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, Map saslClientProperties, Map> perServerProperties, List tags, - Map features) { + Map features) { // do fail fast. the following values are invalid for kraft modes. if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero."); if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero."); @@ -179,7 +179,7 @@ public List tags() { return tags; } - public Map features() { + public Map features() { return features; } @@ -255,7 +255,7 @@ public static class Builder { private Map saslClientProperties = Collections.emptyMap(); private Map> perServerProperties = Collections.emptyMap(); private List tags = Collections.emptyList(); - private Map features = Collections.emptyMap(); + private Map features = Collections.emptyMap(); private Builder() {} @@ -356,7 +356,7 @@ public Builder setTags(List tags) { return this; } - public Builder setFeatures(Map features) { + public Builder setFeatures(Map features) { this.features = Collections.unmodifiableMap(features); return this; } diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java index ba0eee508c420..ab1893ab05b4d 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.test.api; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -28,6 +28,6 @@ @Target({ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ClusterFeature { - Features feature(); + Feature feature(); short version(); } diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java index 180acd9630610..65cfac0e0804a 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.test.api; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.util.timer.SystemTimer; import org.junit.jupiter.api.extension.AfterEachCallback; @@ -240,7 +240,7 @@ private List processClusterTestInternal( .collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(), Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)))); - Map features = Arrays.stream(clusterTest.features()) + Map features = Arrays.stream(clusterTest.features()) .collect(Collectors.toMap(ClusterFeature::feature, ClusterFeature::version)); ClusterConfig config = ClusterConfig.builder() diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java index de491c7f7195d..22a009b394e6e 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java @@ -29,8 +29,8 @@ import org.apache.kafka.metadata.BrokerState; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.storage.FormatterException; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.FeatureVersion; -import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.FaultHandlerException; @@ -236,12 +236,12 @@ public Map controllers() { public void format() throws Exception { if (formated.compareAndSet(false, true)) { - Map nameToSupportedFeature = new TreeMap<>(); - Features.PRODUCTION_FEATURES.forEach(feature -> nameToSupportedFeature.put(feature.featureName(), feature)); + Map nameToSupportedFeature = new TreeMap<>(); + Feature.PRODUCTION_FEATURES.forEach(feature -> nameToSupportedFeature.put(feature.featureName(), feature)); Map newFeatureLevels = new TreeMap<>(); // Verify that all specified features are known to us. - for (Map.Entry entry : clusterConfig.features().entrySet()) { + for (Map.Entry entry : clusterConfig.features().entrySet()) { String featureName = entry.getKey().featureName(); short level = entry.getValue(); if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { @@ -255,10 +255,10 @@ public void format() throws Exception { newFeatureLevels.put(MetadataVersion.FEATURE_NAME, clusterConfig.metadataVersion().featureLevel()); // Add default values for features that were not specified. - Features.PRODUCTION_FEATURES.forEach(supportedFeature -> { + Feature.PRODUCTION_FEATURES.forEach(supportedFeature -> { if (!newFeatureLevels.containsKey(supportedFeature.featureName())) { newFeatureLevels.put(supportedFeature.featureName(), - supportedFeature.defaultValue(clusterConfig.metadataVersion())); + supportedFeature.defaultLevel(clusterConfig.metadataVersion())); } }); @@ -268,10 +268,10 @@ public void format() throws Exception { String featureName = entry.getKey(); if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { short level = entry.getValue(); - Features supportedFeature = nameToSupportedFeature.get(featureName); + Feature supportedFeature = nameToSupportedFeature.get(featureName); FeatureVersion featureVersion = supportedFeature.fromFeatureLevel(level, true); - Features.validateVersion(featureVersion, newFeatureLevels); + Feature.validateVersion(featureVersion, newFeatureLevels); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java index 7464e68b8ad37..3e81126d8cd01 100644 --- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java @@ -24,8 +24,8 @@ import org.apache.kafka.clients.admin.UpdateFeaturesResult; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.FeatureVersion; -import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.util.CommandLineUtils; @@ -118,10 +118,10 @@ static void execute(String... args) throws Exception { handleDisable(namespace, adminClient); break; case "version-mapping": - handleVersionMapping(namespace, Features.PRODUCTION_FEATURES); + handleVersionMapping(namespace, Feature.PRODUCTION_FEATURES); break; case "feature-dependencies": - handleFeatureDependencies(namespace, Features.PRODUCTION_FEATURES); + handleFeatureDependencies(namespace, Feature.PRODUCTION_FEATURES); break; default: throw new TerseException("Unknown command " + command); @@ -301,8 +301,8 @@ private static void handleUpgradeOrDowngrade(String op, Namespace namespace, Adm MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latestProduction())); } try { - for (Features feature : Features.PRODUCTION_FEATURES) { - short featureLevel = feature.defaultValue(metadataVersion); + for (Feature feature : Feature.PRODUCTION_FEATURES) { + short featureLevel = feature.defaultLevel(metadataVersion); // Don't send a request to upgrade a feature to 0. if (upgradeType != FeatureUpdate.UpgradeType.UPGRADE || featureLevel > 0) { updates.put(feature.featureName(), new FeatureUpdate(featureLevel, upgradeType)); @@ -356,7 +356,7 @@ static void handleDisable(Namespace namespace, Admin adminClient) throws TerseEx update("disable", adminClient, updates, namespace.getBoolean("dry_run")); } - static void handleVersionMapping(Namespace namespace, List validFeatures) throws TerseException { + static void handleVersionMapping(Namespace namespace, List validFeatures) throws TerseException { // Get the release version from the command-line arguments or default to the latest stable version String releaseVersion = Optional.ofNullable(namespace.getString("release_version")) .orElseGet(() -> MetadataVersion.latestProduction().version()); @@ -367,8 +367,8 @@ static void handleVersionMapping(Namespace namespace, List validFeatur short metadataVersionLevel = version.featureLevel(); System.out.printf("metadata.version=%d (%s)%n", metadataVersionLevel, releaseVersion); - for (Features feature : validFeatures) { - short featureLevel = feature.defaultValue(version); + for (Feature feature : validFeatures) { + short featureLevel = feature.defaultLevel(version); System.out.printf("%s=%d%n", feature.featureName(), featureLevel); } } catch (IllegalArgumentException e) { @@ -378,7 +378,7 @@ static void handleVersionMapping(Namespace namespace, List validFeatur } } - static void handleFeatureDependencies(Namespace namespace, List validFeatures) throws TerseException { + static void handleFeatureDependencies(Namespace namespace, List validFeatures) throws TerseException { List featureArgs = namespace.getList("feature"); // Iterate over each feature specified with --feature @@ -400,7 +400,7 @@ static void handleFeatureDependencies(Namespace namespace, List validF // Assuming metadata versions do not have dependencies. System.out.printf("%s=%d (%s) has no dependencies.%n", featureName, featureLevel, metadataVersion.version()); } else { - Features featureEnum = validFeatures.stream() + Feature featureEnum = validFeatures.stream() .filter(f -> f.featureName().equals(featureName)) .findFirst() .orElseThrow(() -> new TerseException("Unknown feature: " + featureName)); diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 1aafc7db994bf..d923e46cf3138 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestExtensions; import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import net.sourceforge.argparse4j.inf.Namespace; @@ -49,7 +49,7 @@ @ExtendWith(value = ClusterTestExtensions.class) public class FeatureCommandTest { - private final List testingFeatures = Arrays.stream(Features.FEATURES).collect(Collectors.toList()); + private final List testingFeatures = Arrays.stream(Feature.FEATURES).collect(Collectors.toList()); @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDescribeWithKRaft(ClusterInstance cluster) { @@ -390,8 +390,8 @@ public void testHandleVersionMappingWithValidReleaseVersion() { assertTrue(versionMappingOutput.contains("metadata.version=" + metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"), "Output did not contain expected Metadata Version: " + versionMappingOutput); - for (Features feature : Features.values()) { - int featureLevel = feature.defaultValue(metadataVersion); + for (Feature feature : Feature.values()) { + int featureLevel = feature.defaultLevel(metadataVersion); assertTrue(versionMappingOutput.contains(feature.featureName() + "=" + featureLevel), "Output did not contain expected feature mapping: " + versionMappingOutput); } @@ -414,8 +414,8 @@ public void testHandleVersionMappingWithNoReleaseVersion() { assertTrue(versionMappingOutput.contains("metadata.version=" + metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"), "Output did not contain expected Metadata Version: " + versionMappingOutput); - for (Features feature : Features.values()) { - int featureLevel = feature.defaultValue(metadataVersion); + for (Feature feature : Feature.values()) { + int featureLevel = feature.defaultLevel(metadataVersion); assertTrue(versionMappingOutput.contains(feature.featureName() + "=" + featureLevel), "Output did not contain expected feature mapping: " + versionMappingOutput); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index 6abe2d2e8cf6e..0ed2e2ed25440 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.Feature; import java.time.Duration; import java.util.ArrayList; @@ -85,8 +85,8 @@ static List generator() { .setServerProperties(serverProperties) .setTags(Collections.singletonList("kraftGroupCoordinator")) .setFeatures(Utils.mkMap( - Utils.mkEntry(Features.TRANSACTION_VERSION, (short) 2), - Utils.mkEntry(Features.GROUP_VERSION, (short) 1))) + Utils.mkEntry(Feature.TRANSACTION_VERSION, (short) 2), + Utils.mkEntry(Feature.GROUP_VERSION, (short) 1))) .build()); }