From e9896cd51fe4aac4aa92254c4652ca92ae26a99f Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 27 Aug 2021 22:06:48 +0800 Subject: [PATCH] KAFKA-13242: Handle UpdateFeatureRequest in kraft --- .../requests/UpdateFeaturesResponse.java | 3 +- .../common/message/UpdateFeaturesRequest.json | 2 +- .../scala/kafka/server/ControllerApis.scala | 21 ++++++- .../test/java/kafka/test/MockController.java | 7 +++ .../kafka/server/ControllerApisTest.scala | 10 ++++ .../apache/kafka/controller/Controller.java | 10 ++++ .../kafka/controller/ControllerResult.java | 3 +- .../controller/FeatureControlManager.java | 39 ++++++++---- .../kafka/controller/QuorumController.java | 31 +++++++++- .../controller/FeatureControlManagerTest.java | 60 ++++++++++++++----- 10 files changed, 154 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java index 26825a0c24763..586bf862dc11c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java @@ -92,8 +92,7 @@ public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Ma .setThrottleTimeMs(throttleTimeMs) .setErrorCode(topLevelError.error().code()) .setErrorMessage(topLevelError.message()) - .setResults(results) - .setThrottleTimeMs(throttleTimeMs); + .setResults(results); return new UpdateFeaturesResponse(responseData); } } diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json index 2b3181362d7c5..21e1bf663dda5 100644 --- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json +++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json @@ -16,7 +16,7 @@ { "apiKey": 57, "type": "request", - "listeners": ["zkBroker", "broker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "UpdateFeaturesRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 2f86e1f7cfd03..2b39a0c517e67 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -108,6 +108,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request) + case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } } catch { @@ -183,7 +184,7 @@ class ControllerApis(val requestChannel: RequestChannel, throw new TopicDeletionDisabledException() } } - val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS); + val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS) // The first step is to load up the names and IDs that have been provided by the // request. This is a bit messy because we support multiple ways of referring to // topics (both by name and by id) and because we need to check for duplicates or @@ -686,7 +687,7 @@ class ControllerApis(val requestChannel: RequestChannel, hasClusterAuth: Boolean, getCreatableTopics: Iterable[String] => Set[String]) : CompletableFuture[util.List[CreatePartitionsTopicResult]] = { - val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS); + val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS) val responses = new util.ArrayList[CreatePartitionsTopicResult]() val duplicateTopicNames = new util.HashSet[String]() val topicNames = new util.HashSet[String]() @@ -757,4 +758,20 @@ class ControllerApis(val requestChannel: RequestChannel, } }) } + + def handleUpdateFeatures(request: RequestChannel.Request): Unit = { + val updateFeaturesRequest = request.body[UpdateFeaturesRequest] + authHelper.authorizeClusterOperation(request, ALTER) + controller.updateFeatures(updateFeaturesRequest.data()) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + results.setThrottleTimeMs(requestThrottleMs) + new UpdateFeaturesResponse(results) + }) + } + }) + } } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index f56e2cb677cfd..fae9632f6e733 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -308,6 +310,11 @@ public CompletableFuture allocateProducerIds(Al throw new UnsupportedOperationException(); } + @Override + public CompletableFuture updateFeatures(UpdateFeaturesRequestData request) { + throw new UnsupportedOperationException(); + } + @Override synchronized public CompletableFuture> createPartitions(long deadlineNs, List topicList) { diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 768c06a9ec3b2..3e862a7992000 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -488,6 +488,14 @@ class ControllerApisTest { new ListPartitionReassignmentsRequestData()).build()))) } + @Test + def testUnauthorizedHandleUpdateFeatures(): Unit = { + assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). + handleUpdateFeatures(buildRequest(new UpdateFeaturesRequest.Builder( + new UpdateFeaturesRequestData()).build()))) + } + @Test def testCreateTopics(): Unit = { val controller = new MockController.Builder().build() @@ -732,6 +740,8 @@ class ControllerApisTest { controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet) } + // TODO test + @AfterEach def tearDown(): Unit = { quotas.shutdown() diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index f06b1088baa1e..42d2856c0f2f7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; @@ -234,6 +236,14 @@ CompletableFuture allocateProducerIds( AllocateProducerIdsRequestData request ); + /** + * TODO + * @return + */ + CompletableFuture updateFeatures( + UpdateFeaturesRequestData request + ); + /** * Begin writing a controller snapshot. If there was already an ongoing snapshot, it * simply returns information about that snapshot rather than starting a new one. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java index d130de59282d4..739a24de1f5a1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java @@ -54,7 +54,8 @@ public boolean equals(Object o) { if (o == null || (!o.getClass().equals(getClass()))) { return false; } - ControllerResult other = (ControllerResult) o; + @SuppressWarnings("unchecked") + ControllerResult other = (ControllerResult) o; return records.equals(other.records) && Objects.equals(response, other.response) && Objects.equals(isAtomic, other.isAtomic); diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index ed7c98cbb6d24..92fa262c9f573 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -28,9 +29,13 @@ import java.util.Set; import java.util.TreeMap; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.UpdateFeaturesRequest; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.FeatureMap; import org.apache.kafka.metadata.FeatureMapAndEpoch; @@ -42,10 +47,6 @@ public class FeatureControlManager { - /** - * An immutable map containing the features supported by this controller's software. - */ - private final Map supportedFeatures; /** * Maps feature names to finalized version ranges. @@ -54,18 +55,34 @@ public class FeatureControlManager { FeatureControlManager(Map supportedFeatures, SnapshotRegistry snapshotRegistry) { - this.supportedFeatures = supportedFeatures; this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); + finalizedVersions.putAll(supportedFeatures); } ControllerResult> updateFeatures( - Map updates, Set downgradeables, - Map> brokerFeatures) { + UpdateFeaturesRequestData.FeatureUpdateKeyCollection updates, + Map> brokerFeatures + ) { TreeMap results = new TreeMap<>(); List records = new ArrayList<>(); - for (Entry entry : updates.entrySet()) { - results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), - downgradeables.contains(entry.getKey()), brokerFeatures, records)); + + for (UpdateFeaturesRequestData.FeatureUpdateKey update : updates) { + VersionRange existingVersionRange = finalizedVersions.get(update.feature()); + if (update.feature().isEmpty()) + // Check that the feature name is not empty. + results.put(update.feature(), + new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")); + else if (existingVersionRange == null) + results.put(update.feature(), new ApiError(Errors.INVALID_UPDATE_VERSION, + "The controller does not support the given feature range.")); + else + results.put(update.feature(), updateFeature( + update.feature(), + new VersionRange(existingVersionRange.min(), update.maxVersionLevel()), + update.allowDowngrade(), + brokerFeatures, + records + )); } return ControllerResult.atomicOf(records, results); @@ -84,7 +101,7 @@ private ApiError updateFeature(String featureName, return new ApiError(Errors.INVALID_UPDATE_VERSION, "The upper value for the new range cannot be less than 1."); } - VersionRange localRange = supportedFeatures.get(featureName); + VersionRange localRange = finalizedVersions.get(featureName); if (localRange == null || !localRange.contains(newRange)) { return new ApiError(Errors.INVALID_UPDATE_VERSION, "The controller does not support the given feature range."); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index d2ff546a7a09b..cb038701aa29a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -40,6 +40,10 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.UpdateFeaturesRequestData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; +import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult; +import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; @@ -54,6 +58,7 @@ import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; @@ -83,6 +88,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.Map; @@ -201,7 +207,6 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) { return this; } - @SuppressWarnings("unchecked") public QuorumController build() throws Exception { if (raftClient == null) { throw new RuntimeException("You must set a raft client."); @@ -1325,6 +1330,30 @@ public CompletableFuture allocateProducerIds( .setProducerIdLen(result.producerIdLen())); } + @Override + public CompletableFuture updateFeatures(UpdateFeaturesRequestData request) { + Map> brokerFeatures = new HashMap<>(); + clusterControl.brokerRegistrations().forEach((key, value) -> brokerFeatures.put(key, value.supportedFeatures())); + + return appendWriteEvent("updateFeatures", + () -> featureControl.updateFeatures(request.featureUpdates(), brokerFeatures) + ).thenApply(updateErrors -> { + UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection(); + for (Map.Entry updateError : updateErrors.entrySet()) { + String feature = updateError.getKey(); + ApiError error = updateError.getValue(); + UpdatableFeatureResult result = new UpdatableFeatureResult(); + result.setFeature(feature) + .setErrorCode(error.error().code()) + .setErrorMessage(error.message()); + results.add(result); + } + return new UpdateFeaturesResponseData() + .setResults(results); + } + ); + } + @Override public CompletableFuture> createPartitions(long deadlineNs, List topics) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 680253c712547..2f0ecbd68d472 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection; +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKey; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -41,7 +44,7 @@ @Timeout(value = 40) public class FeatureControlManagerTest { - @SuppressWarnings("unchecked") + private static Map rangeMap(Object... args) { Map result = new HashMap<>(); for (int i = 0; i < args.length; i += 3) { @@ -53,6 +56,20 @@ private static Map rangeMap(Object... args) { return result; } + private static FeatureUpdateKeyCollection featureUpdateKeys(Object... args) { + FeatureUpdateKeyCollection result = new FeatureUpdateKeyCollection(); + for (int i = 0; i < args.length; i += 3) { + String feature = (String) args[i]; + Integer max = (Integer) args[i + 1]; + Boolean downgradable = (Boolean) args[i + 2]; + result.add(new FeatureUpdateKey() + .setFeature(feature) + .setMaxVersionLevel(max.shortValue()) + .setAllowDowngrade(downgradable)); + } + return result; + } + @Test public void testUpdateFeatures() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); @@ -64,12 +81,10 @@ public void testUpdateFeatures() { assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections. singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "The controller does not support the given feature range."))), - manager.updateFeatures(rangeMap("foo", 1, 3), - Collections.singleton("foo"), + manager.updateFeatures(featureUpdateKeys("foo", 3, true), Collections.emptyMap())); ControllerResult> result = manager.updateFeatures( - rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(), - Collections.emptyMap()); + featureUpdateKeys("foo", 2, false, "bar", 1, false), Collections.emptyMap()); Map expectedMap = new HashMap<>(); expectedMap.put("foo", ApiError.NONE); expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION, @@ -102,6 +117,23 @@ public void testUpdateFeaturesErrorCases() { FeatureControlManager manager = new FeatureControlManager( rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry); + assertEquals( + ControllerResult.atomicOf( + Collections.emptyList(), + Collections.singletonMap( + "", + new ApiError( + Errors.INVALID_REQUEST, + "Feature name can not be empty." + ) + ) + ), + manager.updateFeatures( + featureUpdateKeys("", 3, true), + Collections.singletonMap(5, rangeMap()) + ) + ); + assertEquals( ControllerResult.atomicOf( Collections.emptyList(), @@ -114,14 +146,13 @@ public void testUpdateFeaturesErrorCases() { ) ), manager.updateFeatures( - rangeMap("foo", 1, 3), - Collections.singleton("foo"), + featureUpdateKeys("foo", 3, true), Collections.singletonMap(5, rangeMap()) ) ); ControllerResult> result = manager.updateFeatures( - rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap()); + featureUpdateKeys("foo", 3, true), Collections.emptyMap()); assertEquals(Collections.singletonMap("foo", ApiError.NONE), result.response()); manager.replay((FeatureLevelRecord) result.records().get(0).message()); snapshotRegistry.getOrCreateSnapshot(3); @@ -130,8 +161,8 @@ public void testUpdateFeaturesErrorCases() { singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "Can't downgrade the maximum version of this feature without " + "setting downgradable to true."))), - manager.updateFeatures(rangeMap("foo", 1, 2), - Collections.emptySet(), Collections.emptyMap())); + manager.updateFeatures(featureUpdateKeys("foo", 2, false), + Collections.emptyMap())); assertEquals( ControllerResult.atomicOf( @@ -147,8 +178,7 @@ public void testUpdateFeaturesErrorCases() { Collections.singletonMap("foo", ApiError.NONE) ), manager.updateFeatures( - rangeMap("foo", 1, 2), - Collections.singleton("foo"), + featureUpdateKeys("foo", 2, true), Collections.emptyMap() ) ); @@ -160,8 +190,8 @@ public void testFeatureControlIterator() throws Exception { FeatureControlManager manager = new FeatureControlManager( rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry); ControllerResult> result = manager. - updateFeatures(rangeMap("foo", 1, 5, "bar", 1, 1), - Collections.emptySet(), Collections.emptyMap()); + updateFeatures(featureUpdateKeys("foo", 5, false, "bar", 1, true), + Collections.emptyMap()); RecordTestUtils.replayAll(manager, result.records()); RecordTestUtils.assertBatchIteratorContains(Arrays.asList( Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord(). @@ -174,4 +204,6 @@ public void testFeatureControlIterator() throws Exception { setMaxFeatureLevel((short) 1), (short) 0))), manager.iterator(Long.MAX_VALUE)); } + + // TODO test }