From d91288d6804db8feed210cfba01cb8f0f6780493 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Jun 2014 20:49:14 -0400 Subject: [PATCH 1/4] rename "conf" parameters in the saveAsHadoop functions to distinguish with SparkConf --- .../apache/spark/rdd/PairRDDFunctions.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index b6ad9b6c3e168..eea2ee5a0c8bd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration) + hadoopConf: Configuration = self.context.hadoopConfiguration) { - val job = new NewAPIHadoopJob(conf) + val job = new NewAPIHadoopJob(hadoopConf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) @@ -752,24 +752,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf(self.context.hadoopConfiguration), + hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - conf.setOutputKeyClass(keyClass) - conf.setOutputValueClass(valueClass) + hadoopConf.setOutputKeyClass(keyClass) + hadoopConf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", outputFormatClass.getName) + hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { - conf.setCompressMapOutput(true) - conf.set("mapred.output.compress", "true") - conf.setMapOutputCompressorClass(c) - conf.set("mapred.output.compression.codec", c.getCanonicalName) - conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + hadoopConf.setCompressMapOutput(true) + hadoopConf.set("mapred.output.compress", "true") + hadoopConf.setMapOutputCompressorClass(c) + hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) + hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) - saveAsHadoopDataset(conf) + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) + saveAsHadoopDataset(hadoopConf) } /** @@ -778,8 +778,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - def saveAsNewAPIHadoopDataset(conf: Configuration) { - val job = new NewAPIHadoopJob(conf) + def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) { + val job = new NewAPIHadoopJob(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id @@ -836,10 +836,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - def saveAsHadoopDataset(conf: JobConf) { - val outputFormatInstance = conf.getOutputFormat - val keyClass = conf.getOutputKeyClass - val valueClass = conf.getOutputValueClass + def saveAsHadoopDataset(hadoopConf: JobConf) { + val outputFormatInstance = hadoopConf.getOutputFormat + val keyClass = hadoopConf.getOutputKeyClass + val valueClass = hadoopConf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } @@ -849,7 +849,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) if (valueClass == null) { throw new SparkException("Output value class not set") } - SparkHadoopUtil.get.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(hadoopConf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") @@ -857,11 +857,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(conf) - conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) + val ignoredFs = FileSystem.get(hadoopConf) + hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) } - val writer = new SparkHadoopWriter(conf) + val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { From 763975ff7b6cdea32ebd9f2770f28febd44b7174 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Jun 2014 20:56:58 -0400 Subject: [PATCH 2/4] style fix --- .../src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index eea2ee5a0c8bd..d3a8d910c88e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -768,7 +768,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) + FileOutputFormat.setOutputPath(hadoopConf, + SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) } From 7ee5712e91861b902814f3e2d76a059e39e4d3c9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 19 Jun 2014 15:45:01 -0400 Subject: [PATCH 3/4] to keep the source-compatibility --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d3a8d910c88e0..856c064d7b30d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -719,8 +719,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - hadoopConf: Configuration = self.context.hadoopConfiguration) + conf: Configuration = self.context.hadoopConfiguration) { + // keep the source-compatibility and distinct with the SparkConf + val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) @@ -752,8 +754,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration), + conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { + // keep the source-compatibility and distinct with the SparkConf + val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug @@ -779,7 +783,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) { + def saveAsNewAPIHadoopDataset(conf: Configuration) { + // keep the source-compatibility and distinct with the SparkConf + val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) @@ -837,7 +843,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - def saveAsHadoopDataset(hadoopConf: JobConf) { + def saveAsHadoopDataset(conf: JobConf) { + // keep the source-compatibility and distinct with the SparkConf + val hadoopConf = conf val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass val valueClass = hadoopConf.getOutputValueClass From 11abeba11f2df9f8ff380c935a50e049a7c5a9fd Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 19 Jun 2014 17:32:34 -0400 Subject: [PATCH 4/4] revise the comments --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 856c064d7b30d..89c2080e7b2e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -721,7 +721,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) { - // keep the source-compatibility and distinct with the SparkConf + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) job.setOutputKeyClass(keyClass) @@ -756,7 +756,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - // keep the source-compatibility and distinct with the SparkConf + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) @@ -784,7 +784,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * configured for a Hadoop MapReduce job. */ def saveAsNewAPIHadoopDataset(conf: Configuration) { - // keep the source-compatibility and distinct with the SparkConf + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -844,7 +844,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { - // keep the source-compatibility and distinct with the SparkConf + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass