From 553fea18e9b4db2fe934bdafb8751959da5c2668 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 10 Oct 2018 18:38:55 +0800 Subject: [PATCH 1/5] Creates ReadSupport in only Append Mode in Data Source V2 write path --- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/sources/v2/DataSourceV2Suite.scala | 20 ++++++++++++++----- .../sources/v2/SimpleWritableDataSource.scala | 5 ++--- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 188fce72efac5..55e538f49feda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options) if (mode == SaveMode.Append) { + val relation = DataSourceV2Relation.create(source, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 7cc8abc9f0428..76e8a87d928aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -190,12 +190,13 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { test("simple writable data source") { // TODO: java implementation. + val writeOnlySource = classOf[SimpleWriteOnlyDataSource] Seq(classOf[SimpleWritableDataSource]).foreach { cls => withTempPath { file => val path = file.getCanonicalPath assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) - spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) + spark.range(10).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) .option("path", path).save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), @@ -208,20 +209,20 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { spark.read.format(cls.getName).option("path", path).load(), spark.range(10).union(spark.range(10)).select('id, -'id)) - spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) .option("path", path).mode("overwrite").save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), spark.range(5).select('id, -'id)) - spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) .option("path", path).mode("ignore").save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), spark.range(5).select('id, -'id)) val e = intercept[Exception] { - spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) .option("path", path).mode("error").save() } assert(e.getMessage.contains("data already exists")) @@ -240,7 +241,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { // this input data will fail to read middle way. val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j) val e2 = intercept[SparkException] { - input.write.format(cls.getName).option("path", path).mode("overwrite").save() + input.write.format(writeOnlySource.getName).option("path", path).mode("overwrite").save() } assert(e2.getMessage.contains("Writing job aborted")) // make sure we don't have partial data. @@ -640,3 +641,12 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } } + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + override def fullSchema(): StructType = { + // This is a bit hacky since this source implements read support but throws + // during schema retrieval. Might have to rewrite but it's done + // such so for minimised changes. + throw new UnsupportedOperationException("read is not supported") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index a0f4404f46140..a7dfc2d1deacc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2 with BatchWriteSupportProvider with SessionConfigSupport { - private val schema = new StructType().add("i", "long").add("j", "long") + protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport { - override def fullSchema(): StructType = schema + override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema() override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { val dataPath = new Path(path) @@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2 schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[BatchWriteSupport] = { - assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) val path = new Path(options.get("path").get()) From b28afe2d2d9c7824f06797ab5dcd2263c06acfbf Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Oct 2018 01:16:32 +0800 Subject: [PATCH 2/5] add a separate test --- .../sql/sources/v2/DataSourceV2Suite.scala | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 76e8a87d928aa..6d96ff3d389d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -190,13 +190,12 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { test("simple writable data source") { // TODO: java implementation. - val writeOnlySource = classOf[SimpleWriteOnlyDataSource] Seq(classOf[SimpleWritableDataSource]).foreach { cls => withTempPath { file => val path = file.getCanonicalPath assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) - spark.range(10).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) + spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) .option("path", path).save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), @@ -209,20 +208,20 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { spark.read.format(cls.getName).option("path", path).load(), spark.range(10).union(spark.range(10)).select('id, -'id)) - spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) .option("path", path).mode("overwrite").save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), spark.range(5).select('id, -'id)) - spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) .option("path", path).mode("ignore").save() checkAnswer( spark.read.format(cls.getName).option("path", path).load(), spark.range(5).select('id, -'id)) val e = intercept[Exception] { - spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName) + spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName) .option("path", path).mode("error").save() } assert(e.getMessage.contains("data already exists")) @@ -241,7 +240,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { // this input data will fail to read middle way. val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j) val e2 = intercept[SparkException] { - input.write.format(writeOnlySource.getName).option("path", path).mode("overwrite").save() + input.write.format(cls.getName).option("path", path).mode("overwrite").save() } assert(e2.getMessage.contains("Writing job aborted")) // make sure we don't have partial data. @@ -352,6 +351,21 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing in other modes except append mode") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5) + try { + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + } catch { + case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) + } + } + } } @@ -642,11 +656,13 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { override def fullSchema(): StructType = { // This is a bit hacky since this source implements read support but throws // during schema retrieval. Might have to rewrite but it's done // such so for minimised changes. - throw new UnsupportedOperationException("read is not supported") + throw new SchemaReadAttemptException("read is not supported") } } From fa69f9c1660a0e240b0a827d08389dc05fe44612 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Oct 2018 13:06:08 +0800 Subject: [PATCH 3/5] Fix the test --- .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 6d96ff3d389d5..783d82aac49d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -356,7 +356,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { withTempPath { file => val cls = classOf[SimpleWriteOnlyDataSource] val path = file.getCanonicalPath - val df = spark.range(5) + val df = spark.range(5).select($"id", $"id") try { df.write.format(cls.getName).option("path", path).mode("error").save() df.write.format(cls.getName).option("path", path).mode("overwrite").save() From ded852c3f99d9fe904a6b54691ac6c170da9a298 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Oct 2018 13:10:47 +0800 Subject: [PATCH 4/5] nit: use different column name --- .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 783d82aac49d6..38557264629cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -356,7 +356,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { withTempPath { file => val cls = classOf[SimpleWriteOnlyDataSource] val path = file.getCanonicalPath - val df = spark.range(5).select($"id", $"id") + val df = spark.range(5).select('id as 'i, -'id as 'j) try { df.write.format(cls.getName).option("path", path).mode("error").save() df.write.format(cls.getName).option("path", path).mode("overwrite").save() From 2a422535451c186546a2ce3da66d422805f7db32 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Oct 2018 14:34:53 +0800 Subject: [PATCH 5/5] Add a test for append mode as well --- .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 38557264629cf..e8f291af13baf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -364,6 +364,9 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } catch { case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) } + intercept[SchemaReadAttemptException] { + df.write.format(cls.getName).option("path", path).mode("append").save() + } } } }