From 1ab49ca90b7cae82efa26e018d9d285c948bf25c Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 12 Mar 2014 09:11:46 -0500 Subject: [PATCH 1/5] Add support for running pipe tasks is separate directories --- core/pom.xml | 9 ++- .../scala/org/apache/spark/rdd/PipedRDD.scala | 62 ++++++++++++++++++- .../main/scala/org/apache/spark/rdd/RDD.scala | 7 ++- .../scala/org/apache/spark/util/Utils.scala | 21 +++++++ .../org/apache/spark/PipedRDDSuite.scala | 22 +++++++ project/SparkBuild.scala | 4 +- 6 files changed, 113 insertions(+), 12 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 2248f9d0446c0..cf19461def2c2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -183,14 +183,13 @@ com.codahale.metrics metrics-graphite - - org.apache.derby - derby - test - commons-io commons-io + + + org.apache.derby + derby test diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 4250a9d02f764..3318e439c9162 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import java.io.File +import java.io.FilenameFilter import java.io.PrintWriter import java.util.StringTokenizer @@ -26,7 +28,9 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag +import org.apache.commons.io.FileUtils import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.util.Utils /** @@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag]( command: Seq[String], envVars: Map[String, String], printPipeContext: (String => Unit) => Unit, - printRDDElement: (T, String => Unit) => Unit) + printRDDElement: (T, String => Unit) => Unit, + separateWorkingDir: Boolean) extends RDD[String](prev) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag]( command: String, envVars: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, - printRDDElement: (T, String => Unit) => Unit = null) = - this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement) + printRDDElement: (T, String => Unit) => Unit = null, + separateWorkingDir: Boolean = false) = + this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement, + separateWorkingDir) override def getPartitions: Array[Partition] = firstParent[T].partitions + /** + * A FilenameFilter that accepts anything that isn't equal to the name passed in. + * @param name of file or directory to leave out + */ + class NotEqualsFileNameFilter(name: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = { + !name.equals(name) + } + } + override def compute(split: Partition, context: TaskContext): Iterator[String] = { val pb = new ProcessBuilder(command) // Add the environmental variables to the process. @@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag]( currentEnvVars.putAll(hadoopSplit.getPipeEnvVars()) } + // When spark.worker.separated.working.directory option is turned on, each + // task will be run in separate directory. This should be resolve file + // access conflict issue + val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString + var workInTaskDirectory = false + logDebug("taskDirectory = " + taskDirectory) + if (separateWorkingDir == true) { + val currentDir = new File(".") + logDebug("currentDir = " + currentDir) + val taskDirFile = new File(taskDirectory) + taskDirFile.mkdirs() + + try { + val tasksDirFilter = new NotEqualsFileNameFilter("tasks") + + // Need to add symlinks to jars, files, and directories. On Yarn we could have + // directories and other files not known to the SparkContext that were added via the + // Hadoop distributed cache. We also don't want to symlink to the /tasks directories we + // are creating here. + for (file <- currentDir.list(tasksDirFilter)) { + val fileWithDir = new File(currentDir, file) + Utils.symlink(new File(fileWithDir.getAbsolutePath()), + new File(taskDirectory + "/" + fileWithDir.getName())) + } + pb.directory(taskDirFile) + workInTaskDirectory = true + } catch { + case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + + " (" + taskDirectory + ")") + } + } + val proc = pb.start() val env = SparkEnv.get @@ -112,6 +161,13 @@ class PipedRDD[T: ClassTag]( if (exitStatus != 0) { throw new Exception("Subprocess exited with status " + exitStatus) } + + // cleanup task working directory if used + if(workInTaskDirectory == true) { + FileUtils.deleteQuietly(new File(taskDirectory)) + logDebug("Removed task working directory " + taskDirectory) + } + false } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4afa7523dd802..bd77d14c95433 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -478,16 +478,19 @@ abstract class RDD[T: ClassTag]( * instead of constructing a huge String to concat all the elements: * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = * for (e <- record._2){f(e)} + * @param separateWorkingDir Use separate working directories for each task. * @return the result RDD */ def pipe( command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, - printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = { + printRDDElement: (T, String => Unit) => Unit = null, + separateWorkingDir: Boolean = false): RDD[String] = { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, - if (printRDDElement ne null) sc.clean(printRDDElement) else null) + if (printRDDElement ne null) sc.clean(printRDDElement) else null, + separateWorkingDir) } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 38a275d438959..b11bd8c5ecc50 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.SortedSet import scala.io.Source import scala.reflect.ClassTag @@ -895,4 +896,24 @@ private[spark] object Utils extends Logging { } count } + + /** + * Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here + * for jdk1.6 support. Doesn't support windows or any other platform without 'ln'. + * @param src absolute path to the source + * @param dst relative path for the destination + */ + def symlink(src: File, dst: File) { + if (!src.isAbsolute()) { + throw new IOException("Source must be absolute") + } + if (dst.isAbsolute()) { + throw new IOException("Destination must be relative") + } + import scala.sys.process._ + ("ln -sf " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line => + (logInfo(line))) + } + + } diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 0bac78d8a6bdf..37c02c16284ee 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -123,6 +123,28 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { } } + test("basic pipe with separate working directory") { + if (testCommandAvailable("cat")) { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + + val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) + + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") + val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) + val collectPwd = pipedPwd.collect() + println("collect pwd is: " + collectPwd(0)) + assert(collectPwd(0).contains("tasks/")) + assert(collectPwd(0).matches("tasks/")) + } else { + assert(true) + } + } + test("test pipe exports map_input_file") { testExportInputFile("map_input_file") } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b0c3bf29dfd4f..fbe320cda59fe 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -238,12 +238,12 @@ object SparkBuild extends Build { "org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106", /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), + "commons-io" % "commons-io" % "2.4", "org.scalatest" %% "scalatest" % "1.9.1" % "test", "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", - "org.mockito" % "mockito-all" % "1.8.5" % "test", - "commons-io" % "commons-io" % "2.4" % "test" + "org.mockito" % "mockito-all" % "1.8.5" % "test" ), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), From 6b783bdb5e09b7c96cbb76111876fbb6c9ca9a6f Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 12 Mar 2014 09:47:13 -0500 Subject: [PATCH 2/5] style fixes --- core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- core/src/test/scala/org/apache/spark/PipedRDDSuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 3318e439c9162..0d11211b07ae6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -163,7 +163,7 @@ class PipedRDD[T: ClassTag]( } // cleanup task working directory if used - if(workInTaskDirectory == true) { + if (workInTaskDirectory == true) { FileUtils.deleteQuietly(new File(taskDirectory)) logDebug("Removed task working directory " + taskDirectory) } diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 37c02c16284ee..8279b3db6cd2d 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -139,7 +139,6 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { val collectPwd = pipedPwd.collect() println("collect pwd is: " + collectPwd(0)) assert(collectPwd(0).contains("tasks/")) - assert(collectPwd(0).matches("tasks/")) } else { assert(true) } From 61be2713696401fe47a7c8108cbda71261c971cc Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 12 Mar 2014 12:26:56 -0500 Subject: [PATCH 3/5] Fix file name filter --- core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 0d11211b07ae6..4ae01574bfa55 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -65,9 +65,9 @@ class PipedRDD[T: ClassTag]( * A FilenameFilter that accepts anything that isn't equal to the name passed in. * @param name of file or directory to leave out */ - class NotEqualsFileNameFilter(name: String) extends FilenameFilter { + class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter { def accept(dir: File, name: String): Boolean = { - !name.equals(name) + !name.equals(filterName) } } @@ -92,7 +92,7 @@ class PipedRDD[T: ClassTag]( logDebug("taskDirectory = " + taskDirectory) if (separateWorkingDir == true) { val currentDir = new File(".") - logDebug("currentDir = " + currentDir) + logDebug("currentDir = " + currentDir.getAbsolutePath()) val taskDirFile = new File(taskDirectory) taskDirFile.mkdirs() From ba23fc05fa45cdc66972e1913912bce1ccfa6e38 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 4 Apr 2014 13:35:18 -0500 Subject: [PATCH 4/5] Add support for symlink on windows, remove commons-io usage --- .../scala/org/apache/spark/rdd/PipedRDD.scala | 6 ++-- .../scala/org/apache/spark/util/Utils.scala | 31 ++++++++++++++++--- .../org/apache/spark/PipedRDDSuite.scala | 13 +++++--- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 4ae01574bfa55..41ae0fec823e7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import java.io.File import java.io.FilenameFilter +import java.io.IOException import java.io.PrintWriter import java.util.StringTokenizer @@ -28,7 +29,6 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import org.apache.commons.io.FileUtils import org.apache.spark.{Partition, SparkEnv, TaskContext} import org.apache.spark.util.Utils @@ -164,7 +164,9 @@ class PipedRDD[T: ClassTag]( // cleanup task working directory if used if (workInTaskDirectory == true) { - FileUtils.deleteQuietly(new File(taskDirectory)) + scala.util.control.Exception.ignoring(classOf[IOException]) { + Utils.deleteRecursively(new File(taskDirectory)) + } logDebug("Removed task working directory " + taskDirectory) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d3ba77a560079..737b765e2aed6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -44,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, */ private[spark] object Utils extends Logging { + val osName = System.getProperty("os.name") + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -522,9 +524,10 @@ private[spark] object Utils extends Logging { /** * Delete a file or directory and its contents recursively. + * Don't follow directories if they are symlinks. */ def deleteRecursively(file: File) { - if (file.isDirectory) { + if ((file.isDirectory) && !isSymlink(file)) { for (child <- listFilesSafely(file)) { deleteRecursively(child) } @@ -537,6 +540,25 @@ private[spark] object Utils extends Logging { } } + /** + * Check to see if file is a symbolic link. + */ + def isSymlink(file: File): Boolean = { + if (file == null) throw new NullPointerException("File must not be null") + if (osName.startsWith("Windows")) return false + val fileInCanonicalDir = if (file.getParent() == null) { + file + } else { + new File(file.getParentFile().getCanonicalFile(), file.getName()) + } + + if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) { + return false; + } else { + return true; + } + } + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ @@ -901,7 +923,7 @@ private[spark] object Utils extends Logging { /** * Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here - * for jdk1.6 support. Doesn't support windows or any other platform without 'ln'. + * for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf". * @param src absolute path to the source * @param dst relative path for the destination */ @@ -912,9 +934,10 @@ private[spark] object Utils extends Logging { if (dst.isAbsolute()) { throw new IOException("Destination must be relative") } + val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf" import scala.sys.process._ - ("ln -sf " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line => - (logInfo(line))) + (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line => + (logInfo(line))) } diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala index 6f52b6f89267b..627e9b5cd9060 100644 --- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark -import org.scalatest.FunSuite +import java.io.File + +import com.google.common.io.Files +import org.scalatest.FunSuite import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition} import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit} @@ -129,9 +132,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { test("basic pipe with separate working directory") { if (testCommandAvailable("cat")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) - val c = piped.collect() assert(c.size === 4) assert(c(0) === "1") @@ -140,8 +141,12 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext { assert(c(3) === "4") val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) val collectPwd = pipedPwd.collect() - println("collect pwd is: " + collectPwd(0)) assert(collectPwd(0).contains("tasks/")) + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect() + // make sure symlinks were created + assert(pipedLs.length > 0) + // clean up top level tasks directory + new File("tasks").delete() } else { assert(true) } From abc128917d419e298c51aeeafbf6b5b4d4aa0e02 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 4 Apr 2014 13:37:31 -0500 Subject: [PATCH 5/5] remove extra tag in pom file --- core/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index e6a3ba66093ac..e4c32eff0cd77 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -195,7 +195,6 @@ com.codahale.metrics metrics-graphite - org.apache.derby derby