From 78bbcf20936cb527d573ab6eb9b8add343911568 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 25 Sep 2019 16:47:16 -0700 Subject: [PATCH 01/15] temp temp temp --- .../org/apache/spark/MapOutputTracker.scala | 91 +++++++++++++------ .../apache/spark/MapOutputTrackerSuite.scala | 23 +++++ .../spark/ml/stat/SummarizerSuite.scala | 2 +- 3 files changed, 88 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 53329f0a937bd..1a4a8ed24d75d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -19,10 +19,12 @@ package org.apache.spark import java.io._ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} -import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import java.util.concurrent.locks.{ReadWriteLock, ReentrantReadWriteLock} +import com.github.luben.zstd.ZstdInputStream +import com.github.luben.zstd.ZstdOutputStream import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map} +import scala.collection.mutable.{HashMap, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -48,7 +50,29 @@ import org.apache.spark.util._ */ private class ShuffleStatus(numPartitions: Int) { - // All accesses to the following state must be guarded with `this.synchronized`. + private val (readLock, writeLock) = { + val lock = new ReentrantReadWriteLock() + (lock.readLock(), lock.writeLock()) + } + + // All accesses to the following state must be guarded with `withReadLock` or `withWriteLock`. + private def withReadLock[B](fn: => B): B = { + readLock.lock() + try { + fn + } finally { + readLock.unlock() + } + } + + private def withWriteLock[B](fn: => B): B = { + writeLock.lock() + try { + fn + } finally { + writeLock.unlock() + } + } /** * MapStatus for each partition. The index of the array is the map partition id. @@ -88,7 +112,7 @@ private class ShuffleStatus(numPartitions: Int) { * Register a map output. If there is already a registered location for the map output then it * will be replaced by the new location. */ - def addMapOutput(mapIndex: Int, status: MapStatus): Unit = synchronized { + def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock { if (mapStatuses(mapIndex) == null) { _numAvailableOutputs += 1 invalidateSerializedMapOutputStatusCache() @@ -101,7 +125,7 @@ private class ShuffleStatus(numPartitions: Int) { * This is a no-op if there is no registered map output or if the registered output is from a * different block manager. */ - def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = synchronized { + def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { _numAvailableOutputs -= 1 mapStatuses(mapIndex) = null @@ -113,7 +137,7 @@ private class ShuffleStatus(numPartitions: Int) { * Removes all shuffle outputs associated with this host. Note that this will also remove * outputs which are served by an external shuffle server (if one exists). */ - def removeOutputsOnHost(host: String): Unit = { + def removeOutputsOnHost(host: String): Unit = withWriteLock { removeOutputsByFilter(x => x.host == host) } @@ -122,7 +146,7 @@ private class ShuffleStatus(numPartitions: Int) { * remove outputs which are served by an external shuffle server (if one exists), as they are * still registered with that execId. */ - def removeOutputsOnExecutor(execId: String): Unit = synchronized { + def removeOutputsOnExecutor(execId: String): Unit = withWriteLock { removeOutputsByFilter(x => x.executorId == execId) } @@ -130,8 +154,8 @@ private class ShuffleStatus(numPartitions: Int) { * Removes all shuffle outputs which satisfies the filter. Note that this will also * remove outputs which are served by an external shuffle server (if one exists). */ - def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized { - for (mapIndex <- 0 until mapStatuses.length) { + def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock { + for (mapIndex <- mapStatuses.indices) { if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) { _numAvailableOutputs -= 1 mapStatuses(mapIndex) = null @@ -143,14 +167,14 @@ private class ShuffleStatus(numPartitions: Int) { /** * Number of partitions that have shuffle outputs. */ - def numAvailableOutputs: Int = synchronized { + def numAvailableOutputs: Int = withReadLock { _numAvailableOutputs } /** * Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ - def findMissingPartitions(): Seq[Int] = synchronized { + def findMissingPartitions(): Seq[Int] = withReadLock { val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") @@ -169,18 +193,31 @@ private class ShuffleStatus(numPartitions: Int) { def serializedMapStatus( broadcastManager: BroadcastManager, isLocal: Boolean, - minBroadcastSize: Int): Array[Byte] = synchronized { - if (cachedSerializedMapStatus eq null) { - val serResult = MapOutputTracker.serializeMapStatuses( + minBroadcastSize: Int): Array[Byte] = { + var result: Array[Byte] = null + + withReadLock { + if (cachedSerializedMapStatus != null) { + result = cachedSerializedMapStatus + } + } + + if (result == null) withWriteLock { + if (cachedSerializedMapStatus == null) { + val serResult = MapOutputTracker.serializeMapStatuses( mapStatuses, broadcastManager, isLocal, minBroadcastSize) - cachedSerializedMapStatus = serResult._1 - cachedSerializedBroadcast = serResult._2 + cachedSerializedMapStatus = serResult._1 + cachedSerializedBroadcast = serResult._2 + } + // The following line has to be outside if statement since it's possible that another thread + // initializes cachedSerializedMapStatus in-between `withReadLock` and `withWriteLock`. + result = cachedSerializedMapStatus } - cachedSerializedMapStatus + result } // Used in testing. - def hasCachedSerializedBroadcast: Boolean = synchronized { + def hasCachedSerializedBroadcast: Boolean = withReadLock { cachedSerializedBroadcast != null } @@ -188,14 +225,14 @@ private class ShuffleStatus(numPartitions: Int) { * Helper function which provides thread-safe access to the mapStatuses array. * The function should NOT mutate the array. */ - def withMapStatuses[T](f: Array[MapStatus] => T): T = synchronized { + def withMapStatuses[T](f: Array[MapStatus] => T): T = withReadLock { f(mapStatuses) } /** * Clears the cached serialized map output statuses. */ - def invalidateSerializedMapOutputStatusCache(): Unit = synchronized { + def invalidateSerializedMapOutputStatusCache(): Unit = withWriteLock { if (cachedSerializedBroadcast != null) { // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444) Utils.tryLogNonFatalError { @@ -274,7 +311,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging /** Send a one-way message to the trackerEndpoint, to which we expect it to reply with true. */ protected def sendTracker(message: Any) { val response = askTracker[Boolean](message) - if (response != true) { + if (!response) { throw new SparkException( "Error reply received from MapOutputTracker. Expecting true, got " + response.toString) } @@ -321,8 +358,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ private[spark] class MapOutputTrackerMaster( conf: SparkConf, - broadcastManager: BroadcastManager, - isLocal: Boolean) + private[spark] val broadcastManager: BroadcastManager, + private[spark] val isLocal: Boolean) extends MapOutputTracker(conf) { // The size at which we use Broadcast to send the map output statuses to the executors @@ -771,13 +808,13 @@ private[spark] object MapOutputTracker extends Logging { private val BROADCAST = 1 // Serialize an array of map output locations into an efficient byte format so that we can send - // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will + // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { val out = new ByteArrayOutputStream out.write(DIRECT) - val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) + val objOut = new ObjectOutputStream(new ZstdOutputStream(out)) Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -794,7 +831,7 @@ private[spark] object MapOutputTracker extends Logging { // toByteArray creates copy, so we can reuse out out.reset() out.write(BROADCAST) - val oos = new ObjectOutputStream(new GZIPOutputStream(out)) + val oos = new ObjectOutputStream(new ZstdOutputStream(out)) oos.writeObject(bcast) oos.close() val outArr = out.toByteArray @@ -810,7 +847,7 @@ private[spark] object MapOutputTracker extends Logging { assert (bytes.length > 0) def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { - val objIn = new ObjectInputStream(new GZIPInputStream( + val objIn = new ObjectInputStream(new ZstdInputStream( new ByteArrayInputStream(arr, off, len))) Utils.tryWithSafeFinally { objIn.readObject() diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index da2ba2165bb0c..1e54314079e4a 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -333,4 +333,27 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } + test("performance test in serialization") { + val numOfTry = 20 + val numOfWarmUp = 10 + +// val tracker: MapOutputTrackerMaster = newTrackerMaster() +// val shuffleId = 10 +// val numMaps = 20000 +// tracker.registerShuffle(shuffleId, numMaps) +// val compressedSize10000 = MapStatus.compressSize(10000L) +// +// (0 until numMaps).foreach { i => +// tracker.registerMapOutput(10, i, +// MapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), +// Array(compressedSize10000, compressedSize10000), 5)) +// } +// +// val shuffleStatus: ShuffleStatus = tracker.shuffleStatuses.get(shuffleId).head +// +// +// shuffleStatus.serializedMapStatus(tracker.broadcastManager, tracker.isLocal, 1000000) +// shuffleStatus.invalidateSerializedMapOutputStatusCache() +// tracker.stop() + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala index 5e4f402989697..ea94daa0d4343 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala @@ -531,7 +531,7 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext { assert(summarizer3.min ~== Vectors.dense(0.0, -10.0) absTol 1e-14) } - ignore("performance test") { + test("performance test") { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12 MacBook Pro (15-inch, 2016) CPU 2.9 GHz Intel Core i7 From 18e5bda3bbf7153048db3b6c2f24b9ff333d9f2d Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 14:43:47 -0700 Subject: [PATCH 02/15] finish --- .../org/apache/spark/MapOutputTracker.scala | 54 ++++++++++--- .../apache/spark/MapOutputTrackerSuite.scala | 81 ++++++++++++++----- 2 files changed, 100 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2993110553049..d339e83b40555 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -21,7 +21,6 @@ import java.io._ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import java.util.concurrent.locks.{ReadWriteLock, ReentrantReadWriteLock} import com.github.luben.zstd.ZstdInputStream import com.github.luben.zstd.ZstdOutputStream @@ -360,8 +359,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ private[spark] class MapOutputTrackerMaster( conf: SparkConf, - broadcastManager: BroadcastManager, - isLocal: Boolean) + private[spark] val broadcastManager: BroadcastManager, + private[spark] val isLocal: Boolean) extends MapOutputTracker(conf) { // The size at which we use Broadcast to send the map output statuses to the executors @@ -814,9 +813,13 @@ private[spark] object MapOutputTracker extends Logging { // generally be pretty compressible because many map outputs will be on the same hostname. def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { - val out = new ByteArrayOutputStream - out.write(DIRECT) - val objOut = new ObjectOutputStream(new ZstdOutputStream(out)) + import scala.language.reflectiveCalls + val out = new ByteArrayOutputStream(4096) { + // exposing `buf` directly to avoid copy + def getBuf: Array[Byte] = buf + } + val objOut = new ObjectOutputStream(out) + Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -825,18 +828,43 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - val arr = out.toByteArray + + val arr: Array[Byte] = { + val compressedOut = new ByteArrayOutputStream(4096) + val zos = new ZstdOutputStream(compressedOut) + Utils.tryWithSafeFinally { + compressedOut.write(DIRECT) + zos.write(out.getBuf, 0, out.size()) + } { + zos.close() + } + // We don't want to use the internal `buf` of `compressedOut` as it can be larger than + // the actual used size since it's a buffer kept growing. + compressedOut.toByteArray + } if (arr.length >= minBroadcastSize) { // Use broadcast instead. // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! val bcast = broadcastManager.newBroadcast(arr, isLocal) // toByteArray creates copy, so we can reuse out out.reset() - out.write(BROADCAST) - val oos = new ObjectOutputStream(new ZstdOutputStream(out)) - oos.writeObject(bcast) - oos.close() - val outArr = out.toByteArray + val oos = new ObjectOutputStream(out) + Utils.tryWithSafeFinally { + oos.writeObject(bcast) + } { + oos.close() + } + val outArr = { + val result = new ByteArrayOutputStream(4096) + val zos = new ZstdOutputStream(result) + Utils.tryWithSafeFinally { + result.write(BROADCAST) + zos.write(out.getBuf, 0, out.size()) + } { + zos.close() + } + result.toByteArray + } logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) (outArr, bcast) } else { @@ -863,7 +891,7 @@ private[spark] object MapOutputTracker extends Logging { deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]] case BROADCAST => // deserialize the Broadcast, pull .value array out of it, and then deserialize that - val bcast = deserializeObject(bytes, 1, bytes.length - 1). + val bcast = deserializeObject(bytes, 2, bytes.length - 1). asInstanceOf[Broadcast[Array[Byte]]] logInfo("Broadcast mapstatuses size = " + bytes.length + ", actual size = " + bcast.value.length) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 1e54314079e4a..865f4a59b279b 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -333,27 +333,64 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } - test("performance test in serialization") { - val numOfTry = 20 - val numOfWarmUp = 10 - -// val tracker: MapOutputTrackerMaster = newTrackerMaster() -// val shuffleId = 10 -// val numMaps = 20000 -// tracker.registerShuffle(shuffleId, numMaps) -// val compressedSize10000 = MapStatus.compressSize(10000L) -// -// (0 until numMaps).foreach { i => -// tracker.registerMapOutput(10, i, -// MapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), -// Array(compressedSize10000, compressedSize10000), 5)) -// } -// -// val shuffleStatus: ShuffleStatus = tracker.shuffleStatuses.get(shuffleId).head -// -// -// shuffleStatus.serializedMapStatus(tracker.broadcastManager, tracker.isLocal, 1000000) -// shuffleStatus.invalidateSerializedMapOutputStatusCache() -// tracker.stop() + ignore("Benchmarking `MapOutputTracker.serializeMapStatuses`") { + val newConf = new SparkConf + + // needs TorrentBroadcast so need a SparkContext + withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => + val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + + val shuffleId = 10 + val numMaps = 20000 + val minBroadcastSize = Int.MaxValue + + tracker.registerShuffle(shuffleId, numMaps) + val compressedSize10000 = MapStatus.compressSize(10000L) + + (0 until numMaps).foreach { i => + tracker.registerMapOutput(shuffleId, i, + MapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), + Array(compressedSize10000, compressedSize10000), numMaps)) + } + + val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head + + val numOfTry = 200 + val numOfWarmUp = 20 + + var serializedMapStatusSizes = 0 + var serializedBroadcastSizes = 0 + + for (i <- 1 to numOfWarmUp) { + val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + if (i == 1) { + serializedMapStatusSizes = serializedMapStatus.length + if (serializedBroadcast != null) { + serializedBroadcastSizes = serializedBroadcast.value.length + } + } + } + + val start = System.nanoTime() + for (i <- 1 to numOfTry) { + MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + } + val throughput = (numOfTry.toDouble / (System.nanoTime() - start)) * 10E6 + + // scalastyle:off println + println("serialized MapStatus sizes : " + serializedMapStatusSizes) + println("serialized Broadcast MapStatus sizes : " + serializedBroadcastSizes) + println(s"Throughput : $throughput ops/ms") + // scalastyle:on println + + tracker.unregisterShuffle(shuffleId) + tracker.stop() + } } } From 39502ed05348d9b380a43eb5942e5a4aabe8d7c8 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 15:09:40 -0700 Subject: [PATCH 03/15] tune test --- .../scala/org/apache/spark/MapOutputTrackerSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 865f4a59b279b..58440210504d4 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -349,17 +349,16 @@ class MapOutputTrackerSuite extends SparkFunSuite { val minBroadcastSize = Int.MaxValue tracker.registerShuffle(shuffleId, numMaps) - val compressedSize10000 = MapStatus.compressSize(10000L) - + val r = new scala.util.Random(912) (0 until numMaps).foreach { i => tracker.registerMapOutput(shuffleId, i, - MapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), - Array(compressedSize10000, compressedSize10000), numMaps)) + new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), + Array.range(0, 500).map(i => math.abs(r.nextLong())), i)) } val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head - val numOfTry = 200 + val numOfTry = 100 val numOfWarmUp = 20 var serializedMapStatusSizes = 0 From 3958c01a1d4a4bb00532392520269a42ba815bb7 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 15:21:26 -0700 Subject: [PATCH 04/15] ready to submit PR --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index d339e83b40555..0e373de23684b 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -855,15 +855,15 @@ private[spark] object MapOutputTracker extends Logging { oos.close() } val outArr = { - val result = new ByteArrayOutputStream(4096) - val zos = new ZstdOutputStream(result) + val compressedOut = new ByteArrayOutputStream(4096) + val zos = new ZstdOutputStream(compressedOut) Utils.tryWithSafeFinally { - result.write(BROADCAST) + compressedOut.write(BROADCAST) zos.write(out.getBuf, 0, out.size()) } { zos.close() } - result.toByteArray + compressedOut.toByteArray } logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) (outArr, bcast) From a4a807edaadde98818afedb2cff3192a045b532c Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 15:29:51 -0700 Subject: [PATCH 05/15] removed unused import --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 0e373de23684b..2121d6be6c1cc 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io._ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock -import java.util.zip.{GZIPInputStream, GZIPOutputStream} import com.github.luben.zstd.ZstdInputStream import com.github.luben.zstd.ZstdOutputStream From d0c3532a3d3f5260ce63d69e6531675bf33aeb18 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 15:50:54 -0700 Subject: [PATCH 06/15] revert --- .../test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala index ea94daa0d4343..5e4f402989697 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala @@ -531,7 +531,7 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext { assert(summarizer3.min ~== Vectors.dense(0.0, -10.0) absTol 1e-14) } - test("performance test") { + ignore("performance test") { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12 MacBook Pro (15-inch, 2016) CPU 2.9 GHz Intel Core i7 From ed08f2eac245aae8c7e1ab705598ccaa697284c1 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 10 Oct 2019 15:55:43 -0700 Subject: [PATCH 07/15] fix --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2121d6be6c1cc..d3b022d42c01e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -890,7 +890,7 @@ private[spark] object MapOutputTracker extends Logging { deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]] case BROADCAST => // deserialize the Broadcast, pull .value array out of it, and then deserialize that - val bcast = deserializeObject(bytes, 2, bytes.length - 1). + val bcast = deserializeObject(bytes, 1, bytes.length - 1). asInstanceOf[Broadcast[Array[Byte]]] logInfo("Broadcast mapstatuses size = " + bytes.length + ", actual size = " + bcast.value.length) From a60135614364fdbe0a2953a47b908176e740a8db Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 11 Oct 2019 16:43:02 -0700 Subject: [PATCH 08/15] address feedback --- .../org/apache/spark/MapOutputTracker.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index d3b022d42c01e..4f7c24b0280bc 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,12 +17,10 @@ package org.apache.spark -import java.io._ +import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock -import com.github.luben.zstd.ZstdInputStream -import com.github.luben.zstd.ZstdOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} @@ -30,6 +28,10 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.github.luben.zstd.ZstdInputStream +import com.github.luben.zstd.ZstdOutputStream +import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} + import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -812,13 +814,14 @@ private[spark] object MapOutputTracker extends Logging { // generally be pretty compressible because many map outputs will be on the same hostname. def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { - import scala.language.reflectiveCalls - val out = new ByteArrayOutputStream(4096) { - // exposing `buf` directly to avoid copy - def getBuf: Array[Byte] = buf - } - val objOut = new ObjectOutputStream(out) + // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one + // This implementation doesn't reallocate the whole memory block but allocates + // additional buffers. This way no buffers need to be garbage collected and + // the contents don't have to be copied to the new buffer. + val out = new ApacheByteArrayOutputStream() + val compressedOut = new ApacheByteArrayOutputStream() + val objOut = new ObjectOutputStream(out) Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -829,16 +832,15 @@ private[spark] object MapOutputTracker extends Logging { } val arr: Array[Byte] = { - val compressedOut = new ByteArrayOutputStream(4096) val zos = new ZstdOutputStream(compressedOut) Utils.tryWithSafeFinally { compressedOut.write(DIRECT) - zos.write(out.getBuf, 0, out.size()) + // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos` + // without copying to avoid unnecessary allocation and copy of byte[]. + out.writeTo(zos) } { zos.close() } - // We don't want to use the internal `buf` of `compressedOut` as it can be larger than - // the actual used size since it's a buffer kept growing. compressedOut.toByteArray } if (arr.length >= minBroadcastSize) { @@ -854,11 +856,11 @@ private[spark] object MapOutputTracker extends Logging { oos.close() } val outArr = { - val compressedOut = new ByteArrayOutputStream(4096) + compressedOut.reset() val zos = new ZstdOutputStream(compressedOut) Utils.tryWithSafeFinally { compressedOut.write(BROADCAST) - zos.write(out.getBuf, 0, out.size()) + out.writeTo(zos) } { zos.close() } From 8dc8fadf8657e1e5f8c144d29be1f87d089932af Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 16 Oct 2019 15:08:22 -0700 Subject: [PATCH 09/15] update --- .../apache/spark/MapOutputTrackerSuite.scala | 60 ---------- .../MapStatusesSerializationBenchmark.scala | 103 ++++++++++++++++++ .../apache/spark/benchmark/Benchmark.scala | 2 +- 3 files changed, 104 insertions(+), 61 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 58440210504d4..1bbae05b190f5 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -332,64 +332,4 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.stop() rpcEnv.shutdown() } - - ignore("Benchmarking `MapOutputTracker.serializeMapStatuses`") { - val newConf = new SparkConf - - // needs TorrentBroadcast so need a SparkContext - withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => - val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val rpcEnv = sc.env.rpcEnv - val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf) - rpcEnv.stop(tracker.trackerEndpoint) - rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - - val shuffleId = 10 - val numMaps = 20000 - val minBroadcastSize = Int.MaxValue - - tracker.registerShuffle(shuffleId, numMaps) - val r = new scala.util.Random(912) - (0 until numMaps).foreach { i => - tracker.registerMapOutput(shuffleId, i, - new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), - Array.range(0, 500).map(i => math.abs(r.nextLong())), i)) - } - - val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head - - val numOfTry = 100 - val numOfWarmUp = 20 - - var serializedMapStatusSizes = 0 - var serializedBroadcastSizes = 0 - - for (i <- 1 to numOfWarmUp) { - val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) - if (i == 1) { - serializedMapStatusSizes = serializedMapStatus.length - if (serializedBroadcast != null) { - serializedBroadcastSizes = serializedBroadcast.value.length - } - } - } - - val start = System.nanoTime() - for (i <- 1 to numOfTry) { - MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) - } - val throughput = (numOfTry.toDouble / (System.nanoTime() - start)) * 10E6 - - // scalastyle:off println - println("serialized MapStatus sizes : " + serializedMapStatusSizes) - println("serialized Broadcast MapStatus sizes : " + serializedBroadcastSizes) - println(s"Throughput : $throughput ops/ms") - // scalastyle:on println - - tracker.unregisterShuffle(shuffleId) - tracker.stop() - } - } } diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala new file mode 100644 index 0000000000000..09e3b27a486be --- /dev/null +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.benchmark.BenchmarkBase +import org.apache.spark.scheduler.CompressedMapStatus +import org.apache.spark.storage.BlockManagerId + +/** + * Benchmark for MapStatuses serialization performance. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * --jars ,, + * 2. build/sbt "avro/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/AvroReadBenchmark-results.txt". + * }}} + */ +object MapStatusesSerializationBenchmark extends BenchmarkBase { + + var sc: SparkContext = null + + def serializationBenchmark(numMaps: Int, blockSize: Int, + minBroadcastSize: Int = Int.MaxValue): Unit = { + val benchmark = new Benchmark(s"MapStatuses Serialization with $numMaps MapOutput", + numMaps, output = output) + + val shuffleId = 10 + val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + + + tracker.registerShuffle(shuffleId, numMaps) + val r = new scala.util.Random(912) + (0 until numMaps).foreach { i => + tracker.registerMapOutput(shuffleId, i, + new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), + Array.range(0, 500).map(i => math.abs(r.nextLong())), i)) + } + + val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head + + + var serializedMapStatusSizes = 0 + var serializedBroadcastSizes = 0 + + val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + serializedMapStatusSizes = serializedMapStatus.length + if (serializedBroadcast != null) { + serializedBroadcastSizes = serializedBroadcast.value.length + } + + + benchmark.addCase("Serialization") { _ => + MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + } + + benchmark.run() + tracker.unregisterShuffle(shuffleId) + tracker.stop() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + createSparkContext() + serializationBenchmark(200000, 500) + } + + def createSparkContext(): Unit = { + val conf = new SparkConf() + if (sc != null) { + sc.stop() + } + sc = new SparkContext("local", "MapStatusesSerializationBenchmark", conf) + } + + override def afterAll(): Unit = { + if (sc != null) { + sc.stop() + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index 022fcbb25b0af..523b50c32279b 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -166,7 +166,7 @@ private[spark] class Benchmark( val stdev = if (runTimes.size > 1) { math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) } else 0 - Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) + Result(avg / 1E6, num / (best / 1E3), best / 1E6, stdev / 1E6) } } From 0bf182a7c55b27db862a046076d358bb04ab6978 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 17 Oct 2019 16:51:30 -0700 Subject: [PATCH 10/15] Fix styling and typo --- .../apache/spark/MapStatusesSerializationBenchmark.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala index 09e3b27a486be..6d7d5e9485a77 100644 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala @@ -27,10 +27,10 @@ import org.apache.spark.storage.BlockManagerId * {{{ * To run this benchmark: * 1. without sbt: bin/spark-submit --class - * --jars ,, - * 2. build/sbt "avro/test:runMain " + * --jars , + * 2. build/sbt "core/test:runMain " * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " - * Results will be written to "benchmarks/AvroReadBenchmark-results.txt". + * Results will be written to "benchmarks/MapStatusesSerializationBenchmark-results.txt". * }}} */ object MapStatusesSerializationBenchmark extends BenchmarkBase { @@ -100,4 +100,4 @@ object MapStatusesSerializationBenchmark extends BenchmarkBase { sc.stop() } } -} \ No newline at end of file +} From bd88abd39c68d9acaeb9f1214c23a8ab4eb76ffe Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 18 Oct 2019 12:06:02 -0700 Subject: [PATCH 11/15] temp --- ...ala => MapStatusesSerDeserBenchmark.scala} | 64 +++++++++++++------ 1 file changed, 46 insertions(+), 18 deletions(-) rename core/src/test/scala/org/apache/spark/{MapStatusesSerializationBenchmark.scala => MapStatusesSerDeserBenchmark.scala} (64%) diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala similarity index 64% rename from core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala rename to core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala index 6d7d5e9485a77..b14075add8cff 100644 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerializationBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala @@ -23,44 +23,47 @@ import org.apache.spark.scheduler.CompressedMapStatus import org.apache.spark.storage.BlockManagerId /** - * Benchmark for MapStatuses serialization performance. + * Benchmark for MapStatuses serialization & deserialization performance. * {{{ * To run this benchmark: * 1. without sbt: bin/spark-submit --class * --jars , * 2. build/sbt "core/test:runMain " * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " - * Results will be written to "benchmarks/MapStatusesSerializationBenchmark-results.txt". + * Results will be written to "benchmarks/MapStatusesSerDeserBenchmark-results.txt". * }}} */ -object MapStatusesSerializationBenchmark extends BenchmarkBase { +object MapStatusesSerDeserBenchmark extends BenchmarkBase { var sc: SparkContext = null + var tracker: MapOutputTrackerMaster = null - def serializationBenchmark(numMaps: Int, blockSize: Int, - minBroadcastSize: Int = Int.MaxValue): Unit = { - val benchmark = new Benchmark(s"MapStatuses Serialization with $numMaps MapOutput", - numMaps, output = output) + def serDeserBenchmark(numMaps: Int, blockSize: Int, enableBroadcast: Boolean): Unit = { + val minBroadcastSize = if (enableBroadcast) { + 0 + } else { + Int.MaxValue + } - val shuffleId = 10 - val tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val rpcEnv = sc.env.rpcEnv - val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) - rpcEnv.stop(tracker.trackerEndpoint) - rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + val benchmark = new Benchmark(s"$numMaps MapOutputs, $blockSize blocks " + { + if (enableBroadcast) "w/ " else "w/o " + } + "broadcast", numMaps, output = output) + val shuffleId = 10 tracker.registerShuffle(shuffleId, numMaps) val r = new scala.util.Random(912) (0 until numMaps).foreach { i => tracker.registerMapOutput(shuffleId, i, new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), - Array.range(0, 500).map(i => math.abs(r.nextLong())), i)) + Array.fill(blockSize) { + // Creating block size ranging from 0byte to 1GB + (r.nextDouble() * 1024 * 1024 * 1024).toLong + }, i)) } val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head - var serializedMapStatusSizes = 0 var serializedBroadcastSizes = 0 @@ -71,20 +74,44 @@ object MapStatusesSerializationBenchmark extends BenchmarkBase { serializedBroadcastSizes = serializedBroadcast.value.length } - benchmark.addCase("Serialization") { _ => MapOutputTracker.serializeMapStatuses( shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) } + benchmark.addCase("Deserialization") { _ => + val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus) + assert(result.length == numMaps) + } + benchmark.run() + // scalastyle:off + import org.apache.commons.io.FileUtils + benchmark.out.println("Compressed Serialized MapStatus sizes: " + + FileUtils.byteCountToDisplaySize(serializedMapStatusSizes)) + benchmark.out.println(s"Compressed Serialized Broadcast MapStatus sizes: " + + FileUtils.byteCountToDisplaySize(serializedBroadcastSizes) + "\n\n") + // scalastyle:on + tracker.unregisterShuffle(shuffleId) - tracker.stop() } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { createSparkContext() - serializationBenchmark(200000, 500) + tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + + serDeserBenchmark(200000, 10, true) + serDeserBenchmark(200000, 10, false) + + serDeserBenchmark(200000, 100, true) + serDeserBenchmark(200000, 100, false) + + serDeserBenchmark(200000, 1000, true) + serDeserBenchmark(200000, 1000, false) } def createSparkContext(): Unit = { @@ -96,6 +123,7 @@ object MapStatusesSerializationBenchmark extends BenchmarkBase { } override def afterAll(): Unit = { + tracker.stop() if (sc != null) { sc.stop() } From f184c4c384845263bbb5f653a3bedb3fcc56b35e Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 18 Oct 2019 14:33:46 -0700 Subject: [PATCH 12/15] remove MapStatusesSerDeserBenchmark.scala --- .../spark/MapStatusesSerDeserBenchmark.scala | 131 ------------------ 1 file changed, 131 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala deleted file mode 100644 index b14075add8cff..0000000000000 --- a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.benchmark.BenchmarkBase -import org.apache.spark.scheduler.CompressedMapStatus -import org.apache.spark.storage.BlockManagerId - -/** - * Benchmark for MapStatuses serialization & deserialization performance. - * {{{ - * To run this benchmark: - * 1. without sbt: bin/spark-submit --class - * --jars , - * 2. build/sbt "core/test:runMain " - * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " - * Results will be written to "benchmarks/MapStatusesSerDeserBenchmark-results.txt". - * }}} - */ -object MapStatusesSerDeserBenchmark extends BenchmarkBase { - - var sc: SparkContext = null - var tracker: MapOutputTrackerMaster = null - - def serDeserBenchmark(numMaps: Int, blockSize: Int, enableBroadcast: Boolean): Unit = { - val minBroadcastSize = if (enableBroadcast) { - 0 - } else { - Int.MaxValue - } - - val benchmark = new Benchmark(s"$numMaps MapOutputs, $blockSize blocks " + { - if (enableBroadcast) "w/ " else "w/o " - } + "broadcast", numMaps, output = output) - - val shuffleId = 10 - - tracker.registerShuffle(shuffleId, numMaps) - val r = new scala.util.Random(912) - (0 until numMaps).foreach { i => - tracker.registerMapOutput(shuffleId, i, - new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), - Array.fill(blockSize) { - // Creating block size ranging from 0byte to 1GB - (r.nextDouble() * 1024 * 1024 * 1024).toLong - }, i)) - } - - val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head - - var serializedMapStatusSizes = 0 - var serializedBroadcastSizes = 0 - - val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) - serializedMapStatusSizes = serializedMapStatus.length - if (serializedBroadcast != null) { - serializedBroadcastSizes = serializedBroadcast.value.length - } - - benchmark.addCase("Serialization") { _ => - MapOutputTracker.serializeMapStatuses( - shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) - } - - benchmark.addCase("Deserialization") { _ => - val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus) - assert(result.length == numMaps) - } - - benchmark.run() - // scalastyle:off - import org.apache.commons.io.FileUtils - benchmark.out.println("Compressed Serialized MapStatus sizes: " + - FileUtils.byteCountToDisplaySize(serializedMapStatusSizes)) - benchmark.out.println(s"Compressed Serialized Broadcast MapStatus sizes: " + - FileUtils.byteCountToDisplaySize(serializedBroadcastSizes) + "\n\n") - // scalastyle:on - - tracker.unregisterShuffle(shuffleId) - } - - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - createSparkContext() - tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val rpcEnv = sc.env.rpcEnv - val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) - rpcEnv.stop(tracker.trackerEndpoint) - rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - - serDeserBenchmark(200000, 10, true) - serDeserBenchmark(200000, 10, false) - - serDeserBenchmark(200000, 100, true) - serDeserBenchmark(200000, 100, false) - - serDeserBenchmark(200000, 1000, true) - serDeserBenchmark(200000, 1000, false) - } - - def createSparkContext(): Unit = { - val conf = new SparkConf() - if (sc != null) { - sc.stop() - } - sc = new SparkContext("local", "MapStatusesSerializationBenchmark", conf) - } - - override def afterAll(): Unit = { - tracker.stop() - if (sc != null) { - sc.stop() - } - } -} From 5aadd8f089a00ceddc179804063cf058646f8a48 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 18 Oct 2019 14:52:30 -0700 Subject: [PATCH 13/15] revert unneeded change --- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 1 + core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index ea7c71e85a3c7..d5ee19bde8edf 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -332,4 +332,5 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.stop() rpcEnv.shutdown() } + } diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index 523b50c32279b..022fcbb25b0af 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -166,7 +166,7 @@ private[spark] class Benchmark( val stdev = if (runTimes.size > 1) { math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) } else 0 - Result(avg / 1E6, num / (best / 1E3), best / 1E6, stdev / 1E6) + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) } } From d7fce8202a48ecdfa32da08e5df1447482c36a0a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 18 Oct 2019 22:10:04 +0000 Subject: [PATCH 14/15] Add JDK11 --- ...tatusesSerDeserBenchmark-jdk11-results.txt | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 747aae09272f9..7a6cfb7b23b94 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -2,21 +2,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 609 631 22 0.3 3043.8 1.0X -Deserialization 840 897 67 0.2 4201.2 0.7X +Serialization 205 213 13 1.0 1023.6 1.0X +Deserialization 908 939 27 0.2 4540.2 0.2X -Compressed Serialized MapStatus sizes: 393 bytes -Compressed Serialized Broadcast MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized Broadcast MapStatus sizes: 2 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 591 599 8 0.3 2955.3 1.0X -Deserialization 878 913 31 0.2 4392.2 0.7X +Serialization 195 204 24 1.0 976.9 1.0X +Deserialization 913 940 33 0.2 4566.7 0.2X -Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1776 1778 2 0.1 8880.5 1.0X -Deserialization 1086 1086 0 0.2 5427.9 1.6X +Serialization 616 619 3 0.3 3079.1 1.0X +Deserialization 936 954 22 0.2 4680.5 0.7X -Compressed Serialized MapStatus sizes: 411 bytes -Compressed Serialized Broadcast MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 418 bytes +Compressed Serialized Broadcast MapStatus sizes: 14 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1725 1726 1 0.1 8624.9 1.0X -Deserialization 1093 1094 2 0.2 5463.6 1.6X +Serialization 586 588 3 0.3 2928.8 1.0X +Deserialization 929 933 4 0.2 4647.0 0.6X -Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 14 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 12421 12522 142 0.0 62104.4 1.0X -Deserialization 3020 3043 32 0.1 15102.0 4.1X +Serialization 4740 4916 249 0.0 23698.5 1.0X +Deserialization 1578 1597 27 0.1 7890.6 3.0X -Compressed Serialized MapStatus sizes: 544 bytes -Compressed Serialized Broadcast MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 546 bytes +Compressed Serialized Broadcast MapStatus sizes: 123 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 11719 11737 26 0.0 58595.3 1.0X -Deserialization 3018 3051 46 0.1 15091.7 3.9X +Serialization 4492 4573 115 0.0 22458.3 1.0X +Deserialization 1533 1547 20 0.1 7664.8 2.9X -Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 123 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes From 7095b60a3bff4d1b856bd5c710e2f3f46391f658 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 18 Oct 2019 22:18:23 +0000 Subject: [PATCH 15/15] Add JDK8 result --- .../MapStatusesSerDeserBenchmark-results.txt | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt index 1f479a49d5860..0c649694f6b6e 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -2,21 +2,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 625 639 9 0.3 3127.2 1.0X -Deserialization 875 931 49 0.2 4376.2 0.7X +Serialization 236 245 18 0.8 1179.1 1.0X +Deserialization 842 885 37 0.2 4211.4 0.3X -Compressed Serialized MapStatus sizes: 393 bytes -Compressed Serialized Broadcast MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized Broadcast MapStatus sizes: 2 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 604 640 71 0.3 3018.4 1.0X -Deserialization 889 903 17 0.2 4443.8 0.7X +Serialization 213 219 8 0.9 1065.1 1.0X +Deserialization 846 870 33 0.2 4228.6 0.3X -Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1879 1880 2 0.1 9394.9 1.0X -Deserialization 1147 1150 5 0.2 5733.8 1.6X +Serialization 624 709 167 0.3 3121.1 1.0X +Deserialization 885 908 22 0.2 4427.0 0.7X -Compressed Serialized MapStatus sizes: 411 bytes -Compressed Serialized Broadcast MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 418 bytes +Compressed Serialized Broadcast MapStatus sizes: 14 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1825 1826 1 0.1 9123.3 1.0X -Deserialization 1147 1281 189 0.2 5735.7 1.6X +Serialization 603 604 2 0.3 3014.9 1.0X +Deserialization 892 895 5 0.2 4458.7 0.7X -Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 14 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 12327 12518 270 0.0 61634.3 1.0X -Deserialization 3120 3133 18 0.1 15600.8 4.0X +Serialization 4612 4945 471 0.0 23061.0 1.0X +Deserialization 1493 1495 2 0.1 7466.3 3.1X -Compressed Serialized MapStatus sizes: 544 bytes -Compressed Serialized Broadcast MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 546 bytes +Compressed Serialized Broadcast MapStatus sizes: 123 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 11928 11986 82 0.0 59642.2 1.0X -Deserialization 3137 3138 2 0.1 15683.3 3.8X +Serialization 4452 4595 202 0.0 22261.4 1.0X +Deserialization 1464 1477 18 0.1 7321.4 3.0X -Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 123 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes