-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35378][SQL] Eagerly execute commands in QueryExecution instead of caller sides #32513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138416 has finished for PR 32513 at commit
|
|
ping @cloud-fan @wangyum @maropu @viirya |
| // We can't clone `logical` here, which will reset the `_analyzed` flag. | ||
| sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) | ||
| sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) match { | ||
| case c: Command => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we leave out the root node Command? Can we remove the command execution logic in Dataset.logicalPlan and execute all the commands here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to maintain the Command here and ensure its behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let's unify the behavior eagerly execute the commands.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #138543 has finished for PR 32513 at commit
|
| sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) | ||
| sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) transform { | ||
| // SPARK-35378: Eagerly execute LeafRunnableCommand so that query command with CTE | ||
| case r: LeafRunnableCommand => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should run all Commands, not just LeafRunnableCommand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
Test build #138624 has finished for PR 32513 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #138626 has finished for PR 32513 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
Outdated
Show resolved
Hide resolved
| case c: Command => | ||
| val subQueryExecution = sparkSession.sessionState.executePlan(c) | ||
| LocalRelation(c.output, | ||
| subQueryExecution.executedPlan.executeCollect(), false, Some(subQueryExecution.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we create new query plan nodes: CommandResult and CommandResultExec
case class CommandResult(qe: QueryExecution) extends LeadNode {
def innerChildren = Seq(qe.analyzedPlan)
def output = qe.logicalPlan.output
}
case class CommandResultExec(qe: QueryExecution) extends LeafNodeExec {
def innerChildren = Seq(qe.executedPlan)
def output = qe.logicalPlan.output
}
then both UI and EXPLAIN can have pretty output.
|
Test build #138628 has finished for PR 32513 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139462 has finished for PR 32513 at commit
|
| * callback functions. | ||
| */ | ||
| private def runCommand(name: String)(command: LogicalPlan): Unit = { | ||
| private def runCommand()(command: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall this just be private def runCommand(command: LogicalPlan)?
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139494 has finished for PR 32513 at commit
|
|
thanks, merging to master! |
|
@cloud-fan Thanks for your hard work! @viirya @yaooqinn Thanks for review too. |
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a public package which makes CommandResult a public API. This is unexpected. We should move this class to org.apache.spark.sql.catalyst.plans.logical. @beliefer can you help to make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
…ataFrameWriterV2 ### What changes were proposed in this pull request? This is a followup of #32513 It's hard to keep the command execution name for `DataFrameWriter`, as the command logical plan is a bit messy (DS v1, file source and hive and different command logical plans) and sometimes it's hard to distinguish "insert" and "save". However, `DataFrameWriterV2` only produce v2 commands which are pretty clean. It's easy to keep the command execution name for them. ### Why are the changes needed? less breaking changes. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #32919 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ical ### What changes were proposed in this pull request? #32513 added the case class `CommandResult` in package `org.apache.spark.sql.expression`. It is not suitable, so this PR move `CommandResult` from `org.apache.spark.sql.expression` to `org.apache.spark.sql.catalyst.plans.logical`. ### Why are the changes needed? Make `CommandResult` in suitable package. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? No need. Closes #32942 from beliefer/SPARK-35378-followup. Lead-authored-by: gengjiaan <gengjiaan@360.cn> Co-authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? #32513 added the case class `CommandResult` so as we can eagerly execute command locally. But we forgot to update `isLocal` of `Dataset`. ### Why are the changes needed? `Dataset.isLocal` should consider `CommandResult`. ### Does this PR introduce _any_ user-facing change? Yes. If the SQL plan is `CommandResult`, `Dataset.isLocal` must return true. ### How was this patch tested? No test. Closes #32963 from beliefer/SPARK-35378-followup2. Authored-by: gengjiaan <gengjiaan@360.cn> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultExec.executeCollect() ### What changes were proposed in this pull request? This PR is a follow-up for #32513 and fixes an issue introduced by that patch. CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one: ``` Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class. ### Why are the changes needed? Fixes a bug in CommandResultExec. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to check the return type of all commands. Closes #36632 from sadikovi/fix-command-exec. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultExec.executeCollect() ### What changes were proposed in this pull request? This PR is a follow-up for #32513 and fixes an issue introduced by that patch. CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one: ``` Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class. ### Why are the changes needed? Fixes a bug in CommandResultExec. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to check the return type of all commands. Closes #36632 from sadikovi/fix-command-exec. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a0decfc) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultExec.executeCollect() ### What changes were proposed in this pull request? This PR is a follow-up for #32513 and fixes an issue introduced by that patch. CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one: ``` Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class. ### Why are the changes needed? Fixes a bug in CommandResultExec. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to check the return type of all commands. Closes #36632 from sadikovi/fix-command-exec. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a0decfc) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultExec.executeCollect() ### What changes were proposed in this pull request? This PR is a follow-up for apache#32513 and fixes an issue introduced by that patch. CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one: ``` Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class. ### Why are the changes needed? Fixes a bug in CommandResultExec. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to check the return type of all commands. Closes apache#36632 from sadikovi/fix-command-exec. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a0decfc) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultExec.executeCollect() ### What changes were proposed in this pull request? This PR is a follow-up for apache#32513 and fixes an issue introduced by that patch. CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one: ``` Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class. ### Why are the changes needed? Fixes a bug in CommandResultExec. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test to check the return type of all commands. Closes apache#36632 from sadikovi/fix-command-exec. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a0decfc) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently, Spark eagerly executes commands on the caller side of
QueryExecution, which is a bit hacky asQueryExecutionis not aware of it and leads to confusion.For example, if you run



sql("show tables").collect(), you will see two queries with identical query plans in the web UI.The first query is triggered at
Dataset.logicalPlan, which eagerly executes the command.The second query is triggered at
Dataset.collect, which is the normal query execution.From the web UI, it's hard to tell that these two queries are caused by eager command execution.
This PR proposes to move the eager command execution to

QueryExecution, and turn the command plan toCommandResultto indicate that command has been executed already. Nowsql("show tables").collect()still triggers two queries, but the quey plans are not identical. The second query becomes:In addition to the UI improvements, this PR also has other benefits:
QueryExecutiontakes care of it.Why are the changes needed?
Explained above.
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing tests