From fa181882ba2c734b938cde3640c3fc4ae367dfbc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Aug 2025 08:54:48 -0600 Subject: [PATCH 1/9] stop setting scan impl to native_comet in CometTestBase --- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index cf11bdf590..35fb0681a8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -82,10 +82,6 @@ abstract class CometTestBase conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") - // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented - // with the assumption that this is the default and would need updating if we - // change the default - conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, CometConf.SCAN_NATIVE_COMET) conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") From 9e501d81214fa26dbe4ae275e947472f0575135c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Aug 2025 09:28:39 -0600 Subject: [PATCH 2/9] Update tests to explicitly choose native_comet scan --- .../org/apache/comet/CometFuzzTestSuite.scala | 5 +- .../comet/parquet/ParquetReadSuite.scala | 273 +++++++++--------- .../sql/comet/CometPlanStabilitySuite.scala | 1 + .../comet/ParquetDatetimeRebaseSuite.scala | 3 + 4 files changed, 148 insertions(+), 134 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 217cd322dd..ed250e141c 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -374,7 +374,10 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { Seq("native", "jvm").foreach { shuffleMode => - Seq("native_comet", "native_datafusion", "native_iceberg_compat").foreach { scanImpl => + Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 2574e12e13..35cb57c1da 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -86,67 +86,74 @@ abstract class ParquetReadSuite extends CometTestBase { } test("unsupported Spark types") { - // for native iceberg compat, CometScanExec supports some types that native_comet does not. - // note that native_datafusion does not use CometScanExec so we need not include that in - // the check - val isDataFusionScan = usingDataSourceExec(conf) - Seq( - NullType -> false, - BooleanType -> true, - ByteType -> true, - ShortType -> true, - IntegerType -> true, - LongType -> true, - FloatType -> true, - DoubleType -> true, - BinaryType -> true, - StringType -> true, - // Timestamp here arbitrary for picking a concrete data type to from ArrayType - // Any other type works - ArrayType(TimestampType) -> isDataFusionScan, - StructType( - Seq( - StructField("f1", DecimalType.SYSTEM_DEFAULT), - StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, - StructType( - Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) - .foreach { case (dt, expected) => - val fallbackReasons = new ListBuffer[String]() - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) == expected) - // usingDataFusionParquetExec does not support CometBatchScanExec yet - if (!isDataFusionScan) { - assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + // for native iceberg compat, CometScanExec supports some types that native_comet does not. + // note that native_datafusion does not use CometScanExec so we need not include that in + // the check + val isDataFusionScan = usingDataSourceExec(conf) + Seq( + NullType -> false, + BooleanType -> true, + ByteType -> true, + ShortType -> true, + IntegerType -> true, + LongType -> true, + FloatType -> true, + DoubleType -> true, + BinaryType -> true, + StringType -> true, + // Timestamp here arbitrary for picking a concrete data type to from ArrayType + // Any other type works + ArrayType(TimestampType) -> isDataFusionScan, + StructType( + Seq( + StructField("f1", DecimalType.SYSTEM_DEFAULT), + StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, + StructType( + Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) + .foreach { case (dt, expected) => + val fallbackReasons = new ListBuffer[String]() + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isTypeSupported(dt, "", fallbackReasons) == expected) + // usingDataFusionParquetExec does not support CometBatchScanExec yet + if (!isDataFusionScan) { + assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + } } - } + } } test("unsupported Spark schema") { - val schemaDDLs = - Seq("f1 int, f2 boolean", "f1 int, f2 array", "f1 map, f2 array") - .map(s => StructType.fromDDL(s)) - - // Arrays support for iceberg compat native and for Parquet V1 - val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) - Seq(true, true, true) - else Seq(true, false, false) - - val cometBatchScanExecSupported = Seq(true, false, false) - val fallbackReasons = new ListBuffer[String]() - - schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + val schemaDDLs = + Seq( + "f1 int, f2 boolean", + "f1 int, f2 array", + "f1 map, f2 array") + .map(s => StructType.fromDDL(s)) + + // Arrays support for iceberg compat native and for Parquet V1 + val cometScanExecSupported = + if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) + Seq(true, true, true) + else Seq(true, false, false) + + val cometBatchScanExecSupported = Seq(true, false, false) + val fallbackReasons = new ListBuffer[String]() + + schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } - schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => - assert( - CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => + assert( + CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } } } @@ -354,8 +361,6 @@ abstract class ParquetReadSuite extends CometTestBase { } test("test multiple pages with different sizes and nulls") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) def makeRawParquetFile( path: Path, dictionaryEnabled: Boolean, @@ -425,40 +430,42 @@ abstract class ParquetReadSuite extends CometTestBase { expected } - Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - val expected = makeRawParquetFile(path, dictionaryEnabled = false, 10000, pageSize) - readParquetFile(path.toString) { df => - checkAnswer( - df, - expected.map { - case None => - Row(null, null, null, null, null, null, null, null, null, null, null, null, null, - null) - case Some(i) => - val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii - Row( - i % 2 == 0, - i.toByte, - i.toShort, - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toString * 48, - (-i).toByte, - (-i).toShort, - java.lang.Integer.toUnsignedLong(-i), - new BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()), - i.toString, - flba_field) - }) - } - readParquetFile(path.toString) { df => - assert( - df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() == - expected.flatten.count(_ % 256 == 255)) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(64, 128, 256, 512, 1024, 4096, 5000).foreach { pageSize => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = makeRawParquetFile(path, dictionaryEnabled = false, 10000, pageSize) + readParquetFile(path.toString) { df => + checkAnswer( + df, + expected.map { + case None => + Row(null, null, null, null, null, null, null, null, null, null, null, null, + null, null) + case Some(i) => + val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii + Row( + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + i.toString * 48, + (-i).toByte, + (-i).toShort, + java.lang.Integer.toUnsignedLong(-i), + new BigDecimal(UnsignedLong.fromLongBits((-i).toLong).bigIntegerValue()), + i.toString, + flba_field) + }) + } + readParquetFile(path.toString) { df => + assert( + df.filter("_8 IS NOT NULL AND _4 % 256 == 255").count() == + expected.flatten.count(_ % 256 == 255)) + } } } } @@ -1330,51 +1337,51 @@ abstract class ParquetReadSuite extends CometTestBase { } test("scan metrics") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) - - val cometScanMetricNames = Seq( - "ParquetRowGroups", - "ParquetNativeDecodeTime", - "ParquetNativeLoadTime", - "ParquetLoadRowGroupTime", - "ParquetInputFileReadTime", - "ParquetInputFileReadSize", - "ParquetInputFileReadThroughput") - - val cometNativeScanMetricNames = Seq( - "time_elapsed_scanning_total", - "bytes_scanned", - "output_rows", - "time_elapsed_opening", - "time_elapsed_processing", - "time_elapsed_scanning_until_data") - - withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { - val df = sql("SELECT * FROM tbl WHERE _1 > 0") - val scans = df.queryExecution.executedPlan collect { - case s: CometScanExec => s - case s: CometBatchScanExec => s - case s: CometNativeScanExec => s - } - assert(scans.size == 1, s"Expect one scan node but found ${scans.size}") - val metrics = scans.head.metrics + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + + val cometScanMetricNames = Seq( + "ParquetRowGroups", + "ParquetNativeDecodeTime", + "ParquetNativeLoadTime", + "ParquetLoadRowGroupTime", + "ParquetInputFileReadTime", + "ParquetInputFileReadSize", + "ParquetInputFileReadThroughput") + + val cometNativeScanMetricNames = Seq( + "time_elapsed_scanning_total", + "bytes_scanned", + "output_rows", + "time_elapsed_opening", + "time_elapsed_processing", + "time_elapsed_scanning_until_data") + + withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { + val df = sql("SELECT * FROM tbl WHERE _1 > 0") + val scans = df.queryExecution.executedPlan collect { + case s: CometScanExec => s + case s: CometBatchScanExec => s + case s: CometNativeScanExec => s + } + assert(scans.size == 1, s"Expect one scan node but found ${scans.size}") + val metrics = scans.head.metrics - val metricNames = scans.head match { - case _: CometNativeScanExec => cometNativeScanMetricNames - case _ => cometScanMetricNames - } + val metricNames = scans.head match { + case _: CometNativeScanExec => cometNativeScanMetricNames + case _ => cometScanMetricNames + } - metricNames.foreach { metricName => - assert(metrics.contains(metricName), s"metric $metricName was not found") - } + metricNames.foreach { metricName => + assert(metrics.contains(metricName), s"metric $metricName was not found") + } - df.collect() + df.collect() - metricNames.foreach { metricName => - assert( - metrics(metricName).value > 0, - s"Expect metric value for $metricName to be positive") + metricNames.foreach { metricName => + assert( + metrics(metricName).value > 0, + s"Expect metric value for $metricName to be positive") + } } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index cf887a1013..aa80251490 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -300,6 +300,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") + conf.set(CometConf.COMET_NATIVE_SCAN_IMPL.key, CometConf.SCAN_NATIVE_COMET) conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index a988467076..bdb4a9d4b1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -40,6 +40,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { test("reading ancient dates before 1582") { Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion => @@ -65,6 +66,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { assume(!usingDataSourceExec(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion => @@ -91,6 +93,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { assume(!usingDataSourceExec(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key -> exceptionOnRebase.toString) { Seq("2_4_5", "2_4_6", "3_2_0").foreach { sparkVersion => From 314b2ef52bc7ad7f0ade01b2dd3f1699d4186a38 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Aug 2025 09:35:57 -0600 Subject: [PATCH 3/9] format --- .../comet/parquet/ParquetReadSuite.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 35cb57c1da..7f3450629a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1337,26 +1337,28 @@ abstract class ParquetReadSuite extends CometTestBase { } test("scan metrics") { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - val cometScanMetricNames = Seq( - "ParquetRowGroups", - "ParquetNativeDecodeTime", - "ParquetNativeLoadTime", - "ParquetLoadRowGroupTime", - "ParquetInputFileReadTime", - "ParquetInputFileReadSize", - "ParquetInputFileReadThroughput") - - val cometNativeScanMetricNames = Seq( - "time_elapsed_scanning_total", - "bytes_scanned", - "output_rows", - "time_elapsed_opening", - "time_elapsed_processing", - "time_elapsed_scanning_until_data") - - withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { + val cometScanMetricNames = Seq( + "ParquetRowGroups", + "ParquetNativeDecodeTime", + "ParquetNativeLoadTime", + "ParquetLoadRowGroupTime", + "ParquetInputFileReadTime", + "ParquetInputFileReadSize", + "ParquetInputFileReadThroughput") + + val cometNativeScanMetricNames = Seq( + "time_elapsed_scanning_total", + "bytes_scanned", + "output_rows", + "time_elapsed_opening", + "time_elapsed_processing", + "time_elapsed_scanning_until_data") + + withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") { + // TODO need to implement metrics for SCAN_NATIVE_ICEBERG_COMPAT + // https://github.com/apache/datafusion-comet/issues/1882 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { val df = sql("SELECT * FROM tbl WHERE _1 > 0") val scans = df.queryExecution.executedPlan collect { case s: CometScanExec => s From a50214218217e1fd3a4cc16630ab2f53e8062549 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Aug 2025 09:43:45 -0600 Subject: [PATCH 4/9] update more tests --- .../comet/CometArrayExpressionSuite.scala | 86 ++++++++++--------- .../org/apache/comet/CometCastSuite.scala | 47 +++++----- 2 files changed, 71 insertions(+), 62 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 9976ecd748..ba25dae67a 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -232,43 +232,47 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_contains - test all types (native Parquet reader)") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - DataGenOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false)) - } - val table = spark.read.parquet(filename) - table.createOrReplaceTempView("t1") - val complexTypeFields = - table.schema.fields.filter(field => isComplexType(field.dataType)) - val primitiveTypeFields = - table.schema.fields.filterNot(field => isComplexType(field.dataType)) - for (field <- primitiveTypeFields) { - val fieldName = field.name - val typeName = field.dataType.typeName - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t2") - checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) - checkSparkAnswerAndOperator( - sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) - } - for (field <- complexTypeFields) { - val fieldName = field.name - sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") - .createOrReplaceTempView("t3") - checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + // TODO test fails if scan is native_iceberg_compat + // https://github.com/apache/datafusion-comet/issues/2173 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + DataGenOptions( + allowNull = true, + generateNegativeZero = true, + generateArray = true, + generateStruct = true, + generateMap = false)) + } + val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t1") + val complexTypeFields = + table.schema.fields.filter(field => isComplexType(field.dataType)) + val primitiveTypeFields = + table.schema.fields.filterNot(field => isComplexType(field.dataType)) + for (field <- primitiveTypeFields) { + val fieldName = field.name + val typeName = field.dataType.typeName + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t2") + checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) + checkSparkAnswerAndOperator( + sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) + } + for (field <- complexTypeFields) { + val fieldName = field.name + sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") + .createOrReplaceTempView("t3") + checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3")) + } } } } @@ -406,9 +410,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_intersect") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + // TODO test fails if scan is native_iceberg_compat + // https://github.com/apache/datafusion-comet/issues/2174 + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index b3fa3db8cf..d4107e13e8 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -944,28 +944,31 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Complex Types test("cast StructType to StringType") { - // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - // primitives - checkSparkAnswerAndOperator( - "SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl") - checkSparkAnswerAndOperator("SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl") - // decimals - // TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved - checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl") - // dates & timestamps - checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl") - // named struct - checkSparkAnswerAndOperator( - "SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl") - // nested struct - checkSparkAnswerAndOperator( - "SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl") + // TODO test fails if scan is native_iceberg_compat + // https://github.com/apache/datafusion-comet/issues/2175 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + // primitives + checkSparkAnswerAndOperator( + "SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl") + checkSparkAnswerAndOperator( + "SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl") + // decimals + // TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved + checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl") + // dates & timestamps + checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl") + // named struct + checkSparkAnswerAndOperator( + "SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl") + // nested struct + checkSparkAnswerAndOperator( + "SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl") + } } } } From 13e62f6ae36e5e9554fbd12f79b7896e7cc7d24d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Aug 2025 15:17:12 -0600 Subject: [PATCH 5/9] update comments --- .../apache/comet/CometArrayExpressionSuite.scala | 4 ++-- .../scala/org/apache/comet/CometCastSuite.scala | 2 +- .../org/apache/comet/parquet/ParquetReadSuite.scala | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index ba25dae67a..ccece3dde9 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -232,7 +232,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_contains - test all types (native Parquet reader)") { - // TODO test fails if scan is native_iceberg_compat + // TODO test fails if scan is auto // https://github.com/apache/datafusion-comet/issues/2173 withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { withTempDir { dir => @@ -410,7 +410,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("array_intersect") { - // TODO test fails if scan is native_iceberg_compat + // TODO test fails if scan is auto // https://github.com/apache/datafusion-comet/issues/2174 withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index d4107e13e8..fbf38e2e03 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -944,7 +944,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Complex Types test("cast StructType to StringType") { - // TODO test fails if scan is native_iceberg_compat + // TODO test fails if scan is auto // https://github.com/apache/datafusion-comet/issues/2175 withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { Seq(true, false).foreach { dictionaryEnabled => diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 7f3450629a..532d62d9da 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -86,7 +86,9 @@ abstract class ParquetReadSuite extends CometTestBase { } test("unsupported Spark types") { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + // TODO test fails in auto scan mode + // https://github.com/apache/datafusion-comet/issues/2183 + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { // for native iceberg compat, CometScanExec supports some types that native_comet does not. // note that native_datafusion does not use CometScanExec so we need not include that in // the check @@ -115,9 +117,12 @@ abstract class ParquetReadSuite extends CometTestBase { MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) .foreach { case (dt, expected) => val fallbackReasons = new ListBuffer[String]() - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) == expected) + val isSupported = CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isTypeSupported(dt, "", fallbackReasons) + if (isSupported != expected) { + fail( + s"Failed on isTypeSupported check for ${dt}; expected=$expected, actual=$isSupported") + } // usingDataFusionParquetExec does not support CometBatchScanExec yet if (!isDataFusionScan) { assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) From 6b1f2a11f53cb246e32c4993506dd515bbf21e7c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Aug 2025 15:21:19 -0600 Subject: [PATCH 6/9] fix --- .../apache/comet/rules/CometScanRule.scala | 4 + .../comet/parquet/ParquetReadSuite.scala | 134 ++++++++++-------- 2 files changed, 75 insertions(+), 63 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6a328f4be2..6c7209e5f2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -316,6 +316,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport { + + // this class is intended to be used with a specific scan impl + assert(scanImpl != CometConf.SCAN_AUTO) + override def isTypeSupported( dt: DataType, name: String, diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 532d62d9da..7ca225cacd 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -86,78 +86,86 @@ abstract class ParquetReadSuite extends CometTestBase { } test("unsupported Spark types") { - // TODO test fails in auto scan mode - // https://github.com/apache/datafusion-comet/issues/2183 - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { - // for native iceberg compat, CometScanExec supports some types that native_comet does not. - // note that native_datafusion does not use CometScanExec so we need not include that in - // the check - val isDataFusionScan = usingDataSourceExec(conf) - Seq( - NullType -> false, - BooleanType -> true, - ByteType -> true, - ShortType -> true, - IntegerType -> true, - LongType -> true, - FloatType -> true, - DoubleType -> true, - BinaryType -> true, - StringType -> true, - // Timestamp here arbitrary for picking a concrete data type to from ArrayType - // Any other type works - ArrayType(TimestampType) -> isDataFusionScan, - StructType( - Seq( - StructField("f1", DecimalType.SYSTEM_DEFAULT), - StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, - StructType( - Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) - .foreach { case (dt, expected) => - val fallbackReasons = new ListBuffer[String]() - val isSupported = CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) - if (isSupported != expected) { - fail( - s"Failed on isTypeSupported check for ${dt}; expected=$expected, actual=$isSupported") - } - // usingDataFusionParquetExec does not support CometBatchScanExec yet - if (!isDataFusionScan) { - assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + for (scan <- Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT, + CometConf.SCAN_NATIVE_DATAFUSION)) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) { + // for native iceberg compat, CometScanExec supports some types that native_comet does not. + // note that native_datafusion does not use CometScanExec so we need not include that in + // the check + val isDataFusionScan = usingDataSourceExec(conf) + Seq( + NullType -> false, + BooleanType -> true, + ByteType -> true, + ShortType -> true, + IntegerType -> true, + LongType -> true, + FloatType -> true, + DoubleType -> true, + BinaryType -> true, + StringType -> true, + // Timestamp here arbitrary for picking a concrete data type to from ArrayType + // Any other type works + ArrayType(TimestampType) -> isDataFusionScan, + StructType( + Seq( + StructField("f1", DecimalType.SYSTEM_DEFAULT), + StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, + StructType( + Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) + .foreach { case (dt, expected) => + val fallbackReasons = new ListBuffer[String]() + val isSupported = CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isTypeSupported(dt, "", fallbackReasons) + if (isSupported != expected) { + fail( + s"Failed on isTypeSupported check for ${dt}; expected=$expected, actual=$isSupported") + } + // usingDataFusionParquetExec does not support CometBatchScanExec yet + if (!isDataFusionScan) { + assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) + } } - } + } } } test("unsupported Spark schema") { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { - val schemaDDLs = - Seq( - "f1 int, f2 boolean", - "f1 int, f2 array", - "f1 map, f2 array") - .map(s => StructType.fromDDL(s)) + for (scan <- Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT, + CometConf.SCAN_NATIVE_DATAFUSION)) { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) { + val schemaDDLs = + Seq( + "f1 int, f2 boolean", + "f1 int, f2 array", + "f1 map, f2 array") + .map(s => StructType.fromDDL(s)) - // Arrays support for iceberg compat native and for Parquet V1 - val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) - Seq(true, true, true) - else Seq(true, false, false) + // Arrays support for iceberg compat native and for Parquet V1 + val cometScanExecSupported = + if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) + Seq(true, true, true) + else Seq(true, false, false) - val cometBatchScanExecSupported = Seq(true, false, false) - val fallbackReasons = new ListBuffer[String]() + val cometBatchScanExecSupported = Seq(true, false, false) + val fallbackReasons = new ListBuffer[String]() - schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } + schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } - schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => - assert( - CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => + assert( + CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } } } } From 72c45d7cb2a4228e926d8e6f572599bdbb91011b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Aug 2025 15:51:37 -0600 Subject: [PATCH 7/9] fix --- .../org/apache/comet/parquet/ParquetReadSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 7ca225cacd..612b547eac 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -147,11 +147,12 @@ abstract class ParquetReadSuite extends CometTestBase { "f1 map, f2 array") .map(s => StructType.fromDDL(s)) - // Arrays support for iceberg compat native and for Parquet V1 - val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) - Seq(true, true, true) - else Seq(true, false, false) + // Arrays support for iceberg compat native + val cometScanExecSupported = if (usingDataSourceExec(conf)) { + Seq(true, true, true) + } else { + Seq(true, false, false) + } val cometBatchScanExecSupported = Seq(true, false, false) val fallbackReasons = new ListBuffer[String]() From d5048abd0013aca0071778e9ce625a6b00b23b4b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 19 Aug 2025 07:57:02 -0600 Subject: [PATCH 8/9] revert changes to two tests and make them use NATIVE_COMET for now --- .../exec/CometColumnarShuffleSuite.scala | 4 +- .../comet/parquet/ParquetReadSuite.scala | 136 +++++++++--------- 2 files changed, 66 insertions(+), 74 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index b5f074cc8d..b0b87509fb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -587,8 +587,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } test("fix: native Unsafe row accessors return incorrect results") { - // https://github.com/apache/datafusion-comet/issues/1538 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + // TODO byte/short issue + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET) Seq(10, 201).foreach { numPartitions => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 612b547eac..a1ee62776f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -86,87 +86,79 @@ abstract class ParquetReadSuite extends CometTestBase { } test("unsupported Spark types") { - for (scan <- Seq( - CometConf.SCAN_NATIVE_COMET, - CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.SCAN_NATIVE_DATAFUSION)) { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) { - // for native iceberg compat, CometScanExec supports some types that native_comet does not. - // note that native_datafusion does not use CometScanExec so we need not include that in - // the check - val isDataFusionScan = usingDataSourceExec(conf) - Seq( - NullType -> false, - BooleanType -> true, - ByteType -> true, - ShortType -> true, - IntegerType -> true, - LongType -> true, - FloatType -> true, - DoubleType -> true, - BinaryType -> true, - StringType -> true, - // Timestamp here arbitrary for picking a concrete data type to from ArrayType - // Any other type works - ArrayType(TimestampType) -> isDataFusionScan, - StructType( - Seq( - StructField("f1", DecimalType.SYSTEM_DEFAULT), - StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, - StructType( - Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, - MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) - .foreach { case (dt, expected) => - val fallbackReasons = new ListBuffer[String]() - val isSupported = CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isTypeSupported(dt, "", fallbackReasons) - if (isSupported != expected) { - fail( - s"Failed on isTypeSupported check for ${dt}; expected=$expected, actual=$isSupported") - } - // usingDataFusionParquetExec does not support CometBatchScanExec yet - if (!isDataFusionScan) { - assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) - } + // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + // for native iceberg compat, CometScanExec supports some types that native_comet does not. + // note that native_datafusion does not use CometScanExec so we need not include that in + // the check + val isDataFusionScan = usingDataSourceExec(conf) + Seq( + NullType -> false, + BooleanType -> true, + ByteType -> true, + ShortType -> true, + IntegerType -> true, + LongType -> true, + FloatType -> true, + DoubleType -> true, + BinaryType -> true, + StringType -> true, + // Timestamp here arbitrary for picking a concrete data type to from ArrayType + // Any other type works + ArrayType(TimestampType) -> isDataFusionScan, + StructType( + Seq( + StructField("f1", DecimalType.SYSTEM_DEFAULT), + StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = LongType, valueType = DateType) -> isDataFusionScan, + StructType( + Seq(StructField("f1", ByteType), StructField("f2", StringType))) -> isDataFusionScan, + MapType(keyType = IntegerType, valueType = BinaryType) -> isDataFusionScan) + .foreach { case (dt, expected) => + val fallbackReasons = new ListBuffer[String]() + // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isTypeSupported(dt, "", fallbackReasons) == expected) + // usingDataFusionParquetExec does not support CometBatchScanExec yet + // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite + if (!isDataFusionScan) { + assert(CometBatchScanExec.isTypeSupported(dt, "", fallbackReasons) == expected) } - } + } } } test("unsupported Spark schema") { - for (scan <- Seq( - CometConf.SCAN_NATIVE_COMET, - CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - CometConf.SCAN_NATIVE_DATAFUSION)) { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) { - val schemaDDLs = - Seq( - "f1 int, f2 boolean", - "f1 int, f2 array", - "f1 map, f2 array") - .map(s => StructType.fromDDL(s)) - - // Arrays support for iceberg compat native - val cometScanExecSupported = if (usingDataSourceExec(conf)) { + // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { + val schemaDDLs = + Seq( + "f1 int, f2 boolean", + "f1 int, f2 array", + "f1 map, f2 array") + .map(s => StructType.fromDDL(s)) + + // Arrays support for iceberg compat native and for Parquet V1 + val cometScanExecSupported = + if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) Seq(true, true, true) - } else { - Seq(true, false, false) - } + else Seq(true, false, false) - val cometBatchScanExecSupported = Seq(true, false, false) - val fallbackReasons = new ListBuffer[String]() + val cometBatchScanExecSupported = Seq(true, false, false) + val fallbackReasons = new ListBuffer[String]() - schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => - assert( - CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) - .isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } + // TODO CometScanTypeChecker should only be used for ParquetReadSuiteV1Suite + schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) => + assert( + CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get()) + .isSchemaSupported(StructType(schema), fallbackReasons) == expected) + } - schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => - assert( - CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) - } + // TODO CometBatchScanExec should only be used for ParquetReadSuiteV2Suite + schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) => + assert( + CometBatchScanExec.isSchemaSupported(StructType(schema), fallbackReasons) == expected) } } } From 4fba4a8d504c2d2076193b1a541a38373b1eb8fa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 19 Aug 2025 07:59:53 -0600 Subject: [PATCH 9/9] link to issue --- .../test/scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index a1ee62776f..f21b59ac1a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -87,6 +87,7 @@ abstract class ParquetReadSuite extends CometTestBase { test("unsupported Spark types") { // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + // https://github.com/apache/datafusion-comet/issues/2188 withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { // for native iceberg compat, CometScanExec supports some types that native_comet does not. // note that native_datafusion does not use CometScanExec so we need not include that in @@ -131,6 +132,7 @@ abstract class ParquetReadSuite extends CometTestBase { test("unsupported Spark schema") { // TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET + // https://github.com/apache/datafusion-comet/issues/2188 withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) { val schemaDDLs = Seq(