From 748dbd70dd378f50f679a0014518b0bac3b14ce5 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Jun 2015 15:40:20 -0700 Subject: [PATCH 01/11] Adding UUID to output file name to avoid accidental overwriting --- .../apache/spark/sql/parquet/newParquet.scala | 46 ++++--------------- .../spark/sql/hive/orc/OrcRelation.scala | 4 +- .../sql/sources/hadoopFsRelationSuites.scala | 29 +++++++++++- 3 files changed, 38 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c9de45e0ddfbb..6d28df954df39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.parquet import java.net.URI -import java.util.{List => JList} +import java.util.{List => JList, UUID} import scala.collection.JavaConversions._ import scala.util.Try @@ -41,8 +41,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.{Logging, SparkException, Partition => SparkPartition} +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWritable, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -60,50 +60,20 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext extends OutputWriter { private val recordWriter: RecordWriter[Void, InternalRow] = { - val conf = context.getConfiguration val outputFormat = { - // When appending new Parquet files to an existing Parquet file directory, to avoid - // overwriting existing data files, we need to find out the max task ID encoded in these data - // file names. - // TODO Make this snippet a utility function for other data source developers - val maxExistingTaskId = { - // Note that `path` may point to a temporary location. Here we retrieve the real - // destination path from the configuration - val outputPath = new Path(conf.get("spark.sql.sources.output.path")) - val fs = outputPath.getFileSystem(conf) - - if (fs.exists(outputPath)) { - // Pattern used to match task ID in part file names, e.g.: - // - // part-r-00001.gz.parquet - // ^~~~~ - val partFilePattern = """part-.-(\d{1,}).*""".r - - fs.listStatus(outputPath).map(_.getPath.getName).map { - case partFilePattern(id) => id.toInt - case name if name.startsWith("_") => 0 - case name if name.startsWith(".") => 0 - case name => throw new AnalysisException( - s"Trying to write Parquet files to directory $outputPath, " + - s"but found items with illegal name '$name'.") - }.reduceOption(_ max _).getOrElse(0) - } else { - 0 - } - } - new ParquetOutputFormat[InternalRow]() { // Here we override `getDefaultWorkFile` for two reasons: // - // 1. To allow appending. We need to generate output file name based on the max available - // task ID computed above. + // 1. To allow appending. We need to generate unique output file names to avoid + // overwriting existing files (either exist before the write job, or are just written + // by other tasks within the same write job). // // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1 - new Path(path, f"part-r-$split%05d$extension") + val split = context.getTaskAttemptID.getTaskID.getId + new Path(path, f"part-r-$split%05d-${UUID.randomUUID()}$extension") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index dbce39f21d271..7f04eae7036fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.orc -import java.util.Properties +import java.util.{Properties, UUID} import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration @@ -106,7 +106,7 @@ private[orc] class OrcOutputWriter( val conf = context.getConfiguration val partition = context.getTaskAttemptID.getTaskID.getId - val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc" + val filename = f"part-r-$partition%05d-${UUID.randomUUID()}.orc" new OrcOutputFormat().getRecordWriter( new Path(path, filename).getFileSystem(conf), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 76469d7a3d6a5..b08490e72f45b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { import sqlContext.sql import sqlContext.implicits._ - val dataSourceName = classOf[SimpleTextSource].getCanonicalName + val dataSourceName: String val dataSchema = StructType( @@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect()) } } + + // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores + // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or + // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this + // requirement. We probably want to move this test case to spark-integration-tests or spark-perf + // later. + test("SPARK-8406") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext + .range(10000) + .repartition(250) + .write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .save(path) + + assertResult(10000) { + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) + .load(path) + .count() + } + } + } } class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { From 3806190480693f15412deb9fe3c93a8ebdff4eb0 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Jun 2015 15:46:00 -0700 Subject: [PATCH 02/11] Lets TestHive use all cores by default --- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index f901bd8171508..1452045faa9f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - System.getProperty("spark.sql.test.master", "local[2]"), + System.getProperty("spark.sql.test.master", "local[*]"), "TestSQLContext", new SparkConf() .set("spark.sql.test", "") From 18b70033d11976917a1845944598b836c975462e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 19 Jun 2015 02:44:12 -0700 Subject: [PATCH 03/11] Uses job level UUID to take speculative tasks into account --- .../apache/spark/sql/parquet/newParquet.scala | 9 +++++---- .../apache/spark/sql/sources/commands.scala | 19 +++++++++++++++---- .../spark/sql/hive/orc/OrcRelation.scala | 3 ++- .../sql/sources/hadoopFsRelationSuites.scala | 2 +- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6d28df954df39..e049d54bf55dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.parquet import java.net.URI -import java.util.{List => JList, UUID} +import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.util.Try @@ -41,8 +41,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.Utils -import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWritable, SparkException} +import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -72,8 +72,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, f"part-r-$split%05d-${UUID.randomUUID()}$extension") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index c16bd9ae52c81..f1949482adffb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.sources -import java.util.Date +import java.util.{Date, UUID} import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark._ @@ -70,7 +71,7 @@ private[sql] case class InsertIntoHadoopFsRelation( relation.paths.length == 1, s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") - val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) val outputPath = new Path(relation.paths.head) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -263,6 +264,13 @@ private[sql] abstract class BaseWriterContainer( protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job)) + // This UUID is used to avoid output file name collision between different appending write jobs. + // These jobs may belong to different SparkContext instances. Concrete data source implementations + // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). + // The reason why this ID is used to identify a job rather than a single task output file is + // that, speculative tasks must generate the same output file name as the original task. + private val uniqueWriteJobId = UUID.randomUUID() + // This is only used on driver side. @transient private val jobContext: JobContext = job @@ -290,6 +298,9 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() + ContextUtil.getConfiguration(job).set( + "spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. @@ -305,7 +316,7 @@ private[sql] abstract class BaseWriterContainer( outputCommitter.setupJob(jobContext) } - def executorSideSetup(taskContext: TaskContext): Unit = { + def executorSideSetup(taskContext:TaskContext): Unit = { setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) setupConf() taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7f04eae7036fc..a5114c8c3f8ec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter( recordWriterInstantiated = true val conf = context.getConfiguration + val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val partition = context.getTaskAttemptID.getTaskID.getId - val filename = f"part-r-$partition%05d-${UUID.randomUUID()}.orc" + val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc" new OrcOutputFormat().getRecordWriter( new Path(path, filename).getFileSystem(conf), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index b08490e72f45b..e1033b319426d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -476,7 +476,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this // requirement. We probably want to move this test case to spark-integration-tests or spark-perf // later. - test("SPARK-8406") { + test("SPARK-8406: Avoids name collision while writing Parquet files") { withTempPath { dir => val path = dir.getCanonicalPath sqlContext From 8966bbb2a6c9bacadb7459559caff68b060484da Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 19 Jun 2015 03:07:17 -0700 Subject: [PATCH 04/11] Fixes Scala style issue --- .../src/main/scala/org/apache/spark/sql/sources/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index f1949482adffb..38b469ed17d9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -316,7 +316,7 @@ private[sql] abstract class BaseWriterContainer( outputCommitter.setupJob(jobContext) } - def executorSideSetup(taskContext:TaskContext): Unit = { + def executorSideSetup(taskContext: TaskContext): Unit = { setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) setupConf() taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) From 1d7d206382cc1ebabfeb89da62766ab1f913f38d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 19 Jun 2015 18:13:09 -0700 Subject: [PATCH 05/11] Adds more logs --- .../org/apache/spark/sql/hive/orc/OrcFileOperator.scala | 9 +++++---- .../org/apache/spark/sql/hive/orc/OrcRelation.scala | 4 ++-- .../apache/spark/sql/sources/SimpleTextRelation.scala | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 1e51173a19882..e3ab9442b4821 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType -private[orc] object OrcFileOperator extends Logging{ +private[orc] object OrcFileOperator extends Logging { def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { val conf = config.getOrElse(new Configuration) val fspath = new Path(pathStr) val fs = fspath.getFileSystem(conf) val orcFiles = listOrcFiles(pathStr, conf) - + logDebug(s"Creating ORC Reader from ${orcFiles.head}") // TODO Need to consider all files when schema evolution is taken into account. OrcFile.createReader(fs, orcFiles.head) } @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{ val reader = getFileReader(path, conf) val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $path, got Hive schema string: $schema") HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] } @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{ def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) - val path = origPath.makeQualified(fs) + val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) .filterNot(_.isDir) .map(_.getPath) .filterNot(_.getName.startsWith("_")) .filterNot(_.getName.startsWith(".")) - if (paths == null || paths.size == 0) { + if (paths == null || paths.isEmpty) { throw new IllegalArgumentException( s"orcFileOperator: path $path does not have valid orc files matching the pattern") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index a5114c8c3f8ec..705f48f1cd9f0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.orc -import java.util.{Properties, UUID} +import java.util.Properties import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.{Logging} import org.apache.spark.util.SerializableConfiguration /* Implicit conversions */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 0f959b3d0b86d..c783634d864b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -156,6 +156,7 @@ class CommitFailureTestRelation( context: TaskAttemptContext): OutputWriter = { new SimpleTextOutputWriter(path, context) { override def close(): Unit = { + super.close() sys.error("Intentional task commitment failure for testing purpose.") } } From 4088226ad5acb715f3eafbe440b885c435310b47 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 20 Jun 2015 01:25:58 -0700 Subject: [PATCH 06/11] Works around SPARK-8501 --- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/orc/OrcSourceSuite.scala | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1452045faa9f9..ea325cc93cb85 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - System.getProperty("spark.sql.test.master", "local[*]"), + System.getProperty("spark.sql.test.master", "local[32]"), "TestSQLContext", new SparkConf() .set("spark.sql.test", "") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 82e08caf46457..433ea9b8534ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { import org.apache.spark.sql.hive.test.TestHive.implicits._ sparkContext - .makeRDD(1 to 10) + .makeRDD(1 to 100) .map(i => OrcData(i, s"part-$i")) .toDF() .registerTempTable(s"orc_temp_table") @@ -70,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { } test("create temporary orc table") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source where intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("create temporary orc table as") { - checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10)) + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100)) checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 10).map(i => Row(i, s"part-$i"))) + (1 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT * FROM normal_orc_source WHERE intField > 5"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) checkAnswer( sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"), - (1 to 10).map(i => Row(1, s"part-$i"))) + (1 to 100).map(i => Row(1, s"part-$i"))) } test("appending insert") { @@ -106,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_source"), - (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i => + (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i => Seq.fill(2)(Row(i, s"part-$i")) }) } @@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("SELECT * FROM normal_orc_as_source"), - (6 to 10).map(i => Row(i, s"part-$i"))) + (6 to 100).map(i => Row(i, s"part-$i"))) } } From 99a5e7e6d3e06d1c2a3cb05df5d5d3e6f00c2845 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 21 Jun 2015 15:17:32 -0700 Subject: [PATCH 07/11] Uses job level UUID in SimpleTextRelation and avoids double task abortion --- .../apache/spark/sql/sources/commands.scala | 21 +++++++++++-------- .../sql/sources/SimpleTextRelation.scala | 3 ++- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 38b469ed17d9e..da4afd4122194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -428,15 +428,14 @@ private[sql] class DefaultWriterContainer( assert(writer != null, "OutputWriter instance should have been initialized") writer.close() super.commitTask() - } catch { - case cause: Throwable => - super.abortTask() - throw new RuntimeException("Failed to commit task", cause) + } catch { case cause: Throwable => + throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { + // It's possible that the task fails before `writer` gets initialized if (writer != null) { writer.close() } @@ -480,21 +479,25 @@ private[sql] class DynamicPartitionWriterContainer( }) } - override def commitTask(): Unit = { - try { + private def clearOutputWriters(): Unit = { + if (outputWriters.nonEmpty) { outputWriters.values.foreach(_.close()) outputWriters.clear() + } + } + + override def commitTask(): Unit = { + try { + clearOutputWriters() super.commitTask() } catch { case cause: Throwable => - super.abortTask() throw new RuntimeException("Failed to commit task", cause) } } override def abortTask(): Unit = { try { - outputWriters.values.foreach(_.close()) - outputWriters.clear() + clearOutputWriters() } finally { super.abortTask() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index c783634d864b5..5d7cd16c129cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW numberFormat.setGroupingUsed(false) override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId val name = FileOutputFormat.getOutputName(context) - new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}") + new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId") } } From 088c76c0c68abd892ffd52eca79485d1632607a2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 21 Jun 2015 15:51:36 -0700 Subject: [PATCH 08/11] Adds comment about SPARK-8501 --- .../org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 433ea9b8534ef..a0cdd0db42d65 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -43,6 +43,12 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { orcTableDir.mkdir() import org.apache.spark.sql.hive.test.TestHive.implicits._ + // Originally we were using a 10-row RDD for testing. However, when default parallelism is + // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions, + // which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and + // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501 + // for more details. To workaround this issue before fixing SPARK-8501, we simply increase row + // number in this RDD to avoid empty partitions. sparkContext .makeRDD(1 to 100) .map(i => OrcData(i, s"part-$i")) From 85c478edc9d3a84a2657658f8249022c966921df Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 21 Jun 2015 18:41:03 -0700 Subject: [PATCH 09/11] Workarounds SPARK-8513 --- .../apache/spark/sql/sources/hadoopFsRelationSuites.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index e1033b319426d..e0d8277a8ed3f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -529,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { } class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils { - import TestHive.implicits._ - override val sqlContext = TestHive + // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName test("SPARK-7684: commitTask() failure should fallback to abortTask()") { withTempPath { file => - val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b") + // Here we coalesce partition number to 1 to ensure that only a single task is issued. This + // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` + // directory while committing/aborting the job. See SPARK-8513 for more details. + val df = sqlContext.range(0, 10).coalesce(1) intercept[SparkException] { df.write.format(dataSourceName).save(file.getCanonicalPath) } From f5c1133b7604671b2238008809b70e34debba406 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 21 Jun 2015 23:23:27 -0700 Subject: [PATCH 10/11] Addresses comments --- .../org/apache/spark/sql/sources/commands.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index da4afd4122194..3a4d04ed793b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -21,11 +21,9 @@ import java.util.{Date, UUID} import scala.collection.mutable -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} -import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -71,7 +69,7 @@ private[sql] case class InsertIntoHadoopFsRelation( relation.paths.length == 1, s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") - val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val hadoopConf = sqlContext.sparkContext.hadoopConfiguration val outputPath = new Path(relation.paths.head) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -262,7 +260,7 @@ private[sql] abstract class BaseWriterContainer( with Logging with Serializable { - protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job)) + protected val serializableConf = new SerializableConfiguration(job.getConfiguration) // This UUID is used to avoid output file name collision between different appending write jobs. // These jobs may belong to different SparkContext instances. Concrete data source implementations @@ -298,8 +296,10 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() - ContextUtil.getConfiguration(job).set( - "spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + // This UUID is sent to executor side together with the serialized `Configuration` object within + // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate + // unique task output files. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, From db7a46a169d1789ff221d7f84315edc3df04511a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 22 Jun 2015 00:45:39 -0700 Subject: [PATCH 11/11] More comments --- .../apache/spark/sql/sources/commands.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 3a4d04ed793b6..215e53c020849 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -58,6 +58,28 @@ private[sql] case class InsertIntoDataSource( } } +/** + * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. + * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a + * single write job, and owns a UUID that identifies this job. Each concrete implementation of + * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for + * each task output file. This UUID is passed to executor side via a property named + * `spark.sql.sources.writeJobUUID`. + * + * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]] + * are used to write to normal tables and tables with dynamic partitions. + * + * Basic work flow of this command is: + * + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, @@ -429,6 +451,8 @@ private[sql] class DefaultWriterContainer( writer.close() super.commitTask() } catch { case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will + // cause `abortTask()` to be invoked. throw new RuntimeException("Failed to commit task", cause) } }