From ec490e8a61898ed36478e62abf2f04ae470aa55a Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 26 Feb 2014 19:13:36 -0500 Subject: [PATCH 1/4] prevent Spark from overwriting directory silently and leaving dirty directory --- .../apache/spark/rdd/PairRDDFunctions.scala | 19 ++++++++++------ .../scala/org/apache/spark/FileSuite.scala | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d29a1a9881cd4..0ddb2b23ed1b3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -34,14 +34,11 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} -import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. import org.apache.hadoop.mapred.SparkHadoopWriter -import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner @@ -604,8 +601,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) + val wrappedConf = new SerializableWritable(job.getConfiguration) - NewFileOutputFormat.setOutputPath(job, new Path(path)) + val outpath = new Path(path) + NewFileOutputFormat.setOutputPath(job, outpath) + val jobFormat = outputFormatClass.newInstance + jobFormat.checkOutputSpecs(new JobContext(wrappedConf.value, job.getJobID)) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id @@ -633,7 +634,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) committer.commitTask(hadoopContext) return 1 } - val jobFormat = outputFormatClass.newInstance + /* apparently we need a TaskAttemptID to construct an OutputCommitter; * however we're only going to use this local OutputCommitter for * setupJob/commitJob, so we just use a dummy "map" task. @@ -642,7 +643,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - val count = self.context.runJob(self, writeShard _).sum + self.context.runJob(self, writeShard _).sum jobCommitter.commitJob(jobTaskContext) } @@ -712,6 +713,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") + val path = new Path(conf.get("mapred.output.dir")) + val fs = path.getFileSystem(conf) + conf.getOutputFormat.checkOutputSpecs(fs, conf) + val writer = new SparkHadoopWriter(conf) writer.preSetup() diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 8ff02aef67aa0..0ae11738b3bba 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -24,6 +24,7 @@ import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec +import org.apache.hadoop.mapred.FileAlreadyExistsException import org.scalatest.FunSuite import org.apache.spark.SparkContext._ @@ -208,4 +209,25 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(rdd.count() === 3) assert(rdd.count() === 3) } + + test ("prevent user from overwriting the empty directory") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + intercept[FileAlreadyExistsException] { + randomRDD.saveAsTextFile(tempdir.getPath) + } + } + + test ("prevent user from overwriting the non-empty directory") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + randomRDD.saveAsTextFile(tempdir.getPath + "/output") + assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + intercept[FileAlreadyExistsException] { + randomRDD.saveAsTextFile(tempdir.getPath + "/output") + } + } } From ac631366c22c8191ebc2f31b92905e165f6ad210 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 28 Feb 2014 22:27:12 -0500 Subject: [PATCH 2/4] checkOutputSpecs not applicable to FSOutputFormat --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 0ddb2b23ed1b3..1b92814a97678 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -713,10 +713,16 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - val path = new Path(conf.get("mapred.output.dir")) - val fs = path.getFileSystem(conf) - conf.getOutputFormat.checkOutputSpecs(fs, conf) - + if (outputFormatClass.isInstanceOf[FileOutputFormat[_, _]]) { + val outputPath = conf.get("mapred.output.dir") + if (outputPath == null) { + throw new SparkException("mapred.output.dir not set") + } + val path = new Path(outputPath) + val fs = path.getFileSystem(conf) + conf.getOutputFormat.checkOutputSpecs(fs, conf) + } + val writer = new SparkHadoopWriter(conf) writer.preSetup() From ef2d43fea5089e5107f51c3453eb31674ad55b1f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 1 Mar 2014 16:33:53 -0500 Subject: [PATCH 3/4] add new test cases and code clean --- .../apache/spark/rdd/PairRDDFunctions.scala | 22 ++++++-------- .../scala/org/apache/spark/FileSuite.scala | 30 +++++++++++++++---- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1b92814a97678..cd83bd5b92472 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -30,7 +30,7 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} @@ -606,7 +606,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val outpath = new Path(path) NewFileOutputFormat.setOutputPath(job, outpath) val jobFormat = outputFormatClass.newInstance - jobFormat.checkOutputSpecs(new JobContext(wrappedConf.value, job.getJobID)) + jobFormat.checkOutputSpecs(job) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id @@ -697,10 +697,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { - val outputFormatClass = conf.getOutputFormat + val outputFormatInstance = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass - if (outputFormatClass == null) { + if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } if (keyClass == null) { @@ -713,16 +713,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (outputFormatClass.isInstanceOf[FileOutputFormat[_, _]]) { - val outputPath = conf.get("mapred.output.dir") - if (outputPath == null) { - throw new SparkException("mapred.output.dir not set") - } - val path = new Path(outputPath) - val fs = path.getFileSystem(conf) - conf.getOutputFormat.checkOutputSpecs(fs, conf) + if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + // FileOutputFormat ignores the filesystem parameter + val ignoredFs = FileSystem.get(conf) + conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) } - + val writer = new SparkHadoopWriter(conf) writer.preSetup() diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 0ae11738b3bba..76173608e9f70 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException import org.scalatest.FunSuite import org.apache.spark.SparkContext._ +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat class FileSuite extends FunSuite with LocalSparkContext { @@ -210,24 +211,43 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(rdd.count() === 3) } - test ("prevent user from overwriting the empty directory") { + test ("prevent user from overwriting the empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() - var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) intercept[FileAlreadyExistsException] { randomRDD.saveAsTextFile(tempdir.getPath) } } - test ("prevent user from overwriting the non-empty directory") { + test ("prevent user from overwriting the non-empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() - var randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) randomRDD.saveAsTextFile(tempdir.getPath + "/output") assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) - randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) intercept[FileAlreadyExistsException] { randomRDD.saveAsTextFile(tempdir.getPath + "/output") } } + + test ("prevent user from overwriting the empty directory (new Hadoop API)") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + intercept[FileAlreadyExistsException] { + randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + } + } + + test ("prevent user from overwriting the non-empty directory (new Hadoop API)") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + randomRDD.saveAsTextFile(tempdir.getPath + "/output") + assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + intercept[FileAlreadyExistsException] { + randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + } + } } From 6a4e3a30f6029027e1f4f9594307047c7c0e564b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 1 Mar 2014 18:55:20 -0500 Subject: [PATCH 4/4] code clean --- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index cd83bd5b92472..5aa0b030dbdd8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -643,7 +643,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _).sum + self.context.runJob(self, writeShard _) jobCommitter.commitJob(jobTaskContext) }