Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object AclCommand {

val resourceToAcls: Iterable[(Resource, Set[Acl])] =
if (resources.isEmpty) authorizer.getAcls()
else resources.map(resource => (resource -> authorizer.getAcls(resource)))
else resources.map(resource => resource -> authorizer.getAcls(resource))

for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AdminClient(val time: Time,
client.poll(future)

if (future.succeeded())
return future.value().responseBody()
future.value().responseBody()
else
throw future.exception()
}
Expand All @@ -61,10 +61,10 @@ class AdminClient(val time: Time,
return send(broker, api, request)
} catch {
case e: Exception =>
debug(s"Request ${api} failed against node ${broker}", e)
debug(s"Request $api failed against node $broker", e)
}
}
throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
}

private def findCoordinator(groupId: String): Node = {
Expand All @@ -88,7 +88,7 @@ class AdminClient(val time: Time,
val response = new MetadataResponse(responseBody)
val errors = response.errors()
if (!errors.isEmpty)
debug(s"Metadata request contained errors: ${errors}")
debug(s"Metadata request contained errors: $errors")
response.cluster().nodes().asScala.toList
}

Expand All @@ -100,7 +100,7 @@ class AdminClient(val time: Time,
listGroups(broker)
} catch {
case e: Exception =>
debug(s"Failed to find groups from broker ${broker}", e)
debug(s"Failed to find groups from broker $broker", e)
List[GroupOverview]()
}
}
Expand All @@ -127,7 +127,7 @@ class AdminClient(val time: Time,
val response = new DescribeGroupsResponse(responseBody)
val metadata = response.groups().get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")

Errors.forCode(metadata.errorCode()).maybeThrow()
val members = metadata.members().map { member =>
Expand All @@ -149,7 +149,7 @@ class AdminClient(val time: Time,
return List.empty[ConsumerSummary]

if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group")

if (group.state == "Stable") {
group.members.map { member =>
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ object AdminUtils extends Logging {
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
if (existingPartitionsReplicaList.isEmpty)
throw new AdminOperationException("The topic %s does not exist".format(topic))

val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
Expand All @@ -274,8 +274,8 @@ object AdminUtils extends Logging {
existingPartitionsReplicaList.size, checkBrokerAvailable)

// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
if (unmatchedRepFactorList.size != 0)
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size)
if (unmatchedRepFactorList.nonEmpty)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)

Expand All @@ -291,9 +291,9 @@ object AdminUtils extends Logging {
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
for (i <- partitionList.indices) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
if (brokerList.isEmpty)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
Expand Down Expand Up @@ -443,7 +443,7 @@ object AdminUtils extends Logging {
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))

if (!update) {
info("Topic creation " + jsonPartitionData.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging {
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
(TopicAndPartition(topic, partition) -> replicas)
TopicAndPartition(topic, partition) -> replicas
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object TopicCommand extends Logging {
def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
if (topics.length == 0 && !ifExists) {
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
Expand Down Expand Up @@ -165,7 +165,7 @@ object TopicCommand extends Logging {
def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
if (topics.length == 0 && !ifExists) {
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package kafka.admin

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.apache.log4j.Level
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._
Expand Down Expand Up @@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging {
if (options.has(helpOpt))
CommandLineUtils.printUsageAndDie(parser, usageMessage)

if ((jaasFile == null)) {
val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " +
"the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
if (jaasFile == null) {
val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " +
"the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.api

import java.nio.ByteBuffer

import kafka.common.{TopicAndPartition}
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/FileMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
this(file,
channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
start = 0,
end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
isSlice = false)

/**
Expand Down Expand Up @@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}

if (sizeInBytes > 0 && newMessages.size == 0) {
if (sizeInBytes > 0 && newMessages.isEmpty) {
// This indicates that the message is too large. We just return all the bytes in the file message set.
this
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.Utils

object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}

/**
Expand Down Expand Up @@ -228,7 +228,7 @@ class Log(val dir: File,
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}

if(logSegments.size == 0) {
if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int,
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(!segs.isEmpty) {
while(segs.nonEmpty) {
var group = List(segs.head)
var logSize = segs.head.size
var indexSize = segs.head.index.sizeInBytes
segs = segs.tail
while(!segs.isEmpty &&
while(segs.nonEmpty &&
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
LogToClean(topicAndPartition, log, firstDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs

this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File],
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception => {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
}

val jobsForDir = for {
Expand Down Expand Up @@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File],
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
if (needToStopCleaner && cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/log/OffsetIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.locks._

import kafka.utils._
import kafka.utils.CoreUtils.inLock
import kafka.common.InvalidOffsetException
import sun.nio.ch.DirectBuffer

/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
Expand Down Expand Up @@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
*/
private def forceUnmap(m: MappedByteBuffer) {
try {
if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
(m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
m match {
case buffer: DirectBuffer => buffer.cleaner().clean()
case _ =>
}
} catch {
case t: Throwable => warn("Error when freeing index buffer", t)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object ConsoleProducer {
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes)
else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object GetOffsetShell {
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/JmxTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object JmxTool extends Logging {
else
List(null)

val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])

val numExpectedAttributes: Map[ObjectName, Int] =
attributesWhitelistExists match {
Expand All @@ -101,7 +101,7 @@ object JmxTool extends Logging {

// print csv header
val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
if(keys.size == numExpectedAttributes.values.sum + 1)
println(keys.map("\"" + _ + "\"").mkString(","))

while(true) {
Expand All @@ -111,7 +111,7 @@ object JmxTool extends Logging {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
println(keys.map(attributes(_)).mkString(","))
val sleep = max(0, interval - (System.currentTimeMillis - start))
Thread.sleep(sleep)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// Creating one stream per each connector instance
val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(streams.size == 1)
val stream = streams(0)
val stream = streams.head
iter = stream.iterator()
}

Expand Down