Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static NodeApiVersions create() {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
Expand Down Expand Up @@ -170,7 +170,7 @@ public String toString(boolean lineBreaks) {

// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
Expand Down
30 changes: 15 additions & 15 deletions clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, false),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false, RecordBatch.MAGIC_VALUE_V0, false, true);
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);

// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
Expand All @@ -116,8 +116,8 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;

/** indicates whether the API is enabled and should be exposed in ApiVersions **/
public final boolean isEnabled;
/** indicates whether this is an API which is only exposed by the KIP-500 controller **/
public final boolean isControllerOnlyApi;
Comment thread
hachikuji marked this conversation as resolved.
Outdated

/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
Expand All @@ -139,22 +139,22 @@ public enum ApiKeys {
}

ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, true);
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
}

ApiKeys(
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable,
boolean isEnabled
boolean isControllerOnlyApi
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.isEnabled = isEnabled;
this.isControllerOnlyApi = isControllerOnlyApi;

this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
Expand Down Expand Up @@ -210,7 +210,7 @@ private static String toHtml() {
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
Expand Down Expand Up @@ -242,9 +242,9 @@ public void visit(Type field) {
return hasBuffer.get();
}

public static List<ApiKeys> enabledApis() {
public static List<ApiKeys> brokerApis() {
return Arrays.stream(values())
.filter(api -> api.isEnabled)
.filter(api -> !api.isControllerOnlyApi)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static String toHtml() {
b.append("</pre>\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
// Key
b.append("<h5>");
b.append("<a name=\"The_Messages_" + key.name + "\">");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static ApiVersionsResponse createApiVersionsResponse(

public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
}
Expand All @@ -137,7 +137,7 @@ public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
ApiVersion brokerApiVersion = toApiVersion(apiKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
Expand Down Expand Up @@ -143,10 +143,10 @@ public void testUsableVersionLatestVersions() {
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
for (ApiKeys apiKey: ApiKeys.values()) {
if (apiKey.isEnabled) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
} else {
if (apiKey.isControllerOnlyApi) {
assertNull(versions.apiVersion(apiKey));
} else {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ApiVersionsResponseTest {

@Test
public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.enabledApis()));
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.brokerApis()));
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
Expand All @@ -49,9 +49,9 @@ public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
@Test
public void shouldHaveCorrectDefaultApiVersionsResponse() {
Collection<ApiVersion> apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API versions for all API keys must be maintained.");
assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API versions for all API keys must be maintained.");

for (ApiKeys key : ApiKeys.enabledApis()) {
for (ApiKeys key : ApiKeys.brokerApis()) {
ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
assertNotNull(version, "Could not find ApiVersion for API " + key.name);
assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name);
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ object Kafka extends Logging {

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
if (config.processRoles.isEmpty) {
if (config.requiresZookeeper) {
Comment thread
abbccdda marked this conversation as resolved.
Outdated
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None
threadNamePrefix = None,
enableForwarding = false
)
} else {
new KafkaRaftServer(
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ object RequestChannel extends Logging {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}

class Metrics(allowDisabledApis: Boolean = false) {
class Metrics(allowControllerOnlyApis: Boolean = false) {

private val metricsMap = mutable.Map[String, RequestMetrics]()

(ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name) ++
(ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || allowControllerOnlyApis).map(_.name) ++
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
metricsMap.put(name, new RequestMetrics(name))
}
Expand Down Expand Up @@ -337,9 +337,9 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
allowDisabledApis: Boolean = false) extends KafkaMetricsGroup {
allowControllerOnlyApis: Boolean = false) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics(allowDisabledApis)
val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
val allowDisabledApis: Boolean = false)
val allowControllerOnlyApis: Boolean = false)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

private val maxQueuedRequests = config.queuedMaxRequests
Expand All @@ -93,12 +93,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowDisabledApis)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowControllerOnlyApis)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis))
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowControllerOnlyApis))

private var nextProcessorId = 0
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
Expand Down Expand Up @@ -429,7 +429,7 @@ class SocketServer(val config: KafkaConfig,
memoryPool,
logContext,
isPrivilegedListener = isPrivilegedListener,
allowDisabledApis = allowDisabledApis
allowControllerOnlyApis = allowControllerOnlyApis
)
}

Expand Down Expand Up @@ -790,7 +790,7 @@ private[kafka] class Processor(val id: Int,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize,
isPrivilegedListener: Boolean = false,
allowDisabledApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
allowControllerOnlyApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
Expand Down Expand Up @@ -981,10 +981,10 @@ private[kafka] class Processor(val id: Int,

protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (header.apiKey.isEnabled || allowDisabledApis) {
if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
header
} else {
throw new InvalidRequestException("Received request for disabled api key " + header.apiKey)
throw new InvalidRequestException("Received request for KIP-500 controller-only api key " + header.apiKey)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ForwardingManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait ForwardingManager {
responseCallback: Option[AbstractResponse] => Unit
): Unit

def controllerApiVersions(): Option[NodeApiVersions]
def controllerApiVersions: Option[NodeApiVersions]

def start(): Unit = {}

Expand Down Expand Up @@ -140,7 +140,7 @@ class ForwardingManagerImpl(
channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
}

override def controllerApiVersions(): Option[NodeApiVersions] =
override def controllerApiVersions: Option[NodeApiVersions] =
channelManager.controllerApiVersions()

private def parseResponse(
Expand Down
28 changes: 12 additions & 16 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val forwardingManager: ForwardingManager,
val forwardingManager: Option[ForwardingManager],
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
Expand Down Expand Up @@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
config.metadataQuorumEnabled && request.context.principalSerde.isPresent
forwardingManager.isDefined && request.context.principalSerde.isPresent
}

private def maybeForwardToController(
Expand All @@ -149,12 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
forwardingManager.forwardRequest(request, responseCallback)
} else {
// When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported,
// therefore requests are handled directly.
handler(request)
forwardingManager match {
case Some(mgr) if !request.isForwarded && !controller.isActive =>
mgr.forwardRequest(request, responseCallback)

case _ =>
handler(request)
}
}

Expand Down Expand Up @@ -1742,11 +1742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
val controllerApiVersions = if (isForwardingEnabled(request)) {
forwardingManager.controllerApiVersions()
} else {
None
}
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)

val apiVersionsResponse =
finalizedFeaturesOpt match {
Expand Down Expand Up @@ -3251,9 +3247,9 @@ class KafkaApis(val requestChannel: RequestChannel,

// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
// then we treat the request as unparsable and close the connection.
if (!config.metadataQuorumEnabled) {
if (!isForwardingEnabled(request)) {
info(s"Closing connection ${request.context.connectionId} because it sent an `Envelope` " +
s"request, which is not accepted without enabling the internal config ${KafkaConfig.EnableMetadataQuorumProp}")
"request even though forwarding has not been enabled")
requestHelper.closeConnection(request, Collections.emptyMap())
return
} else if (!request.context.fromPrivilegedListener) {
Expand All @@ -3279,7 +3275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)

val forwardedApi = forwardedRequestHeader.apiKey
if (!forwardedApi.forwardable || !forwardedApi.isEnabled) {
if (!forwardedApi.forwardable) {
throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
}

Expand Down
Loading