Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5671,6 +5671,21 @@
"message" : [
"The input stream <className> is not supported in Real-time Mode."
]
},
"OPERATOR_OR_SINK_NOT_IN_ALLOWLIST" : {
"message" : [
"The <errorType>(s): <message> not in the <errorType> allowlist for Real-Time Mode. To bypass this check, set spark.sql.streaming.realTimeMode.allowlistCheck to false. By changing this, you agree to run the query at your own risk."
]
},
"OUTPUT_MODE_NOT_SUPPORTED" : {
"message" : [
"The output mode <outputMode> is not supported. To work around this limitation, set the output mode to Update. In the future, <outputMode> may be supported."
]
},
"SINK_NOT_SUPPORTED" : {
"message" : [
"The <className> sink is currently not supported. See the Real-Time Mode User Guide for a list of supported sinks."
]
}
},
"sqlState" : "0A000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,25 @@ object UnsupportedOperationChecker extends Logging {
}
}

// Verifies that a query using real-time mode is valid. It is meant to be used in addition to
// the checkForStreaming method: for this reason, we call this method check *additional*
// real-time mode constraints.
//
// It should be called during resolution of the WriteToStreamStatement if and only if
// the query is using the real-time trigger.
def checkAdditionalRealTimeModeConstraints(plan: LogicalPlan, outputMode: OutputMode): Unit = {
if (outputMode != InternalOutputModes.Update) {
throwRealTimeError("OUTPUT_MODE_NOT_SUPPORTED", Map("outputMode" -> outputMode.toString))
}
}

private def throwRealTimeError(subClass: String, args: Map[String, String]): Unit = {
throw new AnalysisException(
errorClass = s"STREAMING_REAL_TIME_MODE.$subClass",
messageParameters = args
)
}

private def throwErrorIf(
condition: Boolean,
msg: String)(implicit operator: LogicalPlan): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

/**
* A statement for Stream writing. It contains all neccessary param and will be resolved in the
Expand All @@ -39,7 +39,9 @@ import org.apache.spark.sql.streaming.OutputMode
* @param sink Sink to write the streaming outputs.
* @param outputMode Output mode for the sink.
* @param hadoopConf The Hadoop Configuration to get a FileSystem instance
* @param isContinuousTrigger Whether the statement is triggered by a continuous query or not.
* @param trigger The trigger being used for this streaming query. It is not used to create the
* resolved [[WriteToStream]] node; rather, it is only used while checking the plan
* for unsupported operations, which happens during resolution.
* @param inputQuery The analyzed query plan from the streaming DataFrame.
* @param catalogAndIdent Catalog and identifier for the sink, set when it is a V2 catalog table
*/
Expand All @@ -51,7 +53,7 @@ case class WriteToStreamStatement(
sink: Table,
outputMode: OutputMode,
hadoopConf: Configuration,
isContinuousTrigger: Boolean,
trigger: Trigger,
inputQuery: LogicalPlan,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None) extends UnaryNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3094,6 +3094,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(5000)

val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf(
"spark.sql.streaming.realTimeMode.allowlistCheck")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation looks a little wrong to me for all realTimeMode configurations, @jerrypeng and @viirya .

  • spark.sql.streaming.realTimeMode.minBatchDuration
  • spark.sql.streaming.realTimeMode.allowlistCheck

It would be great if you can match with the other code around here.

.doc("Whether to check all operators, sinks used in real-time mode are in the allowlist.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a version field.

.version("4.1.0")

.version("4.1.0")
.booleanConf
.createWithDefault(true)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
Deduplicate(Seq(attribute), streamRelation)), outputMode = Append)
}

/*
=======================================================================================
REAL-TIME STREAMING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, for the actual test methods, it seems that the following {...} grouping is used in this file, doesn't it?

// REAL-TIME STREAMING
{
   ...
}

=======================================================================================
*/

{
assertNotSupportedForRealTime(
"real-time without operators - append mode",
streamRelation,
Append,
"STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED"
)

assertSupportedForRealTime(
"real-time with stream-batch join - update mode",
streamRelation.join(batchRelation, joinType = Inner),
Update
)
}

/*
=======================================================================================
TESTING FUNCTIONS
Expand Down Expand Up @@ -1017,6 +1038,31 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
}
}

/** Assert that the logical plan is supported for real-time mode */
def assertSupportedForRealTime(name: String, plan: LogicalPlan, outputMode: OutputMode): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, where is it used? I don't find any.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah forgot to push a commit. The test is added

test(s"real-time trigger - $name: supported") {
UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode)
}
}

/**
* Assert that the logical plan is not supported inside a streaming plan with the
* real-time trigger.
*/
def assertNotSupportedForRealTime(
name: String,
plan: LogicalPlan,
outputMode: OutputMode,
condition: String): Unit = {
testError(
s"real-time trigger - $name: not supported",
Seq("Streaming real-time mode"),
condition
) {
UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode)
}
}

/**
* Assert that the logical plan is not supported inside a streaming plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDat
import org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -299,6 +300,14 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
recoverFromCheckpoint: Boolean = true,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
if (trigger.isInstanceOf[RealTimeTrigger]) {
RealTimeModeAllowlist.checkAllowedSink(
sink,
ds.sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK)
)
}

val useTempCheckpointLocation = DataStreamWriter.SOURCES_ALLOW_ONE_TIME_QUERY.contains(source)

ds.sparkSession.sessionState.streamingQueryManager.startQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class StreamingQueryManager private[sql] (
sink,
outputMode,
df.sparkSession.sessionState.newHadoopConf(),
trigger.isInstanceOf[ContinuousTrigger],
trigger,
analyzedPlan,
catalogAndIdent,
catalogTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeq, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
Expand Down Expand Up @@ -436,7 +436,10 @@ class MicroBatchExecution(
}
}

if (containsStatefulOperator(analyzedPlan)) {
if (trigger.isInstanceOf[RealTimeTrigger]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR you recently added AQE support streaming queries. Can you take a look to see if this logic is appropriate to disable AQE for RTM quieries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should do the thing. Looks good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

logWarning(log"Disabling AQE since AQE is not supported for Real-time Mode.")
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
} else if (containsStatefulOperator(analyzedPlan)) {
// SPARK-53941: We disable AQE for stateful workloads as of now.
logWarning(log"Disabling AQE since AQE is not supported in stateful workloads.")
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
Expand Down Expand Up @@ -1042,6 +1045,14 @@ class MicroBatchExecution(

markMicroBatchExecutionStart(execCtx)

if (trigger.isInstanceOf[RealTimeTrigger]) {
RealTimeModeAllowlist.checkAllowedPhysicalOperator(
execCtx.executionPlan.executedPlan,
sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK)
)
}

if (execCtx.previousContext.isEmpty) {
purgeStatefulMetadataAsync(execCtx.executionPlan.executedPlan)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.internal.{Logging, LogKeys, MessageWithContext}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec
import org.apache.spark.sql.execution.streaming.operators.stateful._

object RealTimeModeAllowlist extends Logging {
private val allowedSinks = Set(
"org.apache.spark.sql.execution.streaming.ConsoleTable$",
"org.apache.spark.sql.execution.streaming.sources.ContinuousMemorySink",
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable",
"org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable"
)

private val allowedOperators = Set(
"org.apache.spark.sql.execution.AppendColumnsExec",
"org.apache.spark.sql.execution.CollectMetricsExec",
"org.apache.spark.sql.execution.ColumnarToRowExec",
"org.apache.spark.sql.execution.DeserializeToObjectExec",
"org.apache.spark.sql.execution.ExpandExec",
"org.apache.spark.sql.execution.FileSourceScanExec",
"org.apache.spark.sql.execution.FilterExec",
"org.apache.spark.sql.execution.GenerateExec",
"org.apache.spark.sql.execution.InputAdapter",
"org.apache.spark.sql.execution.LocalTableScanExec",
"org.apache.spark.sql.execution.MapElementsExec",
"org.apache.spark.sql.execution.MapPartitionsExec",
"org.apache.spark.sql.execution.PlanLater",
"org.apache.spark.sql.execution.ProjectExec",
"org.apache.spark.sql.execution.RangeExec",
"org.apache.spark.sql.execution.SerializeFromObjectExec",
"org.apache.spark.sql.execution.UnionExec",
"org.apache.spark.sql.execution.WholeStageCodegenExec",
"org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec",
"org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec",
"org.apache.spark.sql.execution.exchange.BroadcastExchangeExec",
"org.apache.spark.sql.execution.exchange.ReusedExchangeExec",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this was initially designed to be a sorted list alphabetically in the code side. Could you change like the following because it looks like the only exceptions?

-    "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
     "org.apache.spark.sql.execution.exchange.ReusedExchangeExec",
+    "org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",

"org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
classOf[EventTimeWatermarkExec].getName
)

private def classNamesString(classNames: Seq[String]): MessageWithContext = {
val sortedClassNames = classNames.sorted
var message = log"${MDC(LogKeys.CLASS_NAME, sortedClassNames.head)}"
sortedClassNames.tail.foreach(
name => message += log", ${MDC(LogKeys.CLASS_NAME, name)}"
)
if (sortedClassNames.size > 1) {
message + log" are"
} else {
message + log" is"
}
}

private def notInRTMAllowlistException(
errorType: String,
classNames: Seq[String]): SparkIllegalArgumentException = {
assert(classNames.nonEmpty)
new SparkIllegalArgumentException(
errorClass = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST",
messageParameters = Map(
"errorType" -> errorType,
"message" -> classNamesString(classNames).message
)
)
}

def checkAllowedSink(sink: Table, throwException: Boolean): Unit = {
if (!allowedSinks.contains(sink.getClass.getName)) {
if (throwException) {
throw notInRTMAllowlistException("sink", Seq(sink.getClass.getName))
} else {
logWarning(
log"The sink: " + classNamesString(Seq(sink.getClass.getName)) +
log" not in the sink allowlist for Real-Time Mode."
)
}
}
}

// Collect ALL nodes whose entire subtree contains RealTimeStreamScanExec.
private def collectRealtimeNodes(root: SparkPlan): Seq[SparkPlan] = {

def collectNodesWhoseSubtreeHasRTS(n: SparkPlan): (Boolean, List[SparkPlan]) = {
n match {
case _: RealTimeStreamScanExec =>
// Subtree has RTS, but we don't collect the RTS node itself.
(true, Nil)
case _ if n.children.isEmpty =>
(false, Nil)
case _ =>
val kidResults = n.children.map(collectNodesWhoseSubtreeHasRTS)
val anyChildHasRTS = kidResults.exists(_._1)
val collectedKids = kidResults.iterator.flatMap(_._2).toList
val collectedHere = if (anyChildHasRTS) n :: collectedKids else collectedKids
(anyChildHasRTS, collectedHere)
}
}

collectNodesWhoseSubtreeHasRTS(root)._2
}

def checkAllowedPhysicalOperator(operator: SparkPlan, throwException: Boolean): Unit = {
val nodesToCheck = collectRealtimeNodes(operator)
val violations = nodesToCheck
.collect {
case node =>
if (allowedOperators.contains(node.getClass.getName)) {
None
} else {
Some(node.getClass.getName)
}
}
.flatten
.distinct

if (violations.nonEmpty) {
if (throwException) {
throw notInRTMAllowlistException("operator", violations.toSet.toSeq)
} else {
logWarning(
log"The operator(s): " + classNamesString(violations.toSet.toSeq) +
log" not in the operator allowlist for Real-Time Mode."
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RealTimeTrigger}
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand All @@ -48,7 +49,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
}

if (conf.isUnsupportedOperationCheckEnabled) {
if (s.sink.isInstanceOf[SupportsWrite] && s.isContinuousTrigger) {
if (s.trigger.isInstanceOf[RealTimeTrigger]) {
UnsupportedOperationChecker.
checkAdditionalRealTimeModeConstraints(s.inputQuery, s.outputMode)
}

if (s.sink.isInstanceOf[SupportsWrite] && s.trigger.isInstanceOf[ContinuousTrigger]) {
UnsupportedOperationChecker.checkForContinuous(s.inputQuery, s.outputMode)
} else {
UnsupportedOperationChecker.checkForStreaming(s.inputQuery, s.outputMode)
Expand Down
Loading