Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.3
SCALA_VERSION=2.13.4
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.3
set SCALA_VERSION=2.13.4
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ subprojects {
scalaCompileOptions.additionalParameters += inlineFrom

if (versions.baseScala != '2.12') {
scalaCompileOptions.additionalParameters += ["-opt-warnings"]
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ object ConfigCommand extends Config {
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
case ConfigType.User | ConfigType.Client =>
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
}

Expand All @@ -491,6 +492,7 @@ object ConfigCommand extends Config {
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.Broker | BrokerLoggerConfigType =>
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

entities.foreach { entity =>
Expand Down Expand Up @@ -530,6 +532,7 @@ object ConfigCommand extends Config {
if (!entityName.isEmpty)
validateBrokerId()
(ConfigResource.Type.BROKER_LOGGER, None)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}

val configSourceFilter = if (describeAll)
Expand Down
19 changes: 9 additions & 10 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import org.apache.kafka.common.requests.ListOffsetResponse
import org.apache.kafka.common.ConsumerGroupState
import joptsimple.OptionException

import scala.annotation.nowarn

object ConsumerGroupCommand extends Logging {

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -151,22 +149,24 @@ object ConsumerGroupCommand extends Logging {
private[admin] case class CsvUtils() {
val mapper = new CsvMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
def readerFor[T <: CsvRecord: ClassTag] = {
def readerFor[T <: CsvRecord : ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.readerFor(clazz).`with`(schema)
}
def writerFor[T <: CsvRecord: ClassTag] = {
def writerFor[T <: CsvRecord : ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.writerFor(clazz).`with`(schema)
}
private def getSchema[T <: CsvRecord: ClassTag] = {
private def getSchema[T <: CsvRecord : ClassTag] = {
val clazz = implicitly[ClassTag[T]].runtimeClass
val fields = clazz match {
case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields
case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields
}

val fields =
if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields
else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields
else throw new IllegalStateException(s"Unhandled class $clazz")

val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
schema
}
Expand Down Expand Up @@ -555,7 +555,6 @@ object ConsumerGroupCommand extends Logging {
/**
* Returns the state of the specified consumer group and partition assignment states
*/
@nowarn("cat=optimizer")
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ object ReassignPartitionsCommand extends Logging {

private def topicDescriptionFutureToState(partition: Int,
future: KafkaFuture[TopicDescription],
targetReplicas: Seq[Int])
: PartitionReassignmentState = {
targetReplicas: Seq[Int]): PartitionReassignmentState = {
try {
val topicDescription = future.get()
if (topicDescription.partitions().size() < partition) {
Expand All @@ -494,7 +493,8 @@ object ReassignPartitionsCommand extends Logging {
case t: ExecutionException =>
t.getCause match {
case _: UnknownTopicOrPartitionException =>
new PartitionReassignmentState(Seq(), targetReplicas, true)
PartitionReassignmentState(Seq(), targetReplicas, true)
case e => throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class GroupMetadataManager(brokerId: Int,

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

val memRecords = fetchDataInfo.records match {
val memRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class TransactionStateManager(brokerId: Int,

readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

val memRecords = fetchDataInfo.records match {
val memRecords = (fetchDataInfo.records: @unchecked) match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2737,4 +2737,4 @@ case object LogDeletion extends SegmentDeletionReason {
override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}")
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ object RequestChannel extends Logging {

}

abstract class Response(val request: Request) {
sealed abstract class Response(val request: Request) {

def processor: Int = request.processor

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object KafkaNetworkChannel {
new EndQuorumEpochResponse(endEpochResponse)
case fetchResponse: FetchResponseData =>
new FetchResponse(fetchResponse)
case _ =>
throw new IllegalArgumentException(s"Unexpected type for responseData: $responseData")
}
}

Expand All @@ -61,6 +63,8 @@ object KafkaNetworkChannel {
new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version)
}
case _ =>
throw new IllegalArgumentException(s"Unexpected type for requestData: $requestData")
}
}

Expand All @@ -70,6 +74,7 @@ object KafkaNetworkChannel {
case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data
case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data
case fetchResponse: FetchResponse[_] => fetchResponse.data
case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response")
}
}

Expand All @@ -79,6 +84,7 @@ object KafkaNetworkChannel {
case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data
case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data
case fetchRequest: FetchRequest => fetchRequest.data
case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,9 @@ class AclAuthorizer extends Authorizer with Logging {
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}

@nowarn("cat=optimizer")
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
// Using `forKeyValue` triggers a scalac bug related to suppression of optimizer warnings, we
// should change this code once that's fixed
aclCache.foreach { case (resource, versionedAcls) =>
aclCache.forKeyValue { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding))
Expand Down Expand Up @@ -542,7 +539,6 @@ class AclAuthorizer extends Authorizer with Logging {
}
}

@nowarn("cat=optimizer")
private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ abstract class AbstractFetcherThread(name: String,
}

protected def toMemoryRecords(records: Records): MemoryRecords = {
records match {
(records: @unchecked) match {
case r: MemoryRecords => r
case r: FileRecords =>
val buffer = ByteBuffer.allocate(r.sizeInBytes)
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ object ClientQuotaManager {
val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))

case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity {
sealed trait BaseUserEntity extends ClientQuotaEntity.ConfigEntity

case class UserEntity(sanitizedUser: String) extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER
override def name: String = Sanitizer.desanitize(sanitizedUser)
override def toString: String = s"user $sanitizedUser"
Expand All @@ -92,7 +94,7 @@ object ClientQuotaManager {
override def toString: String = s"client-id $clientId"
}

case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity {
case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ConfigEntityName.Default
override def toString: String = "default user"
Expand All @@ -104,7 +106,7 @@ object ClientQuotaManager {
override def toString: String = "default client-id"
}

case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity],
case class KafkaQuotaEntity(userEntity: Option[BaseUserEntity],
clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity {
override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] =
(userEntity.toList ++ clientIdEntity.toList).asJava
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ class ZooKeeperClient(connectString: String,
def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())

val sendTimeMs = time.hiResClockMs()
request match {

// Cast to AsyncRequest to workaround a scalac bug that results in an false exhaustiveness warning
// with -Xlint:strict-unsealed-patmat
(request: AsyncRequest) match {
case ExistsRequest(path, ctx) =>
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@ object SaslPlainSslEndToEndAuthorizationTest {
class TestPrincipalBuilder extends KafkaPrincipalBuilder {

override def build(context: AuthenticationContext): KafkaPrincipal = {
context match {
case ctx: SaslAuthenticationContext =>
ctx.server.getAuthorizationID match {
case KafkaPlainAdmin =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
case KafkaPlainUser =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
case _ =>
KafkaPrincipal.ANONYMOUS
}
context.asInstanceOf[SaslAuthenticationContext].server.getAuthorizationID match {
case KafkaPlainAdmin =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
case KafkaPlainUser =>
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
case _ =>
KafkaPrincipal.ANONYMOUS
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@ object SslEndToEndAuthorizationTest {
// Use full DN as client principal to test special characters in principal
// Use field from DN as server principal to test custom PrincipalBuilder
override def build(context: AuthenticationContext): KafkaPrincipal = {
context match {
case ctx: SslAuthenticationContext =>
val peerPrincipal = ctx.session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, _) =>
val principal = if (name == "server") name else peerPrincipal
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
}
val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, _) =>
val principal = if (name == "server") name else peerPrincipal
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class KafkaNetworkChannelTest {
}

private def extractError(response: ApiMessage): Errors = {
val code = response match {
val code = (response: @unchecked) match {
case res: BeginQuorumEpochResponseData => res.errorCode
case res: EndQuorumEpochResponseData => res.errorCode
case res: FetchResponseData => res.errorCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class BaseClientQuotaManagerTest {

protected def callback(response: RequestChannel.Response): Unit = {
// Count how many times this callback is called for notifyThrottlingDone().
response match {
(response: @unchecked) match {
case _: StartThrottlingResponse =>
case _: EndThrottlingResponse => numCallbacks += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ThrottledChannelExpirationTest {
}

def callback(response: Response): Unit = {
response match {
(response: @unchecked) match {
case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1
case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ group=org.apache.kafka
# - tests/kafkatest/version.py (variable DEV_VERSION)
# - kafka-merge-pr.py
version=2.8.0-SNAPSHOT
scalaVersion=2.13.3
scalaVersion=2.13.4
task=build
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
4 changes: 2 additions & 2 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ext {

// Add Scala version
def defaultScala212Version = '2.12.12'
def defaultScala213Version = '2.13.3'
def defaultScala213Version = '2.13.4'
if (hasProperty('scalaVersion')) {
if (scalaVersion == '2.12') {
versions["scala"] = defaultScala212Version
Expand Down Expand Up @@ -101,7 +101,7 @@ versions += [
powermock: "2.0.9",
reflections: "0.9.12",
rocksDB: "5.18.4",
scalaCollectionCompat: "2.2.0",
scalaCollectionCompat: "2.3.0",
scalafmt: "1.5.1",
scalaJava8Compat : "0.9.1",
scalatest: "3.0.8",
Expand Down
8 changes: 8 additions & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="NP_NULL_ON_SOME_PATH_EXCEPTION"/>
</Match>

<Match>
<!-- Scala doesn't have checked exceptions so one cannot catch RuntimeException and rely
on the compiler to fail if the code is changed to call a method that throws Exception.
Given that, this bug pattern doesn't make sense for Scala code. -->
<Class name="kafka.log.Log"/>
<Bug pattern="REC_CATCH_EXCEPTION"/>
</Match>

<Match>
<!-- A spurious null check after inlining by the scalac optimizer confuses spotBugs -->
<Class name="kafka.tools.StateChangeLogMerger$"/>
Expand Down