Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)

// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
// and does not support complex types. Use native_iceberg_compat or auto instead.
// This will be removed in a future release.
// See: https://github.com/apache/datafusion-comet/issues/2186
@deprecated("Use SCAN_AUTO instead. native_comet will be removed in a future release.", "0.9.0")
val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
val SCAN_AUTO = "auto"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ object Utils extends CometTypeShim {
throw new SparkException(
s"Comet execution only takes Arrow Arrays, but got ${c.getClass}. " +
"This typically happens when a Comet scan falls back to Spark due to unsupported " +
"data types (e.g., complex types like structs, arrays, or maps with native_comet). " +
"data types (e.g., complex types like structs, arrays, or maps). " +
"To resolve this, you can: " +
"(1) enable spark.comet.scan.allowIncompatible=true to use a compatible native " +
"scan variant, or " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.comet.parquet.CometParquetFileFormat
*
* This is a hybrid scan where the native plan will contain a `ScanExec` that reads batches of
* data from the JVM via JNI. The ultimate source of data may be a JVM implementation such as
* Spark readers, or could be the `native_comet` or `native_iceberg_compat` native scans.
* Spark readers, or could be the `native_iceberg_compat` native scan.
*
* Note that scanImpl can only be `native_datafusion` after CometScanRule runs and before
* CometExecRule runs. It will never be set to `native_datafusion` at execution time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp

// https://github.com/apache/datafusion-comet/issues/2612
test("array_reverse - fallback for binary array") {
val fallbackReason =
if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET || sys.env
.getOrElse("COMET_PARQUET_SCAN_IMPL", "") == CometConf.SCAN_NATIVE_COMET) {
"Unsupported schema"
} else {
CometArrayReverse.unsupportedReason
}
val fallbackReason = CometArrayReverse.unsupportedReason
withTable("t1") {
sql("""create table t1 using parquet as
select cast(null as array<binary>) c1, cast(array() as array<binary>) c2
Expand Down
10 changes: 2 additions & 8 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1205,14 +1205,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
|USING parquet
""".stripMargin)
sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
if (!usingLegacyNativeCometScan) {
checkSparkAnswerAndOperator(
"SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1")
} else {
// Should just fall back to Spark since non-DataSourceExec scan does not support nested types.
checkSparkAnswer(
"SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1")
}
checkSparkAnswerAndOperator(
"SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1")
}
}

Expand Down
191 changes: 6 additions & 185 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// ignored: native_comet scan is no longer supported
ignore("basic data type support") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100")
}
}
}
}
}

test("basic data type support - excluding u8/u16") {
// variant that skips _9 (UINT_8) and _10 (UINT_16) for default scan impl
Seq(true, false).foreach { dictionaryEnabled =>
Expand All @@ -217,27 +201,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// ignored: native_comet scan is no longer supported
ignore("uint data type support") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "testuint.parquet")
makeParquetFileAllPrimitiveTypes(
path,
dictionaryEnabled = dictionaryEnabled,
Byte.MinValue,
Byte.MaxValue)
withParquetTable(path.toString, "tbl") {
val qry = "select _9 from tbl order by _11"
checkSparkAnswerAndOperator(qry)
}
}
}
}
}

test("uint data type support - excluding u8/u16") {
// variant that tests UINT_32 and UINT_64, skipping _9 (UINT_8) and _10 (UINT_16)
Seq(true, false).foreach { dictionaryEnabled =>
Expand Down Expand Up @@ -1491,57 +1454,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// ignored: native_comet scan is no longer supported
ignore("round") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(
path,
dictionaryEnabled = dictionaryEnabled,
-128,
128,
randomSize = 100)
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
withParquetTable(path.toString, "tbl") {
for (s <- Seq(-5, -1, 0, 1, 5, -1000, 1000, -323, -308, 308, -15, 15, -16, 16,
null)) {
// array tests
// TODO: enable test for floats (_6, _7, _8, _13)
for (c <- Seq(2, 3, 4, 5, 9, 10, 11, 12, 15, 16, 17)) {
checkSparkAnswerAndOperator(s"select _${c}, round(_${c}, ${s}) FROM tbl")
}
// scalar tests
// Exclude the constant folding optimizer in order to actually execute the native round
// operations for scalar (literal) values.
// TODO: comment in the tests for float once supported
withSQLConf(
"spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
for (n <- Seq("0.0", "-0.0", "0.5", "-0.5", "1.2", "-1.2")) {
checkSparkAnswerAndOperator(
s"select round(cast(${n} as tinyint), ${s}) FROM tbl")
// checkSparkAnswerAndCometOperators(s"select round(cast(${n} as float), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(38, 18)), ${s}) FROM tbl")
checkSparkAnswerAndOperator(
s"select round(cast(${n} as decimal(20, 0)), ${s}) FROM tbl")
}
// checkSparkAnswer(s"select round(double('infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('-infinity'), ${s}) FROM tbl")
// checkSparkAnswer(s"select round(double('NaN'), ${s}) FROM tbl")
// checkSparkAnswer(
// s"select round(double('0.000000000000000000000000000000000001'), ${s}) FROM tbl")
}
}
}
}
}
}
}

test("md5") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Expand All @@ -1556,25 +1468,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// ignored: native_comet scan is no longer supported
ignore("hex") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(usingLegacyNativeCometScan)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "hex.parquet")
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
withParquetTable(path.toString, "tbl") {
checkSparkAnswerAndOperator(
"SELECT hex(_1), hex(_2), hex(_3), hex(_4), hex(_5), hex(_6), hex(_7), hex(_8), hex(_9), hex(_10), hex(_11), hex(_12), hex(_13), hex(_14), hex(_15), hex(_16), hex(_17), hex(_18), hex(_19), hex(_20) FROM tbl")
}
}
}
}
}

test("unhex") {
val table = "unhex_table"
withTable(table) {
Expand Down Expand Up @@ -2442,13 +2335,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
df.write.parquet(dir.toString())
}
val df = spark.read.parquet(dir.toString()).select("nested1.id")
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (!scanImpl.equals(CometConf.SCAN_NATIVE_COMET)) {
checkSparkAnswerAndOperator(df)
} else {
checkSparkAnswer(df)
}
checkSparkAnswerAndOperator(df)
}
}

Expand All @@ -2474,19 +2361,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

val df = spark.read.parquet(dir.toString())
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id"))
} else {
checkSparkAnswer(df.select("nested1.id"))
checkSparkAnswer(df.select("nested1.nested2"))
checkSparkAnswer(df.select("nested1.nested2.id"))
checkSparkAnswer(df.select("nested1.id", "nested1.nested2.id"))
}
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id"))
}
}

Expand All @@ -2512,13 +2390,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

val df = spark.read.parquet(dir.toString()).select("nested1.id")
// Comet's original scan does not support structs.
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
checkSparkAnswerAndOperator(df)
} else {
checkSparkAnswer(df)
}
checkSparkAnswerAndOperator(df)
}
}

Expand Down Expand Up @@ -2595,7 +2467,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("get_struct_field with DataFusion ParquetExec - read entire struct") {
assume(!usingLegacyNativeCometScan(conf))
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
Expand Down Expand Up @@ -2632,7 +2503,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("read array[int] from parquet") {
assume(!usingLegacyNativeCometScan(conf))

withTempPath { dir =>
// create input file with Comet disabled
Expand Down Expand Up @@ -2773,55 +2643,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

// ignored: native_comet scan is no longer supported
ignore("test integral divide") {
// this test requires native_comet scan due to unsigned u8/u16 issue
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path1 = new Path(dir.toURI.toString, "test1.parquet")
val path2 = new Path(dir.toURI.toString, "test2.parquet")
makeParquetFileAllPrimitiveTypes(
path1,
dictionaryEnabled = dictionaryEnabled,
0,
0,
randomSize = 10000)
makeParquetFileAllPrimitiveTypes(
path2,
dictionaryEnabled = dictionaryEnabled,
0,
0,
randomSize = 10000)
withParquetTable(path1.toString, "tbl1") {
withParquetTable(path2.toString, "tbl2") {
checkSparkAnswerAndOperator("""
|select
| t1._2 div t2._2, div(t1._2, t2._2),
| t1._3 div t2._3, div(t1._3, t2._3),
| t1._4 div t2._4, div(t1._4, t2._4),
| t1._5 div t2._5, div(t1._5, t2._5),
| t1._9 div t2._9, div(t1._9, t2._9),
| t1._10 div t2._10, div(t1._10, t2._10),
| t1._11 div t2._11, div(t1._11, t2._11)
| from tbl1 t1 join tbl2 t2 on t1._id = t2._id
| order by t1._id""".stripMargin)

checkSparkAnswerAndOperator("""
|select
| t1._12 div t2._12, div(t1._12, t2._12),
| t1._15 div t2._15, div(t1._15, t2._15),
| t1._16 div t2._16, div(t1._16, t2._16),
| t1._17 div t2._17, div(t1._17, t2._17)
| from tbl1 t1 join tbl2 t2 on t1._id = t2._id
| order by t1._id""".stripMargin)
}
}
}
}
}
}

test("ANSI support for add") {
val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1))
withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
Expand Down
Loading
Loading