From f075df3520a22436cb2a9b5eb39ccf985dadb58d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 12:48:40 +0900 Subject: [PATCH 01/10] Add the initial style pattern --- .../apache/spark/deploy/worker/DriverRunner.scala | 4 +++- .../main/scala/org/apache/spark/rdd/BlockRDD.scala | 4 ++-- .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 ++-- .../apache/spark/rdd/PartitionerAwareUnionRDD.scala | 4 ++-- .../flume/sink/SparkAvroCallbackHandler.scala | 4 ++-- .../spark/streaming/flume/sink/SparkSink.scala | 12 ++++++------ .../streaming/flume/sink/TransactionProcessor.scala | 12 ++++++------ .../streaming/flume/FlumePollingInputDStream.scala | 4 ++-- .../streaming/flume/PollingFlumeTestUtils.scala | 4 ++-- scalastyle-config.xml | 5 +++++ .../spark/sql/catalyst/expressions/Projection.scala | 4 ++-- .../org/apache/spark/streaming/dstream/DStream.scala | 4 ++-- 12 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 9c6bc5c62f25c..5309f96689f75 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -68,7 +68,9 @@ private[deploy] class DriverRunner( private var clock: Clock = new SystemClock() private var sleeper = new Sleeper { - def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) + def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ => + Thread.sleep(1000); !killed + } } /** Starts a thread to run and manage the driver. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 8358244987a6d..63d1d1767a8cb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -35,9 +35,9 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo override def getPartitions: Array[Partition] = { assertValid() - (0 until blockIds.length).map(i => { + (0 until blockIds.length).map { i => new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] - }).toArray + }.toArray } override def compute(split: Partition, context: TaskContext): Iterator[T] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 90d9735cb3f69..0ad0ee672d331 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -191,9 +191,9 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // initializes/resets to start iterating from the beginning def resetIterator(): Iterator[(String, Partition)] = { val iterators = (0 to 2).map( x => - prev.partitions.iterator.flatMap(p => { + prev.partitions.iterator.flatMap { p => if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None - } ) + } ) iterators.reduceLeft((x, y) => x ++ y) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index c3579d761d73c..0abba15bec9f7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -68,9 +68,9 @@ class PartitionerAwareUnionRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { val numPartitions = partitioner.get.numPartitions - (0 until numPartitions).map(index => { + (0 until numPartitions).map { index => new PartitionerAwareUnionRDDPartition(rdds, index) - }).toArray + }.toArray } // Get the location where most of the partitions of parent RDDs are located diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 719fca0938b3a..8050ec357e261 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha * @param success Whether the batch was successful or not. */ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { - removeAndGetProcessor(sequenceNumber).foreach(processor => { + removeAndGetProcessor(sequenceNumber).foreach { processor => processor.batchProcessed(success) - }) + } } /** diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala index 14dffb15fef98..41f27e937662f 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable { // dependencies which are being excluded in the build. In practice, // Netty dependencies are already available on the JVM as Flume would have pulled them in. serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) - serverOpt.foreach(server => { + serverOpt.foreach { server => logInfo("Starting Avro server for sink: " + getName) server.start() - }) + } super.start() } override def stop() { logInfo("Stopping Spark Sink: " + getName) - handler.foreach(callbackHandler => { + handler.foreach { callbackHandler => callbackHandler.shutdown() - }) - serverOpt.foreach(server => { + } + serverOpt.foreach { server => logInfo("Stopping Avro Server for sink: " + getName) server.close() server.join() - }) + } blockingLatch.countDown() super.stop() } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala index b15c2097e550c..19e736f016977 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, eventBatch.setErrorMsg("Something went wrong. Channel was " + "unable to create a transaction!") } - txOpt.foreach(tx => { + txOpt.foreach { tx => tx.begin() val events = new util.ArrayList[SparkSinkEvent](maxBatchSize) val loop = new Breaks @@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, // At this point, the events are available, so fill them into the event batch eventBatch = new EventBatch("", seqNum, events) } - }) + } } catch { case interrupted: InterruptedException => // Don't pollute logs if the InterruptedException came from this being stopped @@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, logWarning("Error while processing transaction.", e) eventBatch.setErrorMsg(e.getMessage) try { - txOpt.foreach(tx => { + txOpt.foreach { tx => rollbackAndClose(tx, close = true) - }) + } } finally { txOpt = None } @@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, */ private def processAckOrNack() { batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS) - txOpt.foreach(tx => { + txOpt.foreach { tx => if (batchSuccess) { try { logDebug("Committing transaction") @@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, // cause issues. This is required to ensure the TransactionProcessor instance is not leaked parent.removeAndGetProcessor(seqNum) } - }) + } } /** diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 250bfc1718dbe..54565840fa665 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -79,11 +79,11 @@ private[streaming] class FlumePollingReceiver( override def onStart(): Unit = { // Create the connections to each Flume agent. - addresses.foreach(host => { + addresses.foreach { host => val transceiver = new NettyTransceiver(host, channelFactory) val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) connections.add(new FlumeConnection(transceiver, client)) - }) + } for (i <- 0 until parallelism) { logInfo("Starting Flume Polling Receiver worker threads..") // Threads that pull data from Flume. diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 1a96df6e94b95..6a4dafb8eddb4 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -123,9 +123,9 @@ private[flume] class PollingFlumeTestUtils { val latch = new CountDownLatch(batchCount * channels.size) sinks.foreach(_.countdownWhenBatchReceived(latch)) - channels.foreach(channel => { + channels.foreach { channel => executorCompletion.submit(new TxnSubmitter(channel)) - }) + } for (i <- 0 until channels.size) { executorCompletion.take() diff --git a/scalastyle-config.xml b/scalastyle-config.xml index a14e3e583f870..57ec95fff3997 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -233,6 +233,11 @@ This file is divided into 3 sections: Omit braces in case clauses. + + (?m)\.[a-zA-Z_][a-zA-Z0-9]*+\s*[\(|\{]\s*[a-zA-Z_][a-zA-Z0-9]*+\s*\=>\s*\{\s*.+\s*\}\s*[\(|\}] + Extra closure in transformation functions. + + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 354311c5e7449..860fc31af20a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -168,9 +168,9 @@ object FromUnsafeProjection { * Returns an UnsafeProjection for given Array of DataTypes. */ def apply(fields: Seq[DataType]): Projection = { - create(fields.zipWithIndex.map(x => { + create(fields.zipWithIndex.map { x => new BoundReference(x._2, x._1, true) - })) + }) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 58842f9c2f446..ee12a7d7d3985 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] ( logInfo(s"Slicing from $fromTime to $toTime" + s" (aligned to $alignedFromTime and $alignedToTime)") - alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + alignedFromTime.to(alignedToTime, slideDuration).flatMap { time => if (time >= zeroTime) getOrCompute(time) else None - }) + } } /** From 2ede93e7f5431d006fb6e0244e49788cd964b4ca Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 13:46:35 +0900 Subject: [PATCH 02/10] Update pattern and correct more of them --- core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala | 4 ++-- .../scala/org/apache/spark/rdd/ParallelCollectionRDD.scala | 4 ++-- .../org/apache/spark/shuffle/BlockStoreShuffleReader.scala | 4 ++-- .../main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 4 ++-- scalastyle-config.xml | 6 +++--- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 6 ++---- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 ++-- .../org/apache/spark/streaming/dstream/StateDStream.scala | 4 ++-- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 4 ++-- 10 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 526138093d3ea..5426bf80bafc5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -65,11 +65,11 @@ class JdbcRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = BigInt(1) + upperBound - lowerBound - (0 until numPartitions).map(i => { + (0 until numPartitions).map { i => val start = lowerBound + ((i * length) / numPartitions) val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 new JdbcPartition(i, start.toLong, end.toLong) - }).toArray + }.toArray } override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T] diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 462fb39ea20b2..bb84e4af15b15 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -121,11 +121,11 @@ private object ParallelCollectionRDD { // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { - (0 until numSlices).iterator.map(i => { + (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) - }) + } } seq match { case r: Range => diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 637b2dfc193b8..876cdfaa87601 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -69,10 +69,10 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Update the context task metrics for each record read. val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( - recordIter.map(record => { + recordIter.map { record => readMetrics.incRecordsRead(1) record - }), + }, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 1304efd8f2ec7..f609fb4cd2e77 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -42,13 +42,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage var hasShuffleWrite = false var hasShuffleRead = false var hasBytesSpilled = false - stageData.foreach(data => { + stageData.foreach { data => hasInput = data.hasInput hasOutput = data.hasOutput hasShuffleRead = data.hasShuffleRead hasShuffleWrite = data.hasShuffleWrite hasBytesSpilled = data.hasBytesSpilled - }) + } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 57ec95fff3997..9e81c06280526 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -233,9 +233,9 @@ This file is divided into 3 sections: Omit braces in case clauses. - - (?m)\.[a-zA-Z_][a-zA-Z0-9]*+\s*[\(|\{]\s*[a-zA-Z_][a-zA-Z0-9]*+\s*\=>\s*\{\s*.+\s*\}\s*[\(|\}] - Extra closure in transformation functions. + + (?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\) + Remove an anonymous extra closure. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index de40ddde1bdd9..9fe86209d3897 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -212,7 +212,7 @@ class Analyzer( * represented as the bit masks. */ def bitmasks(r: Rollup): Seq[Int] = { - Seq.tabulate(r.groupByExprs.length + 1)(idx => {(1 << idx) - 1}) + Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1) } /* diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 784b018353472..06d919b7ca1e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -152,10 +152,8 @@ private[hive] case class HiveGenericUDF( var i = 0 while (i < children.length) { val idx = i - deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set( - () => { - children(idx).eval(input) - }) + deferredObjects(i).asInstanceOf[DeferredObjectAdapter] + .set(() => children(idx).eval(input)) i += 1 } unwrap(function.evaluate(deferredObjects), returnInspector) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5cc677d085108..98c88d6287413 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -247,10 +247,10 @@ class CheckpointWriter( // Delete old checkpoint files val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)) if (allCheckpointFiles.size > 10) { - allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { + allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file => logInfo("Deleting " + file) fs.delete(file, true) - }) + } } // All done, print success diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 28aed0ca45342..8efb09a8ce981 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -48,11 +48,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { - val i = iterator.map(t => { + val i = iterator.map { t => val itr = t._2._2.iterator val headOption = if (itr.hasNext) Some(itr.next()) else None (t._1, t._2._1.toSeq, headOption) - }) + } updateFuncLocal(i) } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3b33a979df882..9aa2f0bbb9952 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -434,11 +434,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { - val receivers = receiverInputStreams.map(nis => { + val receivers = receiverInputStreams.map { nis => val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr - }) + } runDummySparkJob() From ff93278f821688e22b2b4a2bf91d3909ecd12e33 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 16:30:16 +0900 Subject: [PATCH 03/10] Add more styles --- .../spark/deploy/master/ui/ApplicationPage.scala | 5 ++--- .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 4 ++-- .../spark/examples/mllib/StreamingTestExample.scala | 4 ++-- .../mllib/api/python/Word2VecModelWrapper.scala | 4 +++- .../mllib/clustering/PowerIterationClustering.scala | 1 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 +++++------- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 1b18cf0ded69d..96274958d1422 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -35,9 +35,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val state = master.askWithRetry[MasterStateResponse](RequestMasterState) - val app = state.activeApps.find(_.id == appId).getOrElse({ - state.completedApps.find(_.id == appId).getOrElse(null) - }) + val app = state.activeApps.find(_.id == appId) + .getOrElse(state.completedApps.find(_.id == appId).orNull) if (app == null) { val msg =
No running application with ID {appId}
return UIUtils.basicSparkPage(msg, "Not Found") diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index bb84e4af15b15..9152e6b1edca1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -150,10 +150,10 @@ private object ParallelCollectionRDD { slices case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc - positions(array.length, numSlices).map({ + positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq - }).toSeq + }.toSeq } } } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala index 49f5df39443e9..ae4dee24c6474 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala @@ -59,10 +59,10 @@ object StreamingTestExample { val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample") val ssc = new StreamingContext(conf, batchDuration) - ssc.checkpoint({ + ssc.checkpoint { val dir = Utils.createTempDir() dir.toString - }) + } // $example on$ val data = ssc.textFileStream(dataDir).map(line => line.split(",") match { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala index 05273c34347e8..4b4ed2291d139 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala @@ -56,7 +56,9 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) { } def getVectors: JMap[String, JList[Float]] = { - model.getVectors.map({case (k, v) => (k, v.toList.asJava)}).asJava + model.getVectors.map { case (k, v) => + (k, v.toList.asJava) + }.asJava } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 2e257ff9b7def..683935155c935 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -234,6 +234,7 @@ class PowerIterationClustering private[clustering] ( Assignment(id, cluster) } }, preservesPartitioning = true) + new PowerIterationClusteringModel(k, assignments) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 438cbabdbb8a8..1a34316082f6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -286,10 +286,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { assert(children.nonEmpty) if (projectList.forall(_.deterministic)) { val newFirstChild = Project(projectList, children.head) - val newOtherChildren = children.tail.map ( child => { + val newOtherChildren = children.tail.map { child => val rewrites = buildRewrites(children.head, child) Project(projectList.map(pushToRight(_, rewrites)), child) - } ) + } Union(newFirstChild +: newOtherChildren) } else { p @@ -300,11 +300,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { assert(children.nonEmpty) val (deterministic, nondeterministic) = partitionByDeterministic(condition) val newFirstChild = Filter(deterministic, children.head) - val newOtherChildren = children.tail.map { - child => { - val rewrites = buildRewrites(children.head, child) - Filter(pushToRight(deterministic, rewrites), child) - } + val newOtherChildren = children.tail.map { child => + val rewrites = buildRewrites(children.head, child) + Filter(pushToRight(deterministic, rewrites), child) } Filter(nondeterministic, Union(newFirstChild +: newOtherChildren)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b7ff5f72427a4..065c8572b06a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -251,12 +251,12 @@ object JdbcUtils extends Logging { def schemaString(df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) - df.schema.fields foreach { field => { + df.schema.fields foreach { field => val name = field.name val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") - }} + } if (sb.length < 2) "" else sb.substring(2) } From bd7dec4c9d72e65917ab41c4bff005b9cae1cc6f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 18:08:39 +0900 Subject: [PATCH 04/10] Remove the change in the rules and correct weirdly written types --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 4 ++-- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 4 ++-- .../scala/org/apache/spark/deploy/worker/DriverRunner.scala | 3 ++- .../examples/streaming/RecoverableNetworkWordCount.scala | 2 +- .../spark/examples/streaming/SqlNetworkWordCount.scala | 2 +- .../spark/mllib/clustering/PowerIterationClustering.scala | 1 - .../spark/mllib/evaluation/BinaryClassificationMetrics.scala | 5 +++-- scalastyle-config.xml | 5 ----- .../apache/spark/sql/execution/joins/ShuffledHashJoin.scala | 4 ++-- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 2 +- 10 files changed, 14 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 83af226bfd6f1..4c4f759ed2356 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -149,8 +149,8 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } - def exception: Option[Throwable] = exceptionWrapper.flatMap { - (w: ThrowableSerializationWrapper) => Option(w.exception) + def exception: Option[Throwable] = exceptionWrapper.flatMap { w => + Option(w.exception) } override def toErrorString: String = diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 4212027122544..780ada90e609e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala - new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue())) + new JavaDoubleRDD(rdd.flatMap(fn).map(x => x.doubleValue())) } /** @@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala } - new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) + new JavaDoubleRDD(rdd.mapPartitions(fn).map(x => x.doubleValue())) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 74935793bce8a..0bb6e0ed8d6f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -69,7 +69,8 @@ private[deploy] class DriverRunner( private var clock: Clock = new SystemClock() private var sleeper = new Sleeper { def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ => - Thread.sleep(1000); !killed + Thread.sleep(1000) + !killed } } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index bb2af9cd72e2a..1828ced013249 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -116,7 +116,7 @@ object RecoverableNetworkWordCount { val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => + wordCounts.foreachRDD { (rdd, time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala index 918e124065e4c..ad6a89e320f56 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala @@ -59,7 +59,7 @@ object SqlNetworkWordCount { val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query - words.foreachRDD { (rdd: RDD[String], time: Time) => + words.foreachRDD { (rdd, time) => // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 683935155c935..2e257ff9b7def 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -234,7 +234,6 @@ class PowerIterationClustering private[clustering] ( Assignment(id, cluster) } }, preservesPartitioning = true) - new PowerIterationClusteringModel(k, assignments) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 0a7a45b4f4e94..44bf9c0eae5bd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -189,8 +189,9 @@ class BinaryClassificationMetrics @Since("1.3.0") ( Iterator(agg) }.collect() val partitionwiseCumulativeCounts = - agg.scanLeft(new BinaryLabelCounter())( - (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c) + agg.scanLeft(new BinaryLabelCounter()) { (agg, c) => + agg.clone() += c + } val totalCount = partitionwiseCumulativeCounts.last logInfo(s"Total counts: $totalCount") val cumulativeCounts = binnedCounts.mapPartitionsWithIndex( diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 9e81c06280526..a14e3e583f870 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -233,11 +233,6 @@ This file is divided into 3 sections: Omit braces in case clauses.
- - (?m)\.[a-zA-Z_][a-zA-Z0-9]*\(\s*[^,]+s*=>\s*\{[^\}]+\}\s*\) - Remove an anonymous extra closure. - - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 0c3e3c3fc18a1..28316846288d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -60,9 +60,9 @@ case class ShuffledHashJoin( val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) // This relation is usually used until the end of task. - context.addTaskCompletionListener((t: TaskContext) => + context.addTaskCompletionListener { _ => relation.close() - ) + } relation } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index d603f63a08501..9afbd0e994ff4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -431,7 +431,7 @@ private[sql] object StatFunctions extends Logging { s"exceed 1e4. Currently $columnSize") val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) => val countsRow = new GenericMutableRow(columnSize + 1) - rows.foreach { (row: Row) => + rows.foreach { row => // row.get(0) is column 1 // row.get(1) is column 2 // row.get(2) is the frequency From 03eabb364006807cc659d12bcd11916ac084df16 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:15:48 +0900 Subject: [PATCH 05/10] Sorrect some more minor nits --- .../main/scala/org/apache/spark/TaskEndReason.scala | 4 +--- .../org/apache/spark/api/java/JavaRDDLike.scala | 10 +++++----- .../apache/spark/deploy/worker/DriverRunner.scala | 2 +- .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 12 ++++++------ .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 4 ++-- .../streaming/RecoverableNetworkWordCount.scala | 6 ++---- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- .../spark/sql/catalyst/expressions/Projection.scala | 4 +--- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../spark/sql/execution/joins/ShuffledHashJoin.scala | 4 +--- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- .../org/apache/spark/streaming/dstream/DStream.scala | 12 ++++++------ 12 files changed, 30 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 4c4f759ed2356..7487cfe9c5509 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -149,9 +149,7 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } - def exception: Option[Throwable] = exceptionWrapper.flatMap { w => - Option(w.exception) - } + def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception)) override def toErrorString: String = if (fullStackTrace == null) { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 780ada90e609e..6f3b8faf03b04 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = { - new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue())) + new JavaDoubleRDD(rdd.map(f.call(_).doubleValue())) } /** @@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala - new JavaDoubleRDD(rdd.flatMap(fn).map(x => x.doubleValue())) + new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue())) } /** @@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala } - new JavaDoubleRDD(rdd.mapPartitions(fn).map(x => x.doubleValue())) + new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue())) } /** @@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) - .map(x => x.doubleValue())) + .map(_.doubleValue())) } /** @@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { - rdd.foreachPartition((x => f.call(x.asJava))) + rdd.foreachPartition(x => f.call(x.asJava)) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 0bb6e0ed8d6f3..f4376dedea725 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -119,7 +119,7 @@ private[deploy] class DriverRunner( /** Terminate this driver (or prevent it from ever starting if not yet started) */ private[worker] def kill() { synchronized { - process.foreach(p => p.destroy()) + process.foreach(_.destroy()) killed = true } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 0ad0ee672d331..936ff195985f0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -175,7 +175,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) def currPrefLocs(part: Partition): Seq[String] = { - prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) + prev.context.getPreferredLocs(prev, part.index).map(_.host) } // this class just keeps iterating and rotating infinitely over the partitions of the RDD @@ -190,12 +190,12 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // initializes/resets to start iterating from the beginning def resetIterator(): Iterator[(String, Partition)] = { - val iterators = (0 to 2).map( x => + val iterators = (0 to 2).map { x => prev.partitions.iterator.flatMap { p => if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None } - ) - iterators.reduceLeft((x, y) => x ++ y) + } + iterators.reduceLeft(_ ++ _) } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD @@ -241,7 +241,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // deal with empty case, just create targetLen partition groups with no preferred location if (!rotIt.hasNext) { - (1 to targetLen).foreach(x => groupArr += PartitionGroup()) + (1 to targetLen).foreach(groupArr += PartitionGroup()) return } @@ -330,7 +330,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: } } - def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + def getPartitions: Array[PartitionGroup] = groupArr.filter(_.size > 0).toArray /** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 9152e6b1edca1..e96a3e7a0296c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -129,7 +129,7 @@ private object ParallelCollectionRDD { } seq match { case r: Range => - positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) => + positions(r.length, numSlices).zipWithIndex.map{ case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) @@ -137,7 +137,7 @@ private object ParallelCollectionRDD { else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } - }).toSeq.asInstanceOf[Seq[Seq[T]]] + }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 1828ced013249..aa762b27dc3bb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -115,7 +115,7 @@ object RecoverableNetworkWordCount { // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + val wordCounts = words.map((_, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { (rdd, time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) @@ -158,9 +158,7 @@ object RecoverableNetworkWordCount { } val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, - () => { - createContext(ip, port, outputPath, checkpointDirectory) - }) + () => createContext(ip, port, outputPath, checkpointDirectory)) ssc.start() ssc.awaitTermination() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9fe86209d3897..b2e0fa9c010fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -169,8 +169,8 @@ class Analyzer( private def assignAliases(exprs: Seq[NamedExpression]) = { exprs.zipWithIndex.map { case (expr, i) => - expr transformUp { - case u @ UnresolvedAlias(child, optionalAliasName) => child match { + expr.transformUp { case u @ UnresolvedAlias(child, optionalAliasName) => + child match { case ne: NamedExpression => ne case e if !e.resolved => u case g: Generator => MultiAlias(g, Nil) @@ -225,7 +225,7 @@ class Analyzer( * represented as the bit masks. */ def bitmasks(c: Cube): Seq[Int] = { - Seq.tabulate(1 << c.groupByExprs.length)(i => i) + Seq.tabulate(1 << c.groupByExprs.length)(_) } private def hasGroupingAttribute(expr: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 860fc31af20a8..27ad8e4cf22ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -168,9 +168,7 @@ object FromUnsafeProjection { * Returns an UnsafeProjection for given Array of DataTypes. */ def apply(fields: Seq[DataType]): Projection = { - create(fields.zipWithIndex.map { x => - new BoundReference(x._2, x._1, true) - }) + create(fields.zipWithIndex.map(x => new BoundReference(x._2, x._1, true))) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 963855b4247c8..07839f559ceb3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -344,7 +344,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty => val newOutput = e.output.filter(a.references.contains(_)) val newProjects = e.projections.map { proj => - proj.zip(e.output).filter { case (e, a) => + proj.zip(e.output).filter { case (_, a) => newOutput.contains(a) }.unzip._1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 28316846288d8..f021f3758c52c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -60,9 +60,7 @@ case class ShuffledHashJoin( val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) // This relation is usually used until the end of task. - context.addTaskCompletionListener { _ => - relation.close() - } + context.addTaskCompletionListener(_ => relation.close()) relation } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 06d919b7ca1e0..5aab4132bc4ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -82,7 +82,7 @@ private[hive] case class HiveSimpleUDF( // TODO: Finish input output types. override def eval(input: InternalRow): Any = { - val inputs = wrap(children.map(c => c.eval(input)), arguments, cached, inputDataTypes) + val inputs = wrap(children.map(_.eval(input)), arguments, cached, inputDataTypes) val ret = FunctionRegistry.invoke( method, function, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ee12a7d7d3985..583f5a48d1a6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] ( * of this DStream. */ def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope { - this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2) } /** @@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] ( */ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { - this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } /** @@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] ( */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { val cleanedF = context.sparkContext.clean(foreachFunc, false) - foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true) + foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true) } /** @@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] ( // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) - transform((r: RDD[T], t: Time) => cleanedF(r)) + transform((r: RDD[T], _: Time) => cleanedF(r)) } /** @@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] ( windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { - this.map(x => (1, x)) + this.map((1, _)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) } @@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] ( numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope { - this.map(x => (x, 1L)).reduceByKeyAndWindow( + this.map((_, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, windowDuration, From 85ace0ed8a4bbd40a4d42d5a4bd0e3058e25c6d9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:17:47 +0900 Subject: [PATCH 06/10] Leave the type notation trick --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b2e0fa9c010fd..96880e8927ec3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -225,7 +225,7 @@ class Analyzer( * represented as the bit masks. */ def bitmasks(c: Cube): Seq[Int] = { - Seq.tabulate(1 << c.groupByExprs.length)(_) + Seq.tabulate(1 << c.groupByExprs.length)(i => i) } private def hasGroupingAttribute(expr: Expression): Boolean = { From 5394138c1ba5adb379bc7be2a053eff23ca54431 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:23:42 +0900 Subject: [PATCH 07/10] Leave conversion tricks except for parentheses corrections --- .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 936ff195985f0..35665ab7c030c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -175,7 +175,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) def currPrefLocs(part: Partition): Seq[String] = { - prev.context.getPreferredLocs(prev, part.index).map(_.host) + prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) } // this class just keeps iterating and rotating infinitely over the partitions of the RDD @@ -195,7 +195,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None } } - iterators.reduceLeft(_ ++ _) + iterators.reduceLeft((x, y) => x ++ y) } // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD @@ -241,7 +241,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // deal with empty case, just create targetLen partition groups with no preferred location if (!rotIt.hasNext) { - (1 to targetLen).foreach(groupArr += PartitionGroup()) + (1 to targetLen).foreach(x => groupArr += PartitionGroup()) return } @@ -330,7 +330,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: } } - def getPartitions: Array[PartitionGroup] = groupArr.filter(_.size > 0).toArray + def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray /** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are From c6b3cb886bf8b25bb0ed2f8ae3ae6e85bef07352 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:26:27 +0900 Subject: [PATCH 08/10] Make a long function to a short function --- .../spark/mllib/evaluation/BinaryClassificationMetrics.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 44bf9c0eae5bd..92cd7f22dc439 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -189,9 +189,7 @@ class BinaryClassificationMetrics @Since("1.3.0") ( Iterator(agg) }.collect() val partitionwiseCumulativeCounts = - agg.scanLeft(new BinaryLabelCounter()) { (agg, c) => - agg.clone() += c - } + agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c) val totalCount = partitionwiseCumulativeCounts.last logInfo(s"Total counts: $totalCount") val cumulativeCounts = binnedCounts.mapPartitionsWithIndex( From 0ffb4d7ea2f6fd6ecbc67b8a1f669f7d2179b0fe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:34:34 +0900 Subject: [PATCH 09/10] Add a space between map and `{` --- .../main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index e96a3e7a0296c..4d6f58d813418 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -129,7 +129,7 @@ private object ParallelCollectionRDD { } seq match { case r: Range => - positions(r.length, numSlices).zipWithIndex.map{ case ((start, end), index) => + positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) From 08de54d0101715e1742f34cbcad66712e1a5793b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Apr 2016 20:39:16 +0900 Subject: [PATCH 10/10] Move case within map above --- .../scala/org/apache/spark/rdd/ParallelCollectionRDD.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 4d6f58d813418..34a1c112cbcd0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -150,8 +150,7 @@ private object ParallelCollectionRDD { slices case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc - positions(array.length, numSlices).map { - case (start, end) => + positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq }