diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 84f80e638f638..be1e8686cf9fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -149,7 +149,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap
}
- def lastReceivedBatchRecords: Map[Int, Long] = {
+ def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
@@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId)
}
- def lastCompletedBatch: Option[BatchInfo] = {
+ def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
- def lastReceivedBatch: Option[BatchInfo] = {
+ def lastReceivedBatch: Option[BatchInfo] = synchronized {
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchInfo] = synchronized {
+ private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index bfe8086fcf8fe..b6dcb62bfeec8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
- val content =
+ val content = listener.synchronized {
generateBasicStats() ++
++