From 4b8a2613be74e4bc386296087457cb80f8a3119d Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 31 Jan 2026 21:17:28 +0800 Subject: [PATCH] MINOR: add Admin#updateFeatures overload method Signed-off-by: Kuan-Po Tseng --- .../clients/MetadataVersionIntegrationTest.java | 7 ++----- .../java/org/apache/kafka/clients/admin/Admin.java | 13 +++++++++++++ .../kafka/clients/admin/KafkaAdminClientTest.java | 6 ++---- .../kafka/api/PlaintextAdminIntegrationTest.scala | 3 +-- .../integration/UncleanLeaderElectionTest.scala | 5 ++--- .../server/BootstrapControllersIntegrationTest.java | 4 +--- .../EligibleLeaderReplicasIntegrationTest.java | 13 ++++--------- .../org/apache/kafka/server/KRaftClusterTest.java | 4 +--- .../kafka/tools/streams/DeleteStreamsGroupTest.java | 3 +-- 9 files changed, 27 insertions(+), 31 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java index 3d560892fe87d..864f8ed7422a3 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients; import org.apache.kafka.clients.admin.FeatureUpdate; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTests; @@ -47,8 +46,7 @@ public void testBasicMetadataVersionUpgrade(ClusterInstance clusterInstance) thr // Update to new version short updateVersion = MetadataVersion.IBP_3_7_IV1.featureLevel(); var updateResult = admin.updateFeatures( - Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()); + Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE))); updateResult.all().get(); // Verify that new version is visible on broker @@ -69,8 +67,7 @@ public void testUpgradeSameVersion(ClusterInstance clusterInstance) throws Excep try (var admin = clusterInstance.admin()) { short updateVersion = MetadataVersion.IBP_3_9_IV0.featureLevel(); var updateResult = admin.updateFeatures( - Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()); + Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE))); updateResult.all().get(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index c5ed11d8237ef..313c29ca49ee1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1543,6 +1543,19 @@ default DescribeFeaturesResult describeFeatures() { */ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + /** + * Applies specified updates to finalized features. + *

+ * This is a convenience method for {@link #updateFeatures(Map, UpdateFeaturesOptions)} with default options. + * See the overload for more details. + * + * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate} + * @return the {@link UpdateFeaturesResult} containing the result + */ + default UpdateFeaturesResult updateFeatures(Map featureUpdates) { + return updateFeatures(featureUpdates, new UpdateFeaturesOptions()); + } + /** * Applies specified updates to finalized features. This operation is not transactional so some * updates may succeed while the rest may fail. diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 93f6daaf8ec8f..1b274c95adde4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -8845,8 +8845,7 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() { try (final AdminClientUnitTestEnv env = mockClientEnv()) { assertThrows( IllegalArgumentException.class, - () -> env.adminClient().updateFeatures( - new HashMap<>(), new UpdateFeaturesOptions())); + () -> env.adminClient().updateFeatures(new HashMap<>())); } } @@ -8857,8 +8856,7 @@ public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() { IllegalArgumentException.class, () -> env.adminClient().updateFeatures( Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)), - Utils.mkEntry("", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE))), - new UpdateFeaturesOptions())); + Utils.mkEntry("", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE))))); } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 28202cb04d67d..43b8a4f702edf 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4114,8 +4114,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { private def disableEligibleLeaderReplicas(admin: Admin): Unit = { if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) { admin.updateFeatures( - util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)), - new UpdateFeaturesOptions()).all().get() + util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get() } } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 11271d17c9389..e9c500822e1aa 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -127,8 +127,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { private def disableEligibleLeaderReplicas(): Unit = { if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) { admin.updateFeatures( - util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)), - new UpdateFeaturesOptions()).all().get() + util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get() } } diff --git a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java index a97a4cb3bed5c..c889ba74d9f72 100644 --- a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.clients.admin.UpdateFeaturesResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -173,8 +172,7 @@ public void testUpdateFeatures(ClusterInstance clusterInstance) { private void testUpdateFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) { try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) { UpdateFeaturesResult result = admin.updateFeatures(Map.of("foo.bar.feature", - new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()); + new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE))); ExecutionException exception = assertThrows(ExecutionException.class, () -> result.all().get(1, TimeUnit.MINUTES)); diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 61863688f2e42..093323ed5d046 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -97,8 +96,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc String testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test"); admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()).all().get(); + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -164,8 +162,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()).all().get(); + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -234,8 +231,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()).all().get(); + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -293,8 +289,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions()).all().get(); + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java index f89e5089156fb..4513c87da6821 100644 --- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java +++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.SupportedVersionRange; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaFuture; @@ -1142,8 +1141,7 @@ public void testUpdateMetadataVersion() throws Exception { try (Admin admin = cluster.admin()) { admin.updateFeatures( Map.of(MetadataVersion.FEATURE_NAME, - new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() + new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)) ); assertEquals(new SupportedVersionRange((short) 0, (short) 1), admin.describeFeatures().featureMetadata().get() .supportedFeatures().get(KRaftVersion.FEATURE_NAME)); diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java index 8730d23066eb1..a620d63a697bf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.FeatureUpdate; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.ListGroupsOptions; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.errors.GroupNotEmptyException; @@ -446,7 +445,7 @@ private void updateStreamsGroupProtocol(short version) { try (Admin admin = cluster.createAdminClient()) { Map updates = Utils.mkMap( Utils.mkEntry("streams.version", new FeatureUpdate(version, version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE : FeatureUpdate.UpgradeType.UPGRADE))); - admin.updateFeatures(updates, new UpdateFeaturesOptions()).all().get(); + admin.updateFeatures(updates).all().get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); }