From df086731952c669e12673fd673d829b9fdd790a2 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 1 Jul 2014 18:39:46 +0800 Subject: [PATCH 1/2] [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error --- .../apache/spark/storage/DiskBlockManager.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2ec46d416f37d..f4619d2008ede 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -26,6 +26,8 @@ import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils +import scala.collection.mutable.ArrayBuffer + /** * Creates and maintains the logical mapping between logical blocks and physical on-disk * locations. By default, one block is mapped to one file with a name given by its BlockId. @@ -44,6 +46,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() + if (localDirs.isEmpty) { + logError("Failed to create any local dir") + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null @@ -115,8 +121,9 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def createLocalDirs(): Array[File] = { logDebug(s"Creating local directories at root dirs '$rootDirs'") + val localDirsResult = ArrayBuffer[File]() val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").map { rootDir => + rootDirs.split(",").foreach { rootDir => var foundLocalDir = false var localDir: File = null var localDirId: String = null @@ -137,11 +144,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } if (!foundLocalDir) { logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir") - System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } else { + logInfo(s"Created local directory at $localDir") + localDirsResult += localDir } - logInfo(s"Created local directory at $localDir") - localDir } + localDirsResult.toArray } private def addShutdownHook() { From 609bf48fbbaa412e5d2f677569cd8d29b1fe2f18 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 2 Jul 2014 12:57:42 +0800 Subject: [PATCH 2/2] [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error --- .../apache/spark/storage/DiskBlockManager.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f4619d2008ede..673fc19c060a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -26,8 +26,6 @@ import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils -import scala.collection.mutable.ArrayBuffer - /** * Creates and maintains the logical mapping between logical blocks and physical on-disk * locations. By default, one block is mapped to one file with a name given by its BlockId. @@ -47,7 +45,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() if (localDirs.isEmpty) { - logError("Failed to create any local dir") + logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) @@ -121,9 +119,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def createLocalDirs(): Array[File] = { logDebug(s"Creating local directories at root dirs '$rootDirs'") - val localDirsResult = ArrayBuffer[File]() val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").foreach { rootDir => + rootDirs.split(",").flatMap { rootDir => var foundLocalDir = false var localDir: File = null var localDirId: String = null @@ -143,13 +140,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } } if (!foundLocalDir) { - logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir") + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." + + " Ignoring this directory.") + None } else { logInfo(s"Created local directory at $localDir") - localDirsResult += localDir + Some(localDir) } } - localDirsResult.toArray } private def addShutdownHook() {