diff --git a/.github/actions/java-test/action.yaml b/.github/actions/java-test/action.yaml index 16f8bf6799..53775da0df 100644 --- a/.github/actions/java-test/action.yaml +++ b/.github/actions/java-test/action.yaml @@ -54,8 +54,27 @@ runs: - name: Run tests shell: bash run: | - SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} - + MAVEN_OPTS="-XX:+UnlockDiagnosticVMOptions -XX:+ShowMessageBoxOnError -XX:+HeapDumpOnOutOfMemoryError -XX:ErrorFile=./hs_err_pid%p.log" SPARK_HOME=`pwd` ./mvnw -B clean install ${{ inputs.maven_opts }} + - name: Upload crash logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: crash-logs-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + path: "**/hs_err_pid*.log" + - name: Debug listing + if: failure() + shell: bash + run: | + echo "CWD: $(pwd)" + ls -lah . + ls -lah target + find . -name 'unit-tests.log' + - name: Upload unit-tests.log + if: failure() + uses: actions/upload-artifact@v4 + with: + name: unit-tests-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} + path: "**/target/unit-tests.log" - name: Upload test results if: ${{ inputs.upload-test-reports == 'true' }} uses: actions/upload-artifact@v4 diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 2fefed133d..cefc138bc3 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -410,14 +410,14 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } - ignore("array_repeat") { + test("array_repeat") { withSQLConf( CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + makeParquetFileAllTypes(path, dictionaryEnabled, 100) spark.read.parquet(path.toString).createOrReplaceTempView("t1") checkSparkAnswerAndOperator(sql("SELECT array_repeat(_4, null) from t1")) @@ -425,7 +425,6 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp checkSparkAnswerAndOperator( sql("SELECT array_repeat(_2, 5) from t1 where _2 is not null")) checkSparkAnswerAndOperator(sql("SELECT array_repeat(_2, 5) from t1 where _2 is null")) - checkSparkAnswerAndOperator(sql("SELECT array_repeat(_3, _4) from t1 where _3 is null")) checkSparkAnswerAndOperator( sql("SELECT array_repeat(_3, _4) from t1 where _3 is not null")) checkSparkAnswerAndOperator(sql("SELECT array_repeat(cast(_3 as string), 2) from t1")) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 07c94f6300..02801f8bc7 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -19,6 +19,7 @@ package org.apache.comet.exec +import scala.reflect.runtime.universe._ import scala.util.Random import org.scalactic.source.Position @@ -248,340 +249,149 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - test("columnar shuffle on map") { - // https://github.com/apache/datafusion-comet/issues/1538 - assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) - - def genTuples[K](num: Int, keys: Seq[K]): Seq[( - Int, - Map[K, Boolean], - Map[K, Byte], - Map[K, Short], - Map[K, Int], - Map[K, Long], - Map[K, Float], - Map[K, Double], - Map[K, java.sql.Date], - Map[K, java.sql.Timestamp], - Map[K, java.math.BigDecimal], - Map[K, Array[Byte]], - Map[K, String])] = { - (0 until num).map(i => - ( - i + 1, - Map(keys(0) -> (i > 10), keys(1) -> (i > 20)), - Map(keys(0) -> i.toByte, keys(1) -> (i + 1).toByte), - Map(keys(0) -> i.toShort, keys(1) -> (i + 1).toShort), - Map(keys(0) -> i, keys(1) -> (i + 1)), - Map(keys(0) -> i.toLong, keys(1) -> (i + 1).toLong), - Map(keys(0) -> i.toFloat, keys(1) -> (i + 1).toFloat), - Map(keys(0) -> i.toDouble, keys(1) -> (i + 1).toDouble), - Map( - keys(0) -> new java.sql.Date(i.toLong), - keys(1) -> new java.sql.Date((i + 1).toLong)), - Map( - keys(0) -> new java.sql.Timestamp(i.toLong), - keys(1) -> new java.sql.Timestamp((i + 1).toLong)), - Map( - keys(0) -> new java.math.BigDecimal(i.toLong), - keys(1) -> new java.math.BigDecimal((i + 1).toLong)), - Map(keys(0) -> i.toString.getBytes(), keys(1) -> (i + 1).toString.getBytes()), - Map(keys(0) -> i.toString, keys(1) -> (i + 1).toString))) - } - + def genTuples[K](num: Int, keys: Seq[K]): Seq[( + Int, + Map[K, Boolean], + Map[K, Byte], + Map[K, Short], + Map[K, Int], + Map[K, Long], + Map[K, Float], + Map[K, Double], + Map[K, java.sql.Date], + Map[K, java.sql.Timestamp], + Map[K, java.math.BigDecimal], + Map[K, Array[Byte]], + Map[K, String])] = { + (0 until num).map(i => + ( + i + 1, + Map(keys(0) -> (i > 10), keys(1) -> (i > 20)), + Map(keys(0) -> i.toByte, keys(1) -> (i + 1).toByte), + Map(keys(0) -> i.toShort, keys(1) -> (i + 1).toShort), + Map(keys(0) -> i, keys(1) -> (i + 1)), + Map(keys(0) -> i.toLong, keys(1) -> (i + 1).toLong), + Map(keys(0) -> i.toFloat, keys(1) -> (i + 1).toFloat), + Map(keys(0) -> i.toDouble, keys(1) -> (i + 1).toDouble), + Map(keys(0) -> new java.sql.Date(i.toLong), keys(1) -> new java.sql.Date((i + 1).toLong)), + Map( + keys(0) -> new java.sql.Timestamp(i.toLong), + keys(1) -> new java.sql.Timestamp((i + 1).toLong)), + Map( + keys(0) -> new java.math.BigDecimal(i.toLong), + keys(1) -> new java.math.BigDecimal((i + 1).toLong)), + Map(keys(0) -> i.toString.getBytes(), keys(1) -> (i + 1).toString.getBytes()), + Map(keys(0) -> i.toString, keys(1) -> (i + 1).toString))) + } + + def repartitionAndSort(numPartitions: Int): Unit = { + val df = sql("SELECT * FROM tbl") + .filter($"_1" > 10) + .repartition( + numPartitions, + $"_2", + $"_3", + $"_4", + $"_5", + $"_6", + $"_7", + $"_8", + $"_9", + $"_10", + $"_11", + $"_12", + $"_13") + .sortWithinPartitions($"_1") + + checkShuffleAnswer(df, 1) + } + + def columnarShuffleOnMapTest[K: TypeTag](num: Int, keys: Seq[K]): Unit = { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { - // Boolean key - withParquetTable(genTuples(50, Seq(true, false)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) - } - - // Byte key - withParquetTable(genTuples(50, Seq(0.toByte, 1.toByte)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) - } - - // Short key - withParquetTable(genTuples(50, Seq(0.toShort, 1.toShort)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) - } - - // Int key - withParquetTable(genTuples(50, Seq(0, 1)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) - } - - // Long key - withParquetTable(genTuples(50, Seq(0.toLong, 1.toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) - } - - // Float key - withParquetTable(genTuples(50, Seq(0.toFloat, 1.toFloat)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") - - checkShuffleAnswer(df, 1) + withParquetTable(genTuples(num, keys), "tbl") { + repartitionAndSort(numPartitions) } + } + } + } + } - // Double key - withParquetTable(genTuples(50, Seq(0.toDouble, 1.toDouble)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [bool]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(true, false)) + } - checkShuffleAnswer(df, 1) - } + test("columnar shuffle on map [byte]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toByte, 1.toByte)) + } - // Date key - withParquetTable( - genTuples(50, Seq(new java.sql.Date(0.toLong), new java.sql.Date(1.toLong))), - "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [short]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toShort, 1.toShort)) + } - checkShuffleAnswer(df, 1) - } + test("columnar shuffle on map [int]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0, 1)) + } - // Timestamp key - withParquetTable( - genTuples( - 50, - Seq(new java.sql.Timestamp(0.toLong), new java.sql.Timestamp(1.toLong))), - "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [long]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toLong, 1.toLong)) + } - checkShuffleAnswer(df, 1) - } + test("columnar shuffle on map [float]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toFloat, 1.toFloat)) + } - // Decimal key - withParquetTable( - genTuples( - 50, - Seq(new java.math.BigDecimal(0.toLong), new java.math.BigDecimal(1.toLong))), - "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [double]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toDouble, 1.toDouble)) + } - checkShuffleAnswer(df, 1) - } + test("columnar shuffle on map [date]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(new java.sql.Date(0.toLong), new java.sql.Date(1.toLong))) + } - // String key - withParquetTable(genTuples(50, Seq(0.toString, 1.toString)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [timestamp]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest( + 50, + Seq(new java.sql.Timestamp(0.toLong), new java.sql.Timestamp(1.toLong))) + } - checkShuffleAnswer(df, 1) - } + test("columnar shuffle on map [decimal]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest( + 50, + Seq(new java.math.BigDecimal(0.toLong), new java.math.BigDecimal(1.toLong))) + } - // Binary key - withParquetTable( - genTuples(50, Seq(0.toString.getBytes(), 1.toString.getBytes())), - "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" > 10) - .repartition( - numPartitions, - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_12", - $"_13") - .sortWithinPartitions($"_1") + test("columnar shuffle on map [string]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toString, 1.toString)) + } - checkShuffleAnswer(df, 1) - } - } - } - } + test("columnar shuffle on map [binary]") { + // https://github.com/apache/datafusion-comet/issues/1538 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) + columnarShuffleOnMapTest(50, Seq(0.toString.getBytes(), 1.toString.getBytes())) } test("columnar shuffle on array") {