From 03a41d1204577102779bcf4ecaf1a5613d0f8ecd Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Tue, 17 Dec 2019 17:07:12 +0800 Subject: [PATCH 1/6] Fix race condition between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError --- .../apache/spark/SparkFirehoseListener.java | 8 +++ .../spark/scheduler/AsyncEventQueue.scala | 20 +++++- .../spark/scheduler/LiveListenerBus.scala | 6 ++ .../spark/scheduler/SparkListener.scala | 2 + .../spark/scheduler/SparkListenerSuite.scala | 69 +++++++++++++++++++ 5 files changed, 102 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 731f6fc767dfd..66ba455d1c36b 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -30,6 +30,14 @@ */ public class SparkFirehoseListener implements SparkListenerInterface { + @Override + public boolean dead() { + return false; + } + + @Override + public void dead_$eq(boolean dead) { } + public void onEvent(SparkListenerEvent event) { } @Override diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 1bcddaceb3576..83570a40b7ab9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -201,10 +201,24 @@ private class AsyncEventQueue( true } + override def doPostEvent(listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { + // If listener is dead, we don't post any event to it. + if (!listener.dead) { + super.doPostEvent(listener, event) + } + } + override def removeListenerOnError(listener: SparkListenerInterface): Unit = { - // the listener failed in an unrecoverably way, we want to remove it from the entire - // LiveListenerBus (potentially stopping a queue if it is empty) - bus.removeListener(listener) + if (bus.isInStop) { + // If bus is in the progress of stop, we just mark the listener as dead instead of removing + // via calling `bus.removeListener` to avoid race condition + // dead listeners will be removed eventually in `bus.stop` + listener.dead = true + } else { + // the listener failed in an unrecoverably way, we want to remove it from the entire + // LiveListenerBus (potentially stopping a queue if it is empty) + bus.removeListener(listener) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index bbbddd86cad39..a12e48fa0cf93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -53,6 +53,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) { private val started = new AtomicBoolean(false) // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) + // Indicate `stop()` is called but not finished + private val inStop = new AtomicBoolean(false) + + private[spark] def isInStop: Boolean = inStop.get() /** A counter for dropped events. It will be reset every time we log it. */ private val droppedEventsCounter = new AtomicLong(0L) @@ -226,10 +230,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) { return } + inStop.set(true) synchronized { queues.asScala.foreach(_.stop()) queues.clear() } + inStop.set(false) } // For testing only. diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c150b0341500c..9335fdb268758 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -215,6 +215,8 @@ case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEven */ private[spark] trait SparkListenerInterface { + @volatile var dead = false + /** * Called when a stage completes successfully or fails, with information on the completed stage. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 7221623f89e1b..e5a1478667761 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -529,6 +529,46 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + Seq(true, false).foreach { throwInterruptedException => + val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" + test(s"SPARK-30285: Fix race condition in AsyncEventQueue.removeListenerOnError: $suffix") { + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3) + bus.addToSharedQueue(counter1) + bus.addToSharedQueue(interruptingListener) + bus.addToEventLogQueue(counter2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, EVENT_LOG_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[DelayInterruptingJobCounter]().size === 1) + + bus.start(mockSparkContext, mockMetricsSystem) + + (0 until 5).foreach { jobId => + bus.post(SparkListenerJobEnd(jobId, jobCompletionTime, JobSucceeded)) + } + + // Call bus.stop in a separate thread, otherwise we will block here until bus is stopped + val stoppingThread = new Thread(() => { + bus.stop() + }) + stoppingThread.start() + // Notify interrupting listener starts to work + interruptingListener.sleep = false + // Wait for bus to stop + stoppingThread.join() + + // All queues are removed + assert(bus.activeQueues() === Set.empty) + assert(counter1.count === 5) + assert(counter2.count === 5) + assert(interruptingListener.count === 3) + } + } + test("event queue size can be configued through spark conf") { // configure the shared queue size to be 1, event log queue size to be 2, // and listner bus event queue size to be 5 @@ -627,6 +667,35 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } } + + /** + * A simple listener that works as follows: + * 1. sleep and wait when `sleep` is true + * 2. when `sleep` is false, start to work: + * if it is interruptOnJobId, interrupt + * else count SparkListenerJobEnd numbers + */ + private class DelayInterruptingJobCounter( + val throwInterruptedException: Boolean, + interruptOnJobId: Int) extends SparkListener { + @volatile var sleep = true + var count = 0 + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + while (sleep) { + Thread.sleep(10) + } + if (interruptOnJobId == jobEnd.jobId) { + if (throwInterruptedException) { + throw new InterruptedException("got interrupted") + } else { + Thread.currentThread().interrupt() + } + } else { + count += 1 + } + } + } } // These classes can't be declared inside of the SparkListenerSuite class because we don't want From 781caba4e6890b2d26dd6d3618d581fae18a04cf Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Wed, 18 Dec 2019 10:19:47 +0800 Subject: [PATCH 2/6] Fix UT --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index e5a1478667761..4dbc807b6138e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -677,7 +677,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match */ private class DelayInterruptingJobCounter( val throwInterruptedException: Boolean, - interruptOnJobId: Int) extends SparkListener { + val interruptOnJobId: Int) extends SparkListener { @volatile var sleep = true var count = 0 From 7b8b1fa7ddd70d039cbb020cf94af796fb733681 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Wed, 18 Dec 2019 10:18:12 +0800 Subject: [PATCH 3/6] Address review feedback --- .../main/java/org/apache/spark/SparkFirehoseListener.java | 8 ++++++-- .../org/apache/spark/scheduler/AsyncEventQueue.scala | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 66ba455d1c36b..22635361a4e5d 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -30,13 +30,17 @@ */ public class SparkFirehoseListener implements SparkListenerInterface { + protected volatile boolean dead; + @Override public boolean dead() { - return false; + return dead; } @Override - public void dead_$eq(boolean dead) { } + public void dead_$eq(boolean dead) { + this.dead = dead; + } public void onEvent(SparkListenerEvent event) { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 83570a40b7ab9..b8ad3bbcaad7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -210,9 +210,9 @@ private class AsyncEventQueue( override def removeListenerOnError(listener: SparkListenerInterface): Unit = { if (bus.isInStop) { - // If bus is in the progress of stop, we just mark the listener as dead instead of removing - // via calling `bus.removeListener` to avoid race condition - // dead listeners will be removed eventually in `bus.stop` + // If we're in the middle of stopping the bus, we just mark the listener as dead, + // instead of removing, to avoid a deadlock. + // Dead listeners will be removed eventually in `bus.stop`. listener.dead = true } else { // the listener failed in an unrecoverably way, we want to remove it from the entire From c2afd632e6d00f693e626e36ae430ceea5b78c47 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Wed, 18 Dec 2019 12:50:20 +0800 Subject: [PATCH 4/6] Fix UT --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 4dbc807b6138e..bf2573b5bd2d3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -531,7 +531,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match Seq(true, false).foreach { throwInterruptedException => val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" - test(s"SPARK-30285: Fix race condition in AsyncEventQueue.removeListenerOnError: $suffix") { + test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") { val conf = new SparkConf(false) .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) val bus = new LiveListenerBus(conf) From 3260be14b30d7d277441066a2e4dc897cba08522 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Fri, 20 Dec 2019 15:24:03 +0800 Subject: [PATCH 5/6] Remove synchronized in LiveListenerBus.stop --- .../apache/spark/SparkFirehoseListener.java | 12 ----------- .../spark/scheduler/AsyncEventQueue.scala | 20 +++---------------- .../spark/scheduler/LiveListenerBus.scala | 12 ++--------- .../spark/scheduler/SparkListener.scala | 2 -- 4 files changed, 5 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 22635361a4e5d..731f6fc767dfd 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -30,18 +30,6 @@ */ public class SparkFirehoseListener implements SparkListenerInterface { - protected volatile boolean dead; - - @Override - public boolean dead() { - return dead; - } - - @Override - public void dead_$eq(boolean dead) { - this.dead = dead; - } - public void onEvent(SparkListenerEvent event) { } @Override diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index b8ad3bbcaad7e..1bcddaceb3576 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -201,24 +201,10 @@ private class AsyncEventQueue( true } - override def doPostEvent(listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { - // If listener is dead, we don't post any event to it. - if (!listener.dead) { - super.doPostEvent(listener, event) - } - } - override def removeListenerOnError(listener: SparkListenerInterface): Unit = { - if (bus.isInStop) { - // If we're in the middle of stopping the bus, we just mark the listener as dead, - // instead of removing, to avoid a deadlock. - // Dead listeners will be removed eventually in `bus.stop`. - listener.dead = true - } else { - // the listener failed in an unrecoverably way, we want to remove it from the entire - // LiveListenerBus (potentially stopping a queue if it is empty) - bus.removeListener(listener) - } + // the listener failed in an unrecoverably way, we want to remove it from the entire + // LiveListenerBus (potentially stopping a queue if it is empty) + bus.removeListener(listener) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index a12e48fa0cf93..95b0096cade38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -53,10 +53,6 @@ private[spark] class LiveListenerBus(conf: SparkConf) { private val started = new AtomicBoolean(false) // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) - // Indicate `stop()` is called but not finished - private val inStop = new AtomicBoolean(false) - - private[spark] def isInStop: Boolean = inStop.get() /** A counter for dropped events. It will be reset every time we log it. */ private val droppedEventsCounter = new AtomicLong(0L) @@ -230,12 +226,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) { return } - inStop.set(true) - synchronized { - queues.asScala.foreach(_.stop()) - queues.clear() - } - inStop.set(false) + queues.asScala.foreach(_.stop()) + queues.clear() } // For testing only. diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 9335fdb268758..c150b0341500c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -215,8 +215,6 @@ case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEven */ private[spark] trait SparkListenerInterface { - @volatile var dead = false - /** * Called when a stage completes successfully or fails, with information on the completed stage. */ From 3d7f435f8452faff71b98a9163cd8e86e77c0a79 Mon Sep 17 00:00:00 2001 From: Wang Shuo Date: Sat, 21 Dec 2019 15:32:42 +0800 Subject: [PATCH 6/6] Fix UT, address some review feedback --- .../apache/spark/scheduler/SparkListenerSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index bf2573b5bd2d3..12fa86c5483af 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -532,9 +532,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match Seq(true, false).foreach { throwInterruptedException => val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") { - val conf = new SparkConf(false) - .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) - val bus = new LiveListenerBus(conf) + val LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS = 10 * 1000L // 10 seconds + val bus = new LiveListenerBus(new SparkConf(false)) val counter1 = new BasicJobCounter() val counter2 = new BasicJobCounter() val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3) @@ -559,8 +558,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Notify interrupting listener starts to work interruptingListener.sleep = false // Wait for bus to stop - stoppingThread.join() + stoppingThread.join(LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS) + // Stopping has been finished + assert(stoppingThread.isAlive === false) // All queues are removed assert(bus.activeQueues() === Set.empty) assert(counter1.count === 5) @@ -676,8 +677,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match * else count SparkListenerJobEnd numbers */ private class DelayInterruptingJobCounter( - val throwInterruptedException: Boolean, - val interruptOnJobId: Int) extends SparkListener { + val throwInterruptedException: Boolean, + val interruptOnJobId: Int) extends SparkListener { @volatile var sleep = true var count = 0