Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
}

case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {

// this class is intended to be used with a specific scan impl
assert(scanImpl != CometConf.SCAN_AUTO)

override def isTypeSupported(
dt: DataType,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,43 +232,47 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_contains - test all types (native Parquet reader)") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
val filename = path.toString
val random = new Random(42)
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
ParquetGenerator.makeParquetFile(
random,
spark,
filename,
100,
DataGenOptions(
allowNull = true,
generateNegativeZero = true,
generateArray = true,
generateStruct = true,
generateMap = false))
}
val table = spark.read.parquet(filename)
table.createOrReplaceTempView("t1")
val complexTypeFields =
table.schema.fields.filter(field => isComplexType(field.dataType))
val primitiveTypeFields =
table.schema.fields.filterNot(field => isComplexType(field.dataType))
for (field <- primitiveTypeFields) {
val fieldName = field.name
val typeName = field.dataType.typeName
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
.createOrReplaceTempView("t2")
checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2"))
checkSparkAnswerAndOperator(
sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2"))
}
for (field <- complexTypeFields) {
val fieldName = field.name
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
.createOrReplaceTempView("t3")
checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
// TODO test fails if scan is auto
// https://github.com/apache/datafusion-comet/issues/2173
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
val filename = path.toString
val random = new Random(42)
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
ParquetGenerator.makeParquetFile(
random,
spark,
filename,
100,
DataGenOptions(
allowNull = true,
generateNegativeZero = true,
generateArray = true,
generateStruct = true,
generateMap = false))
}
val table = spark.read.parquet(filename)
table.createOrReplaceTempView("t1")
val complexTypeFields =
table.schema.fields.filter(field => isComplexType(field.dataType))
val primitiveTypeFields =
table.schema.fields.filterNot(field => isComplexType(field.dataType))
for (field <- primitiveTypeFields) {
val fieldName = field.name
val typeName = field.dataType.typeName
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
.createOrReplaceTempView("t2")
checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2"))
checkSparkAnswerAndOperator(
sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2"))
}
for (field <- complexTypeFields) {
val fieldName = field.name
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
.createOrReplaceTempView("t3")
checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
}
}
}
}
Expand Down Expand Up @@ -406,9 +410,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_intersect") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!usingDataSourceExec)
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
// TODO test fails if scan is auto
// https://github.com/apache/datafusion-comet/issues/2174
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {

Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
Expand Down
47 changes: 25 additions & 22 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -944,28 +944,31 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// Complex Types

test("cast StructType to StringType") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!usingDataSourceExec)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
// primitives
checkSparkAnswerAndOperator(
"SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl")
checkSparkAnswerAndOperator("SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl")
// decimals
// TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved
checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl")
// dates & timestamps
checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl")
// named struct
checkSparkAnswerAndOperator(
"SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl")
// nested struct
checkSparkAnswerAndOperator(
"SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl")
// TODO test fails if scan is auto
// https://github.com/apache/datafusion-comet/issues/2175
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
// primitives
checkSparkAnswerAndOperator(
"SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl")
checkSparkAnswerAndOperator(
"SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl")
// decimals
// TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved
checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl")
// dates & timestamps
checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl")
// named struct
checkSparkAnswerAndOperator(
"SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl")
// nested struct
checkSparkAnswerAndOperator(
"SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,10 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
Seq("native", "jvm").foreach { shuffleMode =>
Seq("native_comet", "native_datafusion", "native_iceberg_compat").foreach { scanImpl =>
Seq(
CometConf.SCAN_NATIVE_COMET,
CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl =>
super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
}

test("fix: native Unsafe row accessors return incorrect results") {
// https://github.com/apache/datafusion-comet/issues/1538
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
// TODO byte/short issue
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET)
Seq(10, 201).foreach { numPartitions =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down
Loading
Loading