From 885b721003923f701643ca7d93ef9edddc8a9961 Mon Sep 17 00:00:00 2001 From: Anbu Cheeralan Date: Mon, 19 Dec 2016 13:12:11 -0500 Subject: [PATCH 1/2] add skip flag to skip partition check --- .../execution/datasources/DataSource.scala | 37 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++ 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5245c14a4c965..0969b7bef4d35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -445,21 +445,28 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. - if (mode == SaveMode.Append) { - val existingPartitionColumns = Try { - getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList - }.getOrElse(Seq.empty[String]) - // TODO: Case sensitivity. - val sameColumns = - existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) - if (existingPartitionColumns.nonEmpty && !sameColumns) { - throw new AnalysisException( - s"""Requested partitioning does not match existing partitioning. - |Existing partitioning columns: - | ${existingPartitionColumns.mkString(", ")} - |Requested partitioning columns: - | ${partitionColumns.mkString(", ")} - |""".stripMargin) + + // SPARK-18917 Skip Partition Check to skip reading all leaf files + val skipPartitionCheckOnAppend = sparkSession.sessionState.conf.skipPartitionCheckOnAppend + if (skipPartitionCheckOnAppend) { + logInfo("Skipping Partition Check on Append Mode") + } else { + if (mode == SaveMode.Append) { + val existingPartitionColumns = Try { + getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList + }.getOrElse(Seq.empty[String]) + // TODO: Case sensitivity. + val sameColumns = + existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) + if (existingPartitionColumns.nonEmpty && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingPartitionColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4d25f54caa130..b14096c6c884e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -546,6 +546,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SKIP_PARTITION_CHECK_ON_APPEND = + SQLConfigBuilder("spark.sql.execution.skipPartitionCheckOnAppend") + .internal() + .doc("Decides if we need to skip Partition Check on Append Mode. +" + + "Enable this is when writing to Object Stores to avoid time out issues") + .booleanConf + .createWithDefault(false) + val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -813,6 +821,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) + def skipPartitionCheckOnAppend: Boolean = getConf(SKIP_PARTITION_CHECK_ON_APPEND) + def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) From 43e599eac828fae630d0ac0acd00255ac6c77ae4 Mon Sep 17 00:00:00 2001 From: Anbu Cheeralan Date: Mon, 19 Dec 2016 13:23:03 -0500 Subject: [PATCH 2/2] change description --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0969b7bef4d35..512c8f78eab6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -446,7 +446,7 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. - // SPARK-18917 Skip Partition Check to skip reading all leaf files + // SPARK-18917 Add Skip Partition Check Flag to avoid list all leaf files in append mode val skipPartitionCheckOnAppend = sparkSession.sessionState.conf.skipPartitionCheckOnAppend if (skipPartitionCheckOnAppend) { logInfo("Skipping Partition Check on Append Mode")