Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class StreamSuite extends StreamTest {
}
}

testQuietly("fatal errors from a source should be sent to the user") {
testQuietly("handle fatal errors thrown from the stream thread") {
for (e <- Seq(
new VirtualMachineError {},
new ThreadDeath,
Expand All @@ -259,8 +259,11 @@ class StreamSuite extends StreamTest {
override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
// These error are fatal errors and should be ignored in `testStream` to not fail the test.
testStream(df)(
// `ExpectFailure(isFatalError = true)` verifies two things:
// - Fatal errors can be propagated to `StreamingQuery.exception` and
// `StreamingQuery.awaitTermination` like non fatal errors.
// - Fatal errors can be caught by UncaughtExceptionHandler.
ExpectFailure(isFatalError = true)(ClassTag(e.getClass))
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
*/
def testStream(
_stream: Dataset[_],
outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = {
outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized {
// `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently
// because this method assumes there is only one active query in its `StreamingQueryListener`
// and it may not work correctly when multiple `testStream`s run concurrently.

val stream = _stream.toDF()
val sparkSession = stream.sparkSession // use the session in DF, not the default session
Expand All @@ -248,6 +251,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {

@volatile
var streamThreadDeathCause: Throwable = null
// Set UncaughtExceptionHandler in `onQueryStarted` so that we can ensure catching fatal errors
// during query initialization.
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
// Note: this assumes there is only one query active in the `testStream` method.
Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
streamThreadDeathCause = e
}
})
}

override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
sparkSession.streams.addListener(listener)

// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
Expand Down Expand Up @@ -364,12 +383,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
triggerClock = triggerClock)
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
streamThreadDeathCause = e
}
})
// Wait until the initialization finishes, because some tests need to use `logicalPlan`
// after starting the query.
currentStream.awaitInitialization(streamingTimeout.toMillis)
Expand Down Expand Up @@ -545,6 +558,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case (key, Some(value)) => sparkSession.conf.set(key, value)
case (key, None) => sparkSession.conf.unset(key)
}
sparkSession.streams.removeListener(listener)
}
}

Expand Down