From 9c5fa648cde2694b212b46a1faebeecc57982d7b Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 21 Feb 2019 13:22:44 +0900 Subject: [PATCH 1/4] [SPARK-26949][SS] Prevent 'purge' to remove needed batch files in CompactibleFileStreamLog --- .../streaming/CompactibleFileStreamLog.scala | 10 ++++++++ .../CompactibleFileStreamLogSuite.scala | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 77bc0ba5548dd..2fa188a33e411 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -163,6 +163,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( batchAdded } + /** + * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal + * state, specifically which latest compaction batch is purged. + * + * To simplify the situation, this method just throws UnsupportedOperationException regardless + * of given parameter, and let CompactibleFileStreamLog handles purging by itself. + */ + override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException( + s"'purge' might break internal state of CompactibleFileStreamLog hence not supported") + /** * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the * corresponding `batchId` file. It will delete expired files as well if enabled. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index ec961a9ecb592..76a99c26d8952 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -232,6 +232,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } + test("prevent removing metadata files via method purge") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = 10000, + defaultCompactInterval = 2, + defaultMinBatchesToRetain = 3, + compactibleLog => { + // compaction batches: 1 + compactibleLog.add(0, Array("some_path_0")) + compactibleLog.add(1, Array("some_path_1")) + compactibleLog.add(2, Array("some_path_2")) + + val exc = intercept[UnsupportedOperationException] { + compactibleLog.purge(2) + } + assert(exc.getMessage.contains("breaks internal state of CompactibleFileStreamLog")) + + // Below line would fail with IllegalStateException if we don't prevent purge: + // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch + // - allFiles() would read batch 1 (latest compaction) and 2 which batch 1 is deleted + compactibleLog.allFiles() + }) + } + private def withFakeCompactibleFileStreamLog( fileCleanupDelayMs: Long, defaultCompactInterval: Int, From a51564f1ad08690ee8971ef12d938d479ed52f77 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 21 Feb 2019 18:09:21 +0900 Subject: [PATCH 2/4] Fix a silly mistake --- .../sql/execution/streaming/CompactibleFileStreamLogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 76a99c26d8952..d998ac82c6e1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -246,7 +246,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext val exc = intercept[UnsupportedOperationException] { compactibleLog.purge(2) } - assert(exc.getMessage.contains("breaks internal state of CompactibleFileStreamLog")) + assert(exc.getMessage.contains("break internal state of CompactibleFileStreamLog")) // Below line would fail with IllegalStateException if we don't prevent purge: // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch From ba62794bae0735762d0b4a2b4cd577c19383c6b3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 12 Jun 2019 17:43:11 +0900 Subject: [PATCH 3/4] Refine sentence on exception --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 2fa188a33e411..16f279cd49e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -171,7 +171,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * of given parameter, and let CompactibleFileStreamLog handles purging by itself. */ override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException( - s"'purge' might break internal state of CompactibleFileStreamLog hence not supported") + s"Cannot purge as it might break internal state.") /** * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the From f8adab32b0eec1ae96f124a563f4393bf3db6659 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 12 Jun 2019 21:31:17 +0900 Subject: [PATCH 4/4] Fix UT --- .../sql/execution/streaming/CompactibleFileStreamLogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index d998ac82c6e1e..71dc3776bcaf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -246,7 +246,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext val exc = intercept[UnsupportedOperationException] { compactibleLog.purge(2) } - assert(exc.getMessage.contains("break internal state of CompactibleFileStreamLog")) + assert(exc.getMessage.contains("Cannot purge as it might break internal state")) // Below line would fail with IllegalStateException if we don't prevent purge: // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch