From 8cc5e835b209d5796b044978ec4221ee22a8b9d2 Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 8 Sep 2016 12:59:15 -0700 Subject: [PATCH 1/2] Added purge() call to scheduler --- .../apache/spark/sql/execution/streaming/MetadataLog.scala | 1 + .../spark/sql/execution/streaming/StreamExecution.scala | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 78d6be17df05a..8de77025b1736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Allow the user to remove obsolete metdata */ trait MetadataLog[T] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5e1e5eeb50936..1c3c342187208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,6 +290,12 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we have logged the new batch, no further processing will happen for + // the previous batch, and it is safe to discard the old metadata. + // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in + // flight at the same time), this cleanup logic will need to change. + offsetLog.purge(currentBatchId - 1) } else { awaitBatchLock.lock() try { From d71366d958334ebbc81e45c7f469bad2a68d0a2d Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 9 Sep 2016 21:23:58 -0700 Subject: [PATCH 2/2] Added test case and corrected off-by-one error. --- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/streaming/StreamingQuerySuite.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1c3c342187208..c7f8cef409462 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -295,7 +295,7 @@ class StreamExecution( // the previous batch, and it is safe to discard the old metadata. // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId - 1) + offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315c20031..879f993cf06f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbarge collection") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(6 / _) + + // Run a few batches through the application + testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + // Three batches have run, but only one set of metadata should be present + AssertOnQuery( + q => { + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 + toTest.size == 1 && toTest.head == "2" + true + } + ) + ) + } + /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. *