From 9357c74bd159c8d359a4930b1a1de2618d176c31 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 3 Aug 2024 11:39:29 -0400 Subject: [PATCH 1/9] Add CreateArray support --- native/Cargo.lock | 24 ++++++++++++++++++- native/Cargo.toml | 1 + native/core/Cargo.toml | 1 + .../core/src/execution/datafusion/planner.rs | 16 +++++++++++++ native/proto/src/proto/expr.proto | 6 +++++ .../apache/comet/serde/QueryPlanSerde.scala | 21 ++++++++++++++++ .../apache/comet/CometExpressionSuite.scala | 12 ++++++++++ 7 files changed, 80 insertions(+), 1 deletion(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 3f6b1d1c71..86cc193d79 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -882,6 +882,7 @@ dependencies = [ "datafusion-comet-spark-expr", "datafusion-common", "datafusion-expr", + "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-common", "flate2", @@ -1054,6 +1055,27 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-functions-nested" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "itertools 0.12.1", + "log", + "paste", + "rand", +] + [[package]] name = "datafusion-optimizer" version = "40.0.0" @@ -2321,7 +2343,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.72", diff --git a/native/Cargo.toml b/native/Cargo.toml index c6cf571a76..0ea687e21d 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -42,6 +42,7 @@ parquet = { version = "52.2.0", default-features = false, features = ["experimen datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e" } datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", features = ["unicode_expressions", "crypto_expressions"] } datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 5b5c237ce1..55c46e1d6c 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -67,6 +67,7 @@ itertools = "0.11.0" paste = "1.0.14" datafusion-common = { workspace = true } datafusion = { workspace = true } +datafusion-functions-nested = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index b4d723eb1f..ec433e52a6 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -55,6 +55,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition}; +use datafusion_functions_nested::make_array::make_array_udf; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; use datafusion_physical_expr_common::expressions::Literal; @@ -624,6 +625,21 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; Ok(Arc::new(GetStructField::new(child, expr.ordinal as usize))) } + ExprStruct::CreateArray(expr) => { + let values = expr + .values + .iter() + .map(|expr| self.create_expr(expr, input_schema.clone())) + .collect::, _>>()?; + let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); + + Ok(Arc::new(ScalarFunctionExpr::new( + "make_array", + make_array_udf(), + values, + data_type, + ))) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 485fdcfddc..799ed5c435 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -79,6 +79,7 @@ message Expr { BloomFilterMightContain bloom_filter_might_contain = 52; CreateNamedStruct create_named_struct = 53; GetStructField get_struct_field = 54; + CreateArray create_array = 55; } } @@ -498,6 +499,11 @@ message GetStructField { int32 ordinal = 2; } +message CreateArray { + repeated Expr values = 1; + DataType datatype = 2; +} + enum SortDirection { Ascending = 0; Descending = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 98ca14047e..b757708170 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2330,6 +2330,27 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim .build() } + // datafusion's make_array only supports nullable element types + case array @ CreateArray(children, _) if array.dataType.containsNull => + val childExprs = children.map(exprToProto(_, inputs, binding)) + val dataType = serializeDataType(array.dataType) + + if (childExprs.forall(_.isDefined) && dataType.isDefined) { + val createArrayBuilder = ExprOuterClass.CreateArray + .newBuilder() + .addAllValues(childExprs.map(_.get).asJava) + .setDatatype(dataType.get) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setCreateArray(createArrayBuilder) + .build()) + } else { + withInfo(expr, "unsupported arguments for CreateArray", children: _*) + None + } + case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 27c046d741..ed8d8547c1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1922,4 +1922,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("create_array") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + val df = spark.read.parquet(path.toString) + checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4")))) + checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11")))) + } + } + } } From 05b4a0ce652b203c8fa1249aaef0eaa93eb5bd0c Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 5 Aug 2024 19:58:36 -0400 Subject: [PATCH 2/9] Update Spark SQL test diffs --- dev/diffs/3.4.3.diff | 30 ++++++++++++++++++++---------- dev/diffs/3.5.1.diff | 26 ++++++++++++++++++-------- dev/diffs/4.0.0-preview1.diff | 26 ++++++++++++++++++-------- 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 3c88d6a7ee..c2a7488972 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -352,7 +352,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -423,7 +423,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -433,7 +443,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -443,7 +453,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -452,7 +462,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -462,7 +472,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -472,7 +482,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -482,7 +492,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -2430,7 +2440,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..293e9dc2986 100644 +index dd55fcfe42c..e7fcd0a9e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -2579,7 +2589,7 @@ index 1966e1e64fd..cde97a0aafe 100644 spark.sql( """ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 07361cfdce9..25b0dc3ef7e 100644 +index 07361cfdce9..6673c141c9a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -55,25 +55,53 @@ object TestHive diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index 19eda87497..cb8d409c5b 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -331,7 +331,7 @@ index c2fe31520ac..0f54b233d14 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..9cf7a9dd4e3 100644 +index f33432ddb6f..10702dba7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -402,7 +402,17 @@ index f33432ddb6f..9cf7a9dd4e3 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -412,7 +422,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1470,7 +1480,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1470,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -422,7 +432,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1485,7 +1496,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1485,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -431,7 +441,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1557,7 +1568,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1557,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -441,7 +451,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1588,7 +1600,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1588,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -451,7 +461,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1617,7 +1630,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1617,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -461,7 +471,7 @@ index f33432ddb6f..9cf7a9dd4e3 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1729,6 +1743,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 69016bc38e..f876f92eee 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -415,7 +415,7 @@ index 16a493b5290..3f0b70e2d59 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..d46dc5e138a 100644 +index 2c24cc7d570..e7ea9b4a824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -486,7 +486,17 @@ index 2c24cc7d570..d46dc5e138a 100644 Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( -@@ -1311,7 +1320,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1279,7 +1288,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") { ++ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type", ++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { + Seq(NO_CODEGEN, CODEGEN_ONLY).foreach { mode => + Seq(true, false).foreach { pruning => + withSQLConf( +@@ -1311,7 +1321,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -496,7 +506,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1471,7 +1481,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1471,7 +1482,8 @@ abstract class DynamicPartitionPruningSuiteBase checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil) } @@ -506,7 +516,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1486,7 +1497,7 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1486,7 +1498,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-38148: Do not add dynamic partition pruning if there exists static partition " + @@ -515,7 +525,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq( "f.store_id = 1" -> false, -@@ -1558,7 +1569,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1558,7 +1570,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -525,7 +535,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withTable("duplicate_keys") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { Seq[(Int, String)]((1, "NL"), (1, "NL"), (3, "US"), (3, "US"), (3, "US")) -@@ -1589,7 +1601,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1589,7 +1602,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -535,7 +545,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1618,7 +1631,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1618,7 +1632,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -545,7 +555,7 @@ index 2c24cc7d570..d46dc5e138a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { val df = sql( """ -@@ -1730,6 +1744,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1745,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) From 661e673deee83f7af8e4afb6b536f4fa690f0873 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 8 Aug 2024 19:50:31 -0400 Subject: [PATCH 3/9] Use scalaExprToProto --- native/core/src/execution/datafusion/planner.rs | 16 ---------------- native/core/src/execution/jni_api.rs | 9 +++++---- native/proto/src/proto/expr.proto | 6 ------ .../org/apache/comet/serde/QueryPlanSerde.scala | 16 +++------------- .../org/apache/comet/CometExpressionSuite.scala | 7 ++++++- 5 files changed, 14 insertions(+), 40 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 818dabb485..a16ceda8c3 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -103,7 +103,6 @@ use datafusion_common::{ }; use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition}; -use datafusion_functions_nested::make_array::make_array_udf; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; use datafusion_physical_expr_common::expressions::Literal; @@ -624,21 +623,6 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; Ok(Arc::new(GetStructField::new(child, expr.ordinal as usize))) } - ExprStruct::CreateArray(expr) => { - let values = expr - .values - .iter() - .map(|expr| self.create_expr(expr, input_schema.clone())) - .collect::, _>>()?; - let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - - Ok(Arc::new(ScalarFunctionExpr::new( - "make_array", - make_array_udf(), - values, - data_type, - ))) - } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index eb5f698b40..078f47a8f2 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -239,10 +239,11 @@ fn prepare_datafusion_session_context( let runtime = RuntimeEnv::new(rt_config).unwrap(); - Ok(SessionContext::new_with_config_rt( - session_config, - Arc::new(runtime), - )) + let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); + + datafusion_functions_nested::register_all(&mut session_ctx)?; + + Ok(session_ctx) } /// Prepares arrow arrays for output. diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 799ed5c435..485fdcfddc 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -79,7 +79,6 @@ message Expr { BloomFilterMightContain bloom_filter_might_contain = 52; CreateNamedStruct create_named_struct = 53; GetStructField get_struct_field = 54; - CreateArray create_array = 55; } } @@ -499,11 +498,6 @@ message GetStructField { int32 ordinal = 2; } -message CreateArray { - repeated Expr values = 1; - DataType datatype = 2; -} - enum SortDirection { Ascending = 0; Descending = 1; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 87243ebb34..04848efb42 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2349,21 +2349,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } // datafusion's make_array only supports nullable element types - case array @ CreateArray(children, _) if array.dataType.containsNull => + case array @ CreateArray(children, _) => val childExprs = children.map(exprToProto(_, inputs, binding)) - val dataType = serializeDataType(array.dataType) - if (childExprs.forall(_.isDefined) && dataType.isDefined) { - val createArrayBuilder = ExprOuterClass.CreateArray - .newBuilder() - .addAllValues(childExprs.map(_.get).asJava) - .setDatatype(dataType.get) - - Some( - ExprOuterClass.Expr - .newBuilder() - .setCreateArray(createArrayBuilder) - .build()) + if (childExprs.forall(_.isDefined)) { + scalarExprToProto("make_array", childExprs: _*) } else { withInfo(expr, "unsupported arguments for CreateArray", children: _*) None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0a0466c733..fa03670e81 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE -import org.apache.spark.sql.types.{Decimal, DecimalType} +import org.apache.spark.sql.types.{ArrayType, Decimal, DecimalType} import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus} @@ -1986,6 +1986,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4")))) checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11")))) } + + // Test non-nullable arrays with range + val df = spark.range(5).select(array(col("id"), col("id") + 1)) + checkSparkAnswerAndOperator(df) + assert(!df.schema.fields(0).dataType.asInstanceOf[ArrayType].containsNull) } } } From 0c9ac17bc6413000ba3c3e9ad760f9277727f31a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 8 Aug 2024 19:58:17 -0400 Subject: [PATCH 4/9] Specify data type --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 04848efb42..fcfebf5ce2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2348,12 +2348,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim .build() } - // datafusion's make_array only supports nullable element types case array @ CreateArray(children, _) => val childExprs = children.map(exprToProto(_, inputs, binding)) + // datafusion's make_array only supports nullable element types + val dataType = array.dataType.copy(containsNull = true) if (childExprs.forall(_.isDefined)) { - scalarExprToProto("make_array", childExprs: _*) + scalarExprToProtoWithReturnType("make_array", dataType, childExprs: _*) } else { withInfo(expr, "unsupported arguments for CreateArray", children: _*) None From 170ab0c628c9b3bab9cb0bcf77775a1a77c88985 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 8 Aug 2024 20:03:59 -0400 Subject: [PATCH 5/9] Only do nullable elements again --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 7 +++---- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 5 ----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index fcfebf5ce2..d857e74fc2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2348,13 +2348,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim .build() } - case array @ CreateArray(children, _) => + // datafusion's make_array only supports nullable element types + case array @ CreateArray(children, _) if array.dataType.containsNull => val childExprs = children.map(exprToProto(_, inputs, binding)) - // datafusion's make_array only supports nullable element types - val dataType = array.dataType.copy(containsNull = true) if (childExprs.forall(_.isDefined)) { - scalarExprToProtoWithReturnType("make_array", dataType, childExprs: _*) + scalarExprToProtoWithReturnType("make_array", array.dataType, childExprs: _*) } else { withInfo(expr, "unsupported arguments for CreateArray", children: _*) None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fa03670e81..e2a59d6db1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1986,11 +1986,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4")))) checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11")))) } - - // Test non-nullable arrays with range - val df = spark.range(5).select(array(col("id"), col("id") + 1)) - checkSparkAnswerAndOperator(df) - assert(!df.schema.fields(0).dataType.asInstanceOf[ArrayType].containsNull) } } } From 52d9e32f9ff28ba3a0b048b669620eeef7c478ab Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 9 Aug 2024 07:48:37 -0400 Subject: [PATCH 6/9] Remove unused import --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e2a59d6db1..0a0466c733 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE -import org.apache.spark.sql.types.{ArrayType, Decimal, DecimalType} +import org.apache.spark.sql.types.{Decimal, DecimalType} import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus} From 9ccd9a2577f20669ee4cee83caf4edf4bbf0dc1e Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 9 Aug 2024 19:38:23 -0400 Subject: [PATCH 7/9] Add null to the test and add nullable element datafusion issue --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 + .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d857e74fc2..f97de4aaf6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2349,6 +2349,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } // datafusion's make_array only supports nullable element types + // https://github.com/apache/datafusion/issues/11923 case array @ CreateArray(children, _) if array.dataType.containsNull => val childExprs = children.map(exprToProto(_, inputs, binding)) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0a0466c733..34eb2cb9ac 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1984,7 +1984,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) val df = spark.read.parquet(path.toString) checkSparkAnswerAndOperator(df.select(array(col("_2"), col("_3"), col("_4")))) - checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11")))) + checkSparkAnswerAndOperator(df.select(array(col("_4"), col("_11"), lit(null)))) } } } From 8afe7232a48b81c5ebb33f275725a1f95add0ef7 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 12 Aug 2024 20:07:13 -0400 Subject: [PATCH 8/9] Rename test --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 34eb2cb9ac..9701f88bca 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1977,7 +1977,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("create_array") { + test("CreateArray") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") From faa8affbbc093a7a4d5b674f58f2b3eb31a99ae6 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 14 Aug 2024 06:56:58 -0400 Subject: [PATCH 9/9] Update lock --- native/Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 0a06e57cb0..5d8dce283b 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1056,8 +1056,8 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=bddb641#bddb6415a50746d2803dd908d19c3758952d74f9" +version = "41.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" dependencies = [ "arrow", "arrow-array",