Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.2
SCALA_VERSION=2.13.3
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.2
set SCALA_VERSION=2.13.3
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ subprojects {
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-override",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
Expand Down Expand Up @@ -503,6 +502,7 @@ subprojects {
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:nullary-override",
"-Xlint:unsound-match"
]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object Kafka extends Logging {
}

// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ object ConfigCommand extends Config {
}
}

val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None))
val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next())) else None))
ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
}

Expand Down Expand Up @@ -711,12 +711,12 @@ object ConfigCommand extends Config {
(userDefaults, ConfigType.User),
(brokerDefaults, ConfigType.Broker))

private[admin] def entityTypes(): List[String] = {
private[admin] def entityTypes: List[String] = {
options.valuesOf(entityType).asScala.toList ++
(entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2)
}

private[admin] def entityNames(): List[String] = {
private[admin] def entityNames: List[String] = {
val namesIterator = options.valuesOf(entityName).iterator
options.specs.asScala
.filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.apache.kafka.common.requests.ListOffsetResponse
import org.apache.kafka.common.ConsumerGroupState
import joptsimple.OptionException

import scala.annotation.nowarn

object ConsumerGroupCommand extends Logging {

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -566,6 +568,7 @@ object ConsumerGroupCommand extends Logging {
/**
* Returns the state of the specified consumer group and partition assignment states
*/
@nowarn("cat=optimizer")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New warning flagged by the compiler.

def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.map(_.topic).toSet
case None =>
zkClient.getAllPartitions().map(_.topic)
zkClient.getAllPartitions.map(_.topic)
}

val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
Expand All @@ -190,7 +190,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.partition(partitionsFromZk.contains)
case None =>
(zkClient.getAllPartitions(), Set.empty)
(zkClient.getAllPartitions, Set.empty)
}
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ object ReassignPartitionsCommand extends Logging {
// Check for the presence of the legacy partition reassignment ZNode. This actually
// won't detect all rebalances... only ones initiated by the legacy method.
// This is a limitation of the legacy ZK API.
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
if (reassignPartitionsInProgress) {
// Note: older versions of this tool would modify the broker quotas here (but not
// topic quotas, for some reason). This behavior wasn't documented in the --execute
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,16 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}

private def setAclIndividually(path: String): Unit = {
val setPromise = Promise[String]
val setPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
}
setAcl(path, setPromise)
}

private def setAclsRecursively(path: String): Unit = {
val setPromise = Promise[String]
val childrenPromise = Promise[String]
val setPromise = Promise[String]()
val childrenPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
futures += childrenPromise.future
Expand Down Expand Up @@ -279,15 +279,15 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
future match {
case Some(a) =>
Await.result(a, 6000 millis)
futures.synchronized { futures.dequeue }
recurse
futures.synchronized { futures.dequeue() }
recurse()
case None =>
}
}
recurse()

} finally {
zkClient.close
zkClient.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
}

class ChangeNotification {
def process(): Unit = processNotifications
def process(): Unit = processNotifications()
}

/**
Expand All @@ -143,17 +143,17 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = queue.take().process
override def doWork(): Unit = queue.take().process()
}

object ChangeNotificationHandler extends ZNodeChildChangeHandler {
override val path: String = seqNodeRoot
override def handleChildChange(): Unit = addChangeNotification
override def handleChildChange(): Unit = addChangeNotification()
}

object ZkStateChangeHandler extends StateChangeHandler {
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
override def afterInitializingSession(): Unit = addChangeNotification
override def afterInitializingSession(): Unit = addChangeNotification()
}
}

18 changes: 6 additions & 12 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,13 @@ class ControllerContext {
partitionLeadershipInfo.get(partition)
}

def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
def partitionsLeadershipInfo: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
partitionLeadershipInfo
}

def partitionsWithLeaders(): Set[TopicPartition] = {
partitionLeadershipInfo.keys.filter(tp => !isTopicQueuedUpForDeletion(tp.topic)).toSet
}
def partitionsWithLeaders: Set[TopicPartition] =
partitionLeadershipInfo.keySet.filter(tp => !isTopicQueuedUpForDeletion(tp.topic))

def partitionsWithOfflineLeader(): Set[TopicPartition] = {
def partitionsWithOfflineLeader: Set[TopicPartition] = {
partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) =>
!isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) &&
!isTopicQueuedUpForDeletion(topicPartition.topic)
Expand All @@ -439,13 +437,9 @@ class ControllerContext {
}.keySet
}

def clearPartitionLeadershipInfo(): Unit = {
partitionLeadershipInfo.clear()
}
def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()

def partitionWithLeadersCount(): Int = {
partitionLeadershipInfo.size
}
def partitionWithLeadersCount: Int = partitionLeadershipInfo.size

private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
oldReplicaAssignment: Option[ReplicaAssignment],
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class KafkaController(val config: KafkaConfig,
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
fun = () => tokenManager.expireTokens(),
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -439,7 +439,7 @@ class KafkaController(val config: KafkaConfig,
val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))

val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader()
val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader

// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)
Expand Down Expand Up @@ -931,10 +931,10 @@ class KafkaController(val config: KafkaConfig,
* @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed
*/
private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = {
if (!zkClient.reassignPartitionsInProgress())
if (!zkClient.reassignPartitionsInProgress)
return

val reassigningPartitions = zkClient.getPartitionReassignment()
val reassigningPartitions = zkClient.getPartitionReassignment
val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) =>
shouldRemoveReassignment(tp, replicas)
}
Expand Down Expand Up @@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class GroupMetadataManager(brokerId: Int,
scheduler.startup()
if (enableMetadataExpiration) {
scheduler.schedule(name = "delete-expired-group-metadata",
fun = () => cleanupGroupMetadata,
fun = () => cleanupGroupMetadata(),
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -752,7 +752,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())

def removeGroupsAndOffsets(): Unit = {
var numOffsetsRemoved = 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File,
var truncated = false

while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
val segment = unflushed.next()
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
Expand Down Expand Up @@ -2270,7 +2270,7 @@ class Log(@volatile private var _dir: File,

if (asyncDelete) {
info(s"Scheduling segments for deletion ${segments.mkString(",")}")
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
} else {
deleteSegments()
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],

var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e)
Expand All @@ -327,7 +327,7 @@ class LogManager(logDirs: Seq[File],

var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ class LogManager(logDirs: Seq[File],
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
log.flush()
} catch {
case e: Throwable =>
error(s"Error flushing topic ${topicPartition.topic}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object AclAuthorizer {
def find(p: AclEntry => Boolean): Option[AclEntry] = {
// Lazily iterate through the inner `Seq` elements and stop as soon as we find a match
val it = seqs.iterator.flatMap(_.find(p))
if (it.hasNext) Some(it.next)
if (it.hasNext) Some(it.next())
else None
}

Expand Down Expand Up @@ -367,7 +367,8 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}

@nowarn("cat=deprecation&cat=optimizer")
@nowarn("cat=deprecation")
@nowarn("cat=optimizer")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of & was changed so that both conditions have to evaluate to true for a warning to match the exclusion.

private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes

Expand Down Expand Up @@ -523,7 +524,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}

if(!writeComplete)
if (!writeComplete)
throw new IllegalStateException(s"Failed to update ACLs for $resource after trying a maximum of $maxUpdateRetries times")

if (newVersionedAcls.acls != currentVersionedAcls.acls) {
Expand All @@ -538,6 +539,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}

@nowarn("cat=optimizer")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New warning flagged by the compiler.

private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
}
Expand All @@ -548,9 +550,9 @@ class AclAuthorizer extends Authorizer with Logging {

private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
if (versionedAcls.acls.nonEmpty) {
aclCache = aclCache + (resource -> versionedAcls)
aclCache = aclCache.updated(resource, versionedAcls)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids the tuple creation.

} else {
aclCache = aclCache - resource
aclCache -= resource
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ abstract class AbstractFetcherThread(name: String,
} finally partitionMapLock.unlock()
}

def partitionCount(): Int = {
def partitionCount: Int = {
partitionMapLock.lockInterruptibly()
try partitionStates.size
finally partitionMapLock.unlock()
Expand Down
Loading