From 304543da3f7f539928e3bc79c13055ce5e64ee2d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 18 Feb 2019 21:42:08 +0800 Subject: [PATCH 1/2] reenable file source v2 --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 7 ++++--- .../spark/sql/FileBasedDataSourceSuite.scala | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0b7b67ed56d28..d285e007dac1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1452,7 +1452,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("orc") + .createWithDefault("") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 450828172b934..6bd4e0fb29a6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode @@ -266,13 +266,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { provider.getTable(dsOptions) match { case table: SupportsBatchWrite => lazy val relation = DataSourceV2Relation.create(table, options) + val isFileSource = table.isInstanceOf[FileTable] mode match { - case SaveMode.Append => + case SaveMode.Append if !isFileSource => runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } - case SaveMode.Overwrite => + case SaveMode.Overwrite if !isFileSource => // truncate the table runCommand(df.sparkSession, "save") { OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 58522f7b13769..a059113cf354d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -469,6 +469,25 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("File data sources V2 supports overwriting with different schema") { + withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "") { + Seq("orc", "parquet", "json").foreach { format => + withTempPath { p => + val path = p.getCanonicalPath + spark.range(10).write.format(format).save(path) + val newDF = spark.range(20).map(id => (id.toDouble, id.toString)).toDF("double", "string") + newDF.write.format(format).mode("overwrite").save(path) + + val readDF = spark.read.format(format).load(path) + val expectedSchema = StructType(Seq( + StructField("double", DoubleType, true), StructField("string", StringType, true))) + assert(readDF.schema == expectedSchema) + checkAnswer(readDF, newDF) + } + } + } + } + test("SPARK-25237 compute correct input metrics in FileScanRDD") { withTempPath { p => val path = p.getAbsolutePath From 8316c07b6415a65901b5479f8b01f0ec548bfb40 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 18 Feb 2019 21:42:15 +0800 Subject: [PATCH 2/2] Revert "[SPARK-26744][SPARK-26744][SQL][HOTFOX] Disable schema validation tests for FileDataSourceV2 (partially revert )" This reverts commit a0e81fcfe8a6dbb246f8b170b6f5e203ab194d7e. --- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index a059113cf354d..b6176a371f98a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -329,7 +329,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "orc" } else { @@ -374,7 +374,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "orc" } else {