From 4a10e5e020d37de82fd4074ac982ca7a73c1ed1b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 20 Nov 2020 22:07:59 -0800 Subject: [PATCH 01/14] MINOR: Update Scala to 2.13.4 and 2.12.13 Also: * Enable -Xlint:strict-unsealed-patmat for Scala 2.13.x. * Update scala-collection-compat to 2.3.0. --- build.gradle | 2 +- gradle/dependencies.gradle | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build.gradle b/build.gradle index f229699e9df65..dc486a17ee134 100644 --- a/build.gradle +++ b/build.gradle @@ -525,7 +525,7 @@ subprojects { scalaCompileOptions.additionalParameters += inlineFrom if (versions.baseScala != '2.12') { - scalaCompileOptions.additionalParameters += ["-opt-warnings"] + scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"] // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b96c25c224b72..034426009c93b 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -27,8 +27,8 @@ ext { } // Add Scala version -def defaultScala212Version = '2.12.12' -def defaultScala213Version = '2.13.3' +def defaultScala212Version = '2.12.13' +def defaultScala213Version = '2.13.4' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { versions["scala"] = defaultScala212Version @@ -101,7 +101,7 @@ versions += [ powermock: "2.0.9", reflections: "0.9.12", rocksDB: "5.18.4", - scalaCollectionCompat: "2.2.0", + scalaCollectionCompat: "2.3.0", scalafmt: "1.5.1", scalaJava8Compat : "0.9.1", scalatest: "3.0.8", From 0d6d7aea7d6186f3655f98b5b70bddef968b3aaf Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:20:53 -0800 Subject: [PATCH 02/14] Remove error suppression annotations that are no longer needed --- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 1 - .../scala/kafka/security/authorizer/AclAuthorizer.scala | 6 +----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 36a5010cd35a1..2d7a53714276b 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -555,7 +555,6 @@ object ConsumerGroupCommand extends Logging { /** * Returns the state of the specified consumer group and partition assignment states */ - @nowarn("cat=optimizer") def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None)) } diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index f78e92513169b..0a60e51659c34 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -287,12 +287,9 @@ class AclAuthorizer extends Authorizer with Logging { }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava } - @nowarn("cat=optimizer") override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { val aclBindings = new util.ArrayList[AclBinding]() - // Using `forKeyValue` triggers a scalac bug related to suppression of optimizer warnings, we - // should change this code once that's fixed - aclCache.foreach { case (resource, versionedAcls) => + aclCache.forKeyValue { case (resource, versionedAcls) => versionedAcls.acls.foreach { acl => val binding = new AclBinding(resource, acl.ace) if (filter.matches(binding)) @@ -542,7 +539,6 @@ class AclAuthorizer extends Authorizer with Logging { } } - @nowarn("cat=optimizer") private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = { aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource")) } From 97f6a724d31d630ea73f85c7a0a81aea6aa04b97 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:21:15 -0800 Subject: [PATCH 03/14] Update kafka-run-class --- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index b06fb12535f7f..f485f5b04459b 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.3 + SCALA_VERSION=2.13.4 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 4a516c026a078..3490588e37ded 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.3 + set SCALA_VERSION=2.13.4 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( From c1f5371bc873b1b1229c5185510b5efae5c1a0bf Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:22:21 -0800 Subject: [PATCH 04/14] Handle non exhaustive matches --- .../main/scala/kafka/admin/ConfigCommand.scala | 3 +++ .../scala/kafka/admin/ConsumerGroupCommand.scala | 15 +++++++-------- .../coordinator/group/GroupMetadataManager.scala | 2 +- .../transaction/TransactionStateManager.scala | 2 +- .../scala/kafka/raft/KafkaNetworkChannel.scala | 6 ++++++ .../kafka/server/AbstractFetcherThread.scala | 2 +- .../kafka/server/BaseClientQuotaManagerTest.scala | 2 +- .../server/ThrottledChannelExpirationTest.scala | 2 +- 8 files changed, 21 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 0492f68acc26a..eb420dd4faa50 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -480,6 +480,7 @@ object ConfigCommand extends Config { describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll) case ConfigType.User | ConfigType.Client => describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames) + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } } @@ -491,6 +492,7 @@ object ConfigCommand extends Config { adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq case ConfigType.Broker | BrokerLoggerConfigType => adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) entities.foreach { entity => @@ -530,6 +532,7 @@ object ConfigCommand extends Config { if (!entityName.isEmpty) validateBrokerId() (ConfigResource.Type.BROKER_LOGGER, None) + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } val configSourceFilter = if (describeAll) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2d7a53714276b..a8769ec941d3a 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -44,8 +44,6 @@ import org.apache.kafka.common.requests.ListOffsetResponse import org.apache.kafka.common.ConsumerGroupState import joptsimple.OptionException -import scala.annotation.nowarn - object ConsumerGroupCommand extends Logging { def main(args: Array[String]): Unit = { @@ -151,21 +149,22 @@ object ConsumerGroupCommand extends Logging { private[admin] case class CsvUtils() { val mapper = new CsvMapper with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) - def readerFor[T <: CsvRecord: ClassTag] = { + def readerFor[T <: CsvRecord : ClassTag] = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.readerFor(clazz).`with`(schema) } - def writerFor[T <: CsvRecord: ClassTag] = { + def writerFor[T <: CsvRecord : ClassTag] = { val schema = getSchema[T] val clazz = implicitly[ClassTag[T]].runtimeClass mapper.writerFor(clazz).`with`(schema) } - private def getSchema[T <: CsvRecord: ClassTag] = { + private def getSchema[T <: CsvRecord : ClassTag] = { val clazz = implicitly[ClassTag[T]].runtimeClass - val fields = clazz match { - case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields - case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields + val fields = { + if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields + else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields + else throw new IllegalStateException(s"Unhandled class $clazz") } val schema = mapper.schemaFor(clazz).sortedBy(fields: _*) schema diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 71ae2476b9d0e..dd2eeb6f360ca 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -589,7 +589,7 @@ class GroupMetadataManager(brokerId: Int, readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 - val memRecords = fetchDataInfo.records match { + val memRecords = (fetchDataInfo.records: @unchecked) match { case records: MemoryRecords => records case fileRecords: FileRecords => val sizeInBytes = fileRecords.sizeInBytes diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index b2835d778c38f..2882d8630249c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -314,7 +314,7 @@ class TransactionStateManager(brokerId: Int, readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 - val memRecords = fetchDataInfo.records match { + val memRecords = (fetchDataInfo.records: @unchecked) match { case records: MemoryRecords => records case fileRecords: FileRecords => val sizeInBytes = fileRecords.sizeInBytes diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala index 9a91b33cf7295..7f769c8463e09 100644 --- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala +++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala @@ -45,6 +45,8 @@ object KafkaNetworkChannel { new EndQuorumEpochResponse(endEpochResponse) case fetchResponse: FetchResponseData => new FetchResponse(fetchResponse) + case _ => + throw new IllegalArgumentException(s"Unexpected type for responseData: $responseData") } } @@ -61,6 +63,8 @@ object KafkaNetworkChannel { new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) { override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version) } + case _ => + throw new IllegalArgumentException(s"Unexpected type for requestData: $requestData") } } @@ -70,6 +74,7 @@ object KafkaNetworkChannel { case beginEpochResponse: BeginQuorumEpochResponse => beginEpochResponse.data case endEpochResponse: EndQuorumEpochResponse => endEpochResponse.data case fetchResponse: FetchResponse[_] => fetchResponse.data + case _ => throw new IllegalArgumentException(s"Unexpected type for response: $response") } } @@ -79,6 +84,7 @@ object KafkaNetworkChannel { case beginEpochRequest: BeginQuorumEpochRequest => beginEpochRequest.data case endEpochRequest: EndQuorumEpochRequest => endEpochRequest.data case fetchRequest: FetchRequest => fetchRequest.data + case _ => throw new IllegalArgumentException(s"Unexpected type for request: $request") } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 52985f9f7852c..edaa6b71e5b25 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -679,7 +679,7 @@ abstract class AbstractFetcherThread(name: String, } protected def toMemoryRecords(records: Records): MemoryRecords = { - records match { + (records: @unchecked) match { case r: MemoryRecords => r case r: FileRecords => val buffer = ByteBuffer.allocate(r.sizeInBytes) diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala index d2c6d4b2dec41..3a4dd9487d181 100644 --- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala @@ -53,7 +53,7 @@ class BaseClientQuotaManagerTest { protected def callback(response: RequestChannel.Response): Unit = { // Count how many times this callback is called for notifyThrottlingDone(). - response match { + (response: @unchecked) match { case _: StartThrottlingResponse => case _: EndThrottlingResponse => numCallbacks += 1 } diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala index ff33084948c8f..02e36413b860a 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala @@ -62,7 +62,7 @@ class ThrottledChannelExpirationTest { } def callback(response: Response): Unit = { - response match { + (response: @unchecked) match { case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1 case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1 } From 47463cb35dd007ce2f9849b7347a6fa02801270c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:22:58 -0800 Subject: [PATCH 05/14] Don't swallow exceptions in ReassignPartitionsCommand --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 3ae128976fdca..ad7b545417211 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -479,8 +479,7 @@ object ReassignPartitionsCommand extends Logging { private def topicDescriptionFutureToState(partition: Int, future: KafkaFuture[TopicDescription], - targetReplicas: Seq[Int]) - : PartitionReassignmentState = { + targetReplicas: Seq[Int]): PartitionReassignmentState = { try { val topicDescription = future.get() if (topicDescription.partitions().size() < partition) { @@ -494,7 +493,8 @@ object ReassignPartitionsCommand extends Logging { case t: ExecutionException => t.getCause match { case _: UnknownTopicOrPartitionException => - new PartitionReassignmentState(Seq(), targetReplicas, true) + PartitionReassignmentState(Seq(), targetReplicas, true) + case e => throw e } } } From e0b249356901af1b6b86a9bdcd0fc69759d7d8dc Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:23:18 -0800 Subject: [PATCH 06/14] Add missing `sealed` modifier in RequestChannel.Response --- core/src/main/scala/kafka/network/RequestChannel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 064c0e85e7f2d..3945e8e15c17c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -308,7 +308,7 @@ object RequestChannel extends Logging { } - abstract class Response(val request: Request) { + sealed abstract class Response(val request: Request) { def processor: Int = request.processor From 3eab2dfe3b5e46dc7f482c726bbe4f6da2606ad1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:24:01 -0800 Subject: [PATCH 07/14] Introduce sealed ClientQuotaManager.BaseUserEntity to avoid false positive exhaustiveness warning --- core/src/main/scala/kafka/server/ClientQuotaManager.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 69676ec3afef0..e32978cf1710e 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -80,7 +80,9 @@ object ClientQuotaManager { val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None) val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity)) - case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity { + sealed trait BaseUserEntity extends ClientQuotaEntity.ConfigEntity + + case class UserEntity(sanitizedUser: String) extends BaseUserEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER override def name: String = Sanitizer.desanitize(sanitizedUser) override def toString: String = s"user $sanitizedUser" @@ -92,7 +94,7 @@ object ClientQuotaManager { override def toString: String = s"client-id $clientId" } - case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity { + case object DefaultUserEntity extends BaseUserEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER override def name: String = ConfigEntityName.Default override def toString: String = "default user" @@ -104,7 +106,7 @@ object ClientQuotaManager { override def toString: String = "default client-id" } - case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity], + case class KafkaQuotaEntity(userEntity: Option[BaseUserEntity], clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity { override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] = (userEntity.toList ++ clientIdEntity.toList).asJava From f5cfb389faf4efb09db377077ee248ea45f09a96 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:30:39 -0800 Subject: [PATCH 08/14] More exhaustiveness handling fixes --- .../SaslPlainSslEndToEndAuthorizationTest.scala | 17 +++++++---------- .../api/SslEndToEndAuthorizationTest.scala | 17 +++++++---------- .../kafka/raft/KafkaNetworkChannelTest.scala | 2 +- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index d74805f067649..356de957e2236 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -38,16 +38,13 @@ object SaslPlainSslEndToEndAuthorizationTest { class TestPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { - context match { - case ctx: SaslAuthenticationContext => - ctx.server.getAuthorizationID match { - case KafkaPlainAdmin => - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin") - case KafkaPlainUser => - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") - case _ => - KafkaPrincipal.ANONYMOUS - } + context.asInstanceOf[SaslAuthenticationContext].server.getAuthorizationID match { + case KafkaPlainAdmin => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin") + case KafkaPlainUser => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") + case _ => + KafkaPrincipal.ANONYMOUS } } } diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 4567b713c5d76..b705050c1b111 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -34,16 +34,13 @@ object SslEndToEndAuthorizationTest { // Use full DN as client principal to test special characters in principal // Use field from DN as server principal to test custom PrincipalBuilder override def build(context: AuthenticationContext): KafkaPrincipal = { - context match { - case ctx: SslAuthenticationContext => - val peerPrincipal = ctx.session.getPeerPrincipal.getName - peerPrincipal match { - case Pattern(name, _) => - val principal = if (name == "server") name else peerPrincipal - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) - case _ => - KafkaPrincipal.ANONYMOUS - } + val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName + peerPrincipal match { + case Pattern(name, _) => + val principal = if (name == "server") name else peerPrincipal + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) + case _ => + KafkaPrincipal.ANONYMOUS } } } diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala index 71697589650b8..699e8faa711bb 100644 --- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala +++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala @@ -207,7 +207,7 @@ class KafkaNetworkChannelTest { } private def extractError(response: ApiMessage): Errors = { - val code = response match { + val code = (response: @unchecked) match { case res: BeginQuorumEpochResponseData => res.errorCode case res: EndQuorumEpochResponseData => res.errorCode case res: FetchResponseData => res.errorCode From d4272013f343de63c98e95ed31faa08a48b3dda5 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:33:23 -0800 Subject: [PATCH 09/14] Workaround scalac bug related to exhaustiveness warnings --- core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index a131acdda7a6d..96ef6d5179368 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -195,7 +195,10 @@ class ZooKeeperClient(connectString: String, def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs()) val sendTimeMs = time.hiResClockMs() - request match { + + // Cast to AsyncRequest to workaround a scalac bug that results in an false exhaustiveness warning + // with -Xlint:strict-unsealed-patmat + (request: AsyncRequest) match { case ExistsRequest(path, ctx) => zooKeeper.exists(path, shouldWatch(request), new StatCallback { def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = From 601d257ef07ceda9182fcd2127b4d85f30a21cde Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 07:33:38 -0800 Subject: [PATCH 10/14] Bump scala version in gradle.properties --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 5ce35dbfb87a4..6c73c38857b41 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,6 +21,6 @@ group=org.apache.kafka # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py version=2.8.0-SNAPSHOT -scalaVersion=2.13.3 +scalaVersion=2.13.4 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC From 773455fa88efeb8115eb748364cf2bbe493a793b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 14:39:57 -0800 Subject: [PATCH 11/14] Fix spotBugs error in Log --- core/src/main/scala/kafka/log/Log.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 173357e0cc919..da1af82fc15fd 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2427,13 +2427,11 @@ class Log(@volatile private var _dir: File, info(s"Replacing overflowed segment $segment with split segments $newSegments") replaceSegments(newSegments.toList, List(segment)) newSegments.toList - } catch { - case e: Exception => - newSegments.foreach { splitSegment => - splitSegment.close() - splitSegment.deleteIfExists() - } - throw e + } finally { + newSegments.foreach { splitSegment => + splitSegment.close() + splitSegment.deleteIfExists() + } } } } @@ -2737,4 +2735,4 @@ case object LogDeletion extends SegmentDeletionReason { override def logReason(log: Log, toDelete: List[LogSegment]): Unit = { log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}") } -} \ No newline at end of file +} From 401c9b674926c19b6f3295f3ea61985002f69664 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 23 Nov 2020 19:59:39 -0800 Subject: [PATCH 12/14] Fix test failures --- core/src/main/scala/kafka/log/Log.scala | 12 +++++++----- gradle/spotbugs-exclude.xml | 8 ++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index da1af82fc15fd..15dc9cedd1079 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2427,11 +2427,13 @@ class Log(@volatile private var _dir: File, info(s"Replacing overflowed segment $segment with split segments $newSegments") replaceSegments(newSegments.toList, List(segment)) newSegments.toList - } finally { - newSegments.foreach { splitSegment => - splitSegment.close() - splitSegment.deleteIfExists() - } + } catch { + case e: Exception => + newSegments.foreach { splitSegment => + splitSegment.close() + splitSegment.deleteIfExists() + } + throw e } } } diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index f395d0a776515..9115e0d59ae82 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -124,6 +124,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + From a3c80af930ba419c82d4eacbc1ae821cd1bdb2d6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Nov 2020 06:26:37 -0800 Subject: [PATCH 13/14] Revert Scala 2.12.13 change, it's not available yet --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 034426009c93b..f4868e95761a0 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -27,7 +27,7 @@ ext { } // Add Scala version -def defaultScala212Version = '2.12.13' +def defaultScala212Version = '2.12.12' def defaultScala213Version = '2.13.4' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { From 7d125d127cb10bb415c27c4e754c4386ab754c4f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Nov 2020 06:26:48 -0800 Subject: [PATCH 14/14] Remove unnecessary braces --- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index a8769ec941d3a..d8a01abe1cbf2 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -161,11 +161,12 @@ object ConsumerGroupCommand extends Logging { } private def getSchema[T <: CsvRecord : ClassTag] = { val clazz = implicitly[ClassTag[T]].runtimeClass - val fields = { + + val fields = if (classOf[CsvRecordWithGroup] == clazz) CsvRecordWithGroup.fields else if (classOf[CsvRecordNoGroup] == clazz) CsvRecordNoGroup.fields else throw new IllegalStateException(s"Unhandled class $clazz") - } + val schema = mapper.schemaFor(clazz).sortedBy(fields: _*) schema }