Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<subpackage name="record">
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>

<subpackage name="requests">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* The version is not supported
*/
public class UnsupportedVersionException extends ApiException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added in #986 as well. The later patch to go in will have to rebase, or we can extract out the common code as a separate PR. I am fine with either way. FYI @becketqin.


private static final long serialVersionUID = 1L;

public UnsupportedVersionException(String message) {
super(message);
}

public UnsupportedVersionException(String message, Throwable throwable) {
super(message, throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public enum Errors {
new ApiException("The committing offset data size is not valid")),
AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
REBALANCE_IN_PROGRESS(30,
new ApiException("The group is rebalancing, so a rejoin is needed."));
new ApiException("The group is rebalancing, so a rejoin is needed.")),
UNSUPPORTED_VERSION(31,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that 31 and 32 are already taken, you might want to rebase.

new UnsupportedVersionException("The version is not supported."));

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.ByteBuffer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;

Expand Down Expand Up @@ -68,4 +69,10 @@ public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) {
return (Struct) responseSchema(apiKey, version).read(buffer);
}

public static void validateApiVersion(int apiKey, int versionId) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add a min version check as well, based on #986 changes.

if (versionId < 0 || latestVersion(apiKey) < versionId)
throw new UnsupportedVersionException("The version " + versionId + " for " + ApiKeys.forId(apiKey).name +
" is higher than highest supported version " + ProtoUtils.latestVersion(apiKey));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.ByteBuffer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;

Expand Down Expand Up @@ -222,11 +223,15 @@ public boolean isValid() {
* Throw an InvalidRecordException if isValid is false for this record
*/
public void ensureValid() {
// Check CRC before checking the magic byte.
if (!isValid())
throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+ ", computed crc = "
+ computeChecksum()
+ ")");
if (magic() > Record.CURRENT_MAGIC_VALUE)
throw new UnsupportedVersionException("Magic byte of record is " + magic() +
", which is higher than supported magic byte " + Record.CURRENT_MAGIC_VALUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;
Expand All @@ -36,6 +37,7 @@ public AbstractRequest(Struct struct) {
* Factory method for getting a request object based on ApiKey ID and a buffer
*/
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
ProtoUtils.validateApiVersion(requestId, versionId);
switch (ApiKeys.forId(requestId)) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.List;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -81,6 +82,14 @@ public void testChecksum() {
}
}

@Test (expected = UnsupportedVersionException.class)
public void testInvalidMagicByte() {
Record copy = copyOf(record);
copy.buffer().put(Record.MAGIC_OFFSET, new Integer(Record.CURRENT_MAGIC_VALUE + 1).byteValue());
copy.buffer().putInt(Record.CRC_OFFSET, (int) (copy.computeChecksum() & 0xffffffffL));
new Record(copy.buffer()).ensureValid();
}

private Record copyOf(Record record) {
ByteBuffer buffer = ByteBuffer.allocate(record.size());
record.buffer().put(buffer);
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/common/ErrorMapping.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.common
import java.nio.ByteBuffer

import kafka.message.InvalidMessageException
import org.apache.kafka.common.errors.UnsupportedVersionException

import scala.Predef._

Expand Down Expand Up @@ -59,7 +60,8 @@ object ErrorMapping {
// 26: INVALID_SESSION_TIMEOUT
// 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
// 28: INVALID_COMMIT_OFFSET_SIZE
val AuthorizationCode: Short = 29;
val AuthorizationCode: Short = 29
val UnsupportedVersionCode: Short = 30

private val exceptionToCode =
Map[Class[Throwable], Short](
Expand All @@ -82,7 +84,8 @@ object ErrorMapping {
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode
classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode,
classOf[UnsupportedVersionException].asInstanceOf[Class[Throwable]] -> UnsupportedVersionCode
).withDefaultValue(UnknownCode)

/* invert the mapping */
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/message/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package kafka.message

import java.nio._
import org.apache.kafka.common.errors.UnsupportedVersionException

import scala.math._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
Expand Down Expand Up @@ -83,7 +85,7 @@ object Message {
class Message(val buffer: ByteBuffer) {

import kafka.message.Message._

/**
* A constructor to create a Message
* @param bytes The payload of the message
Expand Down Expand Up @@ -165,6 +167,9 @@ class Message(val buffer: ByteBuffer) {
def ensureValid() {
if(!isValid)
throw new InvalidMessageException("Message is corrupt (stored crc = " + checksum + ", computed crc = " + computeChecksum() + ")")
if (magic > Message.CurrentMagicValue)
throw new UnsupportedVersionException("Magic byte of message is " + magic +
", which is higher than supported magic byte " + Message.CurrentMagicValue)
}

/**
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, SystemTime}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
Expand Down Expand Up @@ -69,8 +70,16 @@ object RequestChannel extends Logging {
} else
null
val body: AbstractRequest =
if (requestObj == null)
AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
if (requestObj == null) {
try {
// If the request version is higher than supported version, UnsupportedVersionException might be thrown,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add check on min version as well based on #986, we should reflect that in the comment here as well.

// we ignore it here and let KafkaApis to handle it.
AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
} catch {
case e : UnsupportedVersionException =>
null
}
}
else
null

Expand Down
52 changes: 39 additions & 13 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,39 @@

package kafka.server

import kafka.message.MessageSet
import kafka.security.auth.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.TopicPartition
import kafka.api._
import kafka.admin.AdminUtils
import kafka.api._
import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.ConsumerCoordinator
import kafka.log._
import kafka.message.MessageSet
import kafka.network.RequestChannel.{Response, Session}
import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend}
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
import scala.collection._
import kafka.security.auth.{Authorizer, ClusterAction, ConsumerGroup, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Describe, Resource, Topic, Operation, ConsumerGroup}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ProtoUtils, SecurityProtocol}
import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, ResponseHeader, ResponseSend}

import scala.collection._

object KafkaApis {
//TODO: this method should only use request.header after all the requests are migrated to use client java request class.
// For the requests using old scala class, we need to pass in the API version explicitly because the Request.header will
// be null. For requests using new java class. the API version will be None.
def validateRequestVersion(request: RequestChannel.Request, apiVersion: Option[Short]) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this all be handled in RequestChannel or before the big match statement in KafkaApis so we are guaranteed to have all messages checked?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be better if we can do it that way.

val requestApiVersion = Option(request.header) match {
//requests using new java classes.
case Some(header) => header.apiVersion()
//requests using old scala classes.
case None => apiVersion.getOrElse(throw new IllegalArgumentException("apiVersion should be defined if request.header is null"))
}
ProtoUtils.validateApiVersion(request.requestId, requestApiVersion)
}
}

/**
* Logic to handle the various Kafka requests
Expand Down Expand Up @@ -103,6 +117,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
KafkaApis.validateRequestVersion(request, Some(leaderAndIsrRequest.versionId))

authorizeClusterAction(request)

Expand Down Expand Up @@ -137,6 +152,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
KafkaApis.validateRequestVersion(request, Some(stopReplicaRequest.versionId))

authorizeClusterAction(request)

Expand All @@ -148,6 +164,7 @@ class KafkaApis(val requestChannel: RequestChannel,

def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
KafkaApis.validateRequestVersion(request, Some(updateMetadataRequest.versionId))

authorizeClusterAction(request)

Expand All @@ -162,6 +179,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
KafkaApis.validateRequestVersion(request, Some(controlledShutdownRequest.versionId))

authorizeClusterAction(request)

Expand All @@ -177,6 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
KafkaApis.validateRequestVersion(request, Some(offsetCommitRequest.versionId))

// filter non-exist topics
val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
Expand Down Expand Up @@ -282,6 +301,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
KafkaApis.validateRequestVersion(request, Some(produceRequest.versionId))
val numBytesAppended = produceRequest.sizeInBytes

val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition {
Expand Down Expand Up @@ -359,6 +379,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
KafkaApis.validateRequestVersion(request, Some(fetchRequest.versionId))

val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
Expand Down Expand Up @@ -414,6 +435,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
KafkaApis.validateRequestVersion(request, Some(offsetRequest.versionId))

val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition {
case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
Expand Down Expand Up @@ -566,6 +588,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
KafkaApis.validateRequestVersion(request, Some(metadataRequest.versionId))

//if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
val topics = if (metadataRequest.topics.isEmpty) {
Expand Down Expand Up @@ -606,6 +629,7 @@ class KafkaApis(val requestChannel: RequestChannel,

def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
KafkaApis.validateRequestVersion(request, Some(offsetFetchRequest.versionId))

val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
Expand Down Expand Up @@ -660,6 +684,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleConsumerMetadataRequest(request: RequestChannel.Request) {
val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
KafkaApis.validateRequestVersion(request, Some(consumerMetadataRequest.versionId))

if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group))) {
val response = ConsumerMetadataResponse(None, ErrorMapping.AuthorizationCode, consumerMetadataRequest.correlationId)
Expand All @@ -686,8 +711,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleJoinGroupRequest(request: RequestChannel.Request) {
import JavaConversions._

import scala.collection.JavaConversions._
KafkaApis.validateRequestVersion(request, None)
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
val respHeader = new ResponseHeader(request.header.correlationId)

Expand Down Expand Up @@ -723,6 +748,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleHeartbeatRequest(request: RequestChannel.Request) {
KafkaApis.validateRequestVersion(request, None)
val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
val respHeader = new ResponseHeader(request.header.correlationId)

Expand Down
Loading