[GLUTEN-3154][CORE] Override doCanonicalize in scan operators#3155
[GLUTEN-3154][CORE] Override doCanonicalize in scan operators#3155ulysses-you merged 3 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI |
|
Thanks for your fix! Does |
@philo-he It seems that both HiveTableScanExecTransformer and FileSourceScanExecTransformer may have this issue. |
|
Run Gluten Clickhouse CI |
Yes, I have added doCanonicalize in FileSourceScanExecTransformer and HiveTableScanExecTransformer. |
|
@ulysses-you, @zzcclp, could you take a review? |
| canonicalized.scan, | ||
| canonicalized.runtimeFilters, | ||
| QueryPlan.normalizePredicates( | ||
| pushdownFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), |
There was a problem hiding this comment.
not related to this pr, but it seems pushdownFilters is never been set and used... not sure the original idea, maybe we can just remove pushdownFilters
There was a problem hiding this comment.
Yes, we can remove it in this PR. Thanks!
There was a problem hiding this comment.
@ulysses-you @philo-he I encountered an error during the test. This parameter here cannot be deleted. Although it is not used, it is possible that the parameter was added unintentionally that avoid the error. We may need to add keyGroupedPartitioning to BatchScanExecTransformer, pushedFilters parameter had ambiguity in name.
Exception in thread "main" java.lang.IllegalStateException:
Failed to copy node.
Is otherCopyArgs specified correctly for BatchScanExecTransformer.
Exception message: wrong number of arguments
ctor: public io.glutenproject.execution.BatchScanExecTransformer(scala.collection.Seq,org.apache.spark.sql.connector.read.Scan,scala.collection.Seq)?
types: class scala.collection.immutable.$colon$colon, class org.apache.iceberg.spark.source.SparkBatchQueryScan, class scala.collection.immutable.$colon$colon, class scala.None$
args: List(xxxxx#10037, xxxxx#10046, xxxxx#10047L, xxxxx#10048L, xxxxx#10079, xxxxx#10080, xxxxx#10081), IcebergScan(table=default_iceberg.xxxxx.xxxxx, type=struct<1: xxxxx: optional string, 10: xxxxx: optional string, 11: xxxxx: optional long, 12: xxxxx: optional long, 43: xxxxx: optional string, 44: xxxxx: optional string, 45: xxxxx: optional string>, filters=[not_null(ref(name="xxxxx")), not_null(ref(name="xxxxx")), ref(name="xxxxx") == "2023-08-01", not(ref(name="xxxxx") == "xxxxx"), not_null(ref(name="xxxxx")), not_null(ref(name="xxxxx"))], runtimeFilters=[], caseSensitive=false), List(dynamicpruningexpression(true), dynamicpruningexpression(xxxxx#10080 IN dynamicpruning#10622)), None
at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:854)
at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:797)
at org.apache.spark.sql.execution.SparkPlan.super$makeCopy(SparkPlan.scala:100)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$makeCopy$1(SparkPlan.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:100)
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:60)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:223)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:188)
at org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery$$anonfun$org$apache$spark$sql$execution$reuse$ReuseExchangeAndSubquery$$reuse$1$1.applyOrElse(ReuseExchangeAndSubquery.scala:54)
at org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery$$anonfun$org$apache$spark$sql$execution$reuse$ReuseExchangeAndSubquery$$reuse$1$1.applyOrElse(ReuseExchangeAndSubquery.scala:44)
It may be a little difficult to explain here. I try to make it clear. When Spark is in TreeNode.makeCopy, it will call the productArity method of Product to obtain the number of parameters of the case class. The current number of parameters of BatchScanExecTransformer is 3, but productArity gets 4. After my test, productArity gets the number of parameters of the first specific case class, which is the number of parameters of BatchScanExec, which does not match the number of BatchScanExecTransformer constructors.
I think the standard approach here is that if we need to inherit Spark's case class plan, we need to ensure that the number of parameters is consistent, otherwise it will cause problems when plan.transform calls makeCopy. Or implement the otherCopyArgs method to ensure that not transformed args can be copied.
There was a problem hiding this comment.
It's a subtle issue. Why productArity returns 4 for BatchScanExec? It looks BatchScanExec has only 3 constructor args.
There was a problem hiding this comment.
It's a subtle issue. Why
productArityreturns 4 forBatchScanExec? It looksBatchScanExechas only 3 constructor args.
Spark 3.3 has 4 args.
https://github.com/apache/spark/blob/b170a670b68616af49b26cbcc84894c939713837/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L40
There was a problem hiding this comment.
case class A(a1: Int, a2: Int) extends Product
class B(a1: Int, a2: Int, a3: Int) extends A(a1, a2)
val b = new B(1, 2, 3)
b.productArityb.productArity will return 2.
There was a problem hiding this comment.
I see. Please help fix it in a separate PR and leave some comments to explain in source code. Thanks!
|
Run Gluten Clickhouse CI |
ad69c5f to
07d1c74
Compare
|
Run Gluten Clickhouse CI |
| } | ||
|
|
||
| override def doCanonicalize(): BatchScanExecTransformer = { | ||
| val canonicalized = super.doCanonicalize() |
There was a problem hiding this comment.
Just wondering whether we can directly use the members of BatchScanExecTransformer to doCanonicalize. There may have some differences. Thanks!
There was a problem hiding this comment.
BatchScanExecTransformer inherits BatchScanExec, and their parameters should be exactly the same.
|
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
Fix #3154
Spark will try to get reused queryStage based on plan.canonicalized.
https://github.com/apache/spark/blob/822f58f0d26b7d760469151a65eaf9ee863a07a1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L523-L523
How was this patch tested?