From ff4c4496a485c91f113ae8662a31261dd6f41987 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 19 Sep 2017 19:23:49 +0800 Subject: [PATCH 1/4] [SPARK-22058][CORE]the BufferedInputStream will not be closed if an exception occurs --- .../spark/scheduler/EventLoggingListener.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 00ab2a393e17f..e2474f2bac023 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -347,15 +347,14 @@ private[spark] object EventLoggingListener extends Logging { def openEventLog(log: Path, fs: FileSystem): InputStream = { val in = new BufferedInputStream(fs.open(log)) - // Compression codec is encoded as an extension, e.g. app_123.lzf - // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) - val codecName: Option[String] = logName.split("\\.").tail.lastOption - val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) - } - try { + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + val codecName: Option[String] = logName.split("\\.").tail.lastOption + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { case e: Exception => From 824856b9f2080cbc2e99ac75a9592f7386322e0b Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 19 Sep 2017 19:33:32 +0800 Subject: [PATCH 2/4] fix code review --- .../apache/spark/scheduler/EventLoggingListener.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e2474f2bac023..06051bc240bb6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -347,11 +347,12 @@ private[spark] object EventLoggingListener extends Logging { def openEventLog(log: Path, fs: FileSystem): InputStream = { val in = new BufferedInputStream(fs.open(log)) + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + val codecName: Option[String] = logName.split("\\.").tail.lastOption + try { - // Compression codec is encoded as an extension, e.g. app_123.lzf - // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) - val codecName: Option[String] = logName.split("\\.").tail.lastOption val codec = codecName.map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } From e3f8e0d0c45c89f0da31a5faa1c4d682ad257912 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Wed, 20 Sep 2017 16:37:16 +0800 Subject: [PATCH 3/4] handle Throwable --- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 06051bc240bb6..6bb950088e796 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -358,7 +358,7 @@ private[spark] object EventLoggingListener extends Logging { } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => in.close() throw e } From 2e5f21a1a89bbf69431f918d3043ff6b58420dd8 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Fri, 22 Sep 2017 16:19:43 +0800 Subject: [PATCH 4/4] fix Scala style checks --- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 6bb950088e796..9dafa0b7646bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -351,7 +351,7 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption - + try { val codec = codecName.map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))