From e1a97229ab474f959c28efcbe0a10a1593913234 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 16 Mar 2021 09:24:07 +0100 Subject: [PATCH 1/8] [SPARK-34756][SQL] Fix FileScan equality check --- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 363dd154b5fbb..50265df61f7c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -86,7 +86,7 @@ trait FileScan extends Scan override def equals(obj: Any): Boolean = obj match { case f: FileScan => - fileIndex == f.fileIndex && readSchema == f.readSchema + fileIndex == f.fileIndex && readSchema == f.readSchema && ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters) From f65ebe3f397053e103d482c50d7f250f798bb825 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 16 Mar 2021 19:46:26 +0100 Subject: [PATCH 2/8] normalize filters in `FileScan.equals()` --- .../execution/datasources/v2/FileScan.scala | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 50265df61f7c9..2a6f3d616790f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -24,8 +24,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics} import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ @@ -84,11 +85,24 @@ trait FileScan extends Scan protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") + private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { + val output = readSchema().toAttributes + val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap + val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap + val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( + QueryPlan.normalizeExpressions(_, output.map(a => + partitionFilterAttributes.getOrElse(a.name, a))))) + val normalizedDataFilters = ExpressionSet(dataFilters.map( + QueryPlan.normalizeExpressions(_, output.map(a => + dataFiltersAttributes.getOrElse(a.name, a))))) + (normalizedPartitionFilters, normalizedDataFilters) + } + override def equals(obj: Any): Boolean = obj match { case f: FileScan => fileIndex == f.fileIndex && readSchema == f.readSchema && - ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && - ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters) + normalizedPartitionFilters == f.normalizedPartitionFilters && + normalizedDataFilters == f.normalizedDataFilters case _ => false } From 776828e5741807a0e7d70f437f1968f1299f3185 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 17 Mar 2021 18:09:01 +0100 Subject: [PATCH 3/8] add attribute name normalization --- .../sql/execution/datasources/v2/FileScan.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 2a6f3d616790f..24bd920634b2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -86,15 +86,16 @@ trait FileScan extends Scan protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { - val output = readSchema().toAttributes - val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap - val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap + val output = readSchema().toAttributes.map(a => a.withName(normalizeName(a.name))) + val partitionFilterAttributes = + AttributeSet(partitionFilters).map(a => normalizeName(a.name) -> a).toMap + val dataFiltersAttributes = AttributeSet(dataFilters).map(a => normalizeName(a.name) -> a).toMap val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( - QueryPlan.normalizeExpressions(_, output.map(a => - partitionFilterAttributes.getOrElse(a.name, a))))) + QueryPlan.normalizeExpressions(_, + output.map(a => partitionFilterAttributes.getOrElse(a.name, a))))) val normalizedDataFilters = ExpressionSet(dataFilters.map( - QueryPlan.normalizeExpressions(_, output.map(a => - dataFiltersAttributes.getOrElse(a.name, a))))) + QueryPlan.normalizeExpressions(_, + output.map(a => dataFiltersAttributes.getOrElse(a.name, a))))) (normalizedPartitionFilters, normalizedDataFilters) } From 67091857ceb266fe0d55413816e94af332ebc482 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 17 Mar 2021 09:48:02 +0100 Subject: [PATCH 4/8] add e3e test --- .../org/apache/spark/sql/SQLQuerySuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 386595277efa9..0c65cc8f65819 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{LogicalRelation, SchemaColumn import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -4065,6 +4066,29 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-33482: Fix FileScan canonicalization") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTempPath { path => + spark.range(5).toDF().write.mode("overwrite").parquet(path.toString) + withTempView("t") { + spark.read.parquet(path.toString).createOrReplaceTempView("t") + val df = sql( + """ + |SELECT * + |FROM t AS t1 + |JOIN t AS t2 ON t2.id = t1.id + |JOIN t AS t3 ON t3.id = t2.id + |""".stripMargin) + df.collect() + val reusedExchanges = collect(df.queryExecution.executedPlan) { + case r: ReusedExchangeExec => r + } + assert(reusedExchanges.size == 1) + } + } + } + } } case class Foo(bar: Option[String]) From 9711da02b01d900bd25b3814e11358f2c36453f2 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 18 Mar 2021 18:33:53 +0100 Subject: [PATCH 5/8] add FileScanSuite and AvroScanSuite --- .../apache/spark/sql/avro/AvroScanSuite.scala | 30 ++ .../org/apache/spark/sql/FileScanSuite.scala | 385 ++++++++++++++++++ 2 files changed, 415 insertions(+) create mode 100644 external/avro/src/test/scala/org/apache/spark/sql/avro/AvroScanSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroScanSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroScanSuite.scala new file mode 100644 index 0000000000000..98a7190ba984e --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroScanSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.spark.sql.FileScanSuiteBase +import org.apache.spark.sql.v2.avro.AvroScan + +class AvroScanSuite extends FileScanSuiteBase { + val scanBuilders = Seq[(String, ScanBuilder, Seq[String])]( + ("AvroScan", + (s, fi, ds, rds, rps, f, o, pf, df) => AvroScan(s, fi, ds, rds, rps, o, f, pf, df), + Seq.empty)) + + run(scanBuilders) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala new file mode 100644 index 0000000000000..8c6dc6b4829ff --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import com.google.common.collect.ImmutableMap +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNull, LessThan} +import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.v2.json.JsonScan +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.execution.datasources.v2.text.TextScan +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +trait FileScanSuiteBase extends SharedSparkSession { + private def newPartitioningAwareFileIndex() = { + new PartitioningAwareFileIndex(spark, Map.empty, None) { + override def partitionSpec(): PartitionSpec = { + PartitionSpec.emptySpec + } + + override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + mutable.LinkedHashMap.empty + } + + override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + Map.empty + } + + override def rootPaths: Seq[Path] = { + Seq.empty + } + + override def refresh(): Unit = {} + } + } + + type ScanBuilder = ( + SparkSession, + PartitioningAwareFileIndex, + StructType, + StructType, + StructType, + Array[Filter], + CaseInsensitiveStringMap, + Seq[Expression], + Seq[Expression]) => FileScan + + def run(scanBuilders: Seq[(String, ScanBuilder, Seq[String])]): Unit = { + val dataSchema = StructType(Seq( + StructField("data", IntegerType, false), + StructField("partition", IntegerType, false), + StructField("other", IntegerType, false))) + val dataSchemaNotEqual = StructType(Seq( + StructField("data", IntegerType, false), + StructField("partition", IntegerType, false), + StructField("other", IntegerType, false), + StructField("new", IntegerType, false))) + val readDataSchema = StructType(Seq(StructField("data", IntegerType, false))) + val readDataSchemaNotEqual = StructType(Seq( + StructField("data", IntegerType, false), + StructField("other", IntegerType, false))) + val readPartitionSchema = StructType(Seq(StructField("partition", IntegerType, false))) + val readPartitionSchemaNotEqual = StructType(Seq( + StructField("partition", IntegerType, false), + StructField("other", IntegerType, false))) + val pushedFilters = + Array[Filter](sources.And(sources.IsNull("data"), sources.LessThan("data", 0))) + val pushedFiltersNotEqual = + Array[Filter](sources.And(sources.IsNull("data"), sources.LessThan("data", 1))) + val optionsMap = ImmutableMap.of("key", "value") + val options = new CaseInsensitiveStringMap(ImmutableMap.copyOf(optionsMap)) + val optionsNotEqual = + new CaseInsensitiveStringMap(ImmutableMap.copyOf(ImmutableMap.of("key2", "value2"))) + val partitionFilters = Seq(And(IsNull('data.int), LessThan('data.int, 0))) + val partitionFiltersNotEqual = Seq(And(IsNull('data.int), LessThan('data.int, 1))) + val dataFilters = Seq(And(IsNull('data.int), LessThan('data.int, 0))) + val dataFiltersNotEqual = Seq(And(IsNull('data.int), LessThan('data.int, 1))) + + scanBuilders.foreach { case (name, scanBuilder, exclusions) => + test(s"SPARK-33482: Test $name equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanEquals = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema.copy(), + readDataSchema.copy(), + readPartitionSchema.copy(), + pushedFilters.clone(), + new CaseInsensitiveStringMap(ImmutableMap.copyOf(optionsMap)), + Seq(partitionFilters: _*), + Seq(dataFilters: _*)) + + assert(parquetScan === parquetScanEquals) + } + + test(s"SPARK-33482: Test $name fileIndex not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val partitioningAwareFileIndexNotEqual = newPartitioningAwareFileIndex() + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndexNotEqual, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + + if (!exclusions.contains("dataSchema")) { + test(s"SPARK-33482: Test $name dataSchema not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchemaNotEqual, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + } + + test(s"SPARK-33482: Test $name readDataSchema not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchemaNotEqual, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + + test(s"SPARK-33482: Test $name readPartitionSchema not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchemaNotEqual, + pushedFilters, + options, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + + if (!exclusions.contains("pushedFilters")) { + test(s"SPARK-33482: Test $name pushedFilters not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFiltersNotEqual, + options, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + } + + test(s"SPARK-33482: Test $name options not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + optionsNotEqual, + partitionFilters, + dataFilters) + + assert(parquetScan !== parquetScanNotEqual) + } + + test(s"SPARK-33482: Test $name partitionFilters not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFiltersNotEqual, + dataFilters) + assert(parquetScan !== parquetScanNotEqual) + } + + test(s"SPARK-33482: Test $name dataFilters not equals") { + val partitioningAwareFileIndex = newPartitioningAwareFileIndex() + + val parquetScan = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFilters) + + val parquetScanNotEqual = scanBuilder( + spark, + partitioningAwareFileIndex, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + options, + partitionFilters, + dataFiltersNotEqual) + assert(parquetScan !== parquetScanNotEqual) + } + } + } +} + +class FileScanSuite extends FileScanSuiteBase { + val scanBuilders = Seq[(String, ScanBuilder, Seq[String])]( + ("ParquetScan", + (s, fi, ds, rds, rps, f, o, pf, df) => + ParquetScan(s, s.sessionState.newHadoopConf(), fi, ds, rds, rps, f, o, pf, df), + Seq.empty), + ("OrcScan", + (s, fi, ds, rds, rps, f, o, pf, df) => + OrcScan(s, s.sessionState.newHadoopConf(), fi, ds, rds, rps, o, f, pf, df), + Seq.empty), + ("CSVScan", + (s, fi, ds, rds, rps, f, o, pf, df) => CSVScan(s, fi, ds, rds, rps, o, f, pf, df), + Seq.empty), + ("JsonScan", + (s, fi, ds, rds, rps, f, o, pf, df) => JsonScan(s, fi, ds, rds, rps, o, f, pf, df), + Seq.empty), + ("TextScan", + (s, fi, _, rds, rps, _, o, pf, df) => TextScan(s, fi, rds, rps, o, pf, df), + Seq("dataSchema", "pushedFilters"))) + + run(scanBuilders) +} From d782723ee5728f54ad8884dab45e3d263a5fa442 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 19 Mar 2021 10:43:54 +0100 Subject: [PATCH 6/8] compact test code --- .../org/apache/spark/sql/FileScanSuite.scala | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala index 8c6dc6b4829ff..433242289c7a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.datasources.v2.text.TextScan import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap trait FileScanSuiteBase extends SharedSparkSession { @@ -71,23 +71,12 @@ trait FileScanSuiteBase extends SharedSparkSession { Seq[Expression]) => FileScan def run(scanBuilders: Seq[(String, ScanBuilder, Seq[String])]): Unit = { - val dataSchema = StructType(Seq( - StructField("data", IntegerType, false), - StructField("partition", IntegerType, false), - StructField("other", IntegerType, false))) - val dataSchemaNotEqual = StructType(Seq( - StructField("data", IntegerType, false), - StructField("partition", IntegerType, false), - StructField("other", IntegerType, false), - StructField("new", IntegerType, false))) - val readDataSchema = StructType(Seq(StructField("data", IntegerType, false))) - val readDataSchemaNotEqual = StructType(Seq( - StructField("data", IntegerType, false), - StructField("other", IntegerType, false))) - val readPartitionSchema = StructType(Seq(StructField("partition", IntegerType, false))) - val readPartitionSchemaNotEqual = StructType(Seq( - StructField("partition", IntegerType, false), - StructField("other", IntegerType, false))) + val dataSchema = StructType.fromDDL("data INT, partition INT, other INT") + val dataSchemaNotEqual = StructType.fromDDL("data INT, partition INT, other INT, new INT") + val readDataSchema = StructType.fromDDL("data INT") + val readDataSchemaNotEqual = StructType.fromDDL("data INT, other INT") + val readPartitionSchema = StructType.fromDDL("partition INT") + val readPartitionSchemaNotEqual = StructType.fromDDL("partition INT, other INT") val pushedFilters = Array[Filter](sources.And(sources.IsNull("data"), sources.LessThan("data", 0))) val pushedFiltersNotEqual = From 30d2d8b38ae95661d9461e2f6fb961e01d144e64 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 22 Mar 2021 10:48:28 +0100 Subject: [PATCH 7/8] remove the case sensitivity handling --- .../spark/sql/execution/datasources/v2/FileScan.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 24bd920634b2c..ac63725b774d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -86,10 +86,9 @@ trait FileScan extends Scan protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { - val output = readSchema().toAttributes.map(a => a.withName(normalizeName(a.name))) - val partitionFilterAttributes = - AttributeSet(partitionFilters).map(a => normalizeName(a.name) -> a).toMap - val dataFiltersAttributes = AttributeSet(dataFilters).map(a => normalizeName(a.name) -> a).toMap + val output = readSchema().toAttributes + val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap + val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( QueryPlan.normalizeExpressions(_, output.map(a => partitionFilterAttributes.getOrElse(a.name, a))))) From 0d38ac27dec27b77e0057113fbb57de8ea4cd3ca Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 22 Mar 2021 14:35:48 +0100 Subject: [PATCH 8/8] fix naming --- .../org/apache/spark/sql/FileScanSuite.scala | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala index 433242289c7a1..4e7fe8455ff93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileScanSuite.scala @@ -94,7 +94,7 @@ trait FileScanSuiteBase extends SharedSparkSession { test(s"SPARK-33482: Test $name equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -105,7 +105,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanEquals = scanBuilder( + val scanEquals = scanBuilder( spark, partitioningAwareFileIndex, dataSchema.copy(), @@ -116,13 +116,13 @@ trait FileScanSuiteBase extends SharedSparkSession { Seq(partitionFilters: _*), Seq(dataFilters: _*)) - assert(parquetScan === parquetScanEquals) + assert(scan === scanEquals) } test(s"SPARK-33482: Test $name fileIndex not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -135,7 +135,7 @@ trait FileScanSuiteBase extends SharedSparkSession { val partitioningAwareFileIndexNotEqual = newPartitioningAwareFileIndex() - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndexNotEqual, dataSchema, @@ -146,14 +146,14 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } if (!exclusions.contains("dataSchema")) { test(s"SPARK-33482: Test $name dataSchema not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -164,7 +164,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchemaNotEqual, @@ -175,14 +175,14 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } } test(s"SPARK-33482: Test $name readDataSchema not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -193,7 +193,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -204,13 +204,13 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } test(s"SPARK-33482: Test $name readPartitionSchema not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -221,7 +221,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -232,14 +232,14 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } if (!exclusions.contains("pushedFilters")) { test(s"SPARK-33482: Test $name pushedFilters not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -250,7 +250,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -261,14 +261,14 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } } test(s"SPARK-33482: Test $name options not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -279,7 +279,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -290,13 +290,13 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } test(s"SPARK-33482: Test $name partitionFilters not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -307,7 +307,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -317,13 +317,13 @@ trait FileScanSuiteBase extends SharedSparkSession { options, partitionFiltersNotEqual, dataFilters) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } test(s"SPARK-33482: Test $name dataFilters not equals") { val partitioningAwareFileIndex = newPartitioningAwareFileIndex() - val parquetScan = scanBuilder( + val scan = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -334,7 +334,7 @@ trait FileScanSuiteBase extends SharedSparkSession { partitionFilters, dataFilters) - val parquetScanNotEqual = scanBuilder( + val scanNotEqual = scanBuilder( spark, partitioningAwareFileIndex, dataSchema, @@ -344,7 +344,7 @@ trait FileScanSuiteBase extends SharedSparkSession { options, partitionFilters, dataFiltersNotEqual) - assert(parquetScan !== parquetScanNotEqual) + assert(scan !== scanNotEqual) } } }