From 4126c1b8d0feefe721b1dd481f171d662d03cd7d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Apr 2020 14:58:19 -0700 Subject: [PATCH 01/30] Add support for migrating shuffle files --- .../org/apache/spark/MapOutputTracker.scala | 23 ++- .../scala/org/apache/spark/SparkContext.scala | 15 +- .../scala/org/apache/spark/SparkEnv.scala | 3 +- .../spark/internal/config/package.scala | 15 ++ .../apache/spark/scheduler/MapStatus.scala | 12 +- .../shuffle/IndexShuffleBlockResolver.scala | 104 ++++++++++- .../org/apache/spark/storage/BlockId.scala | 3 + .../apache/spark/storage/BlockManager.scala | 171 ++++++++++++++++-- .../spark/storage/BlockManagerMaster.scala | 1 - .../storage/BlockManagerMasterEndpoint.scala | 21 ++- ...nDecommissionedBlockManagerException.scala | 21 +++ .../sort/IndexShuffleBlockResolverSuite.scala | 3 + .../BlockManagerDecommissionSuite.scala | 48 ++++- .../BlockManagerReplicationSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 2 +- .../integrationtest/DecommissionSuite.scala | 13 +- .../k8s/integrationtest/KubernetesSuite.scala | 26 ++- .../tests/decommissioning.py | 23 ++- .../streaming/ReceivedBlockHandlerSuite.scala | 2 +- 19 files changed, 462 insertions(+), 46 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ec8621bc55cf3..382f4a764feca 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -49,7 +49,7 @@ import org.apache.spark.util._ * * All public methods of this class are thread-safe. */ -private class ShuffleStatus(numPartitions: Int) { +private class ShuffleStatus(numPartitions: Int) extends Logging { private val (readLock, writeLock) = { val lock = new ReentrantReadWriteLock() @@ -121,6 +121,20 @@ private class ShuffleStatus(numPartitions: Int) { mapStatuses(mapIndex) = status } + /** + * Update the map output location (e.g. during migration). + */ + def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock { + val mapStatusOpt = mapStatuses.find(_.mapId == mapId) + mapStatusOpt match { + case Some(mapStatus) => + mapStatus.updateLocation(bmAddress) + invalidateSerializedMapOutputStatusCache() + case None => + logError("Asked to update map output ${mapId} for untracked map status.") + } + } + /** * Remove the map output which was served by the specified block manager. * This is a no-op if there is no registered map output or if the registered output is from a @@ -479,6 +493,13 @@ private[spark] class MapOutputTrackerMaster( } } + def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = { + shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => shuffleStatus.updateMapOutput(mapId, bmAddress) + case None => logError("Asked to update map output for unknown shuffle ${shuffleId}") + } + } + def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = { shuffleStatuses(shuffleId).addMapOutput(mapIndex, status) } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5c92527b7b80e..0b9ebf75d1b03 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -57,7 +57,7 @@ import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.shuffle.ShuffleDataIOUtils import org.apache.spark.shuffle.api.ShuffleDriverComponents @@ -1586,7 +1586,7 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.removeListener(listener) } - private[spark] def getExecutorIds(): Seq[String] = { + def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: ExecutorAllocationClient => b.getExecutorIds() @@ -1725,6 +1725,17 @@ class SparkContext(config: SparkConf) extends Logging { } } + + @DeveloperApi + def decommissionExecutors(executorIds: Seq[String]): Unit = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + executorIds.foreach(b.decommissionExecutor) + case _ => + logWarning("Decommissioning executors is not supported by current scheduler.") + } + } + /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8ba1739831803..d543359f4dedf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -367,7 +367,8 @@ object SparkEnv extends Logging { externalShuffleClient } else { None - }, blockManagerInfo)), + }, blockManagerInfo, + mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1bc273477ba93..7a88126699d8c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -420,6 +420,21 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.shuffle_blocks") + .doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " + + "an indexed shuffle resolver (like sort based shuffe)") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + + private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.rdd_blocks") + .doc("Whether to transfer RDD blocks during block manager decommissioning.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 7f8893ff3b9d8..9dee1f779bcb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -33,9 +33,11 @@ import org.apache.spark.util.Utils * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. */ private[spark] sealed trait MapStatus { - /** Location where this task was run. */ + /** Location where this task output is. */ def location: BlockManagerId + def updateLocation(bm: BlockManagerId): Unit + /** * Estimated size for the reduce block, in bytes. * @@ -126,6 +128,10 @@ private[spark] class CompressedMapStatus( override def location: BlockManagerId = loc + override def updateLocation(bm: BlockManagerId): Unit = { + loc = bm + } + override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } @@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private ( override def location: BlockManagerId = loc + override def updateLocation(bm: BlockManagerId): Unit = { + loc = bm + } + override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) if (emptyBlocks.contains(reduceId)) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index af2c82e771970..b959e83599d14 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import java.io._ +import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.file.Files @@ -25,8 +26,10 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExecutorDiskUtils +import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver( def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) + /** + * Get the shuffle files that are stored locally. Used for block migrations. + */ + def getStoredShuffles(): Set[(Int, Long)] = { + // Matches ShuffleIndexBlockId name + val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r + val rootDirs = blockManager.diskBlockManager.localDirs + // ExecutorDiskUtil puts things inside one level hashed sub directories + val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs + val filenames = searchDirs.flatMap(_.list()) + logDebug(s"Got block files ${filenames.toList}") + filenames.flatMap{ fname => + pattern.findAllIn(fname).matchData.map { + matched => (matched.group(1).toInt, matched.group(2).toLong) + } + }.toSet + } + + /** * Get the shuffle data file. * @@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver( } } + /** + * Write a provided shuffle block as a stream. Used for block migrations. + * ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock. + * Requires the caller to delete any shuffle index blocks where the shuffle block fails to + * put. + */ + def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): + StreamCallbackWithID = { + val file = blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + getIndexFile(shuffleId, mapId) + case ShuffleBlockBatchId(shuffleId, mapId, _, _) => + getDataFile(shuffleId, mapId) + case _ => + throw new Exception(s"Unexpected shuffle block transfer ${blockId}") + } + val fileTmp = Utils.tempFileWith(file) + val channel = Channels.newChannel( + serializerManager.wrapStream(blockId, + new FileOutputStream(fileTmp))) + + new StreamCallbackWithID { + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + logTrace(s"Done receiving block $blockId, now putting into local shuffle service") + channel.close() + val diskSize = fileTmp.length() + this.synchronized { + if (file.exists()) { + file.delete() + } + if (!fileTmp.renameTo(file)) { + throw new IOException(s"fail to rename file ${fileTmp} to ${file}") + } + } + blockManager.reportBlockStatus(blockId, BlockStatus( + StorageLevel( + useDisk = true, + useMemory = false, + useOffHeap = false, + deserialized = false, + replication = 0) + , 0, diskSize)) + } + + override def onFailure(streamId: String, cause: Throwable): Unit = { + // the framework handles the connection itself, we just need to do local cleanup + channel.close() + fileTmp.delete() + } + } + } + + /** + * Get the index & data block for migration. + */ + def getMigrationBlocks(shuffleId: Int, mapId: Long): + ((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = { + // Load the index block + val indexFile = getIndexFile(shuffleId, mapId) + val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0) + val indexFileSize = indexFile.length() + val indexBlockData = new FileSegmentManagedBuffer(transportConf, indexFile, 0, indexFileSize) + + // Load the data block + val dataFile = getDataFile(shuffleId, mapId) + val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0) + val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length()) + ((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) + } + + /** * Write an index file with the offsets of each block, plus a final offset at the end for the * end of the output file. This will be used by getBlockData to figure out where each block @@ -169,7 +271,7 @@ private[spark] class IndexShuffleBlockResolver( val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. - synchronized { + this.synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 68ed3aa5b062f..398db1f4875ee 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -40,6 +40,9 @@ sealed abstract class BlockId { def isRDD: Boolean = isInstanceOf[RDDBlockId] def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] + def isInternalShuffle: Boolean = { + isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId] + } override def toString: String = name } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aa15d1253b3f7..303e796ab8047 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -26,6 +26,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit} import scala.collection.mutable import scala.collection.mutable.HashMap +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -53,6 +54,7 @@ import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ @@ -254,6 +256,10 @@ private[spark] class BlockManager( var hostLocalDirManager: Option[HostLocalDirManager] = None + private lazy val indexShuffleResolver: IndexShuffleBlockResolver = { + shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] + } + /** * Abstraction for storing blocks from bytes, whether they start in memory or on disk. * @@ -650,6 +656,19 @@ private[spark] class BlockManager( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { + // Delegate shuffle blocks here to resolver if supported + if (blockId.isShuffle || blockId.isInternalShuffle) { + logDebug(s"Putting shuffle block ${blockId}") + try { + return indexShuffleResolver.putShuffleBlockAsStream(blockId, serializerManager) + } catch { + case e: ClassCastException => throw new Exception( + s"Unexpected shuffle block ${blockId} with unsupported shuffle " + + s"resolver ${shuffleManager.shuffleBlockResolver}") + } + } + logDebug(s"Putting regular block ${blockId}") + // All other blocks val (_, tmpFile) = diskBlockManager.createTempLocalBlock() val channel = new CountingWritableChannel( Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) @@ -720,7 +739,7 @@ private[spark] class BlockManager( * it is still valid). This ensures that update in master will compensate for the increase in * memory on slave. */ - private def reportBlockStatus( + private[spark] def reportBlockStatus( blockId: BlockId, status: BlockStatus, droppedMemorySize: Long = 0L): Unit = { @@ -1285,6 +1304,9 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + if (blockManagerDecommissioning && blockId.isRDD) { + throw new RDDBlockSavedOnDecommissionedBlockManagerException(blockId.asRDDId.get) + } val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) @@ -1777,7 +1799,7 @@ private[spark] class BlockManager( def decommissionBlockManager(): Unit = { if (!blockManagerDecommissioning) { - logInfo("Starting block manager decommissioning process") + logInfo("Starting block manager decommissioning process...") blockManagerDecommissioning = true decommissionManager = Some(new BlockManagerDecommissionManager(conf)) decommissionManager.foreach(_.start()) @@ -1786,6 +1808,109 @@ private[spark] class BlockManager( } } + + // Shuffles which are either in queue for migrations or migrated + private val migratingShuffles = mutable.HashSet[(Int, Long)]() + // Shuffles which are queued for migration + private val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]() + + + private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable { + @volatile var running = true + override def run(): Unit = { + var migrating: Option[(Int, Long)] = None + // Once a block fails to transfer to an executor stop trying to transfer more blocks + try { + while (running) { + val migrating = Option(shufflesToMigrate.poll()) + migrating match { + case None => + // Nothing to do right now, but maybe a transfer will fail or a new block + // will finish being committed. + val SLEEP_TIME_SECS = 5 + Thread.sleep(SLEEP_TIME_SECS * 1000L) + case Some((shuffleId, mapId)) => + logInfo(s"Trying to migrate ${shuffleId},${mapId} to ${peer}") + val ((indexBlockId, indexBuffer), (dataBlockId, dataBuffer)) = + indexShuffleResolver.getMigrationBlocks(shuffleId, mapId) + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + indexBlockId, + indexBuffer, + StorageLevel( + useDisk=true, + useMemory=false, + useOffHeap=false, + deserialized=false, + replication=1), + null)// class tag, we don't need for shuffle + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + dataBlockId, + dataBuffer, + StorageLevel( + useDisk=true, + useMemory=false, + useOffHeap=false, + deserialized=false, + replication=1), + null)// class tag, we don't need for shuffle + } + } + } catch { + case e: Exception => + migrating match { + case Some(shuffleMap) => + logError("Error ${e} during migration, adding ${shuffleMap} back to migration queue") + shufflesToMigrate.add(shuffleMap) + case None => + logError("Error ${e} while waiting for block to migrate") + } + } + } + } + + private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() + + /** + * Tries to offload all shuffle blocks that are registered with the shuffle service locally. + * Note: this does not delete the shuffle files in-case there is an in-progress fetch + * but rather shadows them. + * Requires an Indexed based shuffle resolver. + */ + def offloadShuffleBlocks(): Unit = { + // Update the queue of shuffles to be migrated + logDebug("Offloading shuffle blocks") + val localShuffles = indexShuffleResolver.getStoredShuffles() + logDebug(s"My local shuffles are ${localShuffles.toList}") + val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq + logDebug(s"My new shuffles to migrate ${newShufflesToMigrate.toList}") + shufflesToMigrate.addAll(newShufflesToMigrate.asJava) + migratingShuffles ++= newShufflesToMigrate + + // Update the threads doing migrations + // TODO: Sort & only start as many threads as min(||blocks||, ||targets||) using location pref + val livePeerSet = getPeers(false).toSet + val currentPeerSet = migrationPeers.keys.toSet + val deadPeers = currentPeerSet.&~(livePeerSet) + val newPeers = livePeerSet.&~(currentPeerSet) + migrationPeers ++= newPeers.map{peer => + logDebug(s"Starting thread to migrate shuffle blocks to ${peer}") + val executor = ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}") + val runnable = new ShuffleMigrationRunnable(peer) + executor.submit(runnable) + (peer, runnable) + } + // A peer may have entered a decommissioning state, don't transfer any new blocks + deadPeers.map{peer => + migrationPeers.get(peer).map(_.running = false) + } + } + /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing @@ -1794,7 +1919,7 @@ private[spark] class BlockManager( val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) if (replicateBlocksInfo.nonEmpty) { - logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + + logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + "for block manager decommissioning") } @@ -1901,18 +2026,36 @@ private[spark] class BlockManager( */ private class BlockManagerDecommissionManager(conf: SparkConf) { @volatile private var stopped = false - private val blockReplicationThread = new Thread { + private val blockMigrationThread = new Thread { + val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + override def run(): Unit = { while (blockManagerDecommissioning && !stopped) { + logInfo("Iterating on migrating from the block manager.") try { - logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() - logInfo("Attempt to replicate all cached blocks done") - val sleepInterval = conf.get( - config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + // If enabled we migrate shuffle blocks first as they are more expensive. + if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { + logDebug(s"Attempting to replicate all cached RDD blocks") + offloadShuffleBlocks() + logInfo(s"Attempt to replicate all cached blocks done") + } + if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) { + logDebug(s"Attempting to replicate all cached RDD blocks") + decommissionRddCacheBlocks() + logInfo(s"Attempt to replicate all cached blocks done") + } + if (!conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED) && + !conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { + logWarning("Decommissioning, but no task configured set one or both:\n" + + "spark.storage.decommission.shuffle_blocks\n" + + "spark.storage.decommission.rdd_blocks") + } + logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case _: InterruptedException => + logInfo("Interrupted during migration, will not refresh migrations.") // no-op case NonFatal(e) => logError("Error occurred while trying to " + @@ -1921,20 +2064,20 @@ private[spark] class BlockManager( } } } - blockReplicationThread.setDaemon(true) - blockReplicationThread.setName("block-replication-thread") + blockMigrationThread.setDaemon(true) + blockMigrationThread.setName("block-replication-thread") def start(): Unit = { logInfo("Starting block replication thread") - blockReplicationThread.start() + blockMigrationThread.start() } def stop(): Unit = { if (!stopped) { stopped = true logInfo("Stopping block replication thread") - blockReplicationThread.interrupt() - blockReplicationThread.join() + blockMigrationThread.interrupt() + blockMigrationThread.join() } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3cfa5d2a25818..ff0b099e44b51 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.storage import scala.collection.generic.CanBuildFrom diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d936420a99276..bad3d5de5e077 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -28,7 +28,7 @@ import scala.util.Random import com.google.common.cache.CacheBuilder -import org.apache.spark.SparkConf +import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExternalBlockStoreClient @@ -48,7 +48,8 @@ class BlockManagerMasterEndpoint( conf: SparkConf, listenerBus: LiveListenerBus, externalBlockStoreClient: Option[ExternalBlockStoreClient], - blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo]) + blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], + mapOutputTracker: MapOutputTrackerMaster) extends IsolatedRpcEndpoint with Logging { // Mapping from executor id to the block manager's local disk directories. @@ -157,7 +158,8 @@ class BlockManagerMasterEndpoint( context.reply(true) case DecommissionBlockManagers(executorIds) => - decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get)) + val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get) + decommissionBlockManagers(bmIds) context.reply(true) case GetReplicateInfoForRDDBlocks(blockManagerId) => @@ -489,6 +491,7 @@ class BlockManagerMasterEndpoint( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { + logDebug(s"Updating block info on master ${blockId}") if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { @@ -505,6 +508,18 @@ class BlockManagerMasterEndpoint( return true } + if (blockId.isInternalShuffle && storageLevel.isValid) { + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + // Don't update the map output on just the index block + logDebug("Received shuffle index block update for ${shuffleId} ${mapId}") + case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + case _ => + logError(s"Unexpected shuffle block type ${blockId}") + } + } + blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) var locations: mutable.HashSet[BlockManagerId] = null diff --git a/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala b/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala new file mode 100644 index 0000000000000..e6cef4dcc5e38 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/RDDBlockSavedOnDecommissionedBlockManagerException.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +class RDDBlockSavedOnDecommissionedBlockManagerException(blockId: RDDBlockId) + extends Exception(s"RDD Block $blockId cannot be saved on decommissioned executor") diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 27bb06b4e0636..b4092be466e27 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -48,6 +48,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(diskBlockManager.getFile(any[BlockId])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) + when(diskBlockManager.localDirs).thenReturn(Array(tempDir)) } override def afterEach(): Unit = { @@ -73,6 +74,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp) + val storedShuffles = resolver.getStoredShuffles() val indexFile = new File(tempDir.getAbsolutePath, idxName) val dataFile = resolver.getDataFile(shuffleId, mapId) @@ -81,6 +83,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp.exists()) + assert(storedShuffles === Set((1, 2))) val lengths2 = new Array[Long](3) val dataTmp2 = File.createTempFile("shuffle", null, tempDir) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 59fb056dae628..d65d3b9660736 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -41,6 +41,14 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { + // runDecomTest(true, false) + } + + test(s"verify that shuffle blocks are migrated.") { + runDecomTest(false, true) + } + + private def runDecomTest(persist: Boolean, shuffle: Boolean) = { // Create input RDD with 10 partitions val input = sc.parallelize(1 to 10, 10) val accum = sc.longAccumulator("mapperRunAccumulator") @@ -52,7 +60,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val sleepyRdd = input.mapPartitions { x => Thread.sleep(500) accum.add(1) - x + x.map(y => (y, y)) + } + val testRdd = shuffle match { + case true => sleepyRdd.reduceByKey(_ + _) + case false => sleepyRdd } // Listen for the job @@ -69,10 +81,12 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext }) // Cache the RDD lazily - sleepyRdd.persist() + if (persist) { + testRdd.persist() + } // Start the computation of RDD - this step will also cache the RDD - val asyncCount = sleepyRdd.countAsync() + val asyncCount = testRdd.countAsync() // Wait for the job to have started sem.acquire(1) @@ -82,23 +96,41 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val execs = sched.getExecutorIds() assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}") val execToDecommission = execs.head - sched.decommissionExecutor(execToDecommission) + // sched.decommissionExecutor(execToDecommission) // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 5.seconds) assert(asyncCountResult === 10) // All 10 tasks finished, so accum should have been increased 10 times assert(accum.value === 10) // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() - assert(taskEndEvents.size === 10) // 10 mappers + if (shuffle) { + assert(taskEndEvents.size === 20) // 10 mappers & 10 reducers + } else { + assert(taskEndEvents.size === 10) // 10 mappers + } assert(taskEndEvents.map(_.reason).toSet === Set(Success)) - // Since the RDD is cached, so further usage of same RDD should use the + // all blocks should have been shifted from decommissioned block manager + // after some time + Thread.sleep(1000) + + // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum // should have same value like before - assert(sleepyRdd.count() === 10) + assert(testRdd.count() === 10) assert(accum.value === 10) + + val storageStatus = sc.env.blockManager.master.getStorageStatus + val execIdToBlocksMapping = storageStatus.map( + status => (status.blockManagerId.executorId, status.blocks)).toMap + // No cached blocks should be present on executor which was decommissioned + assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq()) + if (persist) { + // There should still be all 10 RDD blocks cached + assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 660bfcfc48267..d18d84dfaa9e5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -103,7 +103,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index eb875dcc44223..f9e29d4ddedc7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -149,7 +149,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE liveListenerBus = spy(new LiveListenerBus(conf)) master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - liveListenerBus, None, blockManagerInfo)), + liveListenerBus, None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index becf9415c7506..87580a753f273 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -28,18 +28,29 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true") .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.container.image", pyImage) + .set("spark.storage.decommission.enabled", "true") + .set("spark.storage.decommission.shuffle_blocks", "true") + .set("spark.storage.decommission.shuffle_blocks", "true") + //Ensure we have somewhere to migrate our data too + .set("spark.executor.instances", "3") + // The default of 30 seconds is fine, but for testing we just want to get this done fast. + .set("spark.storage.decommission.replicationReattemptInterval", "1s") + .set("spark.storage.decommission.rdd_blocks", "true") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_DECOMISSIONING, mainClass = "", expectedLogOnCompletion = Seq( "Finished waiting, stopping Spark", - "decommissioning executor"), + "decommissioning executor", + "Final accumulator value is: 100"), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, executorPodChecker = doBasicExecutorPyPodCheck, appLocator = appLocator, isJVM = false, + pyFiles = None, + executorPatience = None, decommissioningTest = true) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 4de7e70c1f409..739c91bb0b1f3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,9 +40,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite + with BeforeAndAfterAll with BeforeAndAfter /* with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually + with DepsTestsSuite */ with DecommissionSuite /* with RTestsSuite */ with Logging with Eventually with Matchers { @@ -325,7 +325,7 @@ class KubernetesSuite extends SparkFunSuite val result = checkPodReady(namespace, name) result shouldBe (true) } - // Look for the string that indicates we're good to clean up + // Look for the string that indicates we're good to trigger decom // on the driver logDebug("Waiting for first collect...") Eventually.eventually(TIMEOUT, INTERVAL) { @@ -333,13 +333,29 @@ class KubernetesSuite extends SparkFunSuite .pods() .withName(driverPodName) .getLog - .contains("Waiting to give nodes time to finish."), + .contains("Waiting to give nodes time to finish migration, decom exec 1."), "Decommission test did not complete first collect.") } // Delete the pod to simulate cluster scale down/migration. - val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) + // This will allow the pod to remain up for the grace period + val pod = kubernetesTestComponents.kubernetesClient.pods() + .withName(name) pod.delete() logDebug(s"Triggered pod decom/delete: $name deleted") + // Look for the string that indicates we should force kill the first + // Executor. This simulates the pod being fully lost. + logDebug("Waiting for second collect...") + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + .contains("Waiting some more, please kill exec 1."), + "Decommission test did not complete second collect.") + } + logDebug("Force deleting") + val podNoGrace = pod.withGracePeriod(0) + podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index f68f24d49763d..9d9b0c4261886 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -31,14 +31,27 @@ .appName("PyMemoryTest") \ .getOrCreate() sc = spark._sc - rdd = sc.parallelize(range(10)) + acc = sc.accumulator(0) + def addToAcc(x): + acc.add(1) + return x + initialRdd = sc.parallelize(range(100), 5) + accRdd = initialRdd.map(addToAcc) + # Trigger a shuffle so there are shuffle blocks to migrate + rdd = accRdd.map(lambda x: (x, x)).groupByKey() rdd.collect() - print("Waiting to give nodes time to finish.") + print("1st accumulator value is: "+ str(acc.value)) + print("Waiting to give nodes time to finish migration, decom exec 1.") + print("...") time.sleep(5) + rdd.count() + print("Waiting some more, please kill exec 1.") + print("...") + time.sleep(5) + print("Executor node should be deleted now") + rdd.count() rdd.collect() - print("Waiting some more....") - time.sleep(10) - rdd.collect() + print("Final accumulator value is: "+ str(acc.value)) print("Finished waiting, stopping Spark.") spark.stop() print("Done, exiting Python") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 0976494b6d094..558e2c99e0442 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -91,7 +91,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) From 8ee89499dfd41f40c20681cc95bae93eb2529d29 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Apr 2020 15:15:03 -0700 Subject: [PATCH 02/30] Style fixes --- .../apache/spark/storage/BlockManager.scala | 22 ++++++++----------- .../integrationtest/DecommissionSuite.scala | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 303e796ab8047..65d1efb82a8f4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -24,9 +24,9 @@ import java.nio.channels.Channels import java.util.Collections import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.HashMap -import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -1819,6 +1819,12 @@ private[spark] class BlockManager( @volatile var running = true override def run(): Unit = { var migrating: Option[(Int, Long)] = None + val storageLevel = StorageLevel( + useDisk = true, + useMemory = false, + useOffHeap = false, + deserialized = false, + replication = 1) // Once a block fails to transfer to an executor stop trying to transfer more blocks try { while (running) { @@ -1839,12 +1845,7 @@ private[spark] class BlockManager( peer.executorId, indexBlockId, indexBuffer, - StorageLevel( - useDisk=true, - useMemory=false, - useOffHeap=false, - deserialized=false, - replication=1), + storageLevel, null)// class tag, we don't need for shuffle blockTransferService.uploadBlockSync( peer.host, @@ -1852,12 +1853,7 @@ private[spark] class BlockManager( peer.executorId, dataBlockId, dataBuffer, - StorageLevel( - useDisk=true, - useMemory=false, - useOffHeap=false, - deserialized=false, - replication=1), + storageLevel, null)// class tag, we don't need for shuffle } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index 87580a753f273..e71d9ea127d25 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -31,7 +31,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => .set("spark.storage.decommission.enabled", "true") .set("spark.storage.decommission.shuffle_blocks", "true") .set("spark.storage.decommission.shuffle_blocks", "true") - //Ensure we have somewhere to migrate our data too + // Ensure we have somewhere to migrate our data too .set("spark.executor.instances", "3") // The default of 30 seconds is fine, but for testing we just want to get this done fast. .set("spark.storage.decommission.replicationReattemptInterval", "1s") From afb1b1aa7a0ffbc19f314fa16a062159d354253f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Apr 2020 15:18:04 -0700 Subject: [PATCH 03/30] Re-enable the rest of the K8s tests. --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 739c91bb0b1f3..f0c10668aa563 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,9 +40,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter /* with BasicTestsSuite with SecretsTestsSuite + with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite */ with DecommissionSuite /* with RTestsSuite */ with Logging with Eventually + with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually with Matchers { From 4071ae2bf0582880e6f051296c0fdc875368ab32 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Apr 2020 15:21:30 -0700 Subject: [PATCH 04/30] Python style fix --- .../kubernetes/integration-tests/tests/decommissioning.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 9d9b0c4261886..84dc514b8988b 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -32,15 +32,17 @@ .getOrCreate() sc = spark._sc acc = sc.accumulator(0) + def addToAcc(x): acc.add(1) return x + initialRdd = sc.parallelize(range(100), 5) accRdd = initialRdd.map(addToAcc) # Trigger a shuffle so there are shuffle blocks to migrate rdd = accRdd.map(lambda x: (x, x)).groupByKey() rdd.collect() - print("1st accumulator value is: "+ str(acc.value)) + print("1st accumulator value is: " + str(acc.value)) print("Waiting to give nodes time to finish migration, decom exec 1.") print("...") time.sleep(5) @@ -51,7 +53,7 @@ def addToAcc(x): print("Executor node should be deleted now") rdd.count() rdd.collect() - print("Final accumulator value is: "+ str(acc.value)) + print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") spark.stop() print("Done, exiting Python") From ff620ba346563b62a24c8d2398b8867b73ee0b19 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Apr 2020 18:20:40 -0700 Subject: [PATCH 05/30] Don't join the block migration thread, we'll block on that --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 65d1efb82a8f4..78a05d2c4900a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -2050,8 +2050,9 @@ private[spark] class BlockManager( logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { - case _: InterruptedException => + case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") + stopped = true // no-op case NonFatal(e) => logError("Error occurred while trying to " + @@ -2073,7 +2074,6 @@ private[spark] class BlockManager( stopped = true logInfo("Stopping block replication thread") blockMigrationThread.interrupt() - blockMigrationThread.join() } } } From adb03dbf44f10e4e254e8a5c3957a058a0dd8de7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 1 May 2020 16:16:08 -0700 Subject: [PATCH 06/30] We only need two executors for this test. --- .../apache/spark/storage/BlockManagerDecommissionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index d65d3b9660736..0c8259cd77449 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -36,7 +36,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) .set(config.STORAGE_DECOMMISSION_ENABLED, true) - sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf) + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } test(s"verify that an already running task which is going to cache data succeeds " + From be2a5e736e051ca0497906b2a2e904c7b4033596 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 1 May 2020 17:32:11 -0700 Subject: [PATCH 07/30] Try and update the tests some more, switch migration to not make a new forkjoinpool on each iteration --- .../spark/internal/config/package.scala | 4 +- .../apache/spark/storage/BlockManager.scala | 4 +- .../storage/BlockManagerMasterEndpoint.scala | 5 ++- .../BlockManagerDecommissionSuite.scala | 39 ++++++++++++------- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7a88126699d8c..f1b7c8997b899 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -426,14 +426,14 @@ package object config { "an indexed shuffle resolver (like sort based shuffe)") .version("3.1.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED = ConfigBuilder("spark.storage.decommission.rdd_blocks") .doc("Whether to transfer RDD blocks during block manager decommissioning.") .version("3.1.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 78a05d2c4900a..47ec54ea07902 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1925,8 +1925,7 @@ private[spark] class BlockManager( // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) // so that we end up prioritize them over each other - val blocksFailedReplication = ThreadUtils.parmap( - replicateBlocksInfo, "decommissionRddCacheBlocks", 4) { + val blocksFailedReplication = replicateBlocksInfo.map{ case ReplicateBlock(blockId, existingReplicas, maxReplicas) => val replicatedSuccessfully = replicateBlock( blockId, @@ -2053,7 +2052,6 @@ private[spark] class BlockManager( case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") stopped = true - // no-op case NonFatal(e) => logError("Error occurred while trying to " + "replicate cached RDD blocks for block manager decommissioning", e) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index bad3d5de5e077..c61a62f86a682 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -336,13 +336,14 @@ class BlockManagerMasterEndpoint( val info = blockManagerInfo(blockManagerId) val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) - rddBlocks.map { blockId => + val result = rddBlocks.map { blockId => val currentBlockLocations = blockLocations.get(blockId) val maxReplicas = currentBlockLocations.size + 1 val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) replicateMsg }.toSeq + result } // Remove a block from the slaves that have it. This can only be used to remove @@ -491,7 +492,7 @@ class BlockManagerMasterEndpoint( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { - logDebug(s"Updating block info on master ${blockId}") + logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}") if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 0c8259cd77449..e327256b7e16f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -30,18 +30,11 @@ import org.apache.spark.util.ThreadUtils class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext { - override def beforeEach(): Unit = { - val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) - .set(config.STORAGE_DECOMMISSION_ENABLED, true) - - sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) - } + val numExecs = 2 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { - // runDecomTest(true, false) + runDecomTest(true, false) } test(s"verify that shuffle blocks are migrated.") { @@ -49,6 +42,16 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } private def runDecomTest(persist: Boolean, shuffle: Boolean) = { + val master = s"local-cluster[${numExecs}, 1, 1024]" + val conf = new SparkConf().setAppName("test").setMaster(master) + .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L) + .set(config.STORAGE_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist) + .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) + + sc = new SparkContext(master, "test", conf) + // Create input RDD with 10 partitions val input = sc.parallelize(1 to 10, 10) val accum = sc.longAccumulator("mapperRunAccumulator") @@ -91,15 +94,19 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for the job to have started sem.acquire(1) + // Give Spark a tiny bit to start the tasks after the listener says hello + Thread.sleep(100) + // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}") + assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}") val execToDecommission = execs.head - // sched.decommissionExecutor(execToDecommission) + logDebug(s"Decommissioning executor ${execToDecommission}") + sched.decommissionExecutor(execToDecommission) // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 5.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds) assert(asyncCountResult === 10) // All 10 tasks finished, so accum should have been increased 10 times assert(accum.value === 10) @@ -107,9 +114,13 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() if (shuffle) { - assert(taskEndEvents.size === 20) // 10 mappers & 10 reducers + // 10 mappers & 10 reducers + assert(taskEndEvents.size === 20, + s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})") } else { - assert(taskEndEvents.size === 10) // 10 mappers + // 10 mappers + assert(taskEndEvents.size === 10, + s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})") } assert(taskEndEvents.map(_.reason).toSet === Set(Success)) From 783114b6d7c951da282d6ba1b3fc7f88765c090d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 26 May 2020 11:49:01 -0700 Subject: [PATCH 08/30] Code cleanups (swap some maps for foreach where we didn't need the results & fix some potential test flakes) as suggested by @attilapiros during review (thanks) --- .../scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../spark/storage/BlockManagerDecommissionSuite.scala | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47ec54ea07902..ac8d9310dd8d3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1902,8 +1902,8 @@ private[spark] class BlockManager( (peer, runnable) } // A peer may have entered a decommissioning state, don't transfer any new blocks - deadPeers.map{peer => - migrationPeers.get(peer).map(_.running = false) + deadPeers.foreach{peer => + migrationPeers.get(peer).foreach(_.running = false) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index e327256b7e16f..5d4558855601b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -114,15 +114,14 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() if (shuffle) { - // 10 mappers & 10 reducers - assert(taskEndEvents.size === 20, + // 10 mappers & 10 reducers which succeeded + assert(taskEndEvents.count(_.reason == Success) === 20, s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})") } else { - // 10 mappers - assert(taskEndEvents.size === 10, + // 10 mappers which executed successfully + assert(taskEndEvents.count(_.reason == Success) === 10, s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})") } - assert(taskEndEvents.map(_.reason).toSet === Set(Success)) // all blocks should have been shifted from decommissioned block manager // after some time From a240f9857dcf53631b269422b4652a05304691be Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 26 May 2020 16:59:06 -0700 Subject: [PATCH 09/30] Add missing follow up commit from merge --- .../org/apache/spark/storage/BlockManagerDecommissionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 01ce4d50c1bd7..90d5efdcc61fe 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -51,7 +51,6 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) sc = new SparkContext(master, "test", conf) - } // Create input RDD with 10 partitions val input = sc.parallelize(1 to 10, 10) From ef8fcc5ace22bf8c9d36f3da7e963fc76024a0da Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 13:21:49 -0700 Subject: [PATCH 10/30] Use NOOP_REDUCE_ID --- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 4 ++-- .../apache/spark/storage/BlockManagerDecommissionSuite.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index b959e83599d14..dd991373b81ac 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -238,13 +238,13 @@ private[spark] class IndexShuffleBlockResolver( ((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = { // Load the index block val indexFile = getIndexFile(shuffleId, mapId) - val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, 0) + val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) val indexFileSize = indexFile.length() val indexBlockData = new FileSegmentManagedBuffer(transportConf, indexFile, 0, indexFileSize) // Load the data block val dataFile = getDataFile(shuffleId, mapId) - val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, 0) + val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length()) ((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 90d5efdcc61fe..38c3f88e8fce1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -138,7 +138,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val execIdToBlocksMapping = storageStatus.map( status => (status.blockManagerId.executorId, status.blocks)).toMap // No cached blocks should be present on executor which was decommissioned - assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq()) + assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), + "Cache blocks should be migrated") if (persist) { // There should still be all 10 RDD blocks cached assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) From 838a346bc27e5d9b7892866ac6faf3188b93b615 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 13:30:32 -0700 Subject: [PATCH 11/30] Config shorter interval for testing. --- .../apache/spark/storage/BlockManagerDecommissionSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 38c3f88e8fce1..b77cab3c9029d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -49,6 +49,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist) .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) sc = new SparkContext(master, "test", conf) @@ -125,7 +126,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } // all blocks should have been shifted from decommissioned block manager - // after some time + // after some time. Thread.sleep(1000) // Since the RDD is cached or shuffled so further usage of same RDD should use the From e85c8efa4806fb82a0da228c7f9d4194775eb937 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 13:36:24 -0700 Subject: [PATCH 12/30] Wait for the desired number of executors to be present. --- .../spark/storage/BlockManagerDecommissionSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index b77cab3c9029d..8241bcbc29748 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success, + TestUtils} import org.apache.spark.internal.config import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend @@ -89,6 +90,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext testRdd.persist() } + // Wait for all of the executors to start + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = numExecs, + timeout = 10000) // 10s + // Start the computation of RDD - this step will also cache the RDD val asyncCount = testRdd.countAsync() From 2da0f2d761659a92ebf44a6f134d9640cff0138a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 14:36:15 -0700 Subject: [PATCH 13/30] Make a MigratableResolver interface so custom shuffle implementations can experiment with this. --- .../shuffle/IndexShuffleBlockResolver.scala | 11 +++-- .../apache/spark/storage/BlockManager.scala | 40 ++++++++----------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index dd991373b81ac..8fce9253ce8ba 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -49,7 +49,7 @@ private[spark] class IndexShuffleBlockResolver( conf: SparkConf, _blockManager: BlockManager = null) extends ShuffleBlockResolver - with Logging { + with Logging with MigratableResolver { private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager) @@ -61,7 +61,7 @@ private[spark] class IndexShuffleBlockResolver( /** * Get the shuffle files that are stored locally. Used for block migrations. */ - def getStoredShuffles(): Set[(Int, Long)] = { + override def getStoredShuffles(): Set[(Int, Long)] = { // Matches ShuffleIndexBlockId name val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r val rootDirs = blockManager.diskBlockManager.localDirs @@ -176,7 +176,7 @@ private[spark] class IndexShuffleBlockResolver( * Requires the caller to delete any shuffle index blocks where the shuffle block fails to * put. */ - def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): + override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): StreamCallbackWithID = { val file = blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => @@ -234,8 +234,7 @@ private[spark] class IndexShuffleBlockResolver( /** * Get the index & data block for migration. */ - def getMigrationBlocks(shuffleId: Int, mapId: Long): - ((BlockId, ManagedBuffer), (BlockId, ManagedBuffer)) = { + def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] = { // Load the index block val indexFile = getIndexFile(shuffleId, mapId) val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID) @@ -246,7 +245,7 @@ private[spark] class IndexShuffleBlockResolver( val dataFile = getDataFile(shuffleId, mapId) val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) val dataBlockData = new FileSegmentManagedBuffer(transportConf, dataFile, 0, dataFile.length()) - ((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) + List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData)) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fe4548e0fe688..3ba109bd65ec6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,7 +54,7 @@ import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} -import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleManager, ShuffleWriteMetricsReporter} +import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ @@ -256,8 +256,8 @@ private[spark] class BlockManager( var hostLocalDirManager: Option[HostLocalDirManager] = None - private lazy val indexShuffleResolver: IndexShuffleBlockResolver = { - shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] + private lazy val migratableResolver: MigratableResolver = { + shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver] } /** @@ -660,7 +660,7 @@ private[spark] class BlockManager( if (blockId.isShuffle || blockId.isInternalShuffle) { logDebug(s"Putting shuffle block ${blockId}") try { - return indexShuffleResolver.putShuffleBlockAsStream(blockId, serializerManager) + return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager) } catch { case e: ClassCastException => throw new Exception( s"Unexpected shuffle block ${blockId} with unsupported shuffle " + @@ -1841,24 +1841,18 @@ private[spark] class BlockManager( Thread.sleep(SLEEP_TIME_SECS * 1000L) case Some((shuffleId, mapId)) => logInfo(s"Trying to migrate ${shuffleId},${mapId} to ${peer}") - val ((indexBlockId, indexBuffer), (dataBlockId, dataBuffer)) = - indexShuffleResolver.getMigrationBlocks(shuffleId, mapId) - blockTransferService.uploadBlockSync( - peer.host, - peer.port, - peer.executorId, - indexBlockId, - indexBuffer, - storageLevel, - null)// class tag, we don't need for shuffle - blockTransferService.uploadBlockSync( - peer.host, - peer.port, - peer.executorId, - dataBlockId, - dataBuffer, - storageLevel, - null)// class tag, we don't need for shuffle + val blocks = + migratableResolver.getMigrationBlocks(shuffleId, mapId) + blocks.foreach { case (blockId, buffer) => + blockTransferService.uploadBlockSync( + peer.host, + peer.port, + peer.executorId, + blockId, + buffer, + storageLevel, + null)// class tag, we don't need for shuffle + } } } } catch { @@ -1885,7 +1879,7 @@ private[spark] class BlockManager( def offloadShuffleBlocks(): Unit = { // Update the queue of shuffles to be migrated logDebug("Offloading shuffle blocks") - val localShuffles = indexShuffleResolver.getStoredShuffles() + val localShuffles = migratableResolver.getStoredShuffles() logDebug(s"My local shuffles are ${localShuffles.toList}") val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq logDebug(s"My new shuffles to migrate ${newShufflesToMigrate.toList}") From 9d317466d4f3f03f4f2431bda5052e78ab3c8ead Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 14:52:11 -0700 Subject: [PATCH 14/30] Add in the trait I refactored the code to use (forgot the git add :p) --- .../spark/shuffle/MigratableResolver.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala diff --git a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala new file mode 100644 index 0000000000000..1ecba0dd4a173 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.storage.BlockId + +/** + * An experimental trait to allow Spark to migrate shuffle blocks. + */ +trait MigratableResolver { + /** + * Get the shuffle ids that are stored locally. Used for block migrations. + */ + def getStoredShuffles(): Set[(Int, Long)] + + /** + * Write a provided shuffle block as a stream. Used for block migrations. + */ + def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): + StreamCallbackWithID + + /** + * Get the blocks for migration for a particular shuffle and map. + */ + def getMigrationBlocks(shuffleId: Int, mapId: Long): List[(BlockId, ManagedBuffer)] +} From 38ff8be4c6020ce14578bcbecb7f392d5126d29c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 15:25:46 -0700 Subject: [PATCH 15/30] Use block updates to make sure our desired blocks are being moved & also remove a thread sleep --- .../BlockManagerDecommissionSuite.scala | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 8241bcbc29748..8d5c5f4c84295 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -22,15 +22,17 @@ import java.util.concurrent.Semaphore import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import org.scalatest.concurrent.Eventually + import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success, TestUtils} import org.apache.spark.internal.config -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext - with ResetSystemProperties { + with ResetSystemProperties with Eventually { val numExecs = 2 @@ -72,9 +74,10 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext case false => sleepyRdd } - // Listen for the job + // Listen for the job & block updates val sem = new Semaphore(0) val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] + val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { sem.release() @@ -83,8 +86,13 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEndEvents.append(taskEnd) } + + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + blocksUpdated.append(blockUpdated) + } }) + // Cache the RDD lazily if (persist) { testRdd.persist() @@ -131,9 +139,26 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})") } - // all blocks should have been shifted from decommissioned block manager - // after some time. - Thread.sleep(1000) + // Wait for our respective blocks to have migrated + eventually(timeout(10.seconds), interval(10.milliseconds)) { + // If cached we expect to see updates for rdd_1_1 on two different BMs + if (persist) { + val numLocs = blocksUpdated.filter{ update => + update.blockUpdatedInfo.blockId.name == "rdd_1_1" + }.map {update => + update.blockUpdatedInfo.blockManagerId }.toSet.size + assert(numLocs > 1, s"Block rdd_1_1 should have been on multiple BMs got ${numLocs}") + } + // If we're migrating shuffles we look for any shuffle block updates + // as there is no block update on the initial shuffle block write. + if (shuffle) { + val numLocs = blocksUpdated.filter{ update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isShuffle || blockId.isInternalShuffle + }.toSet.size + assert(numLocs > 0, s"No shuffle block updates in ${blocksUpdated}") + } + } // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum From 13ec43a5aadca0c9c2c849f09b8b2a544086692d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 15:37:45 -0700 Subject: [PATCH 16/30] Don't hardcode the blockId because it is not constant. --- .../storage/BlockManagerDecommissionSuite.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 8d5c5f4c84295..0bf99e7511ba8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -140,14 +140,15 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } // Wait for our respective blocks to have migrated - eventually(timeout(10.seconds), interval(10.milliseconds)) { - // If cached we expect to see updates for rdd_1_1 on two different BMs + eventually(timeout(15.seconds), interval(10.milliseconds)) { if (persist) { - val numLocs = blocksUpdated.filter{ update => - update.blockUpdatedInfo.blockId.name == "rdd_1_1" - }.map {update => - update.blockUpdatedInfo.blockManagerId }.toSet.size - assert(numLocs > 1, s"Block rdd_1_1 should have been on multiple BMs got ${numLocs}") + // One of our blocks should have moved. + val blockLocs = blocksUpdated.map{ update => + (update.blockUpdatedInfo.blockId.name, + update.blockUpdatedInfo.blockManagerId)} + val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size) + assert(!blocksToManagers.filter(_._2 > 1).isEmpty, + s"We should have a block that has been on multiple BMs in ${blocksUpdated}") } // If we're migrating shuffles we look for any shuffle block updates // as there is no block update on the initial shuffle block write. From a92025c024cb0cf8df29d490dc53a9b04c16e69b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 15:42:08 -0700 Subject: [PATCH 17/30] Test both migrations at the same time & reduce some of the sleeps --- .../spark/storage/BlockManagerDecommissionSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 0bf99e7511ba8..82f749d74fe5b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -45,6 +45,10 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext runDecomTest(false, true) } + test(s"verify that both migrations can work at the same time.") { + runDecomTest(true, true) + } + private def runDecomTest(persist: Boolean, shuffle: Boolean) = { val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) @@ -65,7 +69,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition val sleepyRdd = input.mapPartitions { x => - Thread.sleep(500) + Thread.sleep(300) accum.add(1) x.map(y => (y, y)) } @@ -110,7 +114,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext sem.acquire(1) // Give Spark a tiny bit to start the tasks after the listener says hello - Thread.sleep(100) + Thread.sleep(20) // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] From fe265d7f576b4c3752bfb061060328a8c32ea2ad Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 16:33:25 -0700 Subject: [PATCH 18/30] Tag new APIs --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++++ .../scala/org/apache/spark/shuffle/MigratableResolver.scala | 3 +++ 2 files changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e68849ce6d224..3f105d19a35ba 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1586,6 +1586,11 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.removeListener(listener) } + /** + * :: DeveloperApi :: + * Deregister the listener from Spark's listener bus. + */ + @DeveloperApi def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: ExecutorAllocationClient => diff --git a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala index 1ecba0dd4a173..768d4f8db4364 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala @@ -17,14 +17,17 @@ package org.apache.spark.shuffle +import org.apache.spark.annotation.Experimental import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.BlockId /** + * :: Experimental :: * An experimental trait to allow Spark to migrate shuffle blocks. */ +@Experimental trait MigratableResolver { /** * Get the shuffle ids that are stored locally. Used for block migrations. From 70c3871ed801ed2b5e964e321bcfaba33f8735af Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 28 May 2020 19:28:07 -0700 Subject: [PATCH 19/30] Increase the number of execs and decrease the thread sleep time while increasing the max allowed job time to try and avoid flakiness. --- .../spark/storage/BlockManagerDecommissionSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 82f749d74fe5b..eb0d28aeb2571 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties with Eventually { - val numExecs = 2 + val numExecs = 3 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { @@ -69,7 +69,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition val sleepyRdd = input.mapPartitions { x => - Thread.sleep(300) + Thread.sleep(250) accum.add(1) x.map(y => (y, y)) } @@ -114,7 +114,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext sem.acquire(1) // Give Spark a tiny bit to start the tasks after the listener says hello - Thread.sleep(20) + Thread.sleep(50) // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] @@ -126,7 +126,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext sched.decommissionExecutor(execToDecommission) // Wait for job to finish - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) assert(asyncCountResult === 10) // All 10 tasks finished, so accum should have been increased 10 times assert(accum.value === 10) From 069dd3b849b3c6f4bf78426eff70bd040d9849a2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 May 2020 11:18:35 -0700 Subject: [PATCH 20/30] Update core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala add more information on unexpected block. Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com> --- .../org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 8fce9253ce8ba..fe39af7b9fd6a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -184,7 +184,8 @@ private[spark] class IndexShuffleBlockResolver( case ShuffleBlockBatchId(shuffleId, mapId, _, _) => getDataFile(shuffleId, mapId) case _ => - throw new Exception(s"Unexpected shuffle block transfer ${blockId}") + throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " + + s"${blockId.getClass().getSimpleName()}") } val fileTmp = Utils.tempFileWith(file) val channel = Channels.newChannel( From 6340f9b9c76f1796d87dfb5813eeb26d350ab511 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 May 2020 14:48:23 -0700 Subject: [PATCH 21/30] Fix the migration to store ShuffleDataBlockId, check that data and index blocks have both been migrated, check that RDD blocks are duplicated not just broadcast blocks, make the number of partitions smaller so the test can run faster, avoid the Thread.sleep for all of the tests except for the midflight test where we need it, check for the broadcast blocks landing (further along in scheduling) beyond just task start, force fetching the shuffle block to local disk if in shuffle block test mode, start the job as soon as the first executor comes online. --- .../shuffle/IndexShuffleBlockResolver.scala | 4 +- .../BlockManagerDecommissionSuite.scala | 106 ++++++++++++------ 2 files changed, 74 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index fe39af7b9fd6a..79323be45845c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -180,8 +180,8 @@ private[spark] class IndexShuffleBlockResolver( StreamCallbackWithID = { val file = blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => - getIndexFile(shuffleId, mapId) - case ShuffleBlockBatchId(shuffleId, mapId, _, _) => + getIndexFile(shuffleId, mapId) + case ShuffleDataBlockId(shuffleId, mapId, _) => getDataFile(shuffleId, mapId) case _ => throw new Exception(s"Unexpected shuffle block transfer ${blockId} as " + diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index eb0d28aeb2571..2219a32216075 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -24,8 +24,7 @@ import scala.concurrent.duration._ import org.scalatest.concurrent.Eventually -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success, - TestUtils} +import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend @@ -35,33 +34,41 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties with Eventually { val numExecs = 3 + val numParts = 3 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { - runDecomTest(true, false) + runDecomTest(true, false, true) } test(s"verify that shuffle blocks are migrated.") { - runDecomTest(false, true) + runDecomTest(false, true, false) } test(s"verify that both migrations can work at the same time.") { - runDecomTest(true, true) + runDecomTest(true, true, false) } - private def runDecomTest(persist: Boolean, shuffle: Boolean) = { + private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = { val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist) .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + // Force fetching to local disk + if (shuffle) { + conf.set("spark.network.maxRemoteBlockSizeFetchToMem", "1") + } + sc = new SparkContext(master, "test", conf) // Create input RDD with 10 partitions - val input = sc.parallelize(1 to 10, 10) + val input = sc.parallelize(1 to numParts, numParts) val accum = sc.longAccumulator("mapperRunAccumulator") // Do a count to wait for the executors to be registered. input.count() @@ -69,7 +76,9 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition val sleepyRdd = input.mapPartitions { x => - Thread.sleep(250) + if (migrateDuring) { + Thread.sleep(500) + } accum.add(1) x.map(y => (y, y)) } @@ -79,12 +88,14 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } // Listen for the job & block updates - val sem = new Semaphore(0) + val taskStartSem = new Semaphore(0) + val broadcastSem = new Semaphore(0) val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - sem.release() + taskStartSem.release() } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -92,6 +103,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + // Once broadcast start landing on the executors we're good to proceed. + // We don't only use task start as it can occur before the work is on the executor. + if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) { + broadcastSem.release() + } blocksUpdated.append(blockUpdated) } }) @@ -102,19 +118,32 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext testRdd.persist() } - // Wait for all of the executors to start + // Wait for the first executor to start TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = numExecs, + numExecutors = 1, timeout = 10000) // 10s // Start the computation of RDD - this step will also cache the RDD val asyncCount = testRdd.countAsync() - // Wait for the job to have started - sem.acquire(1) + // Wait for all of the executors to start + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = numExecs, + timeout = 10000) // 10s - // Give Spark a tiny bit to start the tasks after the listener says hello - Thread.sleep(50) + // Wait for the job to have started. + taskStartSem.acquire(1) + // Wait for each executor + driver to have it's broadcast info delivered. + broadcastSem.acquire((numExecs + 1)) + + // Make sure the job is either mid run or otherwise has data to migrate. + if (migrateDuring) { + // Give Spark a tiny bit to start executing after the broadcast blocks land. + // For me this works at 100, set to 300 for system variance. + Thread.sleep(300) + } else { + ThreadUtils.awaitResult(asyncCount, 15.seconds) + } // Decommission one of the executor val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] @@ -127,49 +156,58 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for job to finish val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) - assert(asyncCountResult === 10) - // All 10 tasks finished, so accum should have been increased 10 times - assert(accum.value === 10) + assert(asyncCountResult === numParts) + // All tasks finished, so accum should have been increased numParts times + assert(accum.value === numParts) // All tasks should be successful, nothing should have failed sc.listenerBus.waitUntilEmpty() if (shuffle) { - // 10 mappers & 10 reducers which succeeded - assert(taskEndEvents.count(_.reason == Success) === 20, - s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})") + // mappers & reducers which succeeded + assert(taskEndEvents.count(_.reason == Success) === 2 * numParts, + s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } else { - // 10 mappers which executed successfully - assert(taskEndEvents.count(_.reason == Success) === 10, - s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})") + // only mappers which executed successfully + assert(taskEndEvents.count(_.reason == Success) === numParts, + s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } // Wait for our respective blocks to have migrated eventually(timeout(15.seconds), interval(10.milliseconds)) { if (persist) { // One of our blocks should have moved. - val blockLocs = blocksUpdated.map{ update => + val rddUpdates = blocksUpdated.filter{update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isRDD} + val blockLocs = rddUpdates.map{ update => (update.blockUpdatedInfo.blockId.name, update.blockUpdatedInfo.blockManagerId)} val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size) assert(!blocksToManagers.filter(_._2 > 1).isEmpty, - s"We should have a block that has been on multiple BMs in ${blocksUpdated}") + s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") } // If we're migrating shuffles we look for any shuffle block updates // as there is no block update on the initial shuffle block write. if (shuffle) { - val numLocs = blocksUpdated.filter{ update => + val numDataLocs = blocksUpdated.filter{ update => + val blockId = update.blockUpdatedInfo.blockId + blockId.isInstanceOf[ShuffleDataBlockId] + }.toSet.size + val numIndexLocs = blocksUpdated.filter{ update => val blockId = update.blockUpdatedInfo.blockId - blockId.isShuffle || blockId.isInternalShuffle + blockId.isInstanceOf[ShuffleIndexBlockId] }.toSet.size - assert(numLocs > 0, s"No shuffle block updates in ${blocksUpdated}") + assert(numDataLocs >= 1, s"Expect shuffle data block updates in ${blocksUpdated}") + assert(numIndexLocs >= 1, s"Expect shuffle index block updates in ${blocksUpdated}") } } // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum // should have same value like before - assert(testRdd.count() === 10) - assert(accum.value === 10) + assert(testRdd.count() === numParts) + assert(accum.value === numParts) val storageStatus = sc.env.blockManager.master.getStorageStatus val execIdToBlocksMapping = storageStatus.map( @@ -178,8 +216,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), "Cache blocks should be migrated") if (persist) { - // There should still be all 10 RDD blocks cached - assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10) + // There should still be all the RDD blocks cached + assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } } } From 4cfeb8e2d282a647ac5319bfa80fbd51e226924e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 11:40:05 -0700 Subject: [PATCH 22/30] We don't need the to set operations, also sleepyRdd isn't always sleepy so lets call it baseRdd, and test both small blocksize to mem and not --- .../BlockManagerDecommissionSuite.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 2219a32216075..b07c4653e51e0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -41,7 +41,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext runDecomTest(true, false, true) } - test(s"verify that shuffle blocks are migrated.") { + test(s"verify that shuffle blocks are migrated with force to disk") { + runDecomTest(false, true, false, remoteBlockSize = "1") + } + + test(s"verify that shuffle blocks are migrated") { runDecomTest(false, true, false) } @@ -49,7 +53,9 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext runDecomTest(true, true, false) } - private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = { + private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean, + remoteBlockSize: String = "100000") = { + val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) @@ -75,7 +81,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition - val sleepyRdd = input.mapPartitions { x => + val baseRdd = input.mapPartitions { x => if (migrateDuring) { Thread.sleep(500) } @@ -83,8 +89,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext x.map(y => (y, y)) } val testRdd = shuffle match { - case true => sleepyRdd.reduceByKey(_ + _) - case false => sleepyRdd + case true => baseRdd.reduceByKey(_ + _) + case false => baseRdd } // Listen for the job & block updates @@ -182,7 +188,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val blockLocs = rddUpdates.map{ update => (update.blockUpdatedInfo.blockId.name, update.blockUpdatedInfo.blockManagerId)} - val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size) + val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size) assert(!blocksToManagers.filter(_._2 > 1).isEmpty, s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") @@ -193,11 +199,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val numDataLocs = blocksUpdated.filter{ update => val blockId = update.blockUpdatedInfo.blockId blockId.isInstanceOf[ShuffleDataBlockId] - }.toSet.size + }.size val numIndexLocs = blocksUpdated.filter{ update => val blockId = update.blockUpdatedInfo.blockId blockId.isInstanceOf[ShuffleIndexBlockId] - }.toSet.size + }.size assert(numDataLocs >= 1, s"Expect shuffle data block updates in ${blocksUpdated}") assert(numIndexLocs >= 1, s"Expect shuffle index block updates in ${blocksUpdated}") } From e81aa5ae428e6326bcda8b72e2ee4916ab70ba5a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 11:56:01 -0700 Subject: [PATCH 23/30] Use the remoteBlockSize param in the tests instead of conditioning on if we're testing shuffles or not --- .../spark/storage/BlockManagerDecommissionSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index b07c4653e51e0..488e538252257 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -66,10 +66,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) - // Force fetching to local disk - if (shuffle) { - conf.set("spark.network.maxRemoteBlockSizeFetchToMem", "1") - } + // Allow force fetching to local disk + conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize) sc = new SparkContext(master, "test", conf) From 841d44312294bf4d9eceb39b68b7bf534fd9dfc4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 12:24:48 -0700 Subject: [PATCH 24/30] Add a part of the test where we kill the original exec and recount. Note: this fails in forced migrate to disk --- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../BlockManagerDecommissionSuite.scala | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 42c46464d79e1..ff41957779c83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -44,7 +44,7 @@ private[spark] class StandaloneSchedulerBackend( with StandaloneAppClientListener with Logging { - private var client: StandaloneAppClient = null + private[spark] var client: StandaloneAppClient = null private val stopping = new AtomicBoolean(false) private val launcherBackend = new LauncherBackend() { override protected def conf: SparkConf = sc.conf diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 488e538252257..c791315ea362e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -94,10 +94,15 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Listen for the job & block updates val taskStartSem = new Semaphore(0) val broadcastSem = new Semaphore(0) + val executorRemovedSem = new Semaphore(0) val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] sc.addSparkListener(new SparkListener { + override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { + executorRemovedSem.release() + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { taskStartSem.release() } @@ -223,5 +228,18 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // There should still be all the RDD blocks cached assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } + + // Make the executor we decommissioned exit + sched.client.killExecutors(List(execToDecommission)) + + // Wait for the executor to be removed + executorRemovedSem.acquire(1) + + // Since the RDD is cached or shuffled so further usage of same RDD should use the + // cached data. Original RDD partitions should not be recomputed i.e. accum + // should have same value like before + assert(testRdd.count() === numParts) + assert(accum.value === numParts) + } } From ba20ec06f57575c3deae3616d753639a060bb662 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 13:18:05 -0700 Subject: [PATCH 25/30] Add a part of the test where we kill the original exec and recount. Note: this fails in forced migrate to disk + some logging --- .../apache/spark/storage/BlockManager.scala | 27 ++++++++++++------- .../BlockManagerDecommissionSuite.scala | 8 +++--- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3ba109bd65ec6..a91b27a0ef295 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -623,6 +623,7 @@ private[spark] class BlockManager( */ override def getLocalBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { + logInfo(s"Getting local shuffle block ${blockId}") shuffleManager.shuffleBlockResolver.getBlockData(blockId) } else { getLocalBytes(blockId) match { @@ -658,7 +659,7 @@ private[spark] class BlockManager( classTag: ClassTag[_]): StreamCallbackWithID = { // Delegate shuffle blocks here to resolver if supported if (blockId.isShuffle || blockId.isInternalShuffle) { - logDebug(s"Putting shuffle block ${blockId}") + logInfo(s"Putting shuffle block ${blockId}") try { return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager) } catch { @@ -667,7 +668,7 @@ private[spark] class BlockManager( s"resolver ${shuffleManager.shuffleBlockResolver}") } } - logDebug(s"Putting regular block ${blockId}") + logInfo(s"Putting regular block ${blockId}") // All other blocks val (_, tmpFile) = diskBlockManager.createTempLocalBlock() val channel = new CountingWritableChannel( @@ -1829,21 +1830,25 @@ private[spark] class BlockManager( useOffHeap = false, deserialized = false, replication = 1) + logInfo(s"Starting migration thread for ${peer}") // Once a block fails to transfer to an executor stop trying to transfer more blocks try { while (running) { val migrating = Option(shufflesToMigrate.poll()) migrating match { case None => + logInfo("Nothing to migrate") // Nothing to do right now, but maybe a transfer will fail or a new block // will finish being committed. - val SLEEP_TIME_SECS = 5 + val SLEEP_TIME_SECS = 1 Thread.sleep(SLEEP_TIME_SECS * 1000L) case Some((shuffleId, mapId)) => - logInfo(s"Trying to migrate ${shuffleId},${mapId} to ${peer}") + logInfo(s"Trying to migrate shuffle ${shuffleId},${mapId} to ${peer}") val blocks = migratableResolver.getMigrationBlocks(shuffleId, mapId) + logInfo(s"Got migration sub-blocks ${blocks}") blocks.foreach { case (blockId, buffer) => + logInfo(s"Migrating sub-block ${blockId}") blockTransferService.uploadBlockSync( peer.host, peer.port, @@ -1852,9 +1857,13 @@ private[spark] class BlockManager( buffer, storageLevel, null)// class tag, we don't need for shuffle + logInfo(s"Migrated sub block ${blockId}") } + logInfo(s"Migrated ${shuffleId},${mapId} to ${peer}") } } + // This catch is intentionally outside of the while running block. + // if we encounter errors migrating to an executor we want to stop. } catch { case e: Exception => migrating match { @@ -1878,11 +1887,11 @@ private[spark] class BlockManager( */ def offloadShuffleBlocks(): Unit = { // Update the queue of shuffles to be migrated - logDebug("Offloading shuffle blocks") + logInfo("Offloading shuffle blocks") val localShuffles = migratableResolver.getStoredShuffles() - logDebug(s"My local shuffles are ${localShuffles.toList}") + logInfo(s"My local shuffles are ${localShuffles.toList}") val newShufflesToMigrate = localShuffles.&~(migratingShuffles).toSeq - logDebug(s"My new shuffles to migrate ${newShufflesToMigrate.toList}") + logInfo(s"My new shuffles to migrate ${newShufflesToMigrate.toList}") shufflesToMigrate.addAll(newShufflesToMigrate.asJava) migratingShuffles ++= newShufflesToMigrate @@ -2036,9 +2045,9 @@ private[spark] class BlockManager( try { // If enabled we migrate shuffle blocks first as they are more expensive. if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED)) { - logDebug(s"Attempting to replicate all cached RDD blocks") + logDebug(s"Attempting to replicate all shuffle blocks") offloadShuffleBlocks() - logInfo(s"Attempt to replicate all cached blocks done") + logInfo(s"Done starting workers to migrate shuffle blocks") } if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED)) { logDebug(s"Attempting to replicate all cached RDD blocks") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index c791315ea362e..a960684167fde 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -35,16 +35,16 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val numExecs = 3 val numParts = 3 - +/* test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { runDecomTest(true, false, true) } - +*/ test(s"verify that shuffle blocks are migrated with force to disk") { runDecomTest(false, true, false, remoteBlockSize = "1") } - +/* test(s"verify that shuffle blocks are migrated") { runDecomTest(false, true, false) } @@ -52,7 +52,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext test(s"verify that both migrations can work at the same time.") { runDecomTest(true, true, false) } - + */ private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean, remoteBlockSize: String = "100000") = { From 17a6a3f0d0eee94900105c9fb4f8e56227471159 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 14:27:43 -0700 Subject: [PATCH 26/30] Fix the map output update logic which was getting tramped on (also the test now passes so yay), re-enable the other tests I disabled while debugging. Add a bit more logging. --- .../org/apache/spark/MapOutputTracker.scala | 14 +++++++--- .../storage/BlockManagerMasterEndpoint.scala | 27 ++++++++++--------- .../BlockManagerDecommissionSuite.scala | 8 +++--- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 382f4a764feca..1b861f69ddaaf 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -128,6 +128,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging { val mapStatusOpt = mapStatuses.find(_.mapId == mapId) mapStatusOpt match { case Some(mapStatus) => + logInfo("Updating map output for ${mapId} to ${bmAddress}") mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => @@ -141,6 +142,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging { * different block manager. */ def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { + logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { _numAvailableOutputs -= 1 mapStatuses(mapIndex) = null @@ -153,6 +155,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging { * outputs which are served by an external shuffle server (if one exists). */ def removeOutputsOnHost(host: String): Unit = withWriteLock { + logDebug(s"Removing outputs for host ${host}") removeOutputsByFilter(x => x.host == host) } @@ -162,6 +165,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging { * still registered with that execId. */ def removeOutputsOnExecutor(execId: String): Unit = withWriteLock { + logDebug(s"Removing outputs for execId ${execId}") removeOutputsByFilter(x => x.executorId == execId) } @@ -279,7 +283,6 @@ private[spark] class MapOutputTrackerMasterEndpoint( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => val hostPort = context.senderAddress.hostPort - logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) tracker.post(new GetMapOutputMessage(shuffleId, context)) case StopMapOutputTracker => @@ -463,7 +466,7 @@ private[spark] class MapOutputTrackerMaster( val context = data.context val shuffleId = data.shuffleId val hostPort = context.senderAddress.hostPort - logDebug("Handling request to send map output locations for shuffle " + shuffleId + + logInfo("Handling request to send map output locations for shuffle " + shuffleId + " to " + hostPort) val shuffleStatus = shuffleStatuses.get(shuffleId).head context.reply( @@ -495,8 +498,11 @@ private[spark] class MapOutputTrackerMaster( def updateMapOutput(shuffleId: Int, mapId: Long, bmAddress: BlockManagerId): Unit = { shuffleStatuses.get(shuffleId) match { - case Some(shuffleStatus) => shuffleStatus.updateMapOutput(mapId, bmAddress) - case None => logError("Asked to update map output for unknown shuffle ${shuffleId}") + case Some(shuffleStatus) => + shuffleStatus.updateMapOutput(mapId, bmAddress) + shuffleStatus.invalidateSerializedMapOutputStatusCache() + case None => + logError(s"Asked to update map output for unknown shuffle ${shuffleId}") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index c61a62f86a682..3699118c55dd6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -492,7 +492,20 @@ class BlockManagerMasterEndpoint( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { - logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}") + logInfo(s"Updating block info on master ${blockId} for ${blockManagerId}") + + if (blockId.isInternalShuffle) { + blockId match { + case ShuffleIndexBlockId(shuffleId, mapId, _) => + // Don't update the map output on just the index block + logDebug("Received shuffle index block update for ${shuffleId} ${mapId}") + case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => + logInfo("Received shuffle data block update for ${shuffleId} ${mapId}, performing update") + mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + case _ => + logDebug(s"Unexpected shuffle block type ${blockId}") + } + } if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { @@ -509,18 +522,6 @@ class BlockManagerMasterEndpoint( return true } - if (blockId.isInternalShuffle && storageLevel.isValid) { - blockId match { - case ShuffleIndexBlockId(shuffleId, mapId, _) => - // Don't update the map output on just the index block - logDebug("Received shuffle index block update for ${shuffleId} ${mapId}") - case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => - mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) - case _ => - logError(s"Unexpected shuffle block type ${blockId}") - } - } - blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) var locations: mutable.HashSet[BlockManagerId] = null diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index a960684167fde..c791315ea362e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -35,16 +35,16 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext val numExecs = 3 val numParts = 3 -/* + test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { runDecomTest(true, false, true) } -*/ + test(s"verify that shuffle blocks are migrated with force to disk") { runDecomTest(false, true, false, remoteBlockSize = "1") } -/* + test(s"verify that shuffle blocks are migrated") { runDecomTest(false, true, false) } @@ -52,7 +52,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext test(s"verify that both migrations can work at the same time.") { runDecomTest(true, true, false) } - */ + private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean, remoteBlockSize: String = "100000") = { From 155aeb279e41fb28b0840a49712770737b678561 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 14:30:45 -0700 Subject: [PATCH 27/30] Saw a test failure of the executors not coming up in time, bumping the timeouts. --- .../apache/spark/storage/BlockManagerDecommissionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index c791315ea362e..8fc651065b9b0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -130,7 +130,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for the first executor to start TestUtils.waitUntilExecutorsUp(sc = sc, numExecutors = 1, - timeout = 10000) // 10s + timeout = 20000) // 20s // Start the computation of RDD - this step will also cache the RDD val asyncCount = testRdd.countAsync() @@ -138,7 +138,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for all of the executors to start TestUtils.waitUntilExecutorsUp(sc = sc, numExecutors = numExecs, - timeout = 10000) // 10s + timeout = 30000) // 30s // Wait for the job to have started. taskStartSem.acquire(1) From 7f93df624fb604074bdd3cab132d2fa85b47a879 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 17:27:30 -0700 Subject: [PATCH 28/30] Bump the timeout, and also don't wait for the full set if we don't need it --- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 2 +- .../apache/spark/storage/BlockManagerDecommissionSuite.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 148d20ee659a2..cd3ab4db77f85 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -58,7 +58,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { }) TestUtils.waitUntilExecutorsUp(sc = sc, numExecutors = 2, - timeout = 10000) // 10s + timeout = 30000) // 30s val sleepyRdd = input.mapPartitions{ x => Thread.sleep(5000) // 5s x diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala index 8fc651065b9b0..f70aecd8793bb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -137,7 +137,9 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext // Wait for all of the executors to start TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = numExecs, + // We need to make sure there is the original plus one exec to migrate too, we don't need + // the full set. + numExecutors = 2, timeout = 30000) // 30s // Wait for the job to have started. From 7e32341b33f565460b0bd159297dc973802dda93 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 1 Jun 2020 17:38:02 -0700 Subject: [PATCH 29/30] Return faster with shuffle blocks since we don't need the rest of the logic in update block :) --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3699118c55dd6..8c9dd42995064 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -499,11 +499,14 @@ class BlockManagerMasterEndpoint( case ShuffleIndexBlockId(shuffleId, mapId, _) => // Don't update the map output on just the index block logDebug("Received shuffle index block update for ${shuffleId} ${mapId}") + return true case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) => logInfo("Received shuffle data block update for ${shuffleId} ${mapId}, performing update") mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) + return true case _ => logDebug(s"Unexpected shuffle block type ${blockId}") + return false } } From a3aa8ebe11de6011fc77daa5b805af1193992d74 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 2 Jun 2020 11:20:02 -0700 Subject: [PATCH 30/30] Small cleanups --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 3 ++- .../src/main/scala/org/apache/spark/SparkContext.scala | 10 ++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1b861f69ddaaf..43370ee31abde 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -283,6 +283,7 @@ private[spark] class MapOutputTrackerMasterEndpoint( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => val hostPort = context.senderAddress.hostPort + logInfo(s"Asked to send map output locations for shuffle ${shuffleId} to ${hostPort}") tracker.post(new GetMapOutputMessage(shuffleId, context)) case StopMapOutputTracker => @@ -466,7 +467,7 @@ private[spark] class MapOutputTrackerMaster( val context = data.context val shuffleId = data.shuffleId val hostPort = context.senderAddress.hostPort - logInfo("Handling request to send map output locations for shuffle " + shuffleId + + logDebug("Handling request to send map output locations for shuffle " + shuffleId + " to " + hostPort) val shuffleStatus = shuffleStatuses.get(shuffleId).head context.reply( diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3f105d19a35ba..c36bbe9b0810f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1586,12 +1586,7 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.removeListener(listener) } - /** - * :: DeveloperApi :: - * Deregister the listener from Spark's listener bus. - */ - @DeveloperApi - def getExecutorIds(): Seq[String] = { + private[spark] def getExecutorIds(): Seq[String] = { schedulerBackend match { case b: ExecutorAllocationClient => b.getExecutorIds() @@ -1731,8 +1726,7 @@ class SparkContext(config: SparkConf) extends Logging { } - @DeveloperApi - def decommissionExecutors(executorIds: Seq[String]): Unit = { + private[spark] def decommissionExecutors(executorIds: Seq[String]): Unit = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => executorIds.foreach(b.decommissionExecutor)