diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java index 26825a0c24763..586bf862dc11c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java @@ -92,8 +92,7 @@ public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Ma .setThrottleTimeMs(throttleTimeMs) .setErrorCode(topLevelError.error().code()) .setErrorMessage(topLevelError.message()) - .setResults(results) - .setThrottleTimeMs(throttleTimeMs); + .setResults(results); return new UpdateFeaturesResponse(responseData); } } diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json index 2b3181362d7c5..21e1bf663dda5 100644 --- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json +++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json @@ -16,7 +16,7 @@ { "apiKey": 57, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "UpdateFeaturesRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index ed9b55a1a7c39..d4fc1f6b983df 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -20,8 +20,8 @@ package kafka.server import java.util import java.util.Collections import java.util.Map.Entry -import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS} import java.util.concurrent.{CompletableFuture, ExecutionException} +import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS} import kafka.network.RequestChannel import kafka.raft.RaftManager @@ -36,10 +36,11 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse} import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult +import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse -import org.apache.kafka.common.message.{CreateTopicsRequestData, _} +import org.apache.kafka.common.message._ import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ @@ -108,6 +109,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request) case ApiKeys.ELECT_LEADERS => handleElectLeaders(request) + case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } } catch { @@ -183,7 +185,7 @@ class ControllerApis(val requestChannel: RequestChannel, throw new TopicDeletionDisabledException() } } - val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS); + val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS) // The first step is to load up the names and IDs that have been provided by the // request. This is a bit messy because we support multiple ways of referring to // topics (both by name and by id) and because we need to check for duplicates or @@ -704,7 +706,7 @@ class ControllerApis(val requestChannel: RequestChannel, hasClusterAuth: Boolean, getCreatableTopics: Iterable[String] => Set[String]) : CompletableFuture[util.List[CreatePartitionsTopicResult]] = { - val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS); + val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS) val responses = new util.ArrayList[CreatePartitionsTopicResult]() val duplicateTopicNames = new util.HashSet[String]() val topicNames = new util.HashSet[String]() @@ -775,4 +777,19 @@ class ControllerApis(val requestChannel: RequestChannel, } }) } + + def handleUpdateFeatures(request: RequestChannel.Request): Unit = { + val updateFeaturesRequest = request.body[UpdateFeaturesRequest] + authHelper.authorizeClusterOperation(request, ALTER) + controller.updateFeatures(updateFeaturesRequest.data) + .whenComplete((response, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)) + }) + } + }) + } } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index f56e2cb677cfd..fae9632f6e733 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -308,6 +310,11 @@ public CompletableFuture allocateProducerIds(Al throw new UnsupportedOperationException(); } + @Override + public CompletableFuture updateFeatures(UpdateFeaturesRequestData request) { + throw new UnsupportedOperationException(); + } + @Override synchronized public CompletableFuture> createPartitions(long deadlineNs, List topicList) { diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 2176d23bb748b..7830f19671e16 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -486,6 +486,14 @@ class ControllerApisTest { new ListPartitionReassignmentsRequestData()).build()))) } + @Test + def testUnauthorizedHandleUpdateFeatures(): Unit = { + assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). + handleUpdateFeatures(buildRequest(new UpdateFeaturesRequest.Builder( + new UpdateFeaturesRequestData()).build()))) + } + @Test def testCreateTopics(): Unit = { val controller = new MockController.Builder().build() diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index f06b1088baa1e..d4b3f46ef780a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; @@ -234,6 +236,15 @@ CompletableFuture allocateProducerIds( AllocateProducerIdsRequestData request ); + /** + * Perform some features changes + * + * @return A future which yields feature update results as a response + */ + CompletableFuture updateFeatures( + UpdateFeaturesRequestData request + ); + /** * Begin writing a controller snapshot. If there was already an ongoing snapshot, it * simply returns information about that snapshot rather than starting a new one. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java index d130de59282d4..739a24de1f5a1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java @@ -54,7 +54,8 @@ public boolean equals(Object o) { if (o == null || (!o.getClass().equals(getClass()))) { return false; } - ControllerResult other = (ControllerResult) o; + @SuppressWarnings("unchecked") + ControllerResult other = (ControllerResult) o; return records.equals(other.records) && Objects.equals(response, other.response) && Objects.equals(isAtomic, other.isAtomic); diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index ed7c98cbb6d24..605e515fdab61 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -17,27 +17,27 @@ package org.apache.kafka.controller; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.TreeMap; - +import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.FeatureMap; import org.apache.kafka.metadata.FeatureMapAndEpoch; import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.TreeMap; + import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD; @@ -59,13 +59,27 @@ public class FeatureControlManager { } ControllerResult> updateFeatures( - Map updates, Set downgradeables, - Map> brokerFeatures) { + UpdateFeaturesRequestData.FeatureUpdateKeyCollection updates, + Map> brokerFeatures + ) { TreeMap results = new TreeMap<>(); List records = new ArrayList<>(); - for (Entry entry : updates.entrySet()) { - results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), - downgradeables.contains(entry.getKey()), brokerFeatures, records)); + + for (UpdateFeaturesRequestData.FeatureUpdateKey update : updates) { + VersionRange existingVersionRange = finalizedVersions.get(update.feature()); + short minVersion = existingVersionRange == null ? 1 : existingVersionRange.min(); + if (update.feature().isEmpty()) + // Check that the feature name is not empty. + results.put(update.feature(), + new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")); + else + results.put(update.feature(), updateFeature( + update.feature(), + new VersionRange(minVersion, update.maxVersionLevel()), + update.allowDowngrade(), + brokerFeatures, + records + )); } return ControllerResult.atomicOf(records, results); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 662da8ab9bfd2..6d75e7caa2b1e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -42,6 +42,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; @@ -86,6 +88,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.Map; @@ -223,7 +226,6 @@ public Builder setConfigurationValidator(ConfigurationValidator configurationVal return this; } - @SuppressWarnings("unchecked") public QuorumController build() throws Exception { if (raftClient == null) { throw new RuntimeException("You must set a raft client."); @@ -542,13 +544,13 @@ ReplicationControlManager replicationControl() { // VisibleForTesting CompletableFuture appendReadEvent(String name, Supplier handler) { - ControllerReadEvent event = new ControllerReadEvent(name, handler); + ControllerReadEvent event = new ControllerReadEvent<>(name, handler); queue.append(event); return event.future(); } CompletableFuture appendReadEvent(String name, long deadlineNs, Supplier handler) { - ControllerReadEvent event = new ControllerReadEvent(name, handler); + ControllerReadEvent event = new ControllerReadEvent<>(name, handler); queue.appendWithDeadline(deadlineNs, event); return event.future(); } @@ -928,7 +930,6 @@ private void cancelMaybeFenceReplicas() { queue.cancelDeferred(MAYBE_FENCE_REPLICAS); } - @SuppressWarnings("unchecked") private void replay(ApiMessage message, Optional snapshotId, long offset) { try { MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); @@ -1392,6 +1393,29 @@ public CompletableFuture allocateProducerIds( .setProducerIdLen(result.producerIdLen())); } + @Override + public CompletableFuture updateFeatures(UpdateFeaturesRequestData request) { + Map> brokerFeatures = new HashMap<>(); + clusterControl.brokerRegistrations().forEach((key, value) -> brokerFeatures.put(key, value.supportedFeatures())); + + return appendWriteEvent("updateFeatures", + () -> featureControl.updateFeatures(request.featureUpdates(), brokerFeatures) + ).thenApply(updateErrors -> { + UpdateFeaturesResponseData.UpdatableFeatureResultCollection results = new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(); + for (Map.Entry updateError : updateErrors.entrySet()) { + String feature = updateError.getKey(); + ApiError error = updateError.getValue(); + UpdateFeaturesResponseData.UpdatableFeatureResult result = new UpdateFeaturesResponseData.UpdatableFeatureResult(); + result.setFeature(feature) + .setErrorCode(error.error().code()) + .setErrorMessage(error.message()); + results.add(result); + } + return new UpdateFeaturesResponseData() + .setResults(results); + }); + } + @Override public CompletableFuture> createPartitions(long deadlineNs, List topics) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 680253c712547..8b494d0fb9b56 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection; +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKey; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -41,7 +44,7 @@ @Timeout(value = 40) public class FeatureControlManagerTest { - @SuppressWarnings("unchecked") + private static Map rangeMap(Object... args) { Map result = new HashMap<>(); for (int i = 0; i < args.length; i += 3) { @@ -53,6 +56,20 @@ private static Map rangeMap(Object... args) { return result; } + private static FeatureUpdateKeyCollection featureUpdateKeys(Object... args) { + FeatureUpdateKeyCollection result = new FeatureUpdateKeyCollection(); + for (int i = 0; i < args.length; i += 3) { + String feature = (String) args[i]; + Integer max = (Integer) args[i + 1]; + Boolean downgradable = (Boolean) args[i + 2]; + result.add(new FeatureUpdateKey() + .setFeature(feature) + .setMaxVersionLevel(max.shortValue()) + .setAllowDowngrade(downgradable)); + } + return result; + } + @Test public void testUpdateFeatures() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); @@ -64,12 +81,10 @@ public void testUpdateFeatures() { assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections. singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "The controller does not support the given feature range."))), - manager.updateFeatures(rangeMap("foo", 1, 3), - Collections.singleton("foo"), + manager.updateFeatures(featureUpdateKeys("foo", 3, false), Collections.emptyMap())); ControllerResult> result = manager.updateFeatures( - rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(), - Collections.emptyMap()); + featureUpdateKeys("foo", 2, false, "bar", 1, false), Collections.emptyMap()); Map expectedMap = new HashMap<>(); expectedMap.put("foo", ApiError.NONE); expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION, @@ -102,6 +117,23 @@ public void testUpdateFeaturesErrorCases() { FeatureControlManager manager = new FeatureControlManager( rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry); + assertEquals( + ControllerResult.atomicOf( + Collections.emptyList(), + Collections.singletonMap( + "", + new ApiError( + Errors.INVALID_REQUEST, + "Feature name can not be empty." + ) + ) + ), + manager.updateFeatures( + featureUpdateKeys("", 3, true), + Collections.singletonMap(5, rangeMap()) + ) + ); + assertEquals( ControllerResult.atomicOf( Collections.emptyList(), @@ -114,14 +146,13 @@ public void testUpdateFeaturesErrorCases() { ) ), manager.updateFeatures( - rangeMap("foo", 1, 3), - Collections.singleton("foo"), + featureUpdateKeys("foo", 3, false), Collections.singletonMap(5, rangeMap()) ) ); ControllerResult> result = manager.updateFeatures( - rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap()); + featureUpdateKeys("foo", 3, false), Collections.emptyMap()); assertEquals(Collections.singletonMap("foo", ApiError.NONE), result.response()); manager.replay((FeatureLevelRecord) result.records().get(0).message()); snapshotRegistry.getOrCreateSnapshot(3); @@ -130,8 +161,8 @@ public void testUpdateFeaturesErrorCases() { singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "Can't downgrade the maximum version of this feature without " + "setting downgradable to true."))), - manager.updateFeatures(rangeMap("foo", 1, 2), - Collections.emptySet(), Collections.emptyMap())); + manager.updateFeatures(featureUpdateKeys("foo", 2, false), + Collections.emptyMap())); assertEquals( ControllerResult.atomicOf( @@ -147,8 +178,7 @@ public void testUpdateFeaturesErrorCases() { Collections.singletonMap("foo", ApiError.NONE) ), manager.updateFeatures( - rangeMap("foo", 1, 2), - Collections.singleton("foo"), + featureUpdateKeys("foo", 2, true), Collections.emptyMap() ) ); @@ -160,8 +190,8 @@ public void testFeatureControlIterator() throws Exception { FeatureControlManager manager = new FeatureControlManager( rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry); ControllerResult> result = manager. - updateFeatures(rangeMap("foo", 1, 5, "bar", 1, 1), - Collections.emptySet(), Collections.emptyMap()); + updateFeatures(featureUpdateKeys("foo", 5, false, "bar", 1, false), + Collections.emptyMap()); RecordTestUtils.replayAll(manager, result.records()); RecordTestUtils.assertBatchIteratorContains(Arrays.asList( Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().