From 080a2698928366e4a17d165cebebf4f44c797f40 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 6 Jan 2017 15:01:04 -0800 Subject: [PATCH 1/3] Set UncaughtExceptionHandler in onQueryStarted to ensure catching fatal errors during query initialization --- .../spark/sql/streaming/StreamTest.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 709050d29bb00..4c23797e277d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -248,6 +248,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { @volatile var streamThreadDeathCause: Throwable = null + // Set UncaughtExceptionHandler in `onQueryStarted` so that we can ensure catching fatal errors + // during query initialization. + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = { + Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + streamThreadDeathCause = e + } + }) + } + + override def onQueryProgress(event: QueryProgressEvent): Unit = {} + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + sparkSession.streams.addListener(listener) // If the test doesn't manually start the stream, we do it automatically at the beginning. val startedManually = @@ -364,12 +379,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { triggerClock = triggerClock) .asInstanceOf[StreamingQueryWrapper] .streamingQuery - currentStream.microBatchThread.setUncaughtExceptionHandler( - new UncaughtExceptionHandler { - override def uncaughtException(t: Thread, e: Throwable): Unit = { - streamThreadDeathCause = e - } - }) // Wait until the initialization finishes, because some tests need to use `logicalPlan` // after starting the query. currentStream.awaitInitialization(streamingTimeout.toMillis) @@ -545,6 +554,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case (key, Some(value)) => sparkSession.conf.set(key, value) case (key, None) => sparkSession.conf.unset(key) } + sparkSession.streams.removeListener(listener) } } From 59a11611999fddd0670218b16b991e691bcc574e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 6 Jan 2017 16:26:58 -0800 Subject: [PATCH 2/3] Address TD's comments --- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 7 +++++-- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 34b0ee8064c3f..a3c89ad6548d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -238,7 +238,7 @@ class StreamSuite extends StreamTest { } } - testQuietly("fatal errors from a source should be sent to the user") { + testQuietly("handle fatal errors thrown from the stream thread correctly") { for (e <- Seq( new VirtualMachineError {}, new ThreadDeath, @@ -259,8 +259,11 @@ class StreamSuite extends StreamTest { override def stop(): Unit = {} } val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) - // These error are fatal errors and should be ignored in `testStream` to not fail the test. testStream(df)( + // `ExpectFailure(isFatalError = true)` verifies two things: + // - Fatal errors can be propagated to `StreamingQuery.exception` and + // `StreamingQuery.awaitTermination` like non fatal errors. + // - Fatal errors can be caught by UncaughtExceptionHandler. ExpectFailure(isFatalError = true)(ClassTag(e.getClass)) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 4c23797e277d8..3fe99c3ef0e9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -235,7 +235,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { */ def testStream( _stream: Dataset[_], - outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = { + outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized { + // `synchronized` is added to prevent the user from calling `testStream` concurrently because + // this method uses `StreamingQueryListener` and it may not work correctly when `testStream` + // runs concurrently. val stream = _stream.toDF() val sparkSession = stream.sparkSession // use the session in DF, not the default session From fca042436e97667187f9b7a63207901f436b5aae Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 9 Jan 2017 10:54:31 -0800 Subject: [PATCH 3/3] Update comments --- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index a3c89ad6548d0..e964e646d22aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -238,7 +238,7 @@ class StreamSuite extends StreamTest { } } - testQuietly("handle fatal errors thrown from the stream thread correctly") { + testQuietly("handle fatal errors thrown from the stream thread") { for (e <- Seq( new VirtualMachineError {}, new ThreadDeath, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3fe99c3ef0e9e..4aa4100522cde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -236,9 +236,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def testStream( _stream: Dataset[_], outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized { - // `synchronized` is added to prevent the user from calling `testStream` concurrently because - // this method uses `StreamingQueryListener` and it may not work correctly when `testStream` - // runs concurrently. + // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently + // because this method assumes there is only one active query in its `StreamingQueryListener` + // and it may not work correctly when multiple `testStream`s run concurrently. val stream = _stream.toDF() val sparkSession = stream.sparkSession // use the session in DF, not the default session @@ -255,6 +255,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { // during query initialization. val listener = new StreamingQueryListener { override def onQueryStarted(event: QueryStartedEvent): Unit = { + // Note: this assumes there is only one query active in the `testStream` method. Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { streamThreadDeathCause = e