From 7162aab6c31d82960f2a2d09a231178d5c27a5a9 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 4 Nov 2025 22:37:46 -0800 Subject: [PATCH 1/3] [SPARK-53823] Implement allow list for real time mode --- .../resources/error/error-conditions.json | 15 ++ .../UnsupportedOperationChecker.scala | 19 +++ .../streaming/WriteToStreamStatement.scala | 8 +- .../apache/spark/sql/internal/SQLConf.scala | 6 + .../analysis/UnsupportedOperationsSuite.scala | 37 +++++ .../spark/sql/classic/DataStreamWriter.scala | 9 ++ .../sql/classic/StreamingQueryManager.scala | 2 +- .../runtime/MicroBatchExecution.scala | 15 +- .../runtime/RealTimeModeAllowlist.scala | 147 ++++++++++++++++++ .../runtime/ResolveWriteToStream.scala | 8 +- .../StreamRealTimeModeAllowlistSuite.scala | 133 ++++++++++++++++ 11 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index b16fe84ae27ff..4b6bf754c3ec2 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5671,6 +5671,21 @@ "message" : [ "The input stream is not supported in Real-time Mode." ] + }, + "OPERATOR_OR_SINK_NOT_IN_ALLOWLIST" : { + "message" : [ + "The (s): not in the allowlist for Real-Time Mode. To bypass this check, set spark.sql.streaming.realTimeMode.allowlistCheck to false. By changing this, you agree to run the query at your own risk." + ] + }, + "OUTPUT_MODE_NOT_SUPPORTED" : { + "message" : [ + "The output mode is not supported. To work around this limitation, set the output mode to Update. In the future, may be supported." + ] + }, + "SINK_NOT_SUPPORTED" : { + "message" : [ + "The sink is currently not supported. See the Real-Time Mode User Guide for a list of supported sinks." + ] } }, "sqlState" : "0A000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index fd4e081c91b52..d658d83f066f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -584,6 +584,25 @@ object UnsupportedOperationChecker extends Logging { } } + // Verifies that a query using real-time mode is valid. It is meant to be used in addition to + // the checkForStreaming method: for this reason, we call this method check *additional* + // real-time mode constraints. + // + // It should be called during resolution of the WriteToStreamStatement if and only if + // the query is using the real-time trigger. + def checkAdditionalRealTimeModeConstraints(plan: LogicalPlan, outputMode: OutputMode): Unit = { + if (outputMode != InternalOutputModes.Update) { + throwRealTimeError("OUTPUT_MODE_NOT_SUPPORTED", Map("outputMode" -> outputMode.toString)) + } + } + + private def throwRealTimeError(subClass: String, args: Map[String, String]): Unit = { + throw new AnalysisException( + errorClass = s"STREAMING_REAL_TIME_MODE.$subClass", + messageParameters = args + ) + } + private def throwErrorIf( condition: Boolean, msg: String)(implicit operator: LogicalPlan): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala index a6204b317d249..7015d0dd3b2cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} -import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.streaming.{OutputMode, Trigger} /** * A statement for Stream writing. It contains all neccessary param and will be resolved in the @@ -39,7 +39,9 @@ import org.apache.spark.sql.streaming.OutputMode * @param sink Sink to write the streaming outputs. * @param outputMode Output mode for the sink. * @param hadoopConf The Hadoop Configuration to get a FileSystem instance - * @param isContinuousTrigger Whether the statement is triggered by a continuous query or not. + * @param trigger The trigger being used for this streaming query. It is not used to create the + * resolved [[WriteToStream]] node; rather, it is only used while checking the plan + * for unsupported operations, which happens during resolution. * @param inputQuery The analyzed query plan from the streaming DataFrame. * @param catalogAndIdent Catalog and identifier for the sink, set when it is a V2 catalog table */ @@ -51,7 +53,7 @@ case class WriteToStreamStatement( sink: Table, outputMode: OutputMode, hadoopConf: Configuration, - isContinuousTrigger: Boolean, + trigger: Trigger, inputQuery: LogicalPlan, catalogAndIdent: Option[(TableCatalog, Identifier)] = None, catalogTable: Option[CatalogTable] = None) extends UnaryNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 38e823a96cbf4..f336fbf5b4f31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3094,6 +3094,12 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(5000) + val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf( + "spark.sql.streaming.realTimeMode.allowlistCheck") + .doc("Whether to check all operators, sinks used in real-time mode are in the allowlist.") + .booleanConf + .createWithDefault(true) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 6ee19bab5180a..71643ce6a0e84 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -853,6 +853,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { Deduplicate(Seq(attribute), streamRelation)), outputMode = Append) } + /* + ======================================================================================= + REAL-TIME STREAMING + ======================================================================================= + */ + + assertNotSupportedForRealTime( + "real-time without operators - append mode", + streamRelation, Append, + "STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED" + // Map("outputMode" -> "Append") + ) + /* ======================================================================================= TESTING FUNCTIONS @@ -1017,6 +1030,30 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { } } + /** Assert that the logical plan is supported for real-time mode */ + def assertSupportedForRealTime(name: String, plan: LogicalPlan, outputMode: OutputMode): Unit = { + test(s"real-time trigger - $name: supported") { + UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode) + } + } + + /** Assert that the logical plan is not supported inside a streaming plan with the + * real-time trigger. + */ + def assertNotSupportedForRealTime( + name: String, + plan: LogicalPlan, + outputMode: OutputMode, + condition: String): Unit = { + testError( + s"real-time trigger - $name: not supported", + Seq("Streaming real-time mode"), + condition + ) { + UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode) + } + } + /** * Assert that the logical plan is not supported inside a streaming plan. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 471c5feadaabc..38483395ec8c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDat import org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2 import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -299,6 +300,14 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D recoverFromCheckpoint: Boolean = true, catalogAndIdent: Option[(TableCatalog, Identifier)] = None, catalogTable: Option[CatalogTable] = None): StreamingQuery = { + if (trigger.isInstanceOf[RealTimeTrigger]) { + RealTimeModeAllowlist.checkAllowedSink( + sink, + ds.sparkSession.sessionState.conf.getConf( + SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK) + ) + } + val useTempCheckpointLocation = DataStreamWriter.SOURCES_ALLOW_ONE_TIME_QUERY.contains(source) ds.sparkSession.sessionState.streamingQueryManager.startQuery( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala index bef09703025ef..72ae3b21d662a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala @@ -213,7 +213,7 @@ class StreamingQueryManager private[sql] ( sink, outputMode, df.sparkSession.sessionState.newHadoopConf(), - trigger.isInstanceOf[ContinuousTrigger], + trigger, analyzedPlan, catalogAndIdent, catalogTable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index d4f9dc8cea93a..cf2fca3d3cd8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} +import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeq, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler @@ -436,7 +436,10 @@ class MicroBatchExecution( } } - if (containsStatefulOperator(analyzedPlan)) { + if (trigger.isInstanceOf[RealTimeTrigger]) { + logWarning(log"Disabling AQE since AQE is not supported for Real-time Mode.") + sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + } else if (containsStatefulOperator(analyzedPlan)) { // SPARK-53941: We disable AQE for stateful workloads as of now. logWarning(log"Disabling AQE since AQE is not supported in stateful workloads.") sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") @@ -1042,6 +1045,14 @@ class MicroBatchExecution( markMicroBatchExecutionStart(execCtx) + if (trigger.isInstanceOf[RealTimeTrigger]) { + RealTimeModeAllowlist.checkAllowedPhysicalOperator( + execCtx.executionPlan.executedPlan, + sparkSession.sessionState.conf.getConf( + SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK) + ) + } + if (execCtx.previousContext.isEmpty) { purgeStatefulMetadataAsync(execCtx.executionPlan.executedPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala new file mode 100644 index 0000000000000..d342e4241f415 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.internal.{Logging, LogKeys, MessageWithContext} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec +import org.apache.spark.sql.execution.streaming.operators.stateful._ + +object RealTimeModeAllowlist extends Logging { + private val allowedSinks = Set( + "org.apache.spark.sql.execution.streaming.ConsoleTable$", + "org.apache.spark.sql.execution.streaming.sources.ContinuousMemorySink", + "org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable", + "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable" + ) + + private val allowedOperators = Set( + "org.apache.spark.sql.execution.AppendColumnsExec", + "org.apache.spark.sql.execution.CollectMetricsExec", + "org.apache.spark.sql.execution.ColumnarToRowExec", + "org.apache.spark.sql.execution.DeserializeToObjectExec", + "org.apache.spark.sql.execution.ExpandExec", + "org.apache.spark.sql.execution.FileSourceScanExec", + "org.apache.spark.sql.execution.FilterExec", + "org.apache.spark.sql.execution.GenerateExec", + "org.apache.spark.sql.execution.InputAdapter", + "org.apache.spark.sql.execution.LocalTableScanExec", + "org.apache.spark.sql.execution.MapElementsExec", + "org.apache.spark.sql.execution.MapPartitionsExec", + "org.apache.spark.sql.execution.PlanLater", + "org.apache.spark.sql.execution.ProjectExec", + "org.apache.spark.sql.execution.RangeExec", + "org.apache.spark.sql.execution.SerializeFromObjectExec", + "org.apache.spark.sql.execution.UnionExec", + "org.apache.spark.sql.execution.WholeStageCodegenExec", + "org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec", + "org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec", + "org.apache.spark.sql.execution.exchange.BroadcastExchangeExec", + "org.apache.spark.sql.execution.exchange.ReusedExchangeExec", + classOf[EventTimeWatermarkExec].getName + ) + + private def classNamesString(classNames: Seq[String]): MessageWithContext = { + val sortedClassNames = classNames.sorted + var message = log"${MDC(LogKeys.CLASS_NAME, sortedClassNames.head)}" + sortedClassNames.tail.foreach( + name => message += log", ${MDC(LogKeys.CLASS_NAME, name)}" + ) + if (sortedClassNames.size > 1) { + message + log" are" + } else { + message + log" is" + } + } + + private def notInRTMAllowlistException( + errorType: String, + classNames: Seq[String]): SparkIllegalArgumentException = { + assert(classNames.nonEmpty) + new SparkIllegalArgumentException( + errorClass = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST", + messageParameters = Map( + "errorType" -> errorType, + "message" -> classNamesString(classNames).message + ) + ) + } + + def checkAllowedSink(sink: Table, throwException: Boolean): Unit = { + if (!allowedSinks.contains(sink.getClass.getName)) { + if (throwException) { + throw notInRTMAllowlistException("sink", Seq(sink.getClass.getName)) + } else { + logWarning( + log"The sink: " + classNamesString(Seq(sink.getClass.getName)) + + log" not in the sink allowlist for Real-Time Mode." + ) + } + } + } + + // Collect ALL nodes whose entire subtree contains RealTimeStreamScanExec. + private def collectRealtimeNodes(root: SparkPlan): Seq[SparkPlan] = { + + def collectNodesWhoseSubtreeHasRTS(n: SparkPlan): (Boolean, List[SparkPlan]) = { + n match { + case _: RealTimeStreamScanExec => + // Subtree has RTS, but we don't collect the RTS node itself. + (true, Nil) + case _ if n.children.isEmpty => + (false, Nil) + case _ => + val kidResults = n.children.map(collectNodesWhoseSubtreeHasRTS) + val anyChildHasRTS = kidResults.exists(_._1) + val collectedKids = kidResults.iterator.flatMap(_._2).toList + val collectedHere = if (anyChildHasRTS) n :: collectedKids else collectedKids + (anyChildHasRTS, collectedHere) + } + } + + collectNodesWhoseSubtreeHasRTS(root)._2 + } + + def checkAllowedPhysicalOperator(operator: SparkPlan, throwException: Boolean): Unit = { + val nodesToCheck = collectRealtimeNodes(operator) + val violations = nodesToCheck + .collect { + case node => + if (allowedOperators.contains(node.getClass.getName)) { + None + } else { + Some(node.getClass.getName) + } + } + .flatten + .distinct + + if (violations.nonEmpty) { + if (throwException) { + throw notInRTMAllowlistException("operator", violations.toSet.toSeq) + } else { + logWarning( + log"The operator(s): " + classNamesString(violations.toSet.toSeq) + + log" not in the operator allowlist for Real-Time Mode." + ) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala index ee7bf67eb9121..86d48b1e88c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RealTimeTrigger} import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -48,7 +49,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] { } if (conf.isUnsupportedOperationCheckEnabled) { - if (s.sink.isInstanceOf[SupportsWrite] && s.isContinuousTrigger) { + if (s.trigger.isInstanceOf[RealTimeTrigger]) { + UnsupportedOperationChecker. + checkAdditionalRealTimeModeConstraints(s.inputQuery, s.outputMode) + } + + if (s.sink.isInstanceOf[SupportsWrite] && s.trigger.isInstanceOf[ContinuousTrigger]) { UnsupportedOperationChecker.checkForContinuous(s.inputQuery, s.outputMode) } else { UnsupportedOperationChecker.checkForStreaming(s.inputQuery, s.outputMode) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala new file mode 100644 index 0000000000000..77c90e1cd8181 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.concurrent.duration._ + +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.execution.streaming.LowLatencyMemoryStream +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf + +class StreamRealTimeModeAllowlistSuite extends StreamRealTimeModeE2ESuiteBase { + import testImplicits._ + + test("rtm source allowlist") { + val query = spark.readStream + .format("rate") + .option("numPartitions", 1) + .load() + .writeStream + .format("console") + .outputMode("update") + .trigger(defaultTrigger) + .start() + + eventually(timeout(60.seconds)) { + checkError( + exception = query.exception.get.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED", + parameters = Map( + "className" -> + "org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchStream") + ) + } + } + + test("rtm operator allowlist") { + withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { + val inputData = LowLatencyMemoryStream[Int](2) + val staticDf = spark.range(1, 31, 1, 10).selectExpr("id AS join_key", "id AS join_value") + + val df = inputData.toDF() + .select(col("value").as("key"), col("value").as("value")) + .join(staticDf, col("value") === col("join_key")) + .select( + concat(col("key"), lit("-"), col("value"), lit("-"), col("join_value")).as("output")) + + val query = runStreamingQuery("operation_allowlist", df) + + eventually(timeout(60.seconds)) { + checkError( + exception = query.exception.get.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST", + parameters = Map( + "errorType" -> "operator", + "message" -> ( + "org.apache.spark.sql.execution.SortExec, " + + "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec, " + + "org.apache.spark.sql.execution.joins.SortMergeJoinExec are" + ) + ) + ) + } + } + } + + test("rtm sink allowlist") { + val read = LowLatencyMemoryStream[Int](2) + + val query = read + .toDF() + .writeStream + .format("noop") + .outputMode(OutputMode.Update()) + .trigger(defaultTrigger) + .queryName("rtm_sink_allowlist") + + checkError( + exception = intercept[SparkIllegalArgumentException] { + query.start() + }, + condition = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST", + parameters = Map( + "errorType" -> "sink", + "message" -> "org.apache.spark.sql.execution.datasources.noop.NoopTable$ is" + )) + + withSQLConf(SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK.key -> "false") { + val tmp = query.start() + Thread.sleep(5000) + tmp.stop() + } + } + +// test("exactly once sink not supported") { +// val read = LowLatencyMemoryStream[Int](2) +// +// val query = read +// .toDF() +// .writeStream +// .format("memory") +// .outputMode(OutputMode.Update()) +// .trigger(defaultTrigger) +// .queryName("rtm_sink_allowlist") +// +// checkError( +// exception = intercept[SparkUnsupportedOperationException ] { +// query.start() +// }, +// condition = "STREAMING_REAL_TIME_MODE.EXACTLY_ONCE_SINK_NOT_SUPPORTED", +// parameters = Map( +// "sink" -> "org.apache.spark.sql.execution.streaming.sources.MemorySink" +// )) +// +// val tmp = query.option("mode", "atLeastOnce").start() +// tmp.stop() +// } +} From caf3a50eefc3932261dff7eae821b6f5c3a5735c Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 6 Nov 2025 16:32:46 -0800 Subject: [PATCH 2/3] adding additional tests and cleaning up --- .../analysis/UnsupportedOperationsSuite.scala | 7 +- .../runtime/RealTimeModeAllowlist.scala | 1 + .../StreamRealTimeModeAllowlistSuite.scala | 76 +++++++++++++------ 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 71643ce6a0e84..855be987291e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -863,7 +863,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { "real-time without operators - append mode", streamRelation, Append, "STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED" - // Map("outputMode" -> "Append") + ) + + assertSupportedForRealTime( + "real-time with stream-batch join - update mode", + streamRelation.join(batchRelation, joinType = Inner), + Update ) /* diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala index d342e4241f415..8ed128e32abd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala @@ -54,6 +54,7 @@ object RealTimeModeAllowlist extends Logging { "org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec", "org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec", "org.apache.spark.sql.execution.exchange.BroadcastExchangeExec", + "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec", "org.apache.spark.sql.execution.exchange.ReusedExchangeExec", classOf[EventTimeWatermarkExec].getName ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala index 77c90e1cd8181..59efa8c33435f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala @@ -107,27 +107,57 @@ class StreamRealTimeModeAllowlistSuite extends StreamRealTimeModeE2ESuiteBase { } } -// test("exactly once sink not supported") { -// val read = LowLatencyMemoryStream[Int](2) -// -// val query = read -// .toDF() -// .writeStream -// .format("memory") -// .outputMode(OutputMode.Update()) -// .trigger(defaultTrigger) -// .queryName("rtm_sink_allowlist") -// -// checkError( -// exception = intercept[SparkUnsupportedOperationException ] { -// query.start() -// }, -// condition = "STREAMING_REAL_TIME_MODE.EXACTLY_ONCE_SINK_NOT_SUPPORTED", -// parameters = Map( -// "sink" -> "org.apache.spark.sql.execution.streaming.sources.MemorySink" -// )) -// -// val tmp = query.option("mode", "atLeastOnce").start() -// tmp.stop() -// } + // TODO : Remove this test after RTM can shuffle to multiple stages + test("repartition not allowed") { + val inputData = LowLatencyMemoryStream[Int](2) + + val df = inputData.toDF() + .select(col("value").as("key")) + .repartition(4, col("key")) + + val query = runStreamingQuery("repartition_allowlist", df) + + eventually(timeout(60.seconds)) { + checkError( + exception = query.exception.get.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST", + parameters = Map( + "errorType" -> "operator", + "message" -> ( + "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec is" + ) + ) + ) + } + } + + // TODO : Remove this test after RTM supports stateful queries + test("stateful queries not allowed") { + val inputData = LowLatencyMemoryStream[Int](2) + + val df = inputData.toDF() + .select(col("value").as("key")) + .groupBy(col("key")) + .count() + .select(concat(col("key"), lit("-"), col("count"))) + + val query = runStreamingQuery("repartition_allowlist", df) + + eventually(timeout(60.seconds)) { + checkError( + exception = query.exception.get.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST", + parameters = Map( + "errorType" -> "operator", + "message" -> ( + "org.apache.spark.sql.execution.aggregate.HashAggregateExec, " + + "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec, " + + "org.apache.spark.sql.execution.streaming" + + ".operators.stateful.StateStoreRestoreExec, " + + "org.apache.spark.sql.execution.streaming.operators.stateful.StateStoreSaveExec are" + ) + ) + ) + } + } } From 9703b2ec80138ff03b5f00ee5551232428798040 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 6 Nov 2025 23:44:05 -0800 Subject: [PATCH 3/3] addressing comments --- .../apache/spark/sql/internal/SQLConf.scala | 1 + .../analysis/UnsupportedOperationsSuite.scala | 32 +++++++++++-------- .../runtime/RealTimeModeAllowlist.scala | 2 +- .../StreamRealTimeModeAllowlistSuite.scala | 6 ++-- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f336fbf5b4f31..6dfacb37fc1d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3097,6 +3097,7 @@ object SQLConf { val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf( "spark.sql.streaming.realTimeMode.allowlistCheck") .doc("Whether to check all operators, sinks used in real-time mode are in the allowlist.") + .version("4.1.0") .booleanConf .createWithDefault(true) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 855be987291e8..425df0856a58a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -854,22 +854,25 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { } /* - ======================================================================================= + ======================================================================================= REAL-TIME STREAMING - ======================================================================================= - */ + ======================================================================================= + */ - assertNotSupportedForRealTime( - "real-time without operators - append mode", - streamRelation, Append, - "STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED" - ) + { + assertNotSupportedForRealTime( + "real-time without operators - append mode", + streamRelation, + Append, + "STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED" + ) - assertSupportedForRealTime( - "real-time with stream-batch join - update mode", - streamRelation.join(batchRelation, joinType = Inner), - Update - ) + assertSupportedForRealTime( + "real-time with stream-batch join - update mode", + streamRelation.join(batchRelation, joinType = Inner), + Update + ) + } /* ======================================================================================= @@ -1042,7 +1045,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { } } - /** Assert that the logical plan is not supported inside a streaming plan with the + /** + * Assert that the logical plan is not supported inside a streaming plan with the * real-time trigger. */ def assertNotSupportedForRealTime( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala index 8ed128e32abd9..443c7fa1a1cf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/RealTimeModeAllowlist.scala @@ -54,8 +54,8 @@ object RealTimeModeAllowlist extends Logging { "org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec", "org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec", "org.apache.spark.sql.execution.exchange.BroadcastExchangeExec", - "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec", "org.apache.spark.sql.execution.exchange.ReusedExchangeExec", + "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec", classOf[EventTimeWatermarkExec].getName ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala index 59efa8c33435f..4306e5a860120 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeAllowlistSuite.scala @@ -50,7 +50,7 @@ class StreamRealTimeModeAllowlistSuite extends StreamRealTimeModeE2ESuiteBase { } test("rtm operator allowlist") { - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { val inputData = LowLatencyMemoryStream[Int](2) val staticDf = spark.range(1, 31, 1, 10).selectExpr("id AS join_key", "id AS join_value") @@ -107,7 +107,7 @@ class StreamRealTimeModeAllowlistSuite extends StreamRealTimeModeE2ESuiteBase { } } - // TODO : Remove this test after RTM can shuffle to multiple stages + // TODO(SPARK-54237) : Remove this test after RTM can shuffle to multiple stages test("repartition not allowed") { val inputData = LowLatencyMemoryStream[Int](2) @@ -131,7 +131,7 @@ class StreamRealTimeModeAllowlistSuite extends StreamRealTimeModeE2ESuiteBase { } } - // TODO : Remove this test after RTM supports stateful queries + // TODO(SPARK-54236) : Remove this test after RTM supports stateful queries test("stateful queries not allowed") { val inputData = LowLatencyMemoryStream[Int](2)