From 1e0b5411ddf98e9bda9751fd0281f9116770c6cc Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 27 May 2016 17:09:42 -0700 Subject: [PATCH 1/4] [KAFKA-3765] Minor Code style corrections --- core/src/main/scala/kafka/admin/AclCommand.scala | 8 ++++---- core/src/main/scala/kafka/admin/AdminClient.scala | 14 +++++++------- core/src/main/scala/kafka/admin/AdminUtils.scala | 4 ++-- .../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +- .../kafka/admin/ReassignPartitionsCommand.scala | 2 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 14 ++++++-------- .../kafka/api/ControlledShutdownRequest.scala | 2 +- .../src/main/scala/kafka/api/ProducerRequest.scala | 2 +- core/src/main/scala/kafka/log/FileMessageSet.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 5 ++--- core/src/main/scala/kafka/log/OffsetIndex.scala | 8 ++++++-- core/src/main/scala/kafka/log/OffsetMap.scala | 1 + .../main/scala/kafka/tools/ConsoleProducer.scala | 2 +- 14 files changed, 35 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 966c4beee0eba..2ce19f579236e 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -85,7 +85,7 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { val acls = resourceToAcl(resource) - println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") authorizer.addAcls(acls, resource) } @@ -99,7 +99,7 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { if (acls.isEmpty) { - if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)")) + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `$resource`? (y/n)")) authorizer.removeAcls(resource) } else { if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)")) @@ -117,10 +117,10 @@ object AclCommand { val resourceToAcls: Iterable[(Resource, Set[Acl])] = if (resources.isEmpty) authorizer.getAcls() - else resources.map(resource => (resource -> authorizer.getAcls(resource))) + else resources.map(resource => resource -> authorizer.getAcls(resource)) for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") } } diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ef76ffc40ee1e..9fc248347dedd 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -49,7 +49,7 @@ class AdminClient(val time: Time, client.poll(future) if (future.succeeded()) - return future.value().responseBody() + future.value().responseBody() else throw future.exception() } @@ -61,10 +61,10 @@ class AdminClient(val time: Time, return send(broker, api, request) } catch { case e: Exception => - debug(s"Request ${api} failed against node ${broker}", e) + debug(s"Request $api failed against node $broker", e) } } - throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}") + throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } private def findCoordinator(groupId: String): Node = { @@ -88,7 +88,7 @@ class AdminClient(val time: Time, val response = new MetadataResponse(responseBody) val errors = response.errors() if (!errors.isEmpty) - debug(s"Metadata request contained errors: ${errors}") + debug(s"Metadata request contained errors: $errors") response.cluster().nodes().asScala.toList } @@ -100,7 +100,7 @@ class AdminClient(val time: Time, listGroups(broker) } catch { case e: Exception => - debug(s"Failed to find groups from broker ${broker}", e) + debug(s"Failed to find groups from broker $broker", e) List[GroupOverview]() } } @@ -127,7 +127,7 @@ class AdminClient(val time: Time, val response = new DescribeGroupsResponse(responseBody) val metadata = response.groups().get(groupId) if (metadata == null) - throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}") + throw new KafkaException(s"Response from broker contained no metadata for group $groupId") Errors.forCode(metadata.errorCode()).maybeThrow() val members = metadata.members().map { member => @@ -149,7 +149,7 @@ class AdminClient(val time: Time, return List.empty[ConsumerSummary] if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") + throw new IllegalArgumentException(s"Group $groupId with protocol type '$group.protocolType' is not a valid consumer group") if (group.state == "Stable") { group.members.map { member => diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a8a282e2c13a1..3507f07f7b3fa 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -274,7 +274,7 @@ object AdminUtils extends Logging { existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor - val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size)) + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size) if (unmatchedRepFactorList.size != 0) throw new AdminOperationException("The replication factor in manual replication assignment " + " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size) @@ -443,7 +443,7 @@ object AdminUtils extends Logging { private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = getTopicPath(topic) - val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2))) + val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2)) if (!update) { info("Topic creation " + jsonPartitionData.toString) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7baee449d..8dd734cc7b790 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -314,7 +314,7 @@ object ConsumerGroupCommand { protected def describeGroup(group: String) { val consumerSummaries = adminClient.describeConsumerGroup(group) if (consumerSummaries.isEmpty) - println(s"Consumer group `${group}` does not exist or is rebalancing.") + println(s"Consumer group `$group` does not exist or is rebalancing.") else { val consumer = getConsumer() printDescribeHeader() diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1bf351a05e0a5..fae0a4045209e 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging { val (_, replicas) = assignment.head val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size) partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) => - (TopicAndPartition(topic, partition) -> replicas) + TopicAndPartition(topic, partition) -> replicas } } diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 20808796dada9..a87e5b7c836b2 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -18,20 +18,18 @@ package kafka.admin import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit + import joptsimple.OptionParser import org.I0Itec.zkclient.exception.ZkException -import kafka.utils.{Logging, ZkUtils, CommandLineUtils} -import org.apache.log4j.Level +import kafka.utils.{CommandLineUtils, Logging, ZkUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} import org.apache.zookeeper.data.Stat import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code + import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection._ import scala.collection.mutable.Queue import scala.concurrent._ import scala.concurrent.duration._ @@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging { if (options.has(helpOpt)) CommandLineUtils.printUsageAndDie(parser, usageMessage) - if ((jaasFile == null)) { - val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " + - "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) + if (jaasFile == null) { + val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " + + "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) System.out.println("ERROR: %s".format(errorMsg)) throw new IllegalArgumentException("Incorrect configuration") } diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index b875e3e0a3626..42a17e67fca33 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.{TopicAndPartition} +import kafka.common.TopicAndPartition import kafka.api.ApiUtils._ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 30af8410b6afc..d284801e194ba 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -134,7 +134,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } else { val producerResponseStatus = data.map { - case (topicAndPartition, data) => + case (topicAndPartition, data: ByteBufferMessageSet) => (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index a164b4b96730c..bd0783053c858 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File, this(file, channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate), start = 0, - end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue), + end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue, isSlice = false) /** diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a7549dc134a79..164dba1957295 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.Utils object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false) + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 749c6229a7d09..4357ef4c5d36e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File], try { recoveryPoints = this.recoveryPointCheckpoints(dir).read } catch { - case e: Exception => { + case e: Exception => warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn("Resetting the recovery checkpoint to 0") - } } val jobsForDir = for { @@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File], // If the log does not exist, skip it if (log != null) { //May need to abort and pause the cleaning of the log, and resume after truncation is done. - val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset) + val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset if (needToStopCleaner && cleaner != null) cleaner.abortAndPauseCleaning(topicAndPartition) log.truncateTo(truncateOffset) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index ce35d6874c7c0..f4327324e0637 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -24,9 +24,11 @@ import java.io._ import java.nio._ import java.nio.channels._ import java.util.concurrent.locks._ + import kafka.utils._ import kafka.utils.CoreUtils.inLock import kafka.common.InvalidOffsetException +import sun.nio.ch.DirectBuffer /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, */ private def forceUnmap(m: MappedByteBuffer) { try { - if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) - (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + m match { + case buffer: DirectBuffer => buffer.cleaner().clean() + case _ => + } } catch { case t: Throwable => warn("Error when freeing index buffer", t) } diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 3893b2c7b1c28..a27e02242ff72 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -20,6 +20,7 @@ package kafka.log import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer + import kafka.utils._ import org.apache.kafka.common.utils.Utils diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index e6476015f1901..4cc7c2000bc14 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -311,7 +311,7 @@ object ConsoleProducer { line.indexOf(keySeparator) match { case -1 => if (ignoreError) new ProducerRecord(topic, line.getBytes) - else throw new KafkaException(s"No key found on line ${lineNumber}: $line") + else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes new ProducerRecord(topic, line.substring(0, n).getBytes, value) From 4fc8b1e1bda47fc58b453c5c4e0900a513b84300 Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 27 May 2016 17:30:04 -0700 Subject: [PATCH 2/4] [KAFKA-3765] Kafka Code style corrections --- core/src/main/scala/kafka/admin/AdminUtils.scala | 12 ++++++------ core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- core/src/main/scala/kafka/log/FileMessageSet.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++-- .../src/main/scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/tools/ConsoleProducer.scala | 2 +- core/src/main/scala/kafka/tools/GetOffsetShell.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 8 ++++---- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 11 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 3507f07f7b3fa..7854a1d513499 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -250,7 +250,7 @@ object AdminUtils extends Logging { checkBrokerAvailable: Boolean = true, rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic)) - if (existingPartitionsReplicaList.size == 0) + if (existingPartitionsReplicaList.isEmpty) throw new AdminOperationException("The topic %s does not exist".format(topic)) val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { @@ -275,7 +275,7 @@ object AdminUtils extends Logging { // check if manual assignment has the right replication factor val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size) - if (unmatchedRepFactorList.size != 0) + if (unmatchedRepFactorList.nonEmpty) throw new AdminOperationException("The replication factor in manual replication assignment " + " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size) @@ -290,12 +290,12 @@ object AdminUtils extends Logging { var partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() var partitionId = startPartitionId - partitionList = partitionList.takeRight(partitionList.size - partitionId) - for (i <- 0 until partitionList.size) { + partitionList = partitionList.takeRight(partitionList.length - partitionId) + for (i <- partitionList.indices) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size <= 0) + if (brokerList.length <= 0) throw new AdminOperationException("replication factor must be larger than 0") - if (brokerList.size != brokerList.toSet.size) + if (brokerList.length != brokerList.toSet.size) throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList)) throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index e6ebb96c872b1..c643a9df688d5 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -118,7 +118,7 @@ object TopicCommand extends Logging { def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topics = getTopics(zkUtils, opts) val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false - if (topics.length == 0 && !ifExists) { + if (topics.isEmpty && !ifExists) { throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), opts.options.valueOf(opts.zkConnectOpt))) } @@ -165,7 +165,7 @@ object TopicCommand extends Logging { def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topics = getTopics(zkUtils, opts) val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false - if (topics.length == 0 && !ifExists) { + if (topics.isEmpty && !ifExists) { throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), opts.options.valueOf(opts.zkConnectOpt))) } diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index bd0783053c858..a454f2cbe2ae3 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File, } } - if (sizeInBytes > 0 && newMessages.size == 0) { + if (sizeInBytes > 0 && newMessages.isEmpty) { // This indicates that the message is too large. We just return all the bytes in the file message set. this } else { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 164dba1957295..62dc7a1788408 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -228,7 +228,7 @@ class Log(val dir: File, replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) } - if(logSegments.size == 0) { + if(logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, startOffset = 0, diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index c6636be09428e..4c0db0d9d451b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int, private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = { var grouped = List[List[LogSegment]]() var segs = segments.toList - while(!segs.isEmpty) { + while(segs.nonEmpty) { var group = List(segs.head) var logSize = segs.head.size var indexSize = segs.head.index.sizeInBytes segs = segs.tail - while(!segs.isEmpty && + while(segs.nonEmpty && logSize + segs.head.size <= maxSize && indexSize + segs.head.index.sizeInBytes <= maxIndexSize && segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index f92db4ed844fb..72757c083d44d 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To LogToClean(topicAndPartition, log, firstDirtyOffset) }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs - this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 + this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 // and must meet the minimum threshold for dirty byte ratio val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) if(cleanableLogs.isEmpty) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4357ef4c5d36e..6cf710e2a4e67 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -402,7 +402,7 @@ class LogManager(val logDirs: Array[File], * data directory with the fewest partitions. */ private def nextLogDir(): File = { - if(logDirs.size == 1) { + if(logDirs.length == 1) { logDirs(0) } else { // count the number of logs in each parent directory (including 0 for empty directories diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 4cc7c2000bc14..26df6d7a02ce0 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -313,7 +313,7 @@ object ConsoleProducer { if (ignoreError) new ProducerRecord(topic, line.getBytes) else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => - val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes + val value = (if (n + keySeparator.length > line.length) "" else line.substring(n + keySeparator.length)).getBytes new ProducerRecord(topic, line.substring(0, n).getBytes, value) } case (line, false) => diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 30c7afe7edca8..f7207eca2fc84 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -77,7 +77,7 @@ object GetOffsetShell { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index bd7ca0e85365e..a9226419f1e12 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -89,11 +89,11 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten + val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]) val numExpectedAttributes: Map[ObjectName, Int] = attributesWhitelistExists match { - case true => queries.map((_, attributesWhitelist.get.size)).toMap + case true => queries.map((_, attributesWhitelist.get.length)).toMap case false => names.map{(name: ObjectName) => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap @@ -101,7 +101,7 @@ object JmxTool extends Logging { // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted - if(keys.size == numExpectedAttributes.map(_._2).sum + 1) + if(keys.size == numExpectedAttributes.values.sum + 1) println(keys.map("\"" + _ + "\"").mkString(",")) while(true) { @@ -111,7 +111,7 @@ object JmxTool extends Logging { case Some(dFormat) => dFormat.format(new Date) case None => System.currentTimeMillis().toString } - if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1) + if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) println(keys.map(attributes(_)).mkString(",")) val sleep = max(0, interval - (System.currentTimeMillis - start)) Thread.sleep(sleep) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 87f3cc53bafb1..9d5f7e6040c40 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // Creating one stream per each connector instance val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) require(streams.size == 1) - val stream = streams(0) + val stream = streams.head iter = stream.iterator() } From c8766b0e5b5493afa7a7ccf50ba1ec1e83f45cbb Mon Sep 17 00:00:00 2001 From: Joshi Date: Sat, 28 May 2016 11:23:39 -0700 Subject: [PATCH 3/4] [KAFKA-3765] Kafka Code style updated as review --- core/src/main/scala/kafka/admin/AclCommand.scala | 6 +++--- core/src/main/scala/kafka/admin/AdminClient.scala | 2 +- core/src/main/scala/kafka/admin/AdminUtils.scala | 4 ++-- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 2 +- core/src/main/scala/kafka/api/ProducerRequest.scala | 2 +- core/src/main/scala/kafka/log/OffsetMap.scala | 1 - 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 2ce19f579236e..080f8097c7281 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -85,7 +85,7 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { val acls = resourceToAcl(resource) - println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") authorizer.addAcls(acls, resource) } @@ -99,7 +99,7 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { if (acls.isEmpty) { - if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `$resource`? (y/n)")) + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)")) authorizer.removeAcls(resource) } else { if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)")) @@ -120,7 +120,7 @@ object AclCommand { else resources.map(resource => resource -> authorizer.getAcls(resource)) for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") } } diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 9fc248347dedd..5dc6885e7aaa7 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -149,7 +149,7 @@ class AdminClient(val time: Time, return List.empty[ConsumerSummary] if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group $groupId with protocol type '$group.protocolType' is not a valid consumer group") + throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group") if (group.state == "Stable") { group.members.map { member => diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 7854a1d513499..1039ba5891c8f 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -293,9 +293,9 @@ object AdminUtils extends Logging { partitionList = partitionList.takeRight(partitionList.length - partitionId) for (i <- partitionList.indices) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.length <= 0) + if (brokerList.isEmpty) throw new AdminOperationException("replication factor must be larger than 0") - if (brokerList.length != brokerList.toSet.size) + if (brokerList.size != brokerList.toSet.size) throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList)) throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 8dd734cc7b790..414e7baee449d 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -314,7 +314,7 @@ object ConsumerGroupCommand { protected def describeGroup(group: String) { val consumerSummaries = adminClient.describeConsumerGroup(group) if (consumerSummaries.isEmpty) - println(s"Consumer group `$group` does not exist or is rebalancing.") + println(s"Consumer group `${group}` does not exist or is rebalancing.") else { val consumer = getConsumer() printDescribeHeader() diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index d284801e194ba..30af8410b6afc 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -134,7 +134,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } else { val producerResponseStatus = data.map { - case (topicAndPartition, data: ByteBufferMessageSet) => + case (topicAndPartition, data) => (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index a27e02242ff72..3893b2c7b1c28 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -20,7 +20,6 @@ package kafka.log import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer - import kafka.utils._ import org.apache.kafka.common.utils.Utils From 0a7513f1ab0c2222230389575f0dab9c520712a3 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sat, 28 May 2016 16:22:18 -0700 Subject: [PATCH 4/4] [KAFKA-3765] Kafka Code style updated as review --- core/src/main/scala/kafka/admin/AdminUtils.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 1039ba5891c8f..53b6dd72a0b6b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -290,7 +290,7 @@ object AdminUtils extends Logging { var partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() var partitionId = startPartitionId - partitionList = partitionList.takeRight(partitionList.length - partitionId) + partitionList = partitionList.takeRight(partitionList.size - partitionId) for (i <- partitionList.indices) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) if (brokerList.isEmpty) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 6cf710e2a4e67..4357ef4c5d36e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -402,7 +402,7 @@ class LogManager(val logDirs: Array[File], * data directory with the fewest partitions. */ private def nextLogDir(): File = { - if(logDirs.length == 1) { + if(logDirs.size == 1) { logDirs(0) } else { // count the number of logs in each parent directory (including 0 for empty directories diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 26df6d7a02ce0..4cc7c2000bc14 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -313,7 +313,7 @@ object ConsoleProducer { if (ignoreError) new ProducerRecord(topic, line.getBytes) else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => - val value = (if (n + keySeparator.length > line.length) "" else line.substring(n + keySeparator.length)).getBytes + val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes new ProducerRecord(topic, line.substring(0, n).getBytes, value) } case (line, false) => diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index a9226419f1e12..8112f9ea66ef1 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -93,7 +93,7 @@ object JmxTool extends Logging { val numExpectedAttributes: Map[ObjectName, Int] = attributesWhitelistExists match { - case true => queries.map((_, attributesWhitelist.get.length)).toMap + case true => queries.map((_, attributesWhitelist.get.size)).toMap case false => names.map{(name: ObjectName) => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap