From 4ebf0c4267a2702346747ad5b6f2d470e041a949 Mon Sep 17 00:00:00 2001 From: Kaifei Yi Date: Tue, 8 Apr 2025 17:00:58 +0800 Subject: [PATCH] Refine the test with specified spark version --- ...HouseIcebergMOREqualityDeletionSuite.scala | 2 +- .../iceberg/ClickHouseIcebergSuite.scala | 17 +++--- .../GlutenClickHouseExcelFormatSuite.scala | 4 +- .../GlutenSQLCollectLimitExecSuite.scala | 20 +++---- .../JsonFunctionsValidateSuite.scala | 22 ++++---- .../gluten/execution/MiscOperatorSuite.scala | 2 +- .../ScalarFunctionsValidateSuite.scala | 26 ++++----- .../VeloxAggregateFunctionsSuite.scala | 8 +-- .../execution/VeloxColumnarCacheSuite.scala | 4 +- .../gluten/execution/VeloxHashJoinSuite.scala | 2 +- .../execution/VeloxStringFunctionsSuite.scala | 2 +- .../sql/execution/VeloxParquetReadSuite.scala | 2 +- .../VeloxParquetWriteForHiveSuite.scala | 4 +- .../apache/gluten/execution/DeltaSuite.scala | 20 ++++--- .../apache/gluten/execution/HudiSuite.scala | 8 +-- .../gluten/execution/IcebergSuite.scala | 18 +++---- .../apache/spark/sql/GlutenQueryTest.scala | 54 +++++++++++++++---- .../GlutenExpressionMappingSuite.scala | 8 +-- 18 files changed, 120 insertions(+), 103 deletions(-) diff --git a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala index 52b69df542cc..fae32bf447b5 100644 --- a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala +++ b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala @@ -59,7 +59,7 @@ class ClickHouseIcebergMOREqualityDeletionSuite extends GlutenClickHouseWholeSta } testWithSpecifiedSparkVersion( - "iceberg read mor table with equality deletion", Array("3.3", "3.5")) { + "iceberg read mor table with equality deletion", "3.3", "3.5") { // The table 'test_upsert_query' was generated by Flink + Iceberg from the iceberg ut, // the root path must be the '/tmp/junit6640909127060857423/default' val testTableName = "local.db.test_upsert_query" diff --git a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala index 3e44047b7424..dafb5dada86d 100644 --- a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala +++ b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala @@ -57,7 +57,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } testWithSpecifiedSparkVersion( - "iceberg bucketed join", Array("3.3", "3.5")) { + "iceberg bucketed join", "3.3", "3.5") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -140,7 +140,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } testWithSpecifiedSparkVersion( - "iceberg bucketed join with partition", Array("3.3", "3.5")) { + "iceberg bucketed join with partition", "3.3", "3.5") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -222,8 +222,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } } - testWithSpecifiedSparkVersion("iceberg bucketed join partition value not exists", - Array("3.4", "3.5")) { + testWithMinSparkVersion("iceberg bucketed join partition value not exists", "3.5") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -302,8 +301,8 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } } - testWithSpecifiedSparkVersion( - "iceberg bucketed join partition value not exists partial cluster", Array("3.4", "3.5")) { + testWithMinSparkVersion( + "iceberg bucketed join partition value not exists partial cluster", "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -383,7 +382,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } testWithSpecifiedSparkVersion( - "iceberg bucketed join with partition filter", Array("3.3", "3.5")) { + "iceberg bucketed join with partition filter", "3.3", "3.5") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -466,7 +465,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite } } - testWithSpecifiedSparkVersion("iceberg: time travel") { + test("iceberg: time travel") { withTable("iceberg_tm") { spark.sql( s""" @@ -765,7 +764,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone is not supported // in Spark 3.4 // TODO: there is a bug when using timestamp type as the partition column - testWithSpecifiedSparkVersion("iceberg partition type - timestamp", Array("")) { + ignore("iceberg partition type - timestamp") { Seq("true", "false").foreach { flag => withSQLConf( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala index cb7bcb430daf..eb9e13e4517d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala @@ -1485,9 +1485,7 @@ class GlutenClickHouseExcelFormatSuite } // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2 - testWithSpecifiedSparkVersion( - "write succeed even if set wrong snappy compression codec level", - Some("3.5")) { + testWithMinSparkVersion("write succeed even if set wrong snappy compression codec level", "3.5") { // TODO: remove duplicated test codes val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/" val format = "parquet" diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala index 8af7b544e284..7ac89c33de4f 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala @@ -56,9 +56,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assert(assertionCondition, assertionMessage) } - testWithSpecifiedSparkVersion( - "ColumnarCollectLimitExec - basic limit test", - Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - basic limit test", "3.2", "3.3") { val df = spark.range(0, 1000, 1).toDF("id").limit(5) val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L)) @@ -67,7 +65,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = true) } - testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", "3.2", "3.3") { val df = spark .range(0, 20, 1) .toDF("id") @@ -80,9 +78,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = true) } - testWithSpecifiedSparkVersion( - "ColumnarCollectLimitExec - range with repartition", - Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - range with repartition", "3.2", "3.3") { val df = spark .range(0, 10, 1) @@ -96,9 +92,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = true) } - testWithSpecifiedSparkVersion( - "ColumnarCollectLimitExec - with distinct values", - Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with distinct values", "3.2", "3.3") { val df = spark .range(0, 10, 1) .toDF("id") @@ -112,7 +106,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = true) } - testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit", Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit", "3.2", "3.3") { val df = spark .range(0, 10, 1) .toDF("id") @@ -125,9 +119,7 @@ class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite { assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = true) } - testWithSpecifiedSparkVersion( - "ColumnarCollectLimitExec - limit after union", - Array("3.2", "3.3")) { + testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - limit after union", "3.2", "3.3") { val df1 = spark.range(0, 5).toDF("id") val df2 = spark.range(5, 10).toDF("id") val unionDf = df1.union(df2).limit(3) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/JsonFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/JsonFunctionsValidateSuite.scala index 3d7c2554082e..260fe586168d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/JsonFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/JsonFunctionsValidateSuite.scala @@ -79,7 +79,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function bool", Some("3.4")) { + testWithMinSparkVersion("from_json function bool", "3.4") { withTempPath { path => Seq[(String)]( @@ -101,7 +101,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function small int", Some("3.4")) { + testWithMinSparkVersion("from_json function small int", "3.4") { withTempPath { path => Seq[(String)]( @@ -123,7 +123,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function int", Some("3.4")) { + testWithMinSparkVersion("from_json function int", "3.4") { withTempPath { path => Seq[(String)]( @@ -145,7 +145,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function big int", Some("3.4")) { + testWithMinSparkVersion("from_json function big int", "3.4") { withTempPath { path => Seq[(String)]( @@ -167,7 +167,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function float", Some("3.4")) { + testWithMinSparkVersion("from_json function float", "3.4") { withTempPath { path => Seq[(String)]( @@ -191,7 +191,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function double", Some("3.4")) { + testWithMinSparkVersion("from_json function double", "3.4") { withTempPath { path => Seq[(String)]( @@ -215,7 +215,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function string", Some("3.4")) { + testWithMinSparkVersion("from_json function string", "3.4") { withTempPath { path => Seq[(String)]( @@ -237,7 +237,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function array", Some("3.4")) { + testWithMinSparkVersion("from_json function array", "3.4") { withTempPath { path => Seq[(String)]( @@ -257,7 +257,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function map", Some("3.4")) { + testWithMinSparkVersion("from_json function map", "3.4") { withTempPath { path => Seq[(String)]( @@ -279,7 +279,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function row", Some("3.4")) { + testWithMinSparkVersion("from_json function row", "3.4") { withTempPath { path => Seq[(String)]( @@ -319,7 +319,7 @@ class JsonFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("from_json function duplicate key", Some("3.4")) { + testWithMinSparkVersion("from_json function duplicate key", "3.4") { withTempPath { path => Seq[(String)]( diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index 43ae1cee0bee..b7d7abe88254 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -312,7 +312,7 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa checkLengthAndPlan(df, 5) } - testWithSpecifiedSparkVersion("coalesce validation", Some("3.4")) { + testWithMinSparkVersion("coalesce validation", "3.4") { withTempPath { path => val data = "2019-09-09 01:02:03.456789" diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index c5cbfe7b8367..8b1ae56893b5 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -123,7 +123,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test array_append function - INT", Some("3.4")) { + testWithMinSparkVersion("Test array_append function - INT", "3.4") { withTempPath { path => Seq[(Array[Int], Int)]( @@ -148,7 +148,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test array_append function - STRING", Some("3.4")) { + testWithMinSparkVersion("Test array_append function - STRING", "3.4") { withTempPath { path => Seq[(Array[String], String)]( @@ -186,7 +186,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("null input for array_size", Some("3.3")) { + testWithMinSparkVersion("null input for array_size", "3.3") { withTempPath { path => Seq[(Array[Int])]( @@ -841,7 +841,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test width_bucket function", Some("3.4")) { + testWithMinSparkVersion("Test width_bucket function", "3.4") { withTempPath { path => Seq[(Integer, Integer, Integer, Integer)]( @@ -859,7 +859,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test url_decode function", Some("3.4")) { + testWithMinSparkVersion("Test url_decode function", "3.4") { withTempPath { path => Seq("https%3A%2F%2Fspark.apache.org") @@ -874,7 +874,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test url_encode function", Some("3.4")) { + testWithMinSparkVersion("Test url_encode function", "3.4") { withTempPath { path => Seq("https://spark.apache.org") @@ -1005,7 +1005,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("mask", Some("3.4")) { + testWithMinSparkVersion("mask", "3.4") { runQueryAndCompare("SELECT mask(c_comment) FROM customer limit 50") { checkGlutenOperatorMatch[ProjectExecTransformer] } @@ -1119,7 +1119,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("try_subtract", Some("3.3")) { + testWithMinSparkVersion("try_subtract", "3.3") { runQueryAndCompare( "select try_subtract(2147483647, cast(l_orderkey as int)), " + "try_subtract(-2147483648, cast(l_orderkey as int)) from lineitem") { @@ -1135,7 +1135,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("try_multiply", Some("3.3")) { + testWithMinSparkVersion("try_multiply", "3.3") { runQueryAndCompare( "select try_multiply(2147483647, cast(l_orderkey as int)), " + "try_multiply(-2147483648, cast(l_orderkey as int)) from lineitem") { @@ -1301,7 +1301,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("get", Some("3.4")) { + testWithMinSparkVersion("get", "3.4") { withTempPath { path => Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), Seq.empty, null) @@ -1392,7 +1392,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("levenshtein with limit", Some("3.5")) { + testWithMinSparkVersion("levenshtein with limit", "3.5") { runQueryAndCompare("select levenshtein(c_comment, c_address, 3) from customer limit 50") { checkGlutenOperatorMatch[ProjectExecTransformer] } @@ -1506,7 +1506,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("array insert", Some("3.4")) { + testWithMinSparkVersion("array insert", "3.4") { withTempPath { path => Seq[Seq[Integer]](Seq(1, null, 5, 4), Seq(5, -1, 8, 9, -7, 2), Seq.empty, null) @@ -1560,7 +1560,7 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } - testWithSpecifiedSparkVersion("Test try_cast", Some("3.4")) { + testWithMinSparkVersion("Test try_cast", "3.4") { withTempView("try_cast_table") { withTempPath { path => diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index a2117a78aaf4..fe3bb035f3fe 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -390,7 +390,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - testWithSpecifiedSparkVersion("regr_r2", Some("3.3")) { + testWithMinSparkVersion("regr_r2", "3.3") { runQueryAndCompare(""" |select regr_r2(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -409,7 +409,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - testWithSpecifiedSparkVersion("regr_slope", Some("3.4")) { + testWithMinSparkVersion("regr_slope", "3.4") { runQueryAndCompare(""" |select regr_slope(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -428,7 +428,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - testWithSpecifiedSparkVersion("regr_intercept", Some("3.4")) { + testWithMinSparkVersion("regr_intercept", "3.4") { runQueryAndCompare(""" |select regr_intercept(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -447,7 +447,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - testWithSpecifiedSparkVersion("regr_sxy regr_sxx regr_syy", Some("3.4")) { + testWithMinSparkVersion("regr_sxy regr_sxx regr_syy", "3.4") { runQueryAndCompare(""" |select regr_sxy(l_quantity, l_tax) from lineitem; |""".stripMargin) { diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala index f595420793f0..e4f59051bdd3 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala @@ -79,7 +79,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } } - testWithSpecifiedSparkVersion("input row", Some("3.2")) { + testWithMinSparkVersion("input row", "3.2") { withTable("t") { sql("CREATE TABLE t USING json AS SELECT * FROM values(1, 'a', (2, 'b'), (3, 'c'))") runQueryAndCompare("SELECT * FROM t", cache = true) { @@ -102,7 +102,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt } // See issue https://github.com/apache/incubator-gluten/issues/8497. - testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar scan", Some("3.3")) { + testWithMinSparkVersion("Input fallen back vanilla Spark columnar scan", "3.3") { def withId(id: Int): Metadata = new MetadataBuilder().putLong("parquet.field.id", id).build() diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 9ec2ec33a68e..3388e7fc6571 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -73,7 +73,7 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("generate hash join plan - v2", Some("3.2")) { + testWithMinSparkVersion("generate hash join plan - v2", "3.2") { withSQLConf( ("spark.sql.autoBroadcastJoinThreshold", "-1"), ("spark.sql.adaptive.enabled", "false"), diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala index ff2967b1c38a..3cb8fa91734c 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala @@ -532,7 +532,7 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite { s"from $LINEITEM_TABLE limit 5") { _ => } } - testWithSpecifiedSparkVersion("split", Some("3.4")) { + testWithMinSparkVersion("split", "3.4") { runQueryAndCompare( s"select l_orderkey, l_comment, split(l_comment, '') " + s"from $LINEITEM_TABLE limit 5") { diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetReadSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetReadSuite.scala index 552b173fbbbd..7cd2ba088031 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetReadSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetReadSuite.scala @@ -32,7 +32,7 @@ class VeloxParquetReadSuite extends VeloxWholeStageTransformerSuite { .set(VeloxConfig.LOAD_QUANTUM.key, "256m") } - testWithSpecifiedSparkVersion("read example parquet files", Some("3.5"), Some("3.5")) { + testWithMinSparkVersion("read example parquet files", "3.5") { withTable("test_table") { val dir = new File(getClass.getResource(resourcePath).getFile) val files = dir.listFiles diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 44e36df09ae0..1315a3ef9252 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -331,9 +331,9 @@ class VeloxParquetWriteForHiveSuite } } - testWithSpecifiedSparkVersion( + testWithMaxSparkVersion( "Native writer should keep the same compression codec if `hive.exec.compress.output` is true", - maxSparkVersion = Some("3.3")) { + "3.3") { Seq(false, true).foreach { enableNativeWrite => withSQLConf("spark.gluten.sql.native.writer.enabled" -> enableNativeWrite.toString) { diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 9046ba41409e..fb135023eb99 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -43,7 +43,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } // IdMapping is supported in Delta 2.2 (related to Spark3.3.1) - testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3")) { + testWithMinSparkVersion("column mapping mode = id", "3.3") { withTable("delta_cm1") { spark.sql(s""" |create table delta_cm1 (id int, name string) using delta @@ -63,7 +63,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } // NameMapping is supported in Delta 2.0 (related to Spark3.2.0) - testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2")) { + testWithMinSparkVersion("column mapping mode = name", "3.2") { withTable("delta_cm2") { spark.sql(s""" |create table delta_cm2 (id int, name string) using delta @@ -82,7 +82,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("delta: time travel", Some("3.3")) { + testWithMinSparkVersion("delta: time travel", "3.3") { withTable("delta_tm") { spark.sql(s""" |create table delta_tm (id int, name string) using delta @@ -107,7 +107,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("delta: partition filters", Some("3.2")) { + testWithMinSparkVersion("delta: partition filters", "3.2") { withTable("delta_pf") { spark.sql(s""" |create table delta_pf (id int, name string) using delta partitioned by (name) @@ -126,7 +126,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("basic test with stats.skipping disabled", Some("3.2")) { + testWithMinSparkVersion("basic test with stats.skipping disabled", "3.2") { withTable("delta_test2") { withSQLConf("spark.databricks.delta.stats.skipping" -> "false") { spark.sql(s""" @@ -146,7 +146,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("column mapping with complex type", Some("3.2")) { + testWithMinSparkVersion("column mapping with complex type", "3.2") { withTable("t1") { val simpleNestedSchema = new StructType() .add("a", StringType, true) @@ -196,7 +196,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("deletion vector", Some("3.4")) { + testWithMinSparkVersion("deletion vector", "3.4") { withTempPath { p => import testImplicits._ @@ -213,7 +213,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("delta: push down input_file_name expression", Some("3.2")) { + testWithMinSparkVersion("delta: push down input_file_name expression", "3.2") { withTable("source_table") { withTable("target_table") { spark.sql(s""" @@ -251,9 +251,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion( - "delta: need to validate delta expression before execution", - Some("3.2")) { + testWithMinSparkVersion("delta: need to validate delta expression before execution", "3.2") { withTable("source_table") { withTable("target_table") { spark.sql(s""" diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/HudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/HudiSuite.scala index 97633fa064cc..471622f445a0 100644 --- a/gluten-hudi/src/test/scala/org/apache/gluten/execution/HudiSuite.scala +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/HudiSuite.scala @@ -38,7 +38,7 @@ abstract class HudiSuite extends WholeStageTransformerSuite { .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") } - testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) { + testWithMinSparkVersion("hudi: time travel", "3.2") { withTable("hudi_tm") { spark.sql(s""" |create table hudi_tm (id int, name string) using hudi @@ -65,7 +65,7 @@ abstract class HudiSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) { + testWithMinSparkVersion("hudi: soft delete", "3.2") { withTable("hudi_pf") { spark.sql(s""" |create table hudi_pf (id int, name string) using hudi @@ -85,7 +85,7 @@ abstract class HudiSuite extends WholeStageTransformerSuite { } // FIXME: flaky leaked file systems issue - ignore("hudi: mor", Some("3.2")) { + ignore("hudi: mor") { withTable("hudi_mor") { spark.sql(s""" |create table hudi_mor (id int, name string, ts bigint) @@ -111,7 +111,7 @@ abstract class HudiSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("hudi: partition filters", Some("3.2")) { + testWithMinSparkVersion("hudi: partition filters", "3.2") { withTable("hudi_pf") { spark.sql(s""" |create table hudi_pf (id int, name string) using hudi partitioned by (name) diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala index 6016001a060d..ec071fff7395 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala @@ -57,7 +57,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("iceberg bucketed join", Some("3.4")) { + testWithMinSparkVersion("iceberg bucketed join", "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -131,7 +131,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("iceberg bucketed join with partition", Some("3.4")) { + testWithMinSparkVersion("iceberg bucketed join with partition", "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -205,9 +205,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion( - "iceberg bucketed join partition value not exists", - Array("3.4", "3.5")) { + testWithMinSparkVersion("iceberg bucketed join partition value not exists", "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -282,9 +280,9 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion( + testWithMinSparkVersion( "iceberg bucketed join partition value not exists partial cluster", - Array("3.4", "3.5")) { + "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -359,7 +357,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("iceberg bucketed join with partition filter", Some("3.4")) { + testWithMinSparkVersion("iceberg bucketed join with partition filter", "3.4") { val leftTable = "p_str_tb" val rightTable = "p_int_tb" withTable(leftTable, rightTable) { @@ -435,7 +433,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { } } - testWithSpecifiedSparkVersion("iceberg: time travel") { + test("iceberg: time travel") { withTable("iceberg_tm") { spark.sql(s""" |create table iceberg_tm (id int, name string) using iceberg @@ -593,7 +591,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone is not supported // in Spark 3.4 - testWithSpecifiedSparkVersion("iceberg partition type - timestamp", Array("3.2", "3.3", "3.5")) { + testWithSpecifiedSparkVersion("iceberg partition type - timestamp", "3.2", "3.3", "3.5") { Seq("true", "false").foreach { flag => withSQLConf( diff --git a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala index 1d4a63bdbc28..21185a358179 100644 --- a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala +++ b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala @@ -60,7 +60,21 @@ abstract class GlutenQueryTest extends PlanTest { maxVersion(0) > version(0) || maxVersion(0) == version(0) && maxVersion(1) >= version(1) } - def shouldRun( + /** + * Check if the current spark version is between the minVersion and maxVersion. + * + * If the maxVersion is not specified, it will check if the current spark version is greater than + * or equal to, and if the minVersion is not specified, it will check if the current spark version + * is less than or equal to. + * + * @param minSparkVersion + * the minimum spark version + * @param maxSparkVersion + * the maximum spark version + * @return + * true if the current spark version is between the minVersion and maxVersion + */ + def matchSparkVersion( minSparkVersion: Option[String] = None, maxSparkVersion: Option[String] = None): Boolean = { var shouldRun = true @@ -77,31 +91,49 @@ abstract class GlutenQueryTest extends PlanTest { shouldRun } - def ignore( + /** Ignore the test if the current spark version is between the minVersion and maxVersion */ + def ignoreWithSpecifiedSparkVersion( testName: String, minSparkVersion: Option[String] = None, maxSparkVersion: Option[String] = None)(testFun: => Any): Unit = { - if (shouldRun(minSparkVersion, maxSparkVersion)) { + if (matchSparkVersion(minSparkVersion, maxSparkVersion)) { ignore(testName) { testFun } } } - def testWithSpecifiedSparkVersion( - testName: String, - minSparkVersion: Option[String] = None, - maxSparkVersion: Option[String] = None)(testFun: => Any): Unit = { - if (shouldRun(minSparkVersion, maxSparkVersion)) { + /** Run the test if the current spark version is between the minVersion and maxVersion */ + def testWithRangeSparkVersion(testName: String, minSparkVersion: String, maxSparkVersion: String)( + testFun: => Any): Unit = { + if (matchSparkVersion(Some(minSparkVersion), Some(maxSparkVersion))) { test(testName) { testFun } } } - def testWithSpecifiedSparkVersion(testName: String, versions: Array[String])( - testFun: => Any): Unit = { - if (versions.exists(v => shouldRun(Some(v), Some(v)))) { + /** Run the test if the current spark version less than the maxVersion */ + def testWithMaxSparkVersion(testName: String, maxVersion: String)(testFun: => Any): Unit = { + if (matchSparkVersion(maxSparkVersion = Some(maxVersion))) { + test(testName) { + testFun + } + } + } + + /** Run the test if the current spark version greater than the minVersion */ + def testWithMinSparkVersion(testName: String, minVersion: String)(testFun: => Any): Unit = { + if (matchSparkVersion(Some(minVersion))) { + test(testName) { + testFun + } + } + } + + /** Run the test on the specified spark version */ + def testWithSpecifiedSparkVersion(testName: String, versions: String*)(testFun: => Any): Unit = { + if (versions.exists(v => matchSparkVersion(Some(v), Some(v)))) { test(testName) { testFun } diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index b2fbd11c0abc..6841a87d1373 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -47,7 +47,7 @@ class GlutenExpressionMappingSuite conf } - testWithSpecifiedSparkVersion("test expression blacklist", Some("3.2")) { + testWithMinSparkVersion("test expression blacklist", "3.2") { val names = ExpressionMappings.expressionsMap.values.toSet assert(names.contains("regexp_replace")) assert(names.contains("regexp_extract")) @@ -68,7 +68,7 @@ class GlutenExpressionMappingSuite } } - testWithSpecifiedSparkVersion("test blacklisting regexp expressions", Some("3.2")) { + testWithMinSparkVersion("test blacklisting regexp expressions", "3.2") { val names = ExpressionMappings.expressionsMap.values.toSet assert(names.contains("rlike")) assert(names.contains("regexp_replace")) @@ -94,9 +94,9 @@ class GlutenExpressionMappingSuite } } - testWithSpecifiedSparkVersion( + testWithMinSparkVersion( "GLUTEN-7213: Check fallback reason with CheckOverflowInTableInsert", - Some("3.4")) { + "3.4") { withSQLConf(GlutenConfig.RAS_ENABLED.key -> "false") { withTable("t1", "t2") { sql("create table t1 (a float) using parquet")