From 7182498b0af11bd20e7615b5c66f8722be4044a3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 11 Apr 2015 21:31:21 +0800 Subject: [PATCH 1/2] Add synchronized to make sure we have a consistent view of StreamingJobProgressListener when creating the content --- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index bfe8086fcf8fe..b6dcb62bfeec8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab) /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { - val content = + val content = listener.synchronized { generateBasicStats() ++

++

Statistics over last {listener.retainedCompletedBatches.size} processed batches

++ generateReceiverStats() ++ generateBatchStatsTable() + } UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) } From cec6f92fb3a1cb27176f8237dfbde8e37a5ba2f4 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 11 Apr 2015 23:36:58 +0800 Subject: [PATCH 2/2] Add missing 'synchronized' in StreamingJobProgressListener --- .../streaming/ui/StreamingJobProgressListener.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 84f80e638f638..be1e8686cf9fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -149,7 +149,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) }.toMap } - def lastReceivedBatchRecords: Map[Int, Long] = { + def lastReceivedBatchRecords: Map[Int, Long] = synchronized { val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => (0 until numReceivers).map { receiverId => @@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } - def receiverInfo(receiverId: Int): Option[ReceiverInfo] = { + def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized { receiverInfos.get(receiverId) } - def lastCompletedBatch: Option[BatchInfo] = { + def lastCompletedBatch: Option[BatchInfo] = synchronized { completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption } - def lastReceivedBatch: Option[BatchInfo] = { + def lastReceivedBatch: Option[BatchInfo] = synchronized { retainedBatches.lastOption } - private def retainedBatches: Seq[BatchInfo] = synchronized { + private def retainedBatches: Seq[BatchInfo] = { (waitingBatchInfos.values.toSeq ++ runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering) }