From 54b09c9f9c650d04477dffe1c41892717375bd50 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 6 Feb 2018 23:45:32 -0800 Subject: [PATCH 01/13] SPARK-23271 Parquet output contains only _SUCCESS file after writing an empty dataframe --- .../datasources/FileFormatWriter.scala | 10 +++++++--- .../datasources/FileFormatWriterSuite.scala | 19 +++++++++++++++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 1 - 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 1d80a69bc5a1d..66abf8f81c5d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -190,9 +190,13 @@ object FileFormatWriter extends Logging { global = false, child = plan).execute() } - val ret = new Array[WriteTaskResult](rdd.partitions.length) + + // SPARK-23271 If we are attempting to write a zero partition rdd, change the number of + // partition to 1 to make sure we at least set up one write task to write the metadata. + val finalRdd = if (rdd.partitions.length == 0) rdd.repartition(1) else rdd + val ret = new Array[WriteTaskResult](finalRdd.partitions.length) sparkSession.sparkContext.runJob( - rdd, + finalRdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, @@ -202,7 +206,7 @@ object FileFormatWriter extends Logging { committer, iterator = iter) }, - 0 until rdd.partitions.length, + 0 until finalRdd.partitions.length, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) ret(index) = res diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 13f0e0bca86c7..3f011636fc2c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType} class FileFormatWriterSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -32,6 +33,24 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-23271 empty dataframe when saved in parquet should write a metadata only file") { + withTempDir { inputPath => + withTempPath { outputPath => + val anySchema = StructType(StructField("anyName", StringType) :: Nil) + val df = spark.read.schema(anySchema).csv(inputPath.toString) + df.write.parquet(outputPath.toString) + val partFiles = outputPath.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + // Now read the file. + val df1 = spark.read.parquet(outputPath.toString) + checkAnswer(df1, Seq.empty[Row]) + assert(df1.schema.equals(anySchema)) + } + } + } + test("SPARK-22252: FileFormatWriter should respect the input query schema") { withTable("t1", "t2", "t3", "t4") { spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 8c9bb7d56a35f..a707a88dfa670 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -301,7 +301,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be intercept[AnalysisException] { spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) } - spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path) } } From c80afc45ba5761a64b5690d63e263a6f67990228 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 6 Feb 2018 23:58:42 -0800 Subject: [PATCH 02/13] review comment --- .../spark/sql/execution/datasources/FileFormatWriterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 3f011636fc2c6..c9c0fc75a576f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -44,7 +44,7 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { assert(partFiles.length === 1) // Now read the file. - val df1 = spark.read.parquet(outputPath.toString) + val df1 = spark.read.parquet(outputPath.toString) checkAnswer(df1, Seq.empty[Row]) assert(df1.schema.equals(anySchema)) } From 804d159177bb7af66e09d2ffb13e00cede93ede0 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Feb 2018 17:00:12 -0800 Subject: [PATCH 03/13] Review comments --- .../spark/sql/FileBasedDataSourceSuite.scala | 21 +++++++++++++++++++ .../datasources/FileFormatWriterSuite.scala | 18 ---------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 73e3df3b6202e..a24f2acb88e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType} class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll { @@ -89,6 +90,26 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + Seq("orc", "parquet").foreach { format => + test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") { + withTempDir { inputPath => + withTempPath { outputPath => + val anySchema = StructType(StructField("anyName", StringType) :: Nil) + val df = spark.read.schema(anySchema).csv(inputPath.toString) + df.write.format(format).save(outputPath.toString) + val partFiles = outputPath.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + // Now read the file. + val df1 = spark.read.format(format).load(outputPath.toString) + checkAnswer(df1, Seq.empty[Row]) + assert(df1.schema.equals(anySchema)) + } + } + } + } + allFileBasedDataSources.foreach { format => test(s"SPARK-22146 read files containing special characters using $format") { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index c9c0fc75a576f..98c9116f77e1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -33,24 +33,6 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-23271 empty dataframe when saved in parquet should write a metadata only file") { - withTempDir { inputPath => - withTempPath { outputPath => - val anySchema = StructType(StructField("anyName", StringType) :: Nil) - val df = spark.read.schema(anySchema).csv(inputPath.toString) - df.write.parquet(outputPath.toString) - val partFiles = outputPath.listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - assert(partFiles.length === 1) - - // Now read the file. - val df1 = spark.read.parquet(outputPath.toString) - checkAnswer(df1, Seq.empty[Row]) - assert(df1.schema.equals(anySchema)) - } - } - } - test("SPARK-22252: FileFormatWriter should respect the input query schema") { withTable("t1", "t2", "t3", "t4") { spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") From d79b05e412c4b730761d6671ce462f7e31397d53 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Feb 2018 22:10:37 -0800 Subject: [PATCH 04/13] Create a dummy single partition RDD to pass to write task --- .../sql/execution/datasources/FileFormatWriter.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 66abf8f81c5d8..b60eacb6d721c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -191,9 +191,14 @@ object FileFormatWriter extends Logging { child = plan).execute() } - // SPARK-23271 If we are attempting to write a zero partition rdd, change the number of - // partition to 1 to make sure we at least set up one write task to write the metadata. - val finalRdd = if (rdd.partitions.length == 0) rdd.repartition(1) else rdd + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single + // partition rdd to make sure we at least set up one write task to write the metadata. + val finalRdd = if (rdd.partitions.length == 0) { + sparkSession.sparkContext.parallelize(Array.empty[InternalRow]) + } else { + rdd + } + val ret = new Array[WriteTaskResult](finalRdd.partitions.length) sparkSession.sparkContext.runJob( finalRdd, From 20851c766dca6ba8811ecbe026f251b003346465 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Feb 2018 23:20:12 -0800 Subject: [PATCH 05/13] Code review --- .../datasources/FileFormatWriter.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index b60eacb6d721c..b943cd9e7450d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -194,7 +194,7 @@ object FileFormatWriter extends Logging { // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. val finalRdd = if (rdd.partitions.length == 0) { - sparkSession.sparkContext.parallelize(Array.empty[InternalRow]) + sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) } else { rdd } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index a24f2acb88e4d..d433b9748b968 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -92,22 +93,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo Seq("orc", "parquet").foreach { format => test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") { - withTempDir { inputPath => - withTempPath { outputPath => - val anySchema = StructType(StructField("anyName", StringType) :: Nil) - val df = spark.read.schema(anySchema).csv(inputPath.toString) - df.write.format(format).save(outputPath.toString) - val partFiles = outputPath.listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - assert(partFiles.length === 1) - - // Now read the file. - val df1 = spark.read.format(format).load(outputPath.toString) - checkAnswer(df1, Seq.empty[Row]) - assert(df1.schema.equals(anySchema)) - } + withTempPath { outputPath => + val df = spark.emptyDataFrame.select(lit(1).as("i")) + df.write.format(format).save(outputPath.toString) + val partFiles = outputPath.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + // Now read the file. + val df1 = spark.read.format(format).load(outputPath.toString) + checkAnswer(df1, Seq.empty[Row]) + assert(df1.schema.equals(df.schema.asNullable)) } } + } allFileBasedDataSources.foreach { format => From 1c98244d78505c5f4384c679ae30368e0308cff4 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Feb 2018 23:21:36 -0800 Subject: [PATCH 06/13] Code review --- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index d433b9748b968..5f583facbf043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -106,7 +106,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo assert(df1.schema.equals(df.schema.asNullable)) } } - } allFileBasedDataSources.foreach { format => From e09c12a972d230c19a11ea46e2d6f769df2dbe4d Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Feb 2018 23:40:00 -0800 Subject: [PATCH 07/13] Code review --- .../sql/execution/datasources/FileFormatWriter.scala | 8 ++++---- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index b943cd9e7450d..5996b2ba91cdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -193,15 +193,15 @@ object FileFormatWriter extends Logging { // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. - val finalRdd = if (rdd.partitions.length == 0) { + val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) } else { rdd } - val ret = new Array[WriteTaskResult](finalRdd.partitions.length) + val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) sparkSession.sparkContext.runJob( - finalRdd, + rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, @@ -211,7 +211,7 @@ object FileFormatWriter extends Logging { committer, iterator = iter) }, - 0 until finalRdd.partitions.length, + 0 until rddWithNonEmptyPartitions.partitions.length, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) ret(index) = res diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 5f583facbf043..bd3071bcf9010 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.SparkException import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructField, StructType} class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll { From 943ca799ec3f8a95f3833e3e01490d80a84d42f6 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 8 Feb 2018 10:46:18 -0800 Subject: [PATCH 08/13] Document the empty dataframe write semantics --- docs/sql-programming-guide.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 451b814ab6c53..177e047f80b60 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1963,6 +1963,9 @@ working with timestamps in `pandas_udf`s to get the best performance, see - The configuration `spark.sql.decimalOperations.allowPrecisionLoss` has been introduced. It defaults to `true`, which means the new behavior described here; if set to `false`, Spark uses previous rules, ie. it doesn't adjust the needed scale to represent the values and it returns NULL if an exact representation of the value is not possible. - In PySpark, `df.replace` does not allow to omit `value` when `to_replace` is not a dictionary. Previously, `value` could be omitted in the other cases and had `None` by default, which is counterintuitive and error prone. + - Since Spark 2.3, writing an empty dataframe (a dataframe with 0 partitions) in parquet or orc format, creates a format specific metadata only file. In prior versions the metadata only file was not created. As a result, subsequent attempt to read from this directory fails with AnalysisException while inferring schema of the file. For example : df.write.format("parquet").save("outDir") +followed by df.read.format("parquet").load("outDir") results in AnalysisException in prior versions. + ## Upgrading From Spark SQL 2.1 to 2.2 - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access. From fcdc7a9e5f442ffdb77ad22755e17c5a7c9e15a1 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 9 Feb 2018 00:12:08 -0800 Subject: [PATCH 09/13] doc fix --- docs/sql-programming-guide.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 177e047f80b60..0c648673a27af 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1963,8 +1963,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - The configuration `spark.sql.decimalOperations.allowPrecisionLoss` has been introduced. It defaults to `true`, which means the new behavior described here; if set to `false`, Spark uses previous rules, ie. it doesn't adjust the needed scale to represent the values and it returns NULL if an exact representation of the value is not possible. - In PySpark, `df.replace` does not allow to omit `value` when `to_replace` is not a dictionary. Previously, `value` could be omitted in the other cases and had `None` by default, which is counterintuitive and error prone. - - Since Spark 2.3, writing an empty dataframe (a dataframe with 0 partitions) in parquet or orc format, creates a format specific metadata only file. In prior versions the metadata only file was not created. As a result, subsequent attempt to read from this directory fails with AnalysisException while inferring schema of the file. For example : df.write.format("parquet").save("outDir") -followed by df.read.format("parquet").load("outDir") results in AnalysisException in prior versions. + - Since Spark 2.3, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing an empty dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe. ## Upgrading From Spark SQL 2.1 to 2.2 From d00b83627ae79eafacad7758b7f7e5b2a73071ea Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 9 Feb 2018 00:22:21 -0800 Subject: [PATCH 10/13] doc fix --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 0c648673a27af..7921aabc035a7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1963,7 +1963,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - The configuration `spark.sql.decimalOperations.allowPrecisionLoss` has been introduced. It defaults to `true`, which means the new behavior described here; if set to `false`, Spark uses previous rules, ie. it doesn't adjust the needed scale to represent the values and it returns NULL if an exact representation of the value is not possible. - In PySpark, `df.replace` does not allow to omit `value` when `to_replace` is not a dictionary. Previously, `value` could be omitted in the other cases and had `None` by default, which is counterintuitive and error prone. - - Since Spark 2.3, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing an empty dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe. + - Since Spark 2.3, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe. ## Upgrading From Spark SQL 2.1 to 2.2 From bc19030ec8aeb275acdd3add8135bf822f38cdf3 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 9 Feb 2018 00:29:36 -0800 Subject: [PATCH 11/13] remove un-used import --- .../spark/sql/execution/datasources/FileFormatWriterSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 98c9116f77e1f..13f0e0bca86c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructField, StructType} class FileFormatWriterSuite extends QueryTest with SharedSQLContext { import testImplicits._ From ed718665e27535541632612e1503e41203368729 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 7 Mar 2018 15:13:03 -0800 Subject: [PATCH 12/13] Re-target the fix to 2.4 --- docs/sql-programming-guide.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 7921aabc035a7..d2132d2ae7441 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1805,6 +1805,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively. - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`. + - Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe. ## Upgrading From Spark SQL 2.2 to 2.3 @@ -1963,8 +1964,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see - The configuration `spark.sql.decimalOperations.allowPrecisionLoss` has been introduced. It defaults to `true`, which means the new behavior described here; if set to `false`, Spark uses previous rules, ie. it doesn't adjust the needed scale to represent the values and it returns NULL if an exact representation of the value is not possible. - In PySpark, `df.replace` does not allow to omit `value` when `to_replace` is not a dictionary. Previously, `value` could be omitted in the other cases and had `None` by default, which is counterintuitive and error prone. - - Since Spark 2.3, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe. - ## Upgrading From Spark SQL 2.1 to 2.2 - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access. From bc48bbd99809db38501ab5d06df1d7cc33efc30b Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 8 Mar 2018 00:02:47 -0800 Subject: [PATCH 13/13] review .. --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 5996b2ba91cdd..401597f967218 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -211,7 +211,7 @@ object FileFormatWriter extends Logging { committer, iterator = iter) }, - 0 until rddWithNonEmptyPartitions.partitions.length, + rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) ret(index) = res