Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Ma
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(topLevelError.error().code())
.setErrorMessage(topLevelError.message())
.setResults(results)
.setThrottleTimeMs(throttleTimeMs);
.setResults(results);
return new UpdateFeaturesResponse(responseData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
Expand Down Expand Up @@ -183,7 +184,7 @@ class ControllerApis(val requestChannel: RequestChannel,
throw new TopicDeletionDisabledException()
}
}
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS)
// The first step is to load up the names and IDs that have been provided by the
// request. This is a bit messy because we support multiple ways of referring to
// topics (both by name and by id) and because we need to check for duplicates or
Expand Down Expand Up @@ -686,7 +687,7 @@ class ControllerApis(val requestChannel: RequestChannel,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS)
val responses = new util.ArrayList[CreatePartitionsTopicResult]()
val duplicateTopicNames = new util.HashSet[String]()
val topicNames = new util.HashSet[String]()
Expand Down Expand Up @@ -757,4 +758,20 @@ class ControllerApis(val requestChannel: RequestChannel,
}
})
}

def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
authHelper.authorizeClusterOperation(request, ALTER)
controller.updateFeatures(updateFeaturesRequest.data())
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
results.setThrottleTimeMs(requestThrottleMs)
new UpdateFeaturesResponse(results)
})
}
})
}
}
7 changes: 7 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand Down Expand Up @@ -308,6 +310,11 @@ public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(Al
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
throw new UnsupportedOperationException();
}

@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,14 @@ class ControllerApisTest {
new ListPartitionReassignmentsRequestData()).build())))
}

@Test
def testUnauthorizedHandleUpdateFeatures(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
handleUpdateFeatures(buildRequest(new UpdateFeaturesRequest.Builder(
new UpdateFeaturesRequestData()).build())))
}

@Test
def testCreateTopics(): Unit = {
val controller = new MockController.Builder().build()
Expand Down Expand Up @@ -732,6 +740,8 @@ class ControllerApisTest {
controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
}

// TODO test

@AfterEach
def tearDown(): Unit = {
quotas.shutdown()
Expand Down
10 changes: 10 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -234,6 +236,14 @@ CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
AllocateProducerIdsRequestData request
);

/**
* TODO
* @return
*/
CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
UpdateFeaturesRequestData request
);

/**
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
* simply returns information about that snapshot rather than starting a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
return false;
}
ControllerResult other = (ControllerResult) o;
@SuppressWarnings("unchecked")
ControllerResult<T> other = (ControllerResult<T>) o;
return records.equals(other.records) &&
Objects.equals(response, other.response) &&
Objects.equals(isAtomic, other.isAtomic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
Expand All @@ -28,9 +29,13 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.FeatureMap;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
Expand All @@ -42,10 +47,6 @@


public class FeatureControlManager {
/**
* An immutable map containing the features supported by this controller's software.
*/
private final Map<String, VersionRange> supportedFeatures;

/**
* Maps feature names to finalized version ranges.
Expand All @@ -54,18 +55,34 @@ public class FeatureControlManager {

FeatureControlManager(Map<String, VersionRange> supportedFeatures,
SnapshotRegistry snapshotRegistry) {
this.supportedFeatures = supportedFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
finalizedVersions.putAll(supportedFeatures);
}

ControllerResult<Map<String, ApiError>> updateFeatures(
Map<String, VersionRange> updates, Set<String> downgradeables,
Map<Integer, Map<String, VersionRange>> brokerFeatures) {
UpdateFeaturesRequestData.FeatureUpdateKeyCollection updates,
Map<Integer, Map<String, VersionRange>> brokerFeatures
) {
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<String, VersionRange> entry : updates.entrySet()) {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
downgradeables.contains(entry.getKey()), brokerFeatures, records));

for (UpdateFeaturesRequestData.FeatureUpdateKey update : updates) {
VersionRange existingVersionRange = finalizedVersions.get(update.feature());
if (update.feature().isEmpty())
// Check that the feature name is not empty.
results.put(update.feature(),
new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty."));
else if (existingVersionRange == null)
results.put(update.feature(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."));
else
results.put(update.feature(), updateFeature(
update.feature(),
new VersionRange(existingVersionRange.min(), update.maxVersionLevel()),
update.allowDowngrade(),
brokerFeatures,
records
));
}

return ControllerResult.atomicOf(records, results);
Expand All @@ -84,7 +101,7 @@ private ApiError updateFeature(String featureName,
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The upper value for the new range cannot be less than 1.");
}
VersionRange localRange = supportedFeatures.get(featureName);
VersionRange localRange = finalizedVersions.get(featureName);
if (localRange == null || !localRange.contains(newRange)) {
return new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
Expand All @@ -54,6 +58,7 @@
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -83,6 +88,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
Expand Down Expand Up @@ -201,7 +207,6 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) {
return this;
}

@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new RuntimeException("You must set a raft client.");
Expand Down Expand Up @@ -1325,6 +1330,30 @@ public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
.setProducerIdLen(result.producerIdLen()));
}

@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
Map<Integer, Map<String, VersionRange>> brokerFeatures = new HashMap<>();
clusterControl.brokerRegistrations().forEach((key, value) -> brokerFeatures.put(key, value.supportedFeatures()));

return appendWriteEvent("updateFeatures",
() -> featureControl.updateFeatures(request.featureUpdates(), brokerFeatures)
).thenApply(updateErrors -> {
UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection();
for (Map.Entry<String, ApiError> updateError : updateErrors.entrySet()) {
String feature = updateError.getKey();
ApiError error = updateError.getValue();
UpdatableFeatureResult result = new UpdatableFeatureResult();
result.setFeature(feature)
.setErrorCode(error.error().code())
.setErrorMessage(error.message());
results.add(result);
}
return new UpdateFeaturesResponseData()
.setResults(results);
}
);
}

@Override
public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
Expand Down
Loading