[SPARK-53823][SS] Implement allow list for real time mode#52891
[SPARK-53823][SS] Implement allow list for real time mode#52891jerrypeng wants to merge 3 commits intoapache:masterfrom
Conversation
e2047f0 to
1dab9ec
Compare
| @@ -436,7 +436,10 @@ class MicroBatchExecution( | |||
| } | |||
| } | |||
|
|
|||
| if (containsStatefulOperator(analyzedPlan)) { | |||
| if (trigger.isInstanceOf[RealTimeTrigger]) { | |||
There was a problem hiding this comment.
@HeartSaVioR you recently added AQE support streaming queries. Can you take a look to see if this logic is appropriate to disable AQE for RTM quieries?
There was a problem hiding this comment.
That should do the thing. Looks good to me.
| } | ||
| } | ||
|
|
||
| // test("exactly once sink not supported") { |
There was a problem hiding this comment.
Do we want to keep commented code here?
| "real-time without operators - append mode", | ||
| streamRelation, Append, | ||
| "STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED" | ||
| // Map("outputMode" -> "Append") |
There was a problem hiding this comment.
forgot the push a commit that removes this :)
| @@ -1017,6 +1030,30 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |||
| } | |||
| } | |||
|
|
|||
| /** Assert that the logical plan is supported for real-time mode */ | |||
| def assertSupportedForRealTime(name: String, plan: LogicalPlan, outputMode: OutputMode): Unit = { | |||
There was a problem hiding this comment.
Hmm, where is it used? I don't find any.
There was a problem hiding this comment.
ah forgot to push a commit. The test is added
|
Note that this will be merged into master and branch-4.1. |
| @@ -3094,6 +3032,12 @@ object SQLConf { | |||
| .timeConf(TimeUnit.MILLISECONDS) | |||
| .createWithDefault(5000) | |||
|
|
|||
| val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf( | |||
| "spark.sql.streaming.realTimeMode.allowlistCheck") | |||
There was a problem hiding this comment.
nit. Indentation looks a little wrong to me for all realTimeMode configurations, @jerrypeng and @viirya .
- spark.sql.streaming.realTimeMode.minBatchDuration
- spark.sql.streaming.realTimeMode.allowlistCheck
It would be great if you can match with the other code around here.
| @@ -3094,6 +3032,12 @@ object SQLConf { | |||
| .timeConf(TimeUnit.MILLISECONDS) | |||
| .createWithDefault(5000) | |||
|
|
|||
| val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf( | |||
| "spark.sql.streaming.realTimeMode.allowlistCheck") | |||
| .doc("Whether to check all operators, sinks used in real-time mode are in the allowlist.") | |||
There was a problem hiding this comment.
We need a version field.
.version("4.1.0")| ======================================================================================= | ||
| REAL-TIME STREAMING | ||
| ======================================================================================= | ||
| */ |
| /* | ||
| ======================================================================================= | ||
| REAL-TIME STREAMING | ||
| ======================================================================================= |
There was a problem hiding this comment.
Also, this is a little mismatched.
| @@ -853,6 +853,24 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |||
| Deduplicate(Seq(attribute), streamRelation)), outputMode = Append) | |||
| } | |||
|
|
|||
| /* | |||
| ======================================================================================= | |||
| REAL-TIME STREAMING | |||
There was a problem hiding this comment.
BTW, for the actual test methods, it seems that the following {...} grouping is used in this file, doesn't it?
// REAL-TIME STREAMING
{
...
}
| } | ||
| } | ||
|
|
||
| /** Assert that the logical plan is not supported inside a streaming plan with the |
There was a problem hiding this comment.
nit. Maybe, the following?
/**
* Assert that the logical plan is not supported inside a streaming plan with the
...
| "org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec", | ||
| "org.apache.spark.sql.execution.exchange.BroadcastExchangeExec", | ||
| "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec", | ||
| "org.apache.spark.sql.execution.exchange.ReusedExchangeExec", |
There was a problem hiding this comment.
I guess this was initially designed to be a sorted list alphabetically in the code side. Could you change like the following because it looks like the only exceptions?
- "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
"org.apache.spark.sql.execution.exchange.ReusedExchangeExec",
+ "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
| } | ||
|
|
||
| test("rtm operator allowlist") { | ||
| withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { |
There was a problem hiding this comment.
nit.
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
| } | ||
| } | ||
|
|
||
| // TODO : Remove this test after RTM can shuffle to multiple stages |
There was a problem hiding this comment.
Please file a JIRA issue and make this IDed TODO like TODO(SPARK-XXX). It helps the community move forward from your contributions.
| } | ||
| } | ||
|
|
||
| // TODO : Remove this test after RTM supports stateful queries |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM (except a few minor style comments)
ab913ef to
9703b2e
Compare
|
@dongjoon-hyun thank you for your review! I have addressed your comments. |
|
Thank you, @jerrypeng and @viirya . |
### What changes were proposed in this pull request? Add an allowlist and some guardrails to help users understand what is supported and what is not supported in Real-time Mode. This should improve the user experience of real-time mode and minimize the chance of a new user use it in a way that is unexpected or untested which may produce undesirable outcomes. ### Why are the changes needed? To help guide users on what is currently supported and what is not. ### Does this PR introduce _any_ user-facing change? Yes, running a query with currently unsupported features in RTM will throw an exception. ### How was this patch tested? Several tests are added ### Was this patch authored or co-authored using generative AI tooling? No Closes #52891 from jerrypeng/SPARK-53823. Authored-by: Jerry Peng <jerry.peng@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 178f0f4) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Thank you @jerrypeng @dongjoon-hyun |
### What changes were proposed in this pull request? Add an allowlist and some guardrails to help users understand what is supported and what is not supported in Real-time Mode. This should improve the user experience of real-time mode and minimize the chance of a new user use it in a way that is unexpected or untested which may produce undesirable outcomes. ### Why are the changes needed? To help guide users on what is currently supported and what is not. ### Does this PR introduce _any_ user-facing change? Yes, running a query with currently unsupported features in RTM will throw an exception. ### How was this patch tested? Several tests are added ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52891 from jerrypeng/SPARK-53823. Authored-by: Jerry Peng <jerry.peng@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Add an allowlist and some guardrails to help users understand what is supported and what is not supported in Real-time Mode. This should improve the user experience of real-time mode and minimize the chance of a new user use it in a way that is unexpected or untested which may produce undesirable outcomes. ### Why are the changes needed? To help guide users on what is currently supported and what is not. ### Does this PR introduce _any_ user-facing change? Yes, running a query with currently unsupported features in RTM will throw an exception. ### How was this patch tested? Several tests are added ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52891 from jerrypeng/SPARK-53823. Authored-by: Jerry Peng <jerry.peng@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
* Move GlutenStreamingQuerySuite to correct package * Add Spark 4.1 new test suites for Gluten * Enable new and existing Gluten test suites for Spark 4.1 UT * Update workflow trigger paths to exclude Spark 4.0 and 4.1 shims directories for clickhouse backend * Add support for Spark 4.1 in build script * Merge Spark 4.1.0 sql-tests into Gluten Spark 4.1 (three-way merge) Three-way merge performed using Git: - Base: Spark 4.0.1 (29434ea766b) - Left: Spark 4.1.0 (e221b56be7b) - Right: Gluten Spark 4.1 backends-velox Summary: - Auto-merged: 165 files - New tests added: 31 files (collations, edge cases, recursion, spatial, etc.) - Modified tests: 134 files - Deleted tests: 2 files (collations.sql -> split into 4 files, timestamp-ntz.sql) Conflicts resolved: - inputs/timestamp-ntz.sql: Right deleted + Left modified -> DELETED (per resolution rule) New test suites from Spark 4.1.0: - Collations (4 files): aliases, basic, padding-trim, string-functions - Edge cases (6 files): alias-resolution, extract-value, join-resolution, etc. - Advanced features: cte-recursion, generators, kllquantiles, thetasketch, time - Name resolution: order-by-alias, session-variable-precedence, runtime-replaceable - Spatial functions: st-functions (ANSI and non-ANSI variants) - Various resolution edge cases Total files after merge: 671 (up from 613) * Enable additional Spark 4.1 SQL tests by resolving TODOs * Add new Spark 4.1 test files to VeloxSQLQueryTestSettings * [Fix] Replace `RuntimeReplaceable` with its `replacement` to fix UT. see apache/spark#50287 * [4.1.0] Exclude "infer shredding with mixed scale" see apache/spark#52406 * [Fix] Implement Kryo serialization for CachedColumnarBatch see apache/spark#50599 * [4.1.0] Exclude GlutenMapStatusEndToEndSuite and configure parallelism see apache/spark#50230 * [4.1.0] Exclude Spark Structure Steaming tests in Gluten see - apache/spark#52473 - apache/spark#52870 - apache/spark#52891 * [4.1.0] Exclude failing SQL tests on Spark 4.1 * Replace SparkException.require with standard require in ColumnarCachedBatchSerializer to work across different spark versions * [Fix] Replace `RuntimeReplaceable` with its `replacement` to fix UT. see apache/spark#50287 * Exclude Spark 4.0 and 4.1 paths in clickhouse_be_trigger using `!` prefix * [Fix] Update GlutenShowNamespacesParserSuite to use GlutenSQLTestsBaseTrait
What changes were proposed in this pull request?
Add an allowlist and some guardrails to help users understand what is supported and what is not supported in Real-time Mode. This should improve the user experience of real-time mode and minimize the chance of a new user use it in a way that is unexpected or untested which may produce undesirable outcomes.
Why are the changes needed?
To help guide users on what is currently supported and what is not.
Does this PR introduce any user-facing change?
Yes, running a query with currently unsupported features in RTM will throw an exception.
How was this patch tested?
Several tests are added
Was this patch authored or co-authored using generative AI tooling?
No