Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Ma
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(topLevelError.error().code())
.setErrorMessage(topLevelError.message())
.setResults(results)
.setThrottleTimeMs(throttleTimeMs);
.setResults(results);
return new UpdateFeaturesResponse(responseData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
25 changes: 21 additions & 4 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package kafka.server
import java.util
import java.util.Collections
import java.util.Map.Entry
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}

import kafka.network.RequestChannel
import kafka.raft.RaftManager
Expand All @@ -36,10 +36,11 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.message._
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
Expand Down Expand Up @@ -108,6 +109,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
Expand Down Expand Up @@ -183,7 +185,7 @@ class ControllerApis(val requestChannel: RequestChannel,
throw new TopicDeletionDisabledException()
}
}
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS)
// The first step is to load up the names and IDs that have been provided by the
// request. This is a bit messy because we support multiple ways of referring to
// topics (both by name and by id) and because we need to check for duplicates or
Expand Down Expand Up @@ -704,7 +706,7 @@ class ControllerApis(val requestChannel: RequestChannel,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS)
val responses = new util.ArrayList[CreatePartitionsTopicResult]()
val duplicateTopicNames = new util.HashSet[String]()
val topicNames = new util.HashSet[String]()
Expand Down Expand Up @@ -775,4 +777,19 @@ class ControllerApis(val requestChannel: RequestChannel,
}
})
}

def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
authHelper.authorizeClusterOperation(request, ALTER)
controller.updateFeatures(updateFeaturesRequest.data)
.whenComplete((response, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs))
})
}
})
}
}
7 changes: 7 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand Down Expand Up @@ -308,6 +310,11 @@ public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(Al
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
throw new UnsupportedOperationException();
}

@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ class ControllerApisTest {
new ListPartitionReassignmentsRequestData()).build())))
}

@Test
def testUnauthorizedHandleUpdateFeatures(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
handleUpdateFeatures(buildRequest(new UpdateFeaturesRequest.Builder(
new UpdateFeaturesRequestData()).build())))
}

@Test
def testCreateTopics(): Unit = {
val controller = new MockController.Builder().build()
Expand Down
11 changes: 11 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -234,6 +236,15 @@ CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
AllocateProducerIdsRequestData request
);

/**
* Perform some features changes
*
* @return A future which yields feature update results as a response
*/
CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
UpdateFeaturesRequestData request
);

/**
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
* simply returns information about that snapshot rather than starting a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
return false;
}
ControllerResult other = (ControllerResult) o;
@SuppressWarnings("unchecked")
ControllerResult<T> other = (ControllerResult<T>) o;
return records.equals(other.records) &&
Objects.equals(response, other.response) &&
Objects.equals(isAtomic, other.isAtomic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@

package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;

import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.FeatureMap;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.TreeMap;

import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;


Expand All @@ -59,13 +59,27 @@ public class FeatureControlManager {
}

ControllerResult<Map<String, ApiError>> updateFeatures(
Map<String, VersionRange> updates, Set<String> downgradeables,
Map<Integer, Map<String, VersionRange>> brokerFeatures) {
UpdateFeaturesRequestData.FeatureUpdateKeyCollection updates,
Map<Integer, Map<String, VersionRange>> brokerFeatures
) {
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<String, VersionRange> entry : updates.entrySet()) {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
downgradeables.contains(entry.getKey()), brokerFeatures, records));

for (UpdateFeaturesRequestData.FeatureUpdateKey update : updates) {
VersionRange existingVersionRange = finalizedVersions.get(update.feature());
short minVersion = existingVersionRange == null ? 1 : existingVersionRange.min();
if (update.feature().isEmpty())
// Check that the feature name is not empty.
results.put(update.feature(),
new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty."));
else
results.put(update.feature(), updateFeature(
update.feature(),
new VersionRange(minVersion, update.maxVersionLevel()),
update.allowDowngrade(),
brokerFeatures,
records
));
}

return ControllerResult.atomicOf(records, results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
Expand Down Expand Up @@ -223,7 +226,6 @@ public Builder setConfigurationValidator(ConfigurationValidator configurationVal
return this;
}

@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new RuntimeException("You must set a raft client.");
Expand Down Expand Up @@ -542,13 +544,13 @@ ReplicationControlManager replicationControl() {

// VisibleForTesting
<T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) {
ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
ControllerReadEvent<T> event = new ControllerReadEvent<>(name, handler);
queue.append(event);
return event.future();
}

<T> CompletableFuture<T> appendReadEvent(String name, long deadlineNs, Supplier<T> handler) {
ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
ControllerReadEvent<T> event = new ControllerReadEvent<>(name, handler);
queue.appendWithDeadline(deadlineNs, event);
return event.future();
}
Expand Down Expand Up @@ -928,7 +930,6 @@ private void cancelMaybeFenceReplicas() {
queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
}

@SuppressWarnings("unchecked")
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
Expand Down Expand Up @@ -1392,6 +1393,29 @@ public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
.setProducerIdLen(result.producerIdLen()));
}

@Override
public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
Map<Integer, Map<String, VersionRange>> brokerFeatures = new HashMap<>();
clusterControl.brokerRegistrations().forEach((key, value) -> brokerFeatures.put(key, value.supportedFeatures()));

return appendWriteEvent("updateFeatures",
() -> featureControl.updateFeatures(request.featureUpdates(), brokerFeatures)
).thenApply(updateErrors -> {
UpdateFeaturesResponseData.UpdatableFeatureResultCollection results = new UpdateFeaturesResponseData.UpdatableFeatureResultCollection();
for (Map.Entry<String, ApiError> updateError : updateErrors.entrySet()) {
String feature = updateError.getKey();
ApiError error = updateError.getValue();
UpdateFeaturesResponseData.UpdatableFeatureResult result = new UpdateFeaturesResponseData.UpdatableFeatureResult();
result.setFeature(feature)
.setErrorCode(error.error().code())
.setErrorMessage(error.message());
results.add(result);
}
return new UpdateFeaturesResponseData()
.setResults(results);
});
}

@Override
public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
Expand Down
Loading