From 9dc30067086f23e18bb495542d65549dfd1adb4b Mon Sep 17 00:00:00 2001 From: duripeng Date: Fri, 20 Dec 2019 22:50:07 +0800 Subject: [PATCH 1/2] [SPARK-30320][SQL] Fix insert overwrite to DataSource table with dynamic partition error when running multiple task attempts --- .../scala/org/apache/spark/SparkContext.scala | 9 +-- .../spark/internal/config/package.scala | 6 ++ .../io/HadoopMapReduceCommitProtocol.scala | 45 ++++++----- .../InsertWithMultipleTaskAttemptSuite.scala | 78 +++++++++++++++++++ 4 files changed, 113 insertions(+), 25 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 94a0ce78c9826..9ef5b1681f0eb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2768,9 +2768,6 @@ object SparkContext extends Logging { deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ - // When running locally, don't try to re-execute tasks on failure. - val MAX_LOCAL_TASK_FAILURES = 1 - // Ensure that executor's resources satisfies one or more tasks requirement. def checkResourcesPerTask(clusterMode: Boolean, executorCores: Option[Int]): Unit = { val taskCores = sc.conf.get(CPUS_PER_TASK) @@ -2851,7 +2848,8 @@ object SparkContext extends Logging { master match { case "local" => checkResourcesPerTask(clusterMode = false, Some(1)) - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = + new TaskSchedulerImpl(sc, sc.conf.get(MAX_LOCAL_TASK_FAILURES), isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) @@ -2864,7 +2862,8 @@ object SparkContext extends Logging { throw new SparkException(s"Asked to run locally with $threadCount threads") } checkResourcesPerTask(clusterMode = false, Some(threadCount)) - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = + new TaskSchedulerImpl(sc, sc.conf.get(MAX_LOCAL_TASK_FAILURES), isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2078965e88246..67102ea0c6f4b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1521,4 +1521,10 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional + private[spark] val MAX_LOCAL_TASK_FAILURES = ConfigBuilder("spark.max.local.task.failures") + .doc("The max failure time for a task while SparkContext running in Local mode, " + + "SomeTimes we need to ignore some nonfatal task failure or test some cases " + + "when speculation or re-executing on") + .intConf + .createWithDefault(1) } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee2..d3d38f8212f02 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -110,7 +110,9 @@ class HadoopMapReduceCommitProtocol( assert(dir.isDefined, "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") partitionPaths += dir.get - this.stagingDir + // For speculative or re-executing task, we need to assign different + // dir for each task attempt + new Path(this.stagingDir, taskContext.getTaskAttemptID.toString) // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) @@ -167,8 +169,8 @@ class HadoopMapReduceCommitProtocol( committer.commitJob(jobContext) if (hasValidPath) { - val (allAbsPathFiles, allPartitionPaths) = - taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip + val (allAbsPathFiles, allPartitionPaths, successAttemptIDs) = + taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String], String)]).unzip3 val fs = stagingDir.getFileSystem(jobContext.getConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) @@ -183,22 +185,25 @@ class HadoopMapReduceCommitProtocol( } if (dynamicPartitionOverwrite) { - val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - fs.rename(new Path(stagingDir, part), finalPartPath) + allPartitionPaths.zip(successAttemptIDs).foreach { + case (allPartitionPath, successAttempID) => + allPartitionPath.foreach(part => { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of + // finalPartPath also does not exist(e.g. the scenario described on SPARK-23815), + // because FileSystem API spec on rename op says the rename dest(finalPartPath) + // must have a parent that exists, otherwise we may get unexpected result + // on the rename. + fs.mkdirs(finalPartPath.getParent) + } + fs.rename(new Path(stagingDir, s"$successAttempID/$part"), finalPartPath) + }) + case _ => } } @@ -243,7 +248,7 @@ class HadoopMapReduceCommitProtocol( logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + new TaskCommitMessage((addedAbsPathFiles.toMap, partitionPaths.toSet, attemptId.toString)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala new file mode 100644 index 0000000000000..8734636c061ad --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode +import org.apache.spark.sql.test.SharedSparkSession + +class InsertWithMultipleTaskAttemptSuite extends QueryTest with SharedSparkSession { + override def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set(config.MAX_LOCAL_TASK_FAILURES, 3) + } + + test("it is allowed to insert into a table for dynamic partition overwrite " + + "while speculation on") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[InsertExceptionCommitProtocol].getName) { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + + sql( + """ + |INSERT OVERWRITE TABLE insertTable Partition(part1=1, part2) + |SELECT 1,2 + """.stripMargin) + checkAnswer(spark.table("insertTable"), Row(1, 1, 2)) + } + } + } +} + +class InsertExceptionCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean = false) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = + if (InsertExceptionCommitProtocol.inited) { + throw new Exception("test") + } else { + super.commitTask(taskContext) + } +} + +object InsertExceptionCommitProtocol { + private val initedFlag = new AtomicBoolean(false) + def inited: Boolean = initedFlag.compareAndSet(false, true) +} From 84ffc72ea77e4814b7118488f177317d6921896b Mon Sep 17 00:00:00 2001 From: duripeng Date: Mon, 13 Jan 2020 19:41:51 +0800 Subject: [PATCH 2/2] rename local failures times, and introduce implicit conversion --- .../spark/internal/config/package.scala | 4 ++-- .../io/HadoopMapReduceCommitProtocol.scala | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 67102ea0c6f4b..8ec445de6e38f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1521,8 +1521,8 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional - private[spark] val MAX_LOCAL_TASK_FAILURES = ConfigBuilder("spark.max.local.task.failures") - .doc("The max failure time for a task while SparkContext running in Local mode, " + + private[spark] val MAX_LOCAL_TASK_FAILURES = ConfigBuilder("spark.task.local.maxFailures") + .doc("The max failure times for a task while SparkContext running in Local mode, " + "SomeTimes we need to ignore some nonfatal task failure or test some cases " + "when speculation or re-executing on") .intConf diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index d3d38f8212f02..65041b13ae4b8 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.util.{Date, UUID} import scala.collection.mutable +import scala.language.implicitConversions import scala.util.Try import org.apache.hadoop.conf.Configurable @@ -167,10 +168,11 @@ class HadoopMapReduceCommitProtocol( override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) - if (hasValidPath) { - val (allAbsPathFiles, allPartitionPaths, successAttemptIDs) = - taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String], String)]).unzip3 + implicit def asPair(x: (Map[String, String], Set[String], String)) + : (Map[String, String], (Set[String], String)) = (x._1, (x._2, x._3)) + val (allAbsPathFiles, partitionPathsAttemptIDPair) = + taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String], String)]).unzip val fs = stagingDir.getFileSystem(jobContext.getConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) @@ -185,8 +187,8 @@ class HadoopMapReduceCommitProtocol( } if (dynamicPartitionOverwrite) { - allPartitionPaths.zip(successAttemptIDs).foreach { - case (allPartitionPath, successAttempID) => + val allPartitionPaths = partitionPathsAttemptIDPair.map { + case (allPartitionPath, successAttemptID) => allPartitionPath.foreach(part => { val finalPartPath = new Path(path, part) if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { @@ -201,9 +203,14 @@ class HadoopMapReduceCommitProtocol( // on the rename. fs.mkdirs(finalPartPath.getParent) } - fs.rename(new Path(stagingDir, s"$successAttempID/$part"), finalPartPath) + fs.rename(new Path(s"$stagingDir/$successAttemptID", part), finalPartPath) }) - case _ => + allPartitionPath + case _ => Set.empty + } + if (log.isDebugEnabled) { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") } }