Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.clients;

import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
Expand Down Expand Up @@ -47,8 +46,7 @@ public void testBasicMetadataVersionUpgrade(ClusterInstance clusterInstance) thr
// Update to new version
short updateVersion = MetadataVersion.IBP_3_7_IV1.featureLevel();
var updateResult = admin.updateFeatures(
Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions());
Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)));
updateResult.all().get();

// Verify that new version is visible on broker
Expand All @@ -69,8 +67,7 @@ public void testUpgradeSameVersion(ClusterInstance clusterInstance) throws Excep
try (var admin = clusterInstance.admin()) {
short updateVersion = MetadataVersion.IBP_3_9_IV0.featureLevel();
var updateResult = admin.updateFeatures(
Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions());
Map.of("metadata.version", new FeatureUpdate(updateVersion, FeatureUpdate.UpgradeType.UPGRADE)));
updateResult.all().get();
}
}
Expand Down
13 changes: 13 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,19 @@ default DescribeFeaturesResult describeFeatures() {
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

/**
* Applies specified updates to finalized features.
* <p>
* This is a convenience method for {@link #updateFeatures(Map, UpdateFeaturesOptions)} with default options.
* See the overload for more details.
*
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @return the {@link UpdateFeaturesResult} containing the result
*/
default UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates) {
return updateFeatures(featureUpdates, new UpdateFeaturesOptions());
}

/**
* Applies specified updates to finalized features. This operation is not transactional so some
* updates may succeed while the rest may fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8845,8 +8845,7 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
new HashMap<>(), new UpdateFeaturesOptions()));
() -> env.adminClient().updateFeatures(new HashMap<>()));
}
}

Expand All @@ -8857,8 +8856,7 @@ public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() {
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
Utils.mkEntry("", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE))),
new UpdateFeaturesOptions()));
Utils.mkEntry("", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4114,8 +4114,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
private def disableEligibleLeaderReplicas(admin: Admin): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
new UpdateFeaturesOptions()).all().get()
util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
Expand Down Expand Up @@ -127,8 +127,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
private def disableEligibleLeaderReplicas(): Unit = {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
admin.updateFeatures(
util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
new UpdateFeaturesOptions()).all().get()
util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -173,8 +172,7 @@ public void testUpdateFeatures(ClusterInstance clusterInstance) {
private void testUpdateFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
UpdateFeaturesResult result = admin.updateFeatures(Map.of("foo.bar.feature",
new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions());
new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)));
ExecutionException exception =
assertThrows(ExecutionException.class,
() -> result.all().get(1, TimeUnit.MINUTES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -97,8 +96,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc
String testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test");
admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()).all().get();
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get();

admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);
Expand Down Expand Up @@ -164,8 +162,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx

admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()).all().get();
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);

Expand Down Expand Up @@ -234,8 +231,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx

admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()).all().get();
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);

Expand Down Expand Up @@ -293,8 +289,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep

admin.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()).all().get();
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))).all().get();
admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
clusterInstance.waitTopicCreation(testTopicName, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.SupportedVersionRange;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaFuture;
Expand Down Expand Up @@ -1142,8 +1141,7 @@ public void testUpdateMetadataVersion() throws Exception {
try (Admin admin = cluster.admin()) {
admin.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME,
new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()
new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE))
);
assertEquals(new SupportedVersionRange((short) 0, (short) 1), admin.describeFeatures().featureMetadata().get()
.supportedFeatures().get(KRaftVersion.FEATURE_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.errors.GroupNotEmptyException;
Expand Down Expand Up @@ -446,7 +445,7 @@ private void updateStreamsGroupProtocol(short version) {
try (Admin admin = cluster.createAdminClient()) {
Map<String, FeatureUpdate> updates = Utils.mkMap(
Utils.mkEntry("streams.version", new FeatureUpdate(version, version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE : FeatureUpdate.UpgradeType.UPGRADE)));
admin.updateFeatures(updates, new UpdateFeaturesOptions()).all().get();
admin.updateFeatures(updates).all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down