diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 95f1a3b606633..b06fb12535f7f 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.2 + SCALA_VERSION=2.13.3 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 5c0c26450851b..4a516c026a078 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.2 + set SCALA_VERSION=2.13.3 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/build.gradle b/build.gradle index 180e1fc4e7242..9b0806d3517b7 100644 --- a/build.gradle +++ b/build.gradle @@ -465,7 +465,6 @@ subprojects { "-Xlint:delayedinit-select", "-Xlint:doc-detached", "-Xlint:missing-interpolator", - "-Xlint:nullary-override", "-Xlint:nullary-unit", "-Xlint:option-implicit", "-Xlint:package-object-classes", @@ -503,6 +502,7 @@ subprojects { if (versions.baseScala == '2.12') { scalaCompileOptions.additionalParameters += [ "-Xlint:by-name-right-associative", + "-Xlint:nullary-override", "-Xlint:unsound-match" ] } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 7b4001c5a6a59..8162535b5de13 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -77,7 +77,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown) + Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown()) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index fdc83fd9b9730..ea7021c409303 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -627,7 +627,7 @@ object ConfigCommand extends Config { } } - val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None)) + val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next())) else None)) ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None) } @@ -711,12 +711,12 @@ object ConfigCommand extends Config { (userDefaults, ConfigType.User), (brokerDefaults, ConfigType.Broker)) - private[admin] def entityTypes(): List[String] = { + private[admin] def entityTypes: List[String] = { options.valuesOf(entityType).asScala.toList ++ (entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2) } - private[admin] def entityNames(): List[String] = { + private[admin] def entityNames: List[String] = { val namesIterator = options.valuesOf(entityName).iterator options.specs.asScala .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index a0fd20621471e..0defefa1cc2e3 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -44,6 +44,8 @@ import org.apache.kafka.common.requests.ListOffsetResponse import org.apache.kafka.common.ConsumerGroupState import joptsimple.OptionException +import scala.annotation.nowarn + object ConsumerGroupCommand extends Logging { def main(args: Array[String]): Unit = { @@ -566,6 +568,7 @@ object ConsumerGroupCommand extends Logging { /** * Returns the state of the specified consumer group and partition assignment states */ + @nowarn("cat=optimizer") def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None)) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index eabc58c5417d3..7403a61b52c79 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -178,7 +178,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case Some(partitions) => partitions.map(_.topic).toSet case None => - zkClient.getAllPartitions().map(_.topic) + zkClient.getAllPartitions.map(_.topic) } val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) => @@ -190,7 +190,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case Some(partitions) => partitions.partition(partitionsFromZk.contains) case None => - (zkClient.getAllPartitions(), Set.empty) + (zkClient.getAllPartitions, Set.empty) } PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2d7f2dc13cb4c..ea46411b6ae7e 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -1111,7 +1111,7 @@ object ReassignPartitionsCommand extends Logging { // Check for the presence of the legacy partition reassignment ZNode. This actually // won't detect all rebalances... only ones initiated by the legacy method. // This is a limitation of the legacy ZK API. - val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress() + val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress if (reassignPartitionsInProgress) { // Note: older versions of this tool would modify the broker quotas here (but not // topic quotas, for some reason). This behavior wasn't documented in the --execute diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 7a6ccd430be3c..28e3468a8d3cf 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -176,7 +176,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { } private def setAclIndividually(path: String): Unit = { - val setPromise = Promise[String] + val setPromise = Promise[String]() futures.synchronized { futures += setPromise.future } @@ -184,8 +184,8 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { } private def setAclsRecursively(path: String): Unit = { - val setPromise = Promise[String] - val childrenPromise = Promise[String] + val setPromise = Promise[String]() + val childrenPromise = Promise[String]() futures.synchronized { futures += setPromise.future futures += childrenPromise.future @@ -279,15 +279,15 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { future match { case Some(a) => Await.result(a, 6000 millis) - futures.synchronized { futures.dequeue } - recurse + futures.synchronized { futures.dequeue() } + recurse() case None => } } recurse() } finally { - zkClient.close + zkClient.close() } } diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index 42341cf72e47b..c49ec085cc7f0 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, } class ChangeNotification { - def process(): Unit = processNotifications + def process(): Unit = processNotifications() } /** @@ -143,17 +143,17 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) { - override def doWork(): Unit = queue.take().process + override def doWork(): Unit = queue.take().process() } object ChangeNotificationHandler extends ZNodeChildChangeHandler { override val path: String = seqNodeRoot - override def handleChildChange(): Unit = addChangeNotification + override def handleChildChange(): Unit = addChangeNotification() } object ZkStateChangeHandler extends StateChangeHandler { override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot) - override def afterInitializingSession(): Unit = addChangeNotification + override def afterInitializingSession(): Unit = addChangeNotification() } } diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 530df2f3e18e9..bf217eba9df86 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -416,15 +416,13 @@ class ControllerContext { partitionLeadershipInfo.get(partition) } - def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = { + def partitionsLeadershipInfo: Map[TopicPartition, LeaderIsrAndControllerEpoch] = partitionLeadershipInfo - } - def partitionsWithLeaders(): Set[TopicPartition] = { - partitionLeadershipInfo.keys.filter(tp => !isTopicQueuedUpForDeletion(tp.topic)).toSet - } + def partitionsWithLeaders: Set[TopicPartition] = + partitionLeadershipInfo.keySet.filter(tp => !isTopicQueuedUpForDeletion(tp.topic)) - def partitionsWithOfflineLeader(): Set[TopicPartition] = { + def partitionsWithOfflineLeader: Set[TopicPartition] = { partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) => !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) && !isTopicQueuedUpForDeletion(topicPartition.topic) @@ -439,13 +437,9 @@ class ControllerContext { }.keySet } - def clearPartitionLeadershipInfo(): Unit = { - partitionLeadershipInfo.clear() - } + def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear() - def partitionWithLeadersCount(): Int = { - partitionLeadershipInfo.size - } + def partitionWithLeadersCount: Int = partitionLeadershipInfo.size private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition, oldReplicaAssignment: Option[ReplicaAssignment], diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f5757b37d5502..a535eec3e74e2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -260,7 +260,7 @@ class KafkaController(val config: KafkaConfig, info("starting the token expiry check scheduler") tokenCleanScheduler.startup() tokenCleanScheduler.schedule(name = "delete-expired-tokens", - fun = () => tokenManager.expireTokens, + fun = () => tokenManager.expireTokens(), period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } @@ -439,7 +439,7 @@ class KafkaController(val config: KafkaConfig, val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) = newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) - val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader() + val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition) @@ -931,10 +931,10 @@ class KafkaController(val config: KafkaConfig, * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed */ private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = { - if (!zkClient.reassignPartitionsInProgress()) + if (!zkClient.reassignPartitionsInProgress) return - val reassigningPartitions = zkClient.getPartitionReassignment() + val reassigningPartitions = zkClient.getPartitionReassignment val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => shouldRemoveReassignment(tp, replicas) } @@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) => + zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) => maybeBuildReassignment(tp, Some(targetReplicas)) match { case Some(context) => partitionsToReassign.put(tp, context) case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 9a548250d7b83..6883ceb15adff 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -174,7 +174,7 @@ class GroupMetadataManager(brokerId: Int, scheduler.startup() if (enableMetadataExpiration) { scheduler.schedule(name = "delete-expired-group-metadata", - fun = () => cleanupGroupMetadata, + fun = () => cleanupGroupMetadata(), period = config.offsetsRetentionCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } @@ -752,7 +752,7 @@ class GroupMetadataManager(brokerId: Int, onGroupUnloaded: GroupMetadata => Unit): Unit = { val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) info(s"Scheduling unloading of offsets and group metadata from $topicPartition") - scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets) + scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets()) def removeGroupsAndOffsets(): Unit = { var numOffsetsRemoved = 0 diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index decff4d86b089..cbc8847ac10f3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File, var truncated = false while (unflushed.hasNext && !truncated) { - val segment = unflushed.next + val segment = unflushed.next() info(s"Recovering unflushed segment ${segment.baseOffset}") val truncatedBytes = try { @@ -2270,7 +2270,7 @@ class Log(@volatile private var _dir: File, if (asyncDelete) { info(s"Scheduling segments for deletion ${segments.mkString(",")}") - scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs) + scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs) } else { deleteSegments() } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8dc6370e19213..16bb15721f9d4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File], var recoveryPoints = Map[TopicPartition, Long]() try { - recoveryPoints = this.recoveryPointCheckpoints(dir).read + recoveryPoints = this.recoveryPointCheckpoints(dir).read() } catch { case e: Exception => warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e) @@ -327,7 +327,7 @@ class LogManager(logDirs: Seq[File], var logStartOffsets = Map[TopicPartition, Long]() try { - logStartOffsets = this.logStartOffsetCheckpoints(dir).read + logStartOffsets = this.logStartOffsetCheckpoints(dir).read() } catch { case e: Exception => warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e) @@ -1039,7 +1039,7 @@ class LogManager(logDirs: Seq[File], debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" + s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush") if(timeSinceLastFlush >= log.config.flushMs) - log.flush + log.flush() } catch { case e: Throwable => error(s"Error flushing topic ${topicPartition.topic}", e) diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 9e239785ee51d..d584148769cb2 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -68,7 +68,7 @@ object AclAuthorizer { def find(p: AclEntry => Boolean): Option[AclEntry] = { // Lazily iterate through the inner `Seq` elements and stop as soon as we find a match val it = seqs.iterator.flatMap(_.find(p)) - if (it.hasNext) Some(it.next) + if (it.hasNext) Some(it.next()) else None } @@ -367,7 +367,8 @@ class AclAuthorizer extends Authorizer with Logging { } else false } - @nowarn("cat=deprecation&cat=optimizer") + @nowarn("cat=deprecation") + @nowarn("cat=optimizer") private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = { // this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes @@ -523,7 +524,7 @@ class AclAuthorizer extends Authorizer with Logging { } } - if(!writeComplete) + if (!writeComplete) throw new IllegalStateException(s"Failed to update ACLs for $resource after trying a maximum of $maxUpdateRetries times") if (newVersionedAcls.acls != currentVersionedAcls.acls) { @@ -538,6 +539,7 @@ class AclAuthorizer extends Authorizer with Logging { } } + @nowarn("cat=optimizer") private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = { aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource")) } @@ -548,9 +550,9 @@ class AclAuthorizer extends Authorizer with Logging { private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { if (versionedAcls.acls.nonEmpty) { - aclCache = aclCache + (resource -> versionedAcls) + aclCache = aclCache.updated(resource, versionedAcls) } else { - aclCache = aclCache - resource + aclCache -= resource } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f80d6fa9d678a..7dc77e499aeaa 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -651,7 +651,7 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } - def partitionCount(): Int = { + def partitionCount: Int = { partitionMapLock.lockInterruptibly() try partitionStates.size finally partitionMapLock.unlock() diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala index 3b8fc6704f437..9a7002bca8431 100644 --- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala +++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala @@ -185,10 +185,10 @@ class DelegationTokenManager(val config: KafkaConfig, def startup() = { if (config.tokenAuthEnabled) { - zkClient.createDelegationTokenPaths - loadCache + zkClient.createDelegationTokenPaths() + loadCache() tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, DelegationTokenChangeNotificationZNode.path, DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, TokenChangedNotificationHandler) - tokenChangeListener.init + tokenChangeListener.init() } } @@ -267,7 +267,7 @@ class DelegationTokenManager(val config: KafkaConfig, responseCallback(CreateTokenResult(-1, -1, -1, "", Array[Byte](), Errors.DELEGATION_TOKEN_AUTH_DISABLED)) } else { lock.synchronized { - val tokenId = CoreUtils.generateUuidAsBase64 + val tokenId = CoreUtils.generateUuidAsBase64() val issueTimeStamp = time.milliseconds val maxLifeTime = if (maxLifeTimeMs <= 0) tokenMaxLifetime else Math.min(maxLifeTimeMs, tokenMaxLifetime) @@ -464,16 +464,10 @@ class DelegationTokenManager(val config: KafkaConfig, } } - /** - * - * @return - */ - def getAllTokenInformation(): List[TokenInformation] = { - tokenCache.tokens.asScala.toList - } + def getAllTokenInformation: List[TokenInformation] = tokenCache.tokens.asScala.toList def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = { - getAllTokenInformation().filter(filterToken).map(token => getToken(token)) + getAllTokenInformation.filter(filterToken).map(token => getToken(token)) } object TokenChangedNotificationHandler extends NotificationHandler { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index dc85dcffd4470..46bd890b459dc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1528,7 +1528,7 @@ class KafkaApis(val requestChannel: RequestChannel, Option(syncGroupRequest.data.protocolType), Option(syncGroupRequest.data.protocolName), Option(syncGroupRequest.data.groupInstanceId), - assignmentMap.result, + assignmentMap.result(), sendResponseCallback ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a8b54ad2c2de1..01051037c3e0e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1210,7 +1210,7 @@ object KafkaConfig { .define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc) } - def configNames() = configDef.names().asScala.toList.sorted + def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala private[server] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 09fd4e7661e08..de38102c8df98 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -457,7 +457,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = { - zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64)) + zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64())) } private[server] def createBrokerInfo: BrokerInfo = { @@ -745,7 +745,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP */ def awaitShutdown(): Unit = shutdownLatch.await() - def getLogManager(): LogManager = logManager + def getLogManager: LogManager = logManager def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala index c941163d017c4..039f22ede6d96 100644 --- a/core/src/main/scala/kafka/server/QuotaFactory.scala +++ b/core/src/main/scala/kafka/server/QuotaFactory.scala @@ -49,9 +49,9 @@ object QuotaFactory extends Logging { alterLogDirs: ReplicationQuotaManager, clientQuotaCallback: Option[ClientQuotaCallback]) { def shutdown(): Unit = { - fetch.shutdown - produce.shutdown - request.shutdown + fetch.shutdown() + produce.shutdown() + request.shutdown() clientQuotaCallback.foreach(_.close()) } } diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index 7098849cc1fc6..0af9b19bd72b1 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -174,10 +174,10 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, * * @return */ - def upperBound(): Long = { + def upperBound: Long = { inReadLock(lock) { if (quota != null) - quota.bound().toLong + quota.bound.toLong else Long.MaxValue } diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 9d14ae6f2bd09..38a009a5fb0c7 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -227,9 +227,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } // Visible for testing - def epochEntries: Seq[EpochEntry] = { - epochs - } + def epochEntries: Seq[EpochEntry] = epochs private def latestEntry: Option[EpochEntry] = epochs.lastOption diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 22d33e642c461..197e94407280d 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -334,14 +334,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[To MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != batch.lastOffset) { - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicPartition + println(ReplicaVerificationTool.getCurrentTimeString() + ": partition " + topicPartition + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + messageInfoFromFirstReplica.offset + " doesn't match replica " + replicaId + "'s offset " + batch.lastOffset) Exit.exit(1) } if (messageInfoFromFirstReplica.checksum != batch.checksum) - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + println(ReplicaVerificationTool.getCurrentTimeString() + ": partition " + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica " + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum + "; replica " + replicaId + "'s checksum " + batch.checksum) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 49eaccbe537d5..de711e5d02f4b 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -137,7 +137,7 @@ object StateChangeLogMerger extends Logging { */ val pqueue = new mutable.PriorityQueue[LineIterator]()(dateBasedOrdering) val output: OutputStream = new BufferedOutputStream(System.out, 1024*1024) - val lineIterators = files.map(scala.io.Source.fromFile(_).getLines) + val lineIterators = files.map(scala.io.Source.fromFile(_).getLines()) var lines: List[LineIterator] = List() for (itr <- lineIterators) { @@ -166,7 +166,7 @@ object StateChangeLogMerger extends Logging { */ def getNextLine(itr: Iterator[String]): LineIterator = { while (itr != null && itr.hasNext) { - val nextLine = itr.next + val nextLine = itr.next() dateRegex.findFirstIn(nextLine).foreach { d => val date = dateFormat.parse(d) if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) { diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala b/core/src/main/scala/kafka/utils/json/DecodeJson.scala index 7f00c366af4f0..71bfd61cee11e 100644 --- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala +++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala @@ -100,7 +100,7 @@ object DecodeJson { private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit factory: Factory[T, C]): Either[String, C] = { val result = factory.newBuilder while (it.hasNext) { - f(it.next) match { + f(it.next()) match { case Right(x) => result += x case Left(x) => return Left(x) } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index be212dd4c170d..efdf9bf6a98e7 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -105,7 +105,7 @@ class SystemTimer(executorName: String, writeLock.lock() try { while (bucket != null) { - timingWheel.advanceClock(bucket.getExpiration()) + timingWheel.advanceClock(bucket.getExpiration) bucket.flush(reinsert) bucket = delayQueue.poll() } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index 66238540d34d6..a1995c1df0a68 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -40,8 +40,6 @@ trait TimerTask extends Runnable { } } - private[timer] def getTimerTaskEntry(): TimerTaskEntry = { - timerTaskEntry - } + private[timer] def getTimerTaskEntry: TimerTaskEntry = timerTaskEntry } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index e463167268626..5892dfaed5a64 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -43,9 +43,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } // Get the bucket's expiration time - def getExpiration(): Long = { - expiration.get() - } + def getExpiration: Long = expiration.get // Apply the supplied function to each of tasks in this list def foreach(f: (TimerTask)=>Unit): Unit = { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 1272108461690..189568ef0e9cd 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -81,7 +81,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo private[kafka] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = { val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) - createResponse.maybeThrow + createResponse.maybeThrow() createResponse.name } @@ -364,7 +364,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName) try createRecursive(path, configData) catch { - case _: NodeExistsException => set(configData).maybeThrow + case _: NodeExistsException => set(configData).maybeThrow() } } @@ -373,7 +373,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo val setDataResponse = set(configData) setDataResponse.resultCode match { case Code.NONODE => createOrSet(configData) - case _ => setDataResponse.maybeThrow + case _ => setDataResponse.maybeThrow() } } @@ -506,7 +506,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo assignment: Map[TopicPartition, ReplicaAssignment], expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = { val setDataResponse = setTopicAssignmentRaw(topic, assignment, expectedControllerEpochZkVersion) - setDataResponse.maybeThrow + setDataResponse.maybeThrow() } /** @@ -561,7 +561,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo if (getChildrenResponse.resultCode == Code.OK) { deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion) } else if (getChildrenResponse.resultCode != Code.NONODE) { - getChildrenResponse.maybeThrow + getChildrenResponse.maybeThrow() } } @@ -666,7 +666,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * Gets all partitions in the cluster * @return all partitions in the cluster */ - def getAllPartitions(): Set[TopicPartition] = { + def getAllPartitions: Set[TopicPartition] = { val topics = getChildren(TopicsZNode.path) if (topics == null) Set.empty else { @@ -810,10 +810,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo /** * Returns all reassignments. * @return the reassignments for each partition. - * @deprecated Use the PartitionReassignment Kafka API instead */ - @Deprecated - def getPartitionReassignment(): collection.Map[TopicPartition, Seq[Int]] = { + def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = { val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) getDataResponse.resultCode match { @@ -857,18 +855,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo setDataResponse.resultCode match { case Code.NONODE => val createDataResponse = create(reassignmentData) - createDataResponse.maybeThrow - case _ => setDataResponse.maybeThrow + createDataResponse.maybeThrow() + case _ => setDataResponse.maybeThrow() } } /** * Creates the partition reassignment znode with the given reassignment. * @param reassignment the reassignment to set on the reassignment znode. - * @throws KeeperException if there is an error while creating the znode - * @deprecated Use the PartitionReassignment Kafka API instead + * @throws KeeperException if there is an error while creating the znode. */ - @Deprecated def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]]) = { createRecursive(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment)) } @@ -876,20 +872,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo /** * Deletes the partition reassignment znode. * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. - * @deprecated Use the PartitionReassignment Kafka API instead */ - @Deprecated def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = { deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion) } /** - * Checks if reassign partitions is in progress - * @return true if reassign partitions is in progress, else false - * @deprecated Use the PartitionReassignment Kafka API instead + * Checks if reassign partitions is in progress. + * @return true if reassign partitions is in progress, else false. */ - @Deprecated - def reassignPartitionsInProgress(): Boolean = { + def reassignPartitionsInProgress: Boolean = { pathExists(ReassignPartitionsZNode.path) } @@ -993,7 +985,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo if (getChildrenResponse.resultCode == Code.OK) { deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion) } else if (getChildrenResponse.resultCode != Code.NONODE) { - getChildrenResponse.maybeThrow + getChildrenResponse.maybeThrow() } } @@ -1183,7 +1175,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource) val createRequest = CreateRequest(aclChange.path, aclChange.bytes, defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) - createResponse.maybeThrow + createResponse.maybeThrow() } def propagateLogDirEvent(brokerId: Int): Unit = { @@ -1209,7 +1201,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo if (getChildrenResponse.resultCode == Code.OK) { deleteAclChangeNotifications(store.aclChangePath, getChildrenResponse.children) } else if (getChildrenResponse.resultCode != Code.NONODE) { - getChildrenResponse.maybeThrow + getChildrenResponse.maybeThrow() } }) } @@ -1228,7 +1220,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo val deleteResponses = retryRequestsUntilConnected(deleteRequests) deleteResponses.foreach { deleteResponse => if (deleteResponse.resultCode != Code.NONODE) { - deleteResponse.maybeThrow + deleteResponse.maybeThrow() } } } @@ -1349,8 +1341,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo setDataResponse.resultCode match { case Code.NONODE => val createDataResponse = create(tokenInfo) - createDataResponse.maybeThrow - case _ => setDataResponse.maybeThrow + createDataResponse.maybeThrow() + case _ => setDataResponse.maybeThrow() } } @@ -1478,7 +1470,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo if (setDataResponse.resultCode == Code.NONODE) { createConsumerOffset(group, topicPartition, offset) } else { - setDataResponse.maybeThrow + setDataResponse.maybeThrow() } } @@ -1518,7 +1510,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo def setAcl(path: String, acl: Seq[ACL]): Unit = { val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion) val setAclResponse = retryRequestUntilConnected(setAclRequest) - setAclResponse.maybeThrow + setAclResponse.maybeThrow() } /** @@ -1575,7 +1567,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo defaultAcls(FeatureZNode.path), CreateMode.PERSISTENT) val response = retryRequestUntilConnected(createRequest) - response.maybeThrow + response.maybeThrow() } def updateFeatureZNode(nodeContents: FeatureZNode): Unit = { @@ -1584,7 +1576,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo FeatureZNode.encode(nodeContents), ZkVersion.MatchAnyVersion) val response = retryRequestUntilConnected(setRequest) - response.maybeThrow + response.maybeThrow() } def deleteFeatureZNode(): Unit = { @@ -1659,14 +1651,14 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo var createResponse = retryRequestUntilConnected(createRequest) if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) { - createResponse.maybeThrow + createResponse.maybeThrow() } else if (createResponse.resultCode == Code.NONODE) { createRecursive0(parentPath(path)) createResponse = retryRequestUntilConnected(createRequest) if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS) - createResponse.maybeThrow + createResponse.maybeThrow() } else if (createResponse.resultCode != Code.NODEEXISTS) - createResponse.maybeThrow + createResponse.maybeThrow() } diff --git a/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala b/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala index eb29361b6ef8b..93674ad39e240 100644 --- a/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala +++ b/core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala @@ -57,12 +57,12 @@ class ZooKeeperMainWithTlsSupportForKafka(args: Array[String], val zkClientConfi val args = co.getArgArray val cmd = co.getCommand if (args.length < 1) { - kafkaTlsUsage + kafkaTlsUsage() throw new MalformedCommandException("No command entered") } if (!ZooKeeperMain.commandMap.containsKey(cmd)) { - kafkaTlsUsage + kafkaTlsUsage() throw new CommandNotFoundException(s"Command not found $cmd") } super.processZKCmd(co) diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala index 28b1f069a2710..f5408920b2f2a 100644 --- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala +++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala @@ -42,7 +42,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8) val lineIter = content.split("\n").iterator assertTrue(lineIter.hasNext) - assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next) + assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next()) val nodeApiVersions = NodeApiVersions.create for (apiKey <- ApiKeys.values) { val apiVersion = nodeApiVersions.apiVersion(apiKey) @@ -54,10 +54,10 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { val usableVersion = nodeApiVersions.latestUsableVersion(apiKey) val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" assertTrue(lineIter.hasNext) - assertEquals(line, lineIter.next) + assertEquals(line, lineIter.next()) } assertTrue(lineIter.hasNext) - assertEquals(")", lineIter.next) + assertEquals(")", lineIter.next()) assertFalse(lineIter.hasNext) } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 900fc81f8edae..ea98f0b9c74d0 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -49,7 +49,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with @Before override def setUp(): Unit = { - super.setUp + super.setUp() TestUtils.waitUntilBrokerMetadataIsPropagated(servers) } diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 6dff4294ed7a2..a1fc75bebf8b1 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -68,7 +68,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg @Test def testCreateDeleteTopics(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topics = Seq("mytopic", "mytopic2", "mytopic3") val newTopics = Seq( new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), @@ -155,7 +155,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg @Test def testAuthorizedOperations(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) // without includeAuthorizedOperations flag var result = client.describeCluster @@ -181,9 +181,8 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg assertEquals(expectedOperations, topicResult.authorizedOperations) } - def configuredClusterPermissions(): Set[AclOperation] = { + def configuredClusterPermissions: Set[AclOperation] = AclEntry.supportedOperations(ResourceType.CLUSTER) - } override def modifyConfigs(configs: Seq[Properties]): Unit = { super.modifyConfigs(configs) @@ -199,7 +198,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg } } - def createConfig(): util.Map[String, Object] = { + def createConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index bab4452d7733c..b181dc2efea37 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -82,14 +82,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testClose(): Unit = { - val client = Admin.create(createConfig()) + val client = Admin.create(createConfig) client.close() client.close() // double close has no effect } @Test def testListNodes(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val brokerStrs = brokerList.split(",").toList.sorted var nodeStrs: List[String] = null do { @@ -101,7 +101,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testCreateExistingTopicsThrowTopicExistsException(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topic = "mytopic" val topics = Seq(topic) val newTopics = Seq(new NewTopic(topic, 1, 1.toShort)) @@ -118,7 +118,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testMetadataRefresh(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topics = Seq("mytopic") val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort)) client.createTopics(newTopics.asJava).all.get() @@ -136,7 +136,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testDescribeNonExistingTopic(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val existingTopic = "existing-topic" client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get() @@ -151,7 +151,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeCluster(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val result = client.describeCluster val nodes = result.nodes.get() val clusterId = result.clusterId().get() @@ -169,7 +169,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeLogDirs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topic = "topic" val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) => leaderId }.map { case (k, v) => @@ -197,7 +197,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeReplicaLogDirs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topic = "topic" val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) val replicas = leaderByPartition.map { case (partition, brokerId) => @@ -214,7 +214,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testAlterReplicaLogDirs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topic = "topic" val tp = new TopicPartition(topic, 0) val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap @@ -925,7 +925,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testAclOperations(): Unit = { val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) - client = Admin.create(createConfig()) + client = Admin.create(createConfig) assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException]) assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl)).all(), classOf[SecurityDisabledException]) @@ -939,7 +939,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testDelayedClose(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topics = Seq("mytopic", "mytopic2") val newTopics = topics.map(new NewTopic(_, 1, 1.toShort)) val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() @@ -956,7 +956,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testForceClose(): Unit = { - val config = createConfig() + val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") client = Admin.create(config) // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be @@ -973,7 +973,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testMinimumRequestTimeouts(): Unit = { - val config = createConfig() + val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") client = Admin.create(config) @@ -990,7 +990,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testCallInFlightTimeouts(): Unit = { - val config = createConfig() + val config = createConfig config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000") val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) @@ -1008,7 +1008,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { */ @Test def testConsumerGroups(): Unit = { - val config = createConfig() + val config = createConfig client = Admin.create(config) try { // Verify that initially there are no consumer groups to list. @@ -1224,7 +1224,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDeleteConsumerGroupOffsets(): Unit = { - val config = createConfig() + val config = createConfig client = Admin.create(config) try { val testTopicName = "test_topic" @@ -1662,7 +1662,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) // Create topics val topic = "list-reassignments-no-reassignments" @@ -1678,7 +1678,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val topic = "list-reassignments-no-reassignments" val tp = new TopicPartition(topic, 0) @@ -1802,7 +1802,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(Map(broker0Resource -> Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"), @@ -1839,7 +1839,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(Map(broker0Resource -> Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"), @@ -2029,7 +2029,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(ApiVersion.latestVersion, logConfig.messageFormatVersion) } - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val invalidConfigs = Map[String, String](LogConfig.MessageFormatVersionProp -> null, LogConfig.CompressionTypeProp -> "producer").asJava val newTopic = new NewTopic(topic, 2, brokerCount.toShort) @@ -2057,7 +2057,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeConfigsForLog4jLogLevels(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val loggerConfig = describeBrokerLoggers() val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value() @@ -2073,7 +2073,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val initialLoggerConfig = describeBrokerLoggers() val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() @@ -2137,7 +2137,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) // step 1 - configure root logger val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL val alterRootLoggerEntry = Seq( @@ -2179,7 +2179,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val deleteRootLoggerEntry = Seq( new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE) ).asJavaCollection @@ -2190,7 +2190,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val validLoggerName = "kafka.server.KafkaRequestHandler" val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName) def assertLogLevelDidNotChange(): Unit = { @@ -2236,7 +2236,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val alterLogLevelsEntries = Seq( new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 14fd5a58b04e8..4cbe7e58d635d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -163,7 +163,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with val consumer = createConsumer() consumer.subscribe(List(topic).asJava) - verifyAuthenticationException(consumerGroupService.listGroups) + verifyAuthenticationException(consumerGroupService.listGroups()) consumerGroupService.close() } @@ -176,7 +176,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with consumer.subscribe(List(topic).asJava) verifyWithRetry(consumer.poll(Duration.ofMillis(1000))) - assertEquals(1, consumerGroupService.listConsumerGroups.size) + assertEquals(1, consumerGroupService.listConsumerGroups().size) consumerGroupService.close() } diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index fa9f1ca47fd74..66a1a15284415 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -76,6 +76,6 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic)) case e: GroupAuthorizationException => assertEquals(group, e.groupId) } - confirmReauthenticationMetrics + confirmReauthenticationMetrics() } } diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index eb98b65c1d90b..c4e291f3ab92b 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -96,7 +96,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAclOperations(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) assertEquals(7, getAcls(AclBindingFilter.ANY).size) @@ -117,7 +117,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAclOperations2(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala) results.all.get() @@ -143,7 +143,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAclDescribe(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY), AccessControlEntryFilter.ANY) @@ -170,7 +170,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAclDelete(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY) @@ -220,7 +220,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors @Test def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned. val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY) @@ -257,7 +257,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAttemptToCreateInvalidAcls(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) val emptyResourceNameAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "", PatternType.LITERAL), @@ -268,7 +268,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } - override def configuredClusterPermissions(): Set[AclOperation] = { + override def configuredClusterPermissions: Set[AclOperation] = { Set(AclOperation.ALTER, AclOperation.CREATE, AclOperation.CLUSTER_ACTION, AclOperation.ALTER_CONFIGS, AclOperation.DESCRIBE, AclOperation.DESCRIBE_CONFIGS) } @@ -356,7 +356,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu @Test def testAclAuthorizationDenied(): Unit = { - client = Admin.create(createConfig()) + client = Admin.create(createConfig) // Test that we cannot create or delete ACLs when ALTER is denied. authorizationAdmin.addClusterAcl(DENY, ALTER) @@ -393,7 +393,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val denyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL), new AccessControlEntry("User:*", "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.DENY)) - client = Admin.create(createConfig()) + client = Admin.create(createConfig) client.createAcls(List(denyAcl).asJava, new CreateAclsOptions()).all().get() val topics = Seq(topic1, topic2) diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 2e02fefbcb074..4ab20fd411158 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -214,7 +214,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { val testSemaphore = new Semaphore(0) SslAdminIntegrationTest.semaphore = Some(testSemaphore) - client = Admin.create(createConfig()) + client = Admin.create(createConfig) val results = client.createAcls(List(acl2, acl3).asJava).values assertEquals(Set(acl2, acl3), results.keySet().asScala) assertFalse(results.values.asScala.exists(_.isDone)) @@ -237,7 +237,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { } private def createAdminClient: Admin = { - val config = createConfig() + val config = createConfig config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000") val client = Admin.create(config) adminClients += client diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 03e609f59ce1d..ea8815ddcccaf 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -390,7 +390,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop) + Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop()) miniKdc } diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala index 902acaf5bd1e6..b95a721cab275 100644 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -89,7 +89,7 @@ class ExitTest { array(1) = array(1).asInstanceOf[Int] + 1 } try { - Exit.addShutdownHook(name, sideEffect) // by-name parameter, only invoked due to above shutdownHookAdder + Exit.addShutdownHook(name, sideEffect()) // by-name parameter, only invoked due to above shutdownHookAdder assertEquals(1, array(1)) assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder @@ -109,7 +109,7 @@ class ExitTest { // mutate the first element array(0) = array(0) + name } - Exit.addShutdownHook(name, sideEffect) // by-name parameter, not invoked + Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked // make sure the first element wasn't mutated assertEquals(name, array(0)) Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala index 58e16ef63af81..3f72afd240489 100644 --- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -80,7 +80,7 @@ object ReplicationQuotasTestRig { def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = { val experiment = new Experiment() try { - experiment.setUp + experiment.setUp() experiment.run(config, journal, displayChartsOnScreen) journal.footer() } @@ -88,7 +88,7 @@ object ReplicationQuotasTestRig { case e: Exception => e.printStackTrace() } finally { - experiment.tearDown + experiment.tearDown() } } diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala index 1060c828436be..df68ca6a52c9f 100644 --- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala @@ -56,7 +56,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup { props.map(KafkaConfig.fromProps) } - private def createAdminConfig():util.Map[String, Object] = { + private def createAdminConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) val securityProps: util.Map[Object, Object] = diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index d59d942ef83e3..63597537bbd40 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -74,7 +74,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => servers.filter(s => s.config.brokerId != follower.config.brokerId) - .forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") + .forall(_.getLogManager.getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topics/test path deleted even when a follower replica is down") @@ -122,7 +122,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // create the topic TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), "Replicas for topic test not created.") val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) @@ -206,7 +206,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // create the topic TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), "Replicas for topic test not created.") // shutdown a broker to make sure the following topic deletion will be suspended val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) @@ -279,7 +279,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either TestUtils.waitUntilTrue(() => - servers.forall(_.getLogManager().getLog(newPartition).isEmpty), + servers.forall(_.getLogManager.getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") } @@ -298,7 +298,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", - servers.forall(_.getLogManager().getLog(newPartition).isEmpty)) + servers.forall(_.getLogManager.getLog(newPartition).isEmpty)) } @Test @@ -313,7 +313,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // re-create topic on same replicas TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // check if all replica logs are created - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), "Replicas for topic test not created.") } @@ -332,7 +332,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // verify delete topic path for test2 is removed from ZooKeeper TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers) // verify that topic test is untouched - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), "Replicas for topic test not created") // test the topic path exists assertTrue("Topic test mistakenly deleted", zkClient.topicExists(topic)) @@ -403,7 +403,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // create the topic TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), "Replicas for topic test not created") servers } @@ -428,7 +428,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topics/%s path not deleted even if deleteTopic is disabled".format(topic)) // verify that topic test is untouched - assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined)) + assertTrue(servers.forall(_.getLogManager.getLog(topicPartition).isDefined)) // test the topic path exists assertTrue("Topic path disappeared", zkClient.topicExists(topic)) // topic test should have a leader diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala index 1fb326792cbb6..7a3f3748b44c8 100644 --- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala @@ -79,7 +79,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit () => servers.forall { server => partitionsAndAssignments.forall { partitionAndAssignment => - server.getLogManager().getLog(partitionAndAssignment._1).isDefined + server.getLogManager.getLog(partitionAndAssignment._1).isDefined } }, "Replicas for topic test not created" diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 4e9f1f1d8c01f..b95485a1a9037 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -47,14 +47,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { @Before override def setUp(): Unit = { - super.setUp + super.setUp() servers = Seq.empty[KafkaServer] } @After override def tearDown(): Unit = { TestUtils.shutdownServers(servers) - super.tearDown + super.tearDown() } @Test @@ -297,7 +297,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { "failed to get expected partition state after partition reassignment") TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, "failed to get updated partition assignment on topic znode after partition reassignment") - TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(), + TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, "failed to remove reassign partitions path after completion") val updatedTimerCount = timer(metricName).count @@ -319,7 +319,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion) waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, "failed to get expected partition state during partition reassignment with offline replica") - TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(), + TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress, "partition reassignment path should remain while reassignment in progress") } @@ -342,7 +342,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { "failed to get expected partition state after partition reassignment") TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, "failed to get updated partition assignment on topic znode after partition reassignment") - TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(), + TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, "failed to remove reassign partitions path after completion") } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index a16aef7ea14a1..72de4a1fb60a2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]() def setUpCallback(member: GroupMember): C = { - val responsePromise = Promise[R] + val responsePromise = Promise[R]() val responseFuture = responsePromise.future responseFutures.put(member, responseFuture) responseCallback(responsePromise) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 33ee73052f1b4..d8c4ce97867b0 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -3745,35 +3745,35 @@ class GroupCoordinatorTest { } private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = { - val responsePromise = Promise[JoinGroupResult] + val responsePromise = Promise[JoinGroupResult]() val responseFuture = responsePromise.future val responseCallback: JoinGroupCallback = responsePromise.success (responseFuture, responseCallback) } private def setupSyncGroupCallback: (Future[SyncGroupResult], SyncGroupCallback) = { - val responsePromise = Promise[SyncGroupResult] + val responsePromise = Promise[SyncGroupResult]() val responseFuture = responsePromise.future val responseCallback: SyncGroupCallback = responsePromise.success (responseFuture, responseCallback) } private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { - val responsePromise = Promise[HeartbeatCallbackParams] + val responsePromise = Promise[HeartbeatCallbackParams]() val responseFuture = responsePromise.future val responseCallback: HeartbeatCallback = error => responsePromise.success(error) (responseFuture, responseCallback) } private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = { - val responsePromise = Promise[CommitOffsetCallbackParams] + val responsePromise = Promise[CommitOffsetCallbackParams]() val responseFuture = responsePromise.future val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets) (responseFuture, responseCallback) } private def setupLeaveGroupCallback: (Future[LeaveGroupResult], LeaveGroupCallback) = { - val responsePromise = Promise[LeaveGroupResult] + val responsePromise = Promise[LeaveGroupResult]() val responseFuture = responsePromise.future val responseCallback: LeaveGroupCallback = result => responsePromise.success(result) (responseFuture, responseCallback) diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala index 066725cea3d9d..003a2c4b48a1b 100644 --- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -62,7 +62,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with metricName.foreach(KafkaYammerMetrics.defaultRegistry.removeMetric) } - super.setUp + super.setUp() } /* diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala index 8ea5af789e8f6..a1b2386163af7 100644 --- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala @@ -31,7 +31,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness { @Test def testDefaultKafkaConfig(): Unit = { - assert(servers.head.getLogManager().initialDefaultConfig.minInSyncReplicas == 5) + assert(servers.head.getLogManager.initialDefaultConfig.minInSyncReplicas == 5) } } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 2f16abaaa28c9..ca59380fa7e23 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -57,7 +57,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] val random = new Random() - val topic = "topic" + random.nextLong + val topic = "topic" + random.nextLong() val partitionId = 0 val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) @@ -112,7 +112,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) - verifyUncleanLeaderElectionEnabled + verifyUncleanLeaderElectionEnabled() } @Test @@ -123,7 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) - verifyUncleanLeaderElectionDisabled + verifyUncleanLeaderElectionDisabled() } @Test @@ -138,7 +138,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { topicProps.put("unclean.leader.election.enable", "true") TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) - verifyUncleanLeaderElectionEnabled + verifyUncleanLeaderElectionEnabled() } @Test @@ -153,7 +153,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { topicProps.put("unclean.leader.election.enable", "false") TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) - verifyUncleanLeaderElectionDisabled + verifyUncleanLeaderElectionDisabled() } @Test @@ -277,7 +277,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) // Don't rely on coordinator as it may be down when this method is called val consumer = TestUtils.createConsumer(brokerList, - groupId = "group" + random.nextLong, + groupId = "group" + random.nextLong(), enableAutoCommit = false, valueDeserializer = new StringDeserializer) try { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index a2b4b0875d984..548b7b6f06bad 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -182,7 +182,7 @@ class LogCleanerTest { cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) assertTrue("Cleaned segment file should be trimmed to its real size.", - log.logSegments.iterator.next.log.channel().size() < originalMaxFileSize) + log.logSegments.iterator.next().log.channel.size < originalMaxFileSize) } @Test diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index d0c44d0243c03..d6342f346b25e 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -334,7 +334,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val acls1 = Set[Acl](acl2) simpleAclAuthorizer.addAcls(acls1, resource1) - zkClient.deleteAclChangeNotifications + zkClient.deleteAclChangeNotifications() val authorizer = new SimpleAclAuthorizer try { authorizer.configure(config.originals) diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala index 89d2095e6edbb..163b1fb366c76 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala @@ -362,7 +362,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { val acls1 = Set(acl2) addAcls(aclAuthorizer, acls1, resource1) - zkClient.deleteAclChangeNotifications + zkClient.deleteAclChangeNotifications() val authorizer = new AclAuthorizer try { authorizer.configure(config.originals) @@ -1087,9 +1087,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { op != AclOperation.ANY && op != AclOperation.UNKNOWN } - private def prepareDefaultConfig(): String = { + private def prepareDefaultConfig: String = prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere")) - } private def prepareConfig(lines : Array[String]): String = { val file = File.createTempFile("kafkatest", ".properties") diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala index 7bffba410b3bf..038d2927aa05b 100644 --- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala +++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala @@ -99,7 +99,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { def testCreateToken(): Unit = { val config = KafkaConfig.fromProps(props) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack) val issueTime = time.milliseconds @@ -116,7 +116,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { def testRenewToken(): Unit = { val config = KafkaConfig.fromProps(props) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack) val issueTime = time.milliseconds @@ -164,7 +164,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { def testExpireToken(): Unit = { val config = KafkaConfig.fromProps(props) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack) val issueTime = time.milliseconds @@ -199,7 +199,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { def testRemoveTokenHmac():Unit = { val config = KafkaConfig.fromProps(props) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack) val issueTime = time.milliseconds @@ -240,7 +240,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1")) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() //create tokens tokenManager.createToken(owner1, List(renewer1, renewer2), 1 * 60 * 60 * 1000L, createTokenResultCallBack) @@ -253,7 +253,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { tokenManager.createToken(owner4, List(owner1, renewer4), 2 * 60 * 60 * 1000L, createTokenResultCallBack) - assert(tokenManager.getAllTokenInformation().size == 4 ) + assert(tokenManager.getAllTokenInformation.size == 4 ) //get tokens non-exiting owner var tokens = getTokens(tokenManager, aclAuthorizer, hostSession, owner1, List(SecurityUtils.parseKafkaPrincipal("User:unknown"))) @@ -330,18 +330,18 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness { def testPeriodicTokenExpiry(): Unit = { val config = KafkaConfig.fromProps(props) val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient) - tokenManager.startup + tokenManager.startup() //create tokens tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack) tokenManager.createToken(owner, renewer, 1 * 60 * 60 * 1000L, createTokenResultCallBack) tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack) tokenManager.createToken(owner, renewer, 2 * 60 * 60 * 1000L, createTokenResultCallBack) - assert(tokenManager.getAllTokenInformation().size == 4 ) + assert(tokenManager.getAllTokenInformation.size == 4 ) time.sleep(2 * 60 * 60 * 1000L) tokenManager.expireTokens() - assert(tokenManager.getAllTokenInformation().size == 2 ) + assert(tokenManager.getAllTokenInformation.size == 2 ) } diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala index 3a8c34cdd00b9..a5025351e16a7 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala @@ -36,7 +36,7 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { super.setUp() } - def createAdminConfig():util.Map[String, Object] = { + def createAdminConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) val securityProps: util.Map[Object, Object] = diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala index 47b677122564c..96115d6adfc2b 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala @@ -56,7 +56,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { props.map(KafkaConfig.fromProps) } - private def createAdminConfig():util.Map[String, Object] = { + private def createAdminConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) val securityProps: util.Map[Object, Object] = diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala index 70ed789f64938..12c2138fbc4f7 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala @@ -44,7 +44,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest super.setUp() } - def createAdminConfig():util.Map[String, Object] = { + def createAdminConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) val securityProps: util.Map[Object, Object] = diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index c745c92f4b6ef..d8ecd787bb815 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -195,7 +195,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testConfigChangeOnNonExistingTopic(): Unit = { - val topic = TestUtils.tempTopic + val topic = TestUtils.tempTopic() try { val logProps = new Properties() logProps.put(FlushMessagesProp, 10000: java.lang.Integer) @@ -208,7 +208,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = { - val topic = TestUtils.tempTopic + val topic = TestUtils.tempTopic() val admin = createAdminClient() try { val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic) diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index 43cff1eaf6fec..c876b9056d2fc 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -52,7 +52,7 @@ class FetchSessionTest { assertEquals(sessionIds.size, cache.size) } - private def dummyCreate(size: Int)() = { + private def dummyCreate(size: Int): FetchSession.CACHE_MAP = { val cacheMap = new FetchSession.CACHE_MAP(size) for (i <- 0 until size) { cacheMap.add(new CachedPartition("test", i)) diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala index 8169c691e1163..e4bba50f41276 100644 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala @@ -191,7 +191,7 @@ class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness { */ @Test def testNotificationFailureDueToFeatureIncompatibility(): Unit = { - createSupportedFeatures + createSupportedFeatures() val initialFinalizedFeatures = createFinalizedFeatures() val listener = createListener(Some(initialFinalizedFeatures)) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index fed489854c270..153a2e2fc5e71 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -60,7 +60,7 @@ class HighwatermarkPersistenceTest { // create kafka scheduler val scheduler = new KafkaScheduler(2) - scheduler.startup + scheduler.startup() val metrics = new Metrics val time = new MockTime // create replica manager @@ -110,7 +110,7 @@ class HighwatermarkPersistenceTest { EasyMock.replay(zkClient) // create kafka scheduler val scheduler = new KafkaScheduler(2) - scheduler.startup + scheduler.startup() val metrics = new Metrics val time = new MockTime // create replica manager @@ -178,7 +178,7 @@ class HighwatermarkPersistenceTest { } private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse( + replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrElse( new TopicPartition(topic, partition), 0L) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index cda4d49f5fc87..52edc71320e06 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -575,23 +575,23 @@ class KafkaConfigTest { @Test def testFromPropsInvalid(): Unit = { - def getBaseProperties(): Properties = { + def baseProperties: Properties = { val validRequiredProperties = new Properties() validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") validRequiredProperties } // to ensure a basis is valid - bootstraps all needed validation - KafkaConfig.fromProps(getBaseProperties()) + KafkaConfig.fromProps(baseProperties) - KafkaConfig.configNames().foreach(name => { + KafkaConfig.configNames.foreach { name => name match { case KafkaConfig.ZkConnectProp => // ignore string - case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") - case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") + case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") case KafkaConfig.ZkClientCnxnSocketProp => //ignore string case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string @@ -603,115 +603,115 @@ class KafkaConfigTest { case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string case KafkaConfig.ZkSslCipherSuitesProp => //ignore string case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string - case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") - case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") - - case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.NumReplicaAlterLogDirsThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + + case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.NumReplicaAlterLogDirsThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.AuthorizerClassNameProp => //ignore string case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string - case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.PortProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.HostNameProp => // ignore string case KafkaConfig.AdvertisedHostNameProp => //ignore string - case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.MaxConnectionsPerIpOverridesProp => - assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number") - case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.FailedAuthenticationDelayMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") + assertPropertyInvalid(baseProperties, name, "127.0.0.1:not_a_number") + case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.FailedAuthenticationDelayMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") - case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.LogDirsProp => // ignore string case KafkaConfig.LogDirProp => // ignore string - case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Records.LOG_OVERHEAD - 1) - - case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - - case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - - case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0") - case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024") - case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") - case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3") - case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", Records.LOG_OVERHEAD - 1) + + case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + + case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + + case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(baseProperties, name, "unknown_policy", "0") + case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "1024") + case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "3") + case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") + case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2") + case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.ReplicaSelectorClassProp => // Ignore string - case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.GroupInitialRebalanceDelayMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.GroupMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-1") - case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") - case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - case KafkaConfig.TransactionalIdExpirationMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsMaxTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsTopicMinISRProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0", "-2") - case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - - case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") - case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") + case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") + case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") + case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.GroupInitialRebalanceDelayMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.GroupMaxSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-1") + case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") + case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2") + case KafkaConfig.TransactionalIdExpirationMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsMaxTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsTopicMinISRProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsLoadBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") + case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") + + case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") case KafkaConfig.MetricReporterClassesProp => // ignore string case KafkaConfig.MetricRecordingLevelProp => // ignore string case KafkaConfig.RackProp => // ignore string @@ -763,33 +763,33 @@ class KafkaConfigTest { case KafkaConfig.PasswordEncoderOldSecretProp => case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp => case KafkaConfig.PasswordEncoderCipherAlgorithmProp => - case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") - case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") //delegation token configs case KafkaConfig.DelegationTokenMasterKeyProp => // ignore - case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") //Kafka Yammer metrics reporter configs case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore - case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") + case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") } - }) + } } @Test def testDynamicLogConfigs(): Unit = { - def getBaseProperties(): Properties = { + def baseProperties: Properties = { val validRequiredProperties = new Properties() validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") validRequiredProperties } - val props = getBaseProperties() + val props = baseProperties val config = KafkaConfig.fromProps(props) def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cd76bca0831d9..484e1637c70d8 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -110,9 +110,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness { "Failed to update high watermark for follower after timeout") servers.foreach(_.replicaManager.checkpointHighWatermarks()) - val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L) + val leaderHW = hwFile1.read().getOrElse(topicPartition, 0L) assertEquals(numMessages, leaderHW) - val followerHW = hwFile2.read.getOrElse(topicPartition, 0L) + val followerHW = hwFile2.read().getOrElse(topicPartition, 0L) assertEquals(numMessages, followerHW) } @@ -120,7 +120,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { def testHWCheckpointWithFailuresSingleLogSegment(): Unit = { var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - assertEquals(0L, hwFile1.read.getOrElse(topicPartition, 0L)) + assertEquals(0L, hwFile1.read().getOrElse(topicPartition, 0L)) sendMessages(1) Thread.sleep(1000) @@ -128,7 +128,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { // kill the server hosting the preferred replica server1.shutdown() - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) // check if leader moves to the other server leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) @@ -143,7 +143,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertTrue("Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0", leader == 0 || leader == 1) - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) /** We plan to shutdown server2 and transfer the leadership to server1. * With unclean leader election turned off, a prerequisite for the successful leadership transition * is that server1 has caught up on the topicPartition, and has joined the ISR. @@ -155,7 +155,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet server2.shutdown() - assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) server2.startup() updateProducer() @@ -172,8 +172,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(_.shutdown()) - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) - assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) } @Test @@ -186,9 +186,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness { "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(_.shutdown()) - val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L) + val leaderHW = hwFile1.read().getOrElse(topicPartition, 0L) assertEquals(hw, leaderHW) - val followerHW = hwFile2.read.getOrElse(topicPartition, 0L) + val followerHW = hwFile2.read().getOrElse(topicPartition, 0L) assertEquals(hw, followerHW) } @@ -206,8 +206,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) - assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) server2.startup() updateProducer() @@ -215,14 +215,14 @@ class LogRecoveryTest extends ZooKeeperTestHarness { leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) assertEquals("Leader must move to broker 1", 1, leader) - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) // bring the preferred replica back server1.startup() updateProducer() - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) - assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) sendMessages(2) hw += 2 @@ -236,8 +236,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(_.shutdown()) - assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) - assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) + assertEquals(hw, hwFile2.read().getOrElse(topicPartition, 0L)) } private def sendMessages(n: Int): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index c2f9033ace6a9..f1b6a9aa56183 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -72,7 +72,7 @@ class ReplicaAlterLogDirsThreadTest { val addedPartitions = thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L))) assertEquals(Set.empty, addedPartitions) - assertEquals(0, thread.partitionCount()) + assertEquals(0, thread.partitionCount) assertEquals(None, thread.fetchState(t1p0)) } @@ -132,18 +132,18 @@ class ReplicaAlterLogDirsThreadTest { // Initially we add the partition with an older epoch which results in an error thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch - 1))) assertTrue(thread.fetchState(t1p0).isDefined) - assertEquals(1, thread.partitionCount()) + assertEquals(1, thread.partitionCount) thread.doWork() assertTrue(failedPartitions.contains(t1p0)) assertEquals(None, thread.fetchState(t1p0)) - assertEquals(0, thread.partitionCount()) + assertEquals(0, thread.partitionCount) // Next we update the epoch and assert that we can continue thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch))) assertEquals(Some(leaderEpoch), thread.fetchState(t1p0).map(_.currentLeaderEpoch)) - assertEquals(1, thread.partitionCount()) + assertEquals(1, thread.partitionCount) val requestData = new FetchRequest.PartitionData(0L, 0L, config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) @@ -162,7 +162,7 @@ class ReplicaAlterLogDirsThreadTest { assertFalse(failedPartitions.contains(t1p0)) assertEquals(None, thread.fetchState(t1p0)) - assertEquals(0, thread.partitionCount()) + assertEquals(0, thread.partitionCount) } @Test @@ -220,12 +220,12 @@ class ReplicaAlterLogDirsThreadTest { thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch))) assertTrue(thread.fetchState(t1p0).isDefined) - assertEquals(1, thread.partitionCount()) + assertEquals(1, thread.partitionCount) thread.doWork() assertEquals(None, thread.fetchState(t1p0)) - assertEquals(0, thread.partitionCount()) + assertEquals(0, thread.partitionCount) } private def mockFetchFromCurrentLog(topicPartition: TopicPartition, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index f6f7ff129823e..a3a83bc8633f9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -69,9 +69,9 @@ class ReplicaFetchTest extends ZooKeeperTestHarness { var result = true for (topic <- List(topic1, topic2)) { val tp = new TopicPartition(topic, partition) - val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset + val expectedOffset = brokers.head.getLogManager.getLog(tp).get.logEndOffset result = result && expectedOffset > 0 && brokers.forall { item => - expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset + expectedOffset == item.getLogManager.getLog(tp).get.logEndOffset } } result diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 24b33e8a61835..e8518ec229e54 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -134,7 +134,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { //Check that throttled config correctly migrated to the new brokers (106 to 107).foreach { brokerId => - assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound()) + assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound) } if (!leaderThrottle) { (0 to 2).foreach { partition => assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(tp(partition))) } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 6bf49e3f54871..436dc9e85d69c 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -118,7 +118,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { producer.close() server.shutdown() CoreUtils.delete(server.config.logDirs) - verifyNonDaemonThreadsStatus + verifyNonDaemonThreadsStatus() } @Test @@ -131,7 +131,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { server.shutdown() server.awaitShutdown() CoreUtils.delete(server.config.logDirs) - verifyNonDaemonThreadsStatus + verifyNonDaemonThreadsStatus() } @Test @@ -177,7 +177,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { server.awaitShutdown() } CoreUtils.delete(server.config.logDirs) - verifyNonDaemonThreadsStatus + verifyNonDaemonThreadsStatus() } private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = { diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index 3d636cda9fd0a..cc8eb097a9831 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -86,8 +86,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness producer.send(new ProducerRecord(topic, 0, null, msg)).get //The message should have epoch 0 stamped onto it in both leader and follower - assertEquals(0, latestRecord(leader).partitionLeaderEpoch()) - assertEquals(0, latestRecord(follower).partitionLeaderEpoch()) + assertEquals(0, latestRecord(leader).partitionLeaderEpoch) + assertEquals(0, latestRecord(follower).partitionLeaderEpoch) //Both leader and follower should have recorded Epoch 0 at Offset 0 assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries) @@ -452,16 +452,16 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1) } - private def leader(): KafkaServer = { + private def leader: KafkaServer = { assertEquals(2, brokers.size) val leaderId = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)).get - brokers.filter(_.config.brokerId == leaderId)(0) + brokers.filter(_.config.brokerId == leaderId).head } - private def follower(): KafkaServer = { + private def follower: KafkaServer = { assertEquals(2, brokers.size) val leader = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)).get - brokers.filter(_.config.brokerId != leader)(0) + brokers.filter(_.config.brokerId != leader).head } private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = { diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index 8b35149587db4..29da8d5eb8495 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -239,9 +239,9 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { var result = true for (topic <- List(topic1, topic2)) { val tp = new TopicPartition(topic, 0) - val leo = broker.getLogManager().getLog(tp).get.logEndOffset + val leo = broker.getLogManager.getLog(tp).get.logEndOffset result = result && leo > 0 && brokers.forall { broker => - broker.getLogManager().getLog(tp).get.logSegments.iterator.forall { segment => + broker.getLogManager.getLog(tp).get.logSegments.iterator.forall { segment => if (segment.read(minOffset, Integer.MAX_VALUE) == null) { false } else { diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 7bfbeab028e99..0f166080a051a 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -115,7 +115,7 @@ class DumpLogSegmentsTest { // only increment the offset if it's not a batch if (isBatch(index)) { assertTrue(s"Not a valid batch-level message record: $line", line.startsWith(s"baseOffset: $offset lastOffset: ")) - batch = batchIterator.next + batch = batchIterator.next() } else { assertTrue(s"Not a valid message record: $line", line.startsWith(s"${DumpLogSegments.RecordIndent} offset: $offset")) if (checkKeysAndValues) { diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index cfbbe0f01a4c4..de3dfd7a03add 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -89,7 +89,7 @@ class MockScheduler(val time: Time) extends Scheduler { private def poll(predicate: MockTask => Boolean): Option[MockTask] = { this synchronized { if (tasks.nonEmpty && predicate.apply(tasks.head)) - Some(tasks.dequeue) + Some(tasks.dequeue()) else None } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 9123967224c7d..76169b9ea23c3 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -467,14 +467,14 @@ object TestUtils extends Logging { var length = 0 while(expected.hasNext && actual.hasNext) { length += 1 - assertEquals(expected.next, actual.next) + assertEquals(expected.next(), actual.next()) } // check if the expected iterator is longer if (expected.hasNext) { var length1 = length while (expected.hasNext) { - expected.next + expected.next() length1 += 1 } assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true) @@ -484,7 +484,7 @@ object TestUtils extends Logging { if (actual.hasNext) { var length2 = length while (actual.hasNext) { - actual.next + actual.next() length2 += 1 } assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true) @@ -498,8 +498,8 @@ object TestUtils extends Logging { def checkLength[T](s1: Iterator[T], expectedLength:Int): Unit = { var n = 0 while (s1.hasNext) { - n+=1 - s1.next + n += 1 + s1.next() } assertEquals(expectedLength, n) } @@ -524,7 +524,7 @@ object TestUtils extends Logging { while (true) { if (cur == null) { if (topIterator.hasNext) - cur = topIterator.next + cur = topIterator.next() else return false } @@ -536,7 +536,7 @@ object TestUtils extends Logging { throw new RuntimeException("should not reach here") } - def next() : T = cur.next + def next() : T = cur.next() } } diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index 02a0403db3ca5..6d7f9de01ced3 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -188,7 +188,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware assertEquals(props, savedProps) } - TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(() => createTopic, () => createTopic), + TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(() => createTopic(), () => createTopic()), JTestUtils.DEFAULT_MAX_WAIT_MS.toInt) } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 10157f47953b5..67445004c0f8c 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -550,7 +550,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { }) // create acl paths - zkClient.createAclPaths + zkClient.createAclPaths() ZkAclStore.stores.foreach(store => { assertTrue(zkClient.pathExists(store.aclPath)) @@ -1184,7 +1184,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testClusterIdMethods(): Unit = { - val clusterId = CoreUtils.generateUuidAsBase64 + val clusterId = CoreUtils.generateUuidAsBase64() zkClient.createOrGetClusterId(clusterId) assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster id found"))) @@ -1193,7 +1193,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testBrokerSequenceIdMethods(): Unit = { val sequenceId = zkClient.generateBrokerSequenceId() - assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId) + assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId()) } @Test @@ -1235,7 +1235,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertFalse(zkClient.pathExists(DelegationTokensZNode.path)) assertFalse(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path)) - zkClient.createDelegationTokenPaths + zkClient.createDelegationTokenPaths() assertTrue(zkClient.pathExists(DelegationTokensZNode.path)) assertTrue(zkClient.pathExists(DelegationTokenChangeNotificationZNode.path)) diff --git a/gradle.properties b/gradle.properties index 4f41e8a44245f..613a8fa50dbaa 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,6 +21,6 @@ group=org.apache.kafka # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py version=2.7.0-SNAPSHOT -scalaVersion=2.13.2 +scalaVersion=2.13.3 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 57f95e3591b38..6c861fc15a547 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -28,7 +28,7 @@ ext { // Add Scala version def defaultScala212Version = '2.12.11' -def defaultScala213Version = '2.13.2' +def defaultScala213Version = '2.13.3' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { versions["scala"] = defaultScala212Version diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index 3a51c93c6597f..3871d3b77bddd 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -165,7 +165,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { .stream[String, String](sourceTopic) .groupByKey .windowedBy(window) - .count + .count() .suppress(suppression) table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) @@ -222,7 +222,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { .stream[String, String](sourceTopic) .groupByKey .windowedBy(window) - .count + .count() .suppress(suppression) table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) @@ -280,7 +280,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { .stream[String, String](sourceTopic) .groupByKey .windowedBy(window) - .count + .count() .suppress(suppression) table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) @@ -348,7 +348,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val table: KTable[String, Long] = builder .stream[String, String](sourceTopic) .groupByKey - .count + .count() .suppress(suppression) table.toStream.to(sinkTopic)