From 2e08d24618fa628c173c0c9484e53e9f7530611c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Mar 2026 13:09:21 -0600 Subject: [PATCH 1/2] chore: remove deprecated SCAN_NATIVE_COMET constant and related test code Remove the deprecated `CometConf.SCAN_NATIVE_COMET` constant (`native_comet`) and all test code that references it, including the `usingLegacyNativeCometScan` helper. The native_comet scan was deprecated in 0.9.0 and has been superseded by native_iceberg_compat and native_datafusion scan implementations. - Remove `SCAN_NATIVE_COMET` constant and deprecation annotations - Remove `usingLegacyNativeCometScan` helper methods from CometTestBase - Remove `ParquetReadV2Suite` class (all tests were ignored) - Remove 12 ignored/dead tests across 6 test suites - Simplify conditional branches that checked for SCAN_NATIVE_COMET - Remove `assume(usingLegacyNativeCometScan)` guards (always false) - Simplify `if (!usingLegacyNativeCometScan)` blocks (always true) --- .../scala/org/apache/comet/CometConf.scala | 6 - .../apache/spark/sql/comet/util/Utils.scala | 2 +- .../spark/sql/comet/CometScanExec.scala | 2 +- .../comet/CometArrayExpressionSuite.scala | 8 +- .../org/apache/comet/CometCastSuite.scala | 10 +- .../apache/comet/CometExpressionSuite.scala | 191 +---------- .../comet/CometFuzzAggregateSuite.scala | 36 +- .../org/apache/comet/CometFuzzTestSuite.scala | 74 ++--- .../comet/CometMapExpressionSuite.scala | 11 +- .../exec/CometColumnarShuffleSuite.scala | 61 ---- .../apache/comet/exec/CometExecSuite.scala | 35 -- .../apache/comet/exec/CometJoinSuite.scala | 63 ---- .../comet/parquet/ParquetReadSuite.scala | 309 +----------------- .../comet/rules/CometScanRuleSuite.scala | 39 --- .../org/apache/spark/sql/CometTestBase.scala | 6 - .../spark/sql/CometToPrettyStringSuite.scala | 6 +- .../spark/sql/CometToPrettyStringSuite.scala | 6 +- 17 files changed, 45 insertions(+), 820 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 38a9d0b2ce..4d2e37924a 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -114,12 +114,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false) - // Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices - // and does not support complex types. Use native_iceberg_compat or auto instead. - // This will be removed in a future release. - // See: https://github.com/apache/datafusion-comet/issues/2186 - @deprecated("Use SCAN_AUTO instead. native_comet will be removed in a future release.", "0.9.0") - val SCAN_NATIVE_COMET = "native_comet" val SCAN_NATIVE_DATAFUSION = "native_datafusion" val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat" val SCAN_AUTO = "auto" diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 7662b219c4..6eaa9cad44 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -271,7 +271,7 @@ object Utils extends CometTypeShim { throw new SparkException( s"Comet execution only takes Arrow Arrays, but got ${c.getClass}. " + "This typically happens when a Comet scan falls back to Spark due to unsupported " + - "data types (e.g., complex types like structs, arrays, or maps with native_comet). " + + "data types (e.g., complex types like structs, arrays, or maps). " + "To resolve this, you can: " + "(1) enable spark.comet.scan.allowIncompatible=true to use a compatible native " + "scan variant, or " + diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 2707f0c040..6151a43797 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -51,7 +51,7 @@ import org.apache.comet.parquet.CometParquetFileFormat * * This is a hybrid scan where the native plan will contain a `ScanExec` that reads batches of * data from the JVM via JNI. The ultimate source of data may be a JVM implementation such as - * Spark readers, or could be the `native_comet` or `native_iceberg_compat` native scans. + * Spark readers, or could be the `native_iceberg_compat` native scan. * * Note that scanImpl can only be `native_datafusion` after CometScanRule runs and before * CometExecRule runs. It will never be set to `native_datafusion` at execution time diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 21b4276a64..65b2c85379 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -812,13 +812,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp // https://github.com/apache/datafusion-comet/issues/2612 test("array_reverse - fallback for binary array") { - val fallbackReason = - if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET || sys.env - .getOrElse("COMET_PARQUET_SCAN_IMPL", "") == CometConf.SCAN_NATIVE_COMET) { - "Unsupported schema" - } else { - CometArrayReverse.unsupportedReason - } + val fallbackReason = CometArrayReverse.unsupportedReason withTable("t1") { sql("""create table t1 using parquet as select cast(null as array) c1, cast(array() as array) c2 diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 2fb3fc062b..3d9acc39e4 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1205,14 +1205,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { |USING parquet """.stripMargin) sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')") - if (!usingLegacyNativeCometScan) { - checkSparkAnswerAndOperator( - "SELECT CAST(s AS struct) AS new_struct FROM tab1") - } else { - // Should just fall back to Spark since non-DataSourceExec scan does not support nested types. - checkSparkAnswer( - "SELECT CAST(s AS struct) AS new_struct FROM tab1") - } + checkSparkAnswerAndOperator( + "SELECT CAST(s AS struct) AS new_struct FROM tab1") } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 339061f5be..570db1795c 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -185,22 +185,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // ignored: native_comet scan is no longer supported - ignore("basic data type support") { - // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100") - } - } - } - } - } - test("basic data type support - excluding u8/u16") { // variant that skips _9 (UINT_8) and _10 (UINT_16) for default scan impl Seq(true, false).foreach { dictionaryEnabled => @@ -217,27 +201,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // ignored: native_comet scan is no longer supported - ignore("uint data type support") { - // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "testuint.parquet") - makeParquetFileAllPrimitiveTypes( - path, - dictionaryEnabled = dictionaryEnabled, - Byte.MinValue, - Byte.MaxValue) - withParquetTable(path.toString, "tbl") { - val qry = "select _9 from tbl order by _11" - checkSparkAnswerAndOperator(qry) - } - } - } - } - } - test("uint data type support - excluding u8/u16") { // variant that tests UINT_32 and UINT_64, skipping _9 (UINT_8) and _10 (UINT_16) Seq(true, false).foreach { dictionaryEnabled => @@ -1491,57 +1454,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // ignored: native_comet scan is no longer supported - ignore("round") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(usingLegacyNativeCometScan) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes( - path, - dictionaryEnabled = dictionaryEnabled, - -128, - 128, - randomSize = 100) - // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - withParquetTable(path.toString, "tbl") { - for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 15, -16, 16, - null)) { - // array tests - // TODO: enable test for floats (_6, _7, _8, _13) - for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) { - checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) FROM tbl") - } - // scalar tests - // Exclude the constant folding optimizer in order to actually execute the native round - // operations for scalar (literal) values. - // TODO: comment in the tests for float once supported - withSQLConf( - "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) { - checkSparkAnswerAndOperator( - s"select round(cast(${n} as tinyint), ${s}) FROM tbl") - // checkSparkAnswerAndCometOperators(s"select round(cast(${n} as float), ${s}) FROM tbl") - checkSparkAnswerAndOperator( - s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM tbl") - checkSparkAnswerAndOperator( - s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl") - } - // checkSparkAnswer(s"select round(double('infinity'), ${s}) FROM tbl") - // checkSparkAnswer(s"select round(double('-infinity'), ${s}) FROM tbl") - // checkSparkAnswer(s"select round(double('NaN'), ${s}) FROM tbl") - // checkSparkAnswer( - // s"select round(double('0.000000000000000000000000000000000001'), ${s}) FROM tbl") - } - } - } - } - } - } - } - test("md5") { Seq(false, true).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { @@ -1556,25 +1468,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // ignored: native_comet scan is no longer supported - ignore("hex") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(usingLegacyNativeCometScan) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "hex.parquet") - // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator( - "SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_9), hex(_10), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl") - } - } - } - } - } - test("unhex") { val table = "unhex_table" withTable(table) { @@ -2442,13 +2335,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { df.write.parquet(dir.toString()) } val df = spark.read.parquet(dir.toString()).select("nested1.id") - // Comet's original scan does not support structs. - // The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch - if (!scanImpl.equals(CometConf.SCAN_NATIVE_COMET)) { - checkSparkAnswerAndOperator(df) - } else { - checkSparkAnswer(df) - } + checkSparkAnswerAndOperator(df) } } @@ -2474,19 +2361,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } val df = spark.read.parquet(dir.toString()) - // Comet's original scan does not support structs. - // The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch - if (scanImpl != CometConf.SCAN_NATIVE_COMET) { - checkSparkAnswerAndOperator(df.select("nested1.id")) - checkSparkAnswerAndOperator(df.select("nested1.nested2")) - checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) - checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) - } else { - checkSparkAnswer(df.select("nested1.id")) - checkSparkAnswer(df.select("nested1.nested2")) - checkSparkAnswer(df.select("nested1.nested2.id")) - checkSparkAnswer(df.select("nested1.id", "nested1.nested2.id")) - } + checkSparkAnswerAndOperator(df.select("nested1.id")) + checkSparkAnswerAndOperator(df.select("nested1.nested2")) + checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) } } @@ -2512,13 +2390,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } val df = spark.read.parquet(dir.toString()).select("nested1.id") - // Comet's original scan does not support structs. - // The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch - if (scanImpl != CometConf.SCAN_NATIVE_COMET) { - checkSparkAnswerAndOperator(df) - } else { - checkSparkAnswer(df) - } + checkSparkAnswerAndOperator(df) } } @@ -2595,7 +2467,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("get_struct_field with DataFusion ParquetExec - read entire struct") { - assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled withSQLConf(CometConf.COMET_ENABLED.key -> "false") { @@ -2632,7 +2503,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("read array[int] from parquet") { - assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled @@ -2773,55 +2643,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // ignored: native_comet scan is no longer supported - ignore("test integral divide") { - // this test requires native_comet scan due to unsigned u8/u16 issue - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path1 = new Path(dir.toURI.toString, "test1.parquet") - val path2 = new Path(dir.toURI.toString, "test2.parquet") - makeParquetFileAllPrimitiveTypes( - path1, - dictionaryEnabled = dictionaryEnabled, - 0, - 0, - randomSize = 10000) - makeParquetFileAllPrimitiveTypes( - path2, - dictionaryEnabled = dictionaryEnabled, - 0, - 0, - randomSize = 10000) - withParquetTable(path1.toString, "tbl1") { - withParquetTable(path2.toString, "tbl2") { - checkSparkAnswerAndOperator(""" - |select - | t1._2 div t2._2, div(t1._2, t2._2), - | t1._3 div t2._3, div(t1._3, t2._3), - | t1._4 div t2._4, div(t1._4, t2._4), - | t1._5 div t2._5, div(t1._5, t2._5), - | t1._9 div t2._9, div(t1._9, t2._9), - | t1._10 div t2._10, div(t1._10, t2._10), - | t1._11 div t2._11, div(t1._11, t2._11) - | from tbl1 t1 join tbl2 t2 on t1._id = t2._id - | order by t1._id""".stripMargin) - - checkSparkAnswerAndOperator(""" - |select - | t1._12 div t2._12, div(t1._12, t2._12), - | t1._15 div t2._15, div(t1._15, t2._15), - | t1._16 div t2._16, div(t1._16, t2._16), - | t1._17 div t2._17, div(t1._17, t2._17) - | from tbl1 t1 join tbl2 t2 on t1._id = t2._id - | order by t1._id""".stripMargin) - } - } - } - } - } - } - test("ANSI support for add") { val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala index 191ebd908a..3674887f80 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala @@ -29,9 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) checkSparkAnswerAndOperator(sql) } @@ -45,9 +43,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -57,9 +53,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) checkSparkAnswerAndOperator(sql) } @@ -73,9 +67,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -87,9 +79,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -99,9 +89,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -112,9 +100,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns.drop(1)) { val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -126,9 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " + s"GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } test("min/max aggregate") { @@ -138,9 +122,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT min($col), max($col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index fe6032414e..c1d7d8e72e 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -37,33 +37,21 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1" - if (!usingLegacyNativeCometScan) { - checkSparkAnswerAndOperator(sql) - } else { - checkSparkAnswer(sql) - } + checkSparkAnswerAndOperator(sql) } test("select * with deeply nested complex types") { val df = spark.read.parquet(complexTypesFilename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1" - if (CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET) { - checkSparkAnswerAndOperator(sql) - } else { - checkSparkAnswer(sql) - } + checkSparkAnswerAndOperator(sql) } test("select * with limit") { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1 LIMIT 500" - if (!usingLegacyNativeCometScan) { - checkSparkAnswerAndOperator(sql) - } else { - checkSparkAnswer(sql) - } + checkSparkAnswerAndOperator(sql) } test("select column with default value") { @@ -112,11 +100,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { s"alter table t2 add column col2 $defaultValueType default $defaultValueString") // Verify that our default value matches Spark's answer val sql = "select col2 from t2" - if (!usingLegacyNativeCometScan) { - checkSparkAnswerAndOperator(sql) - } else { - checkSparkAnswer(sql) - } + checkSparkAnswerAndOperator(sql) // Verify that our default value matches what we originally selected out of t1. if (defaultValueType == "BINARY") { assert( @@ -139,9 +123,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $col FROM t1 ORDER BY $col" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } } @@ -152,9 +134,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(1 == collectNativeScans(cometPlan).length) - } + assert(1 == collectNativeScans(cometPlan).length) } test("order by random columns") { @@ -186,18 +166,12 @@ class CometFuzzTestSuite extends CometFuzzTestBase { // check for Comet shuffle val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan val cometShuffleExchanges = collectCometShuffleExchanges(plan) - val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { - case CometConf.SCAN_NATIVE_COMET => - // native_comet does not support reading complex types + val expectedNumCometShuffles = CometConf.COMET_SHUFFLE_MODE.get() match { + case "jvm" => + 1 + case "native" => + // native shuffle does not support complex types as partitioning keys 0 - case _ => - CometConf.COMET_SHUFFLE_MODE.get() match { - case "jvm" => - 1 - case "native" => - // native shuffle does not support complex types as partitioning keys - 0 - } } assert(cometShuffleExchanges.length == expectedNumCometShuffles) } @@ -207,22 +181,14 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) val df2 = df.repartition(8, df.col("c0")).sort("c1") df2.collect() - if (!usingLegacyNativeCometScan) { - val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) - val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { - case CometConf.SCAN_NATIVE_COMET => - // native_comet does not support reading complex types - 0 - case _ => - CometConf.COMET_SHUFFLE_MODE.get() match { - case "jvm" => - 1 - case "native" => - 2 - } - } - assert(cometShuffles.length == expectedNumCometShuffles) + val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) + val expectedNumCometShuffles = CometConf.COMET_SHUFFLE_MODE.get() match { + case "jvm" => + 1 + case "native" => + 2 } + assert(cometShuffles.length == expectedNumCometShuffles) } test("join") { @@ -233,9 +199,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col" val (_, cometPlan) = checkSparkAnswer(sql) - if (!usingLegacyNativeCometScan) { - assert(2 == collectNativeScans(cometPlan).length) - } + assert(2 == collectNativeScans(cometPlan).length) } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index d35eeb0b60..03db26e566 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -33,7 +33,6 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti class CometMapExpressionSuite extends CometTestBase { test("read map[int, int] from parquet") { - assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled @@ -65,7 +64,6 @@ class CometMapExpressionSuite extends CometTestBase { // repro for https://github.com/apache/datafusion-comet/issues/1754 test("read map[struct, struct] from parquet") { - assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled @@ -224,14 +222,7 @@ class CometMapExpressionSuite extends CometTestBase { } test("map_from_entries - fallback for binary type") { - def fallbackReason(reason: String) = { - if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET || sys.env - .getOrElse("COMET_PARQUET_SCAN_IMPL", "") == CometConf.SCAN_NATIVE_COMET) { - "Unsupported schema" - } else { - reason - } - } + def fallbackReason(reason: String) = reason val table = "t2" withTable(table) { sql( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index ed204ef776..f632e10df0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -608,67 +608,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - test("fix: native Unsafe row accessors return incorrect results") { - // TODO byte/short issue - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET) - Seq(10, 201).foreach { numPartitions => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, false, 10000, 10010) - // TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040 - // and https://github.com/apache/arrow-rs/issues/7097 - val fieldsToTest = - if (!usingLegacyNativeCometScan(conf)) { - Seq( - $"_1", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_11", - $"_12", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20") - } else { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20") - } - fieldsToTest.foreach { col => - readParquetFile(path.toString) { df => - val shuffled = df.select(col).repartition(numPartitions, col) - checkShuffleAnswer(shuffled, 1) - } - } - } - } - } - test("fix: StreamReader should always set useDecimal128 as true") { Seq(10, 201).foreach { numPartitions => withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index bcbbdb7f92..91a02d69f7 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -382,41 +382,6 @@ class CometExecSuite extends CometTestBase { } } - // ignored: native_comet scan is no longer supported - ignore("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2 scan") { - withSQLConf( - CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.USE_V1_SOURCE_LIST.key -> "") { - withTempPath { path => - spark - .range(5) - .withColumn("p", $"id" % 2) - .write - .mode("overwrite") - .partitionBy("p") - .parquet(path.toString) - withTempView("t") { - spark.read.parquet(path.toString).createOrReplaceTempView("t") - val df = sql(""" - |SELECT t1.id, t2.id, t3.id - |FROM t AS t1 - |JOIN t AS t2 ON t2.id = t1.id - |JOIN t AS t3 ON t3.id = t2.id - |WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1 - |""".stripMargin) - val reusedPlan = ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan) - val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec => - r - } - assert(reusedExchanges.size == 1) - assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec]) - } - } - } - } - test("CometShuffleExchangeExec logical link should be correct") { withTempView("v") { spark.sparkContext diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 6111b9c0d4..d5a8387be7 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.Decimal import org.apache.comet.CometConf @@ -197,68 +196,6 @@ class CometJoinSuite extends CometTestBase { } } - test("HashJoin struct key") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(usingLegacyNativeCometScan) - withSQLConf( - "spark.sql.join.forceApplyShuffledHashJoin" -> "true", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false", - SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - - def manyTypes(idx: Int, v: Int) = - ( - idx, - v, - v.toLong, - v.toFloat, - v.toDouble, - v.toString, - v % 2 == 0, - v.toString.getBytes, - Decimal(v)) - - withParquetTable((0 until 10).map(i => manyTypes(i, i % 5)), "tbl_a") { - withParquetTable((0 until 10).map(i => manyTypes(i, i % 10)), "tbl_b") { - // Full join: struct key - val df1 = - sql( - "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + - "ON named_struct('1', tbl_a._2) = named_struct('1', tbl_b._1)") - checkSparkAnswerAndOperator(df1) - - // Full join: struct key with nulls - val df2 = - sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + - "ON IF(tbl_a._1 > 5, named_struct('2', tbl_a._2), NULL) = IF(tbl_b._2 > 5, named_struct('2', tbl_b._1), NULL)") - checkSparkAnswerAndOperator(df2) - - // Full join: struct key with nulls in the struct - val df3 = - sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + - "ON named_struct('2', IF(tbl_a._1 > 5, tbl_a._2, NULL)) = named_struct('2', IF(tbl_b._2 > 5, tbl_b._1, NULL))") - checkSparkAnswerAndOperator(df3) - - // Full join: nested structs - val df4 = - sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + - "ON named_struct('1', named_struct('2', tbl_a._2)) = named_struct('1', named_struct('2', tbl_b._1))") - checkSparkAnswerAndOperator(df4) - - val columnCount = manyTypes(0, 0).productArity - def key(tbl: String) = - (1 to columnCount).map(i => s"${tbl}._$i").mkString("struct(", ", ", ")") - // Using several different types in the struct key - val df5 = - sql( - "SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b " + - s"ON ${key("tbl_a")} = ${key("tbl_b")}") - checkSparkAnswerAndOperator(df5) - } - } - } - } - test("HashJoin with join filter") { withSQLConf( SQLConf.PREFER_SORTMERGEJOIN.key -> "false", diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 4a049afbf0..b9caa94308 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -23,7 +23,6 @@ import java.io.File import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} -import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.Breaks.breakable @@ -37,7 +36,7 @@ import org.apache.parquet.schema.MessageTypeParser import org.apache.spark.SparkException import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec} +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf @@ -46,8 +45,6 @@ import org.apache.spark.sql.types._ import com.google.common.primitives.UnsignedLong import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -import org.apache.comet.rules.CometScanTypeChecker abstract class ParquetReadSuite extends CometTestBase { import testImplicits._ @@ -82,88 +79,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - // ignored: native_comet scan is no longer supported - ignore("unsupported Spark types") { - // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET - // https://github.com/apache/datafusion-comet/issues/2188 - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - // for native iceberg compat, CometScanExec supports some types that native_comet does not. - // note that native_datafusion does not use CometScanExec so we need not include that in - // the check - val isDataFusionScan = !usingLegacyNativeCometScan(conf) - Seq( - NullType -> false, - BooleanType -> true, - ByteType -> true, - ShortType -> true, - IntegerType -> true, - LongType -> true, - FloatType -> true, - DoubleType -> true, - BinaryType -> true, - StringType -> true, - // Timestamp here arbitrary for picking a concrete data type to from ArrayType - // Any other type works - ArrayType(TimestampType) -> isDataFusionScan, - StructType( - Seq( - StructField("f1", DecimalType.SYSTEM_DEFAULT), - StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, - StructType( - Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) - .foreach { case (dt, expected) => - val fallbackReasons = new ListBuffer[String]() - // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) == expected) - // usingDataFusionParquetExec does not support CometBatchScanExec yet - // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite - if (!isDataFusionScan) { - assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) - } - } - } - } - - // ignored: native_comet scan is no longer supported - ignore("unsupported Spark schema") { - // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET - // https://github.com/apache/datafusion-comet/issues/2188 - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - val schemaDDLs = - Seq( - "f1 int, f2 boolean", - "f1 int, f2 array", - "f1 map, f2 array") - .map(s => StructType.fromDDL(s)) - - // Arrays support for iceberg compat native and for Parquet V1 - val cometScanExecSupported = - if (!usingLegacyNativeCometScan(conf) && this.isInstanceOf[ParquetReadV1Suite]) - Seq(true, true, true) - else Seq(true, false, false) - - val cometBatchScanExecSupported = Seq(true, false, false) - val fallbackReasons = new ListBuffer[String]() - - // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite - schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } - - // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite - schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => - assert( - CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } - } - } - test("simple count") { withParquetTable((0 until 10).map(i => (i, i.toString)), "tbl") { assert(sql("SELECT * FROM tbl WHERE _1 % 2 == 0").count() == 5) @@ -367,118 +282,6 @@ abstract class ParquetReadSuite extends CometTestBase { checkParquetFile(data) } - // ignored: native_comet scan is no longer supported - ignore("test multiple pages with different sizes and nulls") { - def makeRawParquetFile( - path: Path, - dictionaryEnabled: Boolean, - n: Int, - pageSize: Int): Seq[Option[Int]] = { - val schemaStr = { - """ - |message root { - | optional boolean _1; - | optional int32 _2(INT_8); - | optional int32 _3(INT_16); - | optional int32 _4; - | optional int64 _5; - | optional float _6; - | optional double _7; - | optional binary _8(UTF8); - | optional int32 _9(UINT_8); - | optional int32 _10(UINT_16); - | optional int32 _11(UINT_32); - | optional int64 _12(UINT_64); - | optional binary _13(ENUM); - | optional FIXED_LEN_BYTE_ARRAY(3) _14; - |} - """.stripMargin - } - - val schema = MessageTypeParser.parseMessageType(schemaStr) - val writer = createParquetWriter( - schema, - path, - dictionaryEnabled = dictionaryEnabled, - pageSize = pageSize, - dictionaryPageSize = pageSize) - - val rand = new scala.util.Random(42) - val expected = (0 until n).map { i => - if (rand.nextBoolean()) { - None - } else { - Some(i) - } - } - expected.foreach { opt => - val record = new SimpleGroup(schema) - opt match { - case Some(i) => - record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) - record.add(3, i) - record.add(4, i.toLong) - record.add(5, i.toFloat) - record.add(6, i.toDouble) - record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) - record.add(10, -i) - record.add(11, (-i).toLong) - record.add(12, i.toString) - record.add(13, (i % 10).toString * 3) - case _ => - } - writer.write(record) - } - - writer.close() - expected - } - - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - val expected = makeRawParquetFile(path, dictionaryEnabled = false, 10000, pageSize) - readParquetFile(path.toString) { df => - checkAnswer( - df, - expected.map { - case None => - Row(null, null, null, null, null, null, null, null, null, null, null, null, - null, null) - case Some(i) => - val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii - Row( - i % 2 == 0, - i.toByte, - i.toShort, - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toString * 48, - (-i).toByte, - (-i).toShort, - java.lang.Integer.toUnsignedLong(-i), - new BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()), - i.toString, - flba_field) - }) - } - readParquetFile(path.toString) { df => - assert( - df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() == - expected.flatten.count(_ % 256 == 255)) - } - } - } - } - } - test("vector reloading with all non-null values") { def makeRawParquetFile( path: Path, @@ -1274,61 +1077,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - // ignored: native_comet scan is no longer supported - ignore("scan metrics") { - - val cometScanMetricNames = Seq( - "ParquetRowGroups", - "ParquetNativeDecodeTime", - "ParquetNativeLoadTime", - "ParquetLoadRowGroupTime", - "ParquetInputFileReadTime", - "ParquetInputFileReadSize", - "ParquetInputFileReadThroughput") - - val cometNativeScanMetricNames = Seq( - "time_elapsed_scanning_total", - "bytes_scanned", - "output_rows", - "time_elapsed_opening", - "time_elapsed_processing", - "time_elapsed_scanning_until_data") - - withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { - // TODO need to implement metrics for SCAN_NATIVE_ICEBERG_COMPAT - // https://github.com/apache/datafusion-comet/issues/1882 - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - val df = sql("SELECT * FROM tbl WHERE _1 > 0") - val scans = df.queryExecution.executedPlan collect { - case s: CometScanExec => s - case s: CometBatchScanExec => s - case s: CometNativeScanExec => s - } - assert(scans.size == 1, s"Expect one scan node but found ${scans.size}") - val metrics = scans.head.metrics - - val metricNames = scans.head match { - case _: CometNativeScanExec => cometNativeScanMetricNames - case s: CometScanExec if s.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => - cometNativeScanMetricNames - case _ => cometScanMetricNames - } - - metricNames.foreach { metricName => - assert(metrics.contains(metricName), s"metric $metricName was not found") - } - - df.collect() - - metricNames.foreach { metricName => - assert( - metrics(metricName).value > 0, - s"Expect metric value for $metricName to be positive") - } - } - } - } - test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") { // In this test, data is encoded using Parquet page v2 format, but with PLAIN encoding checkAnswer( @@ -1443,25 +1191,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("row group skipping doesn't overflow when reading into larger type") { - // Spark 4.0 no longer fails for widening types SPARK-40876 - // https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967 - assume(!isSpark40Plus && usingLegacyNativeCometScan(conf)) - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // Reading integer 'a' as a long isn't supported. Check that an exception is raised instead - // of incorrectly skipping the single row group and producing incorrect results. - val exception = intercept[SparkException] { - spark.read - .schema("a LONG") - .parquet(path.toString) - .where(s"a < ${Long.MaxValue}") - .collect() - } - assert(exception.getMessage.contains("Column: [a], Expected: bigint, Found: INT32")) - } - } - test("test merge scan range") { def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = { val dictionaryPageSize = 1024 @@ -1845,39 +1574,3 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { } } - -// ignored: native_comet scan is no longer supported -class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - super.ignore(testName, testTags: _*)( - withSQLConf( - SQLConf.USE_V1_SOURCE_LIST.key -> "", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - testFun - })(pos) - } - - override def checkParquetScan[T <: Product: ClassTag: TypeTag]( - data: Seq[T], - f: Row => Boolean = _ => true): Unit = { - withParquetDataFrame(data) { r => - val scans = collect(r.filter(f).queryExecution.executedPlan) { case p: CometBatchScanExec => - p.scan - } - assert(scans.isEmpty) - } - } - - // ignored: native_comet scan is no longer supported - ignore("Test V2 parquet scan uses respective scanner") { - Seq(("false", "BatchScan"), ("true", "CometBatchScan")).foreach { - case (cometEnabled, expectedScanner) => - testScanner( - cometEnabled, - CometConf.SCAN_NATIVE_COMET, - scanner = expectedScanner, - v1 = None) - } - } -} diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index 18dec68171..6214513f48 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.QueryStageExec -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.comet.CometConf @@ -100,43 +98,6 @@ class CometScanRuleSuite extends CometTestBase { } } - // ignored: native_comet scan is no longer supported - ignore("CometScanRule should replace V2 BatchScanExec, but only when Comet is enabled") { - withTempPath { path => - createTestDataFrame.write.parquet(path.toString) - withTempView("test_data") { - withSQLConf( - SQLConf.USE_V1_SOURCE_LIST.key -> "", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - spark.read.parquet(path.toString).createOrReplaceTempView("test_data") - - val sparkPlan = - createSparkPlan( - spark, - "SELECT id, id * 2 as doubled FROM test_data WHERE id % 2 == 0") - - // Count original Spark operators - assert(countOperators(sparkPlan, classOf[BatchScanExec]) == 1) - - for (cometEnabled <- Seq(true, false)) { - withSQLConf(CometConf.COMET_ENABLED.key -> cometEnabled.toString) { - - val transformedPlan = applyCometScanRule(sparkPlan) - - if (cometEnabled) { - assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 0) - assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 1) - } else { - assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 1) - assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 0) - } - } - } - } - } - } - } - test("CometScanRule should fallback to Spark for ShortType when safety check enabled") { withTempPath { path => // Create test data with ShortType which may be from unsigned UINT_8 diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index f831d53bfe..33c1d444b9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -1267,13 +1267,7 @@ abstract class CometTestBase writer.close() } - def usingLegacyNativeCometScan: Boolean = usingLegacyNativeCometScan(SQLConf.get) - - def usingLegacyNativeCometScan(conf: SQLConf): Boolean = - CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET - def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = { - !usingLegacyNativeCometScan(conf) && CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf) } diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 70119f44a7..5dd956116f 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.types.DataTypes -import org.apache.comet.{CometConf, CometFuzzTestBase} +import org.apache.comet.CometFuzzTestBase import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.Compatible @@ -48,9 +48,7 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { Some(spark.sessionState.conf.sessionLocalTimeZone), CometEvalMode.TRY) supportLevel match { - case _: Compatible - if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET => - checkSparkAnswerAndOperator(result) + case _: Compatible => checkSparkAnswerAndOperator(result) case _ => checkSparkAnswer(result) } } diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala index b0f40edf76..e7f1757bf6 100644 --- a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle import org.apache.spark.sql.types.DataTypes -import org.apache.comet.{CometConf, CometFuzzTestBase} +import org.apache.comet.CometFuzzTestBase import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.Compatible @@ -59,9 +59,7 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { Some(spark.sessionState.conf.sessionLocalTimeZone), CometEvalMode.TRY) supportLevel match { - case _: Compatible - if CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_COMET => - checkSparkAnswerAndOperator(result) + case _: Compatible => checkSparkAnswerAndOperator(result) case _ => checkSparkAnswer(result) } } From ee303663dd3d5ce999e51d08e134bb5c73da135a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Mar 2026 13:54:50 -0600 Subject: [PATCH 2/2] -trigger CI